Laravel中Kafka的使用详解

吾爱主题 阅读:196 2021-11-09 15:07:00 评论:0

 

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 <?php namespace App\Tools;   use Illuminate\Config\Repository;   use Illuminate\Support\Facades\DB; use Monolog\Logger; use Monolog\Handler\StreamHandler;   use Illuminate\Http\Request;   class Kafka {    public $broker_list = '127.0.0.1' ; //配置kafka,可以用逗号隔开多个kafka    public $topic = 'test' ; //管道名称    public $partition = 0;      protected $producer = null;    protected $consumer = null;      public function __construct()    {      if ( empty ( $this ->broker_list)) {        throw new InvalidConfigException( "broker not config" );      }      $rk = new \RdKafka\Producer();      if ( empty ( $rk )) {        throw new InvalidConfigException( "producer error" );      }      $rk ->setLogLevel(LOG_DEBUG);      if (! $rk ->addBrokers( $this ->broker_list)) {        throw new InvalidConfigException( "producer error" );      }      $this ->producer = $rk ;    }      /**     * 生产者     * @param array $messages     * @return mixed     */    public function send( $messages = [], $topic )    {      $topic = $this ->producer->newTopic( $topic );      return $topic ->produce(RD_KAFKA_PARTITION_UA, $this ->partition, json_encode( $messages ));    }      /**     * 消费者     */    public function consumer( $object , $callback ){      $conf = new \RdKafka\Conf();      $conf ->set( 'group.id' , 0);      $conf ->set( 'metadata.broker.list' , $this ->broker_list);        $topicConf = new \RdKafka\TopicConf();      $topicConf ->set( 'auto.offset.reset' , 'smallest' );        $conf ->setDefaultTopicConf( $topicConf );        $consumer = new \RdKafka\KafkaConsumer( $conf );        $consumer ->subscribe([ $this ->topic]);        echo "waiting for messages.....\n" ;      while (true) {        $message = $consumer ->consume(120*1000);        switch ( $message ->err) {          case RD_KAFKA_RESP_ERR_NO_ERROR:            echo "message payload...." ;            $object -> $callback ( $message ->payload);            break ;        }        sleep(1);      }    } } ?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public function test(){       $topic = 'tool' ; //输入使用管道名称     $data [ 'shop_id' ] = 58;     $data [ 'bar_code' ]=586;     $data [ 'goods_num' ] = 1;     $data [ 'goods_unit' ] = '个' ;   $Kafka = new Kafka(); $Error_Msg = $Kafka ->send( $data , $topic ); //传入数组会自动转换json var_dump( $Error_Msg );        }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 <?php   $conf = new RdKafka\Conf();   $conf ->set( 'group.id' , 'myConsumerGroup' );   $rk = new RdKafka\Consumer( $conf ); $rk ->addBrokers( "localhost:9092" );   $topicConf = new RdKafka\TopicConf(); $topicConf ->set( 'auto.commit.interval.ms' , 100); $topicConf ->set( 'offset.store.method' , 'file' ); $topicConf ->set( 'offset.store.path' , sys_get_temp_dir()); $topicConf ->set( 'auto.offset.reset' , 'smallest' );   $topic = $rk ->newTopic( "tool" , $topicConf ); //读取的管道   // Start consuming partition 0 $topic ->consumeStart(0, RD_KAFKA_OFFSET_STORED);   while (true) {    $message = $topic ->consume(0, 120*10000);    switch ( $message ->err) {      case RD_KAFKA_RESP_ERR_NO_ERROR:      //没有错误打印信息        $message = json_decode(json_encode( $message ),true);        $data = json_decode( $message [ 'payload' ],true);        var_dump( $data );        break ;      case RD_KAFKA_RESP_ERR__PARTITION_EOF:        echo "等待接收信息\n" ;        break ;      case RD_KAFKA_RESP_ERR__TIMED_OUT:        echo "超时\n" ;        break ;      default :        throw new \Exception( $message ->errstr(), $message ->err);        break ;    }   sleep(1); }   ?>

到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/qq_16619361/article/details/90712742

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容