PHP实现RabbitMQ消息列队的示例代码

吾爱主题 阅读:183 2022-11-08 15:19:00 评论:0

业务场景

项目公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务。

当rabbitMQ服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:

项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitMQ的文档解释太少了!

所以开始踩坑!

 

1、首先部署好thinkphp6框架

过程去看thinkphp6手册

 

2、安装workerman扩展

过程去看thinkphp6手册

 

3、生产者

配置一个workerman类

创建的Send类代码如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
  //websocket地址,一会用于测试。
  protected $socket = 'websocket://127.0.0.1:2345';

  /**
   * 收到信息
   * @param $connection
   * @param $data
   */
  public function onMessage($connection, $data)
{
      //websocket发送过来的消息
      $connection->send('我收到你的信息了:'.$data);
      //rabbitMQ配置
      $options = [
          'host'=>'127.0.0.1',//rabbitMQ IP
          'port'=>5672,//rabbitMQ 通讯端口
          'user'=>'admin',//rabbitMQ 账号
          'password'=>'123456'//rabbitMQ 密码
      ];
      (new Client($options))->connect()->then(function (Client $client) {
          return $client->channel();
      })->then(function (Channel $channel) {
          /**
           * 创建队列(Queue)
           * name: ceshi         // 队列名称
           * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
           * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
           *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
           * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
           *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
           */
          return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
              return $channel;
          });
      })->then(function (Channel $channel) use($data){
          echo "发送消息内容:".$data."\n";

          /**
           * 发送消息
           * body 发送的数据
           * headers 数据头,建议 ['content_type' => 'text/plain'],这样消费端是springboot注解接收直接是字符串类型
           * exchange 交换器名称
           * routingKey 路由key
           * mandatory
           * immediate
           * @return bool|PromiseInterface|int
           */

          return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
              return $channel;
          });
      })->then(function (Channel $channel) {
          //echo " [x] Sent 'Hello World!'\n";
          $client = $channel->getClient();
          return $channel->close()->then(function () use ($client) {
              return $client;
          });
      })->then(function (Client $client) {
          $client->disconnect();
      });
  }

  /**
   * 当连接建立时触发的回调函数
   * @param $connection
   */
  public function onConnect($connection)
{

  }

  /**
   * 当连接断开时触发的回调函数
   * @param $connection
   */
  public function onClose($connection)
{

  }
  /**
   * 当客户端的连接上发生错误时触发
   * @param $connection
   * @param $code
   * @param $msg
   */
  public function onError($connection, $code, $msg)
{
      echo "error $code $msg\n";
  }

  /**
   * 每个进程启动
   * @param $worker
   */
  public function onWorkerStart($worker)
{


  }
}

上述都OK以后咱们可以项目路径下通过命令启动这个生产者:

php think worker:server

测试发送数据:

通过这个网站

连接【ws://127.0.0.1:2345】后发送数据!

前往rabbitMQ控制台

列队中有一条消息产生并且等待了!

这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?

笨思路呗:接口给内置服务器发消息->内置服务去发消息给rabbitMQ

将协议改为tcp

然后重新启动服务

然后去tp6创建一个路由接口

接口代码

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
  public function index(string $msg)
{
      //连接本地tcp服务
      $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
      //发送字符串
      fwrite($client, $msg."\n");
      //断开服务
      fclose($client);
      return 'OK';
  }

}

执行结果:

说明接口成功的将数据发送给了本地内置的tcp服务。

同时,内置服务将收到的数据给了rabbitMQ服务列队中。

生产者完成。

 

4、消费者

同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口

创建的Receive类代码如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
  protected $socket = 'tcp://127.0.0.1:2346';

  /**
   * 收到信息
   * @param $connection
   * @param $data
   */
  public function onMessage($connection, $data)
{

  }

  /**
   * 当连接建立时触发的回调函数
   * @param $connection
   */
  public function onConnect($connection)
{

  }

  /**
   * 当连接断开时触发的回调函数
   * @param $connection
   */
  public function onClose($connection)
{

  }
  /**
   * 当客户端的连接上发生错误时触发
   * @param $connection
   * @param $code
   * @param $msg
   */
  public function onError($connection, $code, $msg)
{
      echo "error $code $msg\n";
  }

  /**
   * 每个进程启动
   * @param $worker
   */
  public function onWorkerStart($worker)
{
      //rabbitMQ配置
      $options = [
          'host'=>'127.0.0.1',//rabbitMQ IP
          'port'=>5672,//rabbitMQ 通讯端口
          'user'=>'admin',//rabbitMQ 账号
          'password'=>'123456'//rabbitMQ 密码
      ];
      (new Client($options))->connect()->then(function (Client $client) {
          return $client->channel();
      })->then(function (Channel $channel) {
          /**
           * 创建队列(Queue)
           * name: ceshi         // 队列名称
           * passive: false      // 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建
           * durable: true       // 是否持久化,设置false是存放到内存中RabbitMQ重启后会丢失,
           *                        设置true则代表是一个持久的队列,服务重启之后也会存在,因为服务会把持久化的Queue存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的Queue
           * exclusive: false    // 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
           *  auto_delete: false // 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除
           */
          return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
              return $channel;
          });
      })->then(function (Channel $channel) {
          echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
          $channel->consume(
              function (Message $message, Channel $channel, Client $client) {
                  echo "接收消息内容:", $message->content, "\n";
              },
              'ceshi',
              '',
              false,
              true
          );
      });

  }
}

都OK以后咱们可以项目路径下通过命令启动这个消费者:

php think worker:server

此时应该会自动消费掉rabbitMQ中等待的消息!

到这里消费者也就结束啦!

 

5、整体测试

接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!

至于具体怎么灵活应用自行开拓大脑哦~

比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~

以上就是PHP实现RabbitMQ消息列队的示例代码的详细内容,更多关于PHP RabbitMQ消息列队的资料请关注服务器之家其它相关文章!

原文链接:https://mp.weixin.qq.com/s/af_NBSkHVoHyhQQkF4264Q

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容