如何用RabbitMQ和Swoole实现一个异步任务系统

吾爱主题 阅读:129 2021-11-17 15:59:00 评论:0
目录
  • 系统介绍
  • 事件生产者
  • 任务调度器
  • 消费者
    • 正常任务
    • 延迟任务
  • 自定义调度器

系统介绍

从图中可以看到,我们这个系统是一个基于事件的异步任务系统。就是说当一个事件产生时,生产者将事件抛给调度器,调度器负责查询事件下有哪些任务,然后将这些任务丢到相应的队列中,最后由消费者消费任务队列中的任务。

在整个系统中主要分为三大部分

1.事件生产者,即产生消息事件的一方。

2.任务调度器(Scheduler),负责注册事件并调度任务。

3.消费者(Worker),负责消费任务队列中的任务。

事件生产者

事件生产者很简单,在业务系统中直接调用即可,代码如下。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <?php   require_once DIR. '/../autoload.php' ;   use Asynclib\Ebats\Event;   try {        $event = new Event( 'order_paied' );  //定义事件        $event ->setOptions([ 'order_id' => 'FB138020392193312' ]); //事件产生的参数        $event ->publish();   } catch (Exception $exc ){        echo $exc ->getMessage();   }

任务调度器

调度器主要做两件事,一是注册事件,另一个是调度任务。

注册事件代码如下:

?
1 2 3 4 5 //注册事件   EventManager::register( 'order_create' , 'closeOrder' , 'demo' , 10); //关闭未付款订单(延迟任务)   EventManager::register( 'order_paied' , 'virtualShipping' , 'demo' ); //虚拟商品自动发货

这样就注册了两个事件,事件下各有一个任务。

具体调度部分代码很简单,就不多赘述,有兴趣的可以去看代码。

消费者

重头戏来了,一个异步任务系统最重要的就是消费端了,现在让我们来看下Worker的流程图。

可以看到,在这里我们采用了两个交换器和两个队列,一个负责处理正常的任务即ntask,另一个负责处理需要延迟执行的任务即dtask。简单描述下一个任务的生命周期。

正常任务

1、task产生,进入正常任务的交换器Exchange[ebats_core_ntask]

2、交换器根据topic将任务分发到对应的队列中

3、子进程ntask阻塞等待成功获取到task,并执行该任务

4、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException

5、子进程ntask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]

6、将任务执行信息回调给上层开发者以便保存查看

延迟任务

1、子进程dtask阻塞等待成功获取到task,并执行该任务
2、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException
3、子进程dtask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]
4、将任务执行信息回调给上层开发者以便保存查看

消费者代码如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 require_once DIR. '/../autoload.php' ;   require_once DIR. '/task/TaskDemoModel.php' ;   use Asynclib\Ebats\Worker;        //执行结果回调函数   $callback = function ( $topic , $taskid , $taskname , $params , $timeuse , $message ){        };   $worker = new Worker( $callback );  //支持多进程消费默认为1   $worker ->setQueue( 'demo' );  //队列名和事件的topic一一对应   $worker ->run();

自定义调度器

一般来说这是一个基于事件的任务系统,那么能不能直接产生任务呢。答案是肯定的。

只需要创建一个自定义调度器,由您自行实现调度逻辑,最终生成一个任务即可。代码如下:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 <?php   require_once DIR. '/../autoload.php' ;   use Asynclib\Ebats\Task;   use Asynclib\Core\Consumer;   use Asynclib\Amq\ExchangeTypes;   use Asynclib\Exception\ExceptionInterface;        /**     * 本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器     */   try {        $worker = new Consumer();        $worker ->setExchange( 'order_fanout' , ExchangeTypes::TOPIC);        $worker ->setQueue( 'shzf_order_paied' , [ '*.*.WAIT_SELLER_SEND_GOODS' ]);        $worker ->run( function ( $key , $msg ){            $order_data = json_encode( $msg );            echo " [$key] $order_data \n" ;            Task::create( 'demo' , 'orderAsync' , $msg ); //创建任务,之后消息将作为参数由任务接管处理        });   } catch (ExceptionInterface $exc ){        echo $exc ->getMessage();   }

这样,当接收到消息时就会产生一个orderAsync的任务,您只需要启动一个用来消费这个Topic的Worker即可。

也许你会觉得这里直接写业务逻辑的代码就可以了,实际上也确实可以。当你可以忍受一个进程慢慢消费的时候是可以这样做的。但大多数情况下我们还是希望它能够尽快的消费掉,所以建议这里只负责创建任务,具体任务的业务逻辑由worker去执行。

以上就是如何用RabbitMQ和Swoole实现一个异步任务系统的详细内容,更多关于用RabbitMQ和Swoole实现一个异步任务系统的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/a609251438/p/12510476.html

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容