php测试kafka项目示例

吾爱主题 阅读:152 2021-09-28 11:53:00 评论:0

本文实例讲述了php测试kafka项目。分享给大家供大家参考,具体如下:

概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

安装kafka-php项目依赖

?
1 composer require nmred/kafka-php

produce.php

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <?php require './vendor/autoload.php' ; date_default_timezone_set( 'PRC' ); $config = \Kafka\ProducerConfig::getInstance(); $config ->setMetadataRefreshIntervalMs(10000); $config ->setMetadataBrokerList( '127.0.0.1:9092' ); $config ->setBrokerVersion( '0.10.2.1' ); $config ->setRequiredAck(1); $config ->setIsAsyn(false); $config ->setProduceInterval(500); $producer = new \Kafka\Producer( function () {   $t = time();   return array (   array (    'topic' => 'test' ,    'value' => $t ,    'key' => $t ,   ),   ); }); $producer ->success( function ( $result ) {   var_export( $result ); }); $producer ->error( function ( $errorCode ) {   var_dump( 'error' , $errorCode ); }); $producer ->send();

consumer.php

?
1 2 3 4 5 6 7 8 9 10 11 12 13 <?php require './vendor/autoload.php' ; date_default_timezone_set( 'PRC' ); $config = \Kafka\ConsumerConfig::getInstance(); $config ->setMetadataRefreshIntervalMs(10000); $config ->setMetadataBrokerList( '127.0.0.1:9092' ); $config ->setGroupId( 'test' ); $config ->setBrokerVersion( '0.10.2.1' ); $config ->setTopics( array ( 'test' )); $consumer = new \Kafka\Consumer(); $consumer ->start( function ( $topic , $part , $message ) {   var_dump( $message ); });

测试生产者

?
1 php produce.php

测试消费者

?
1 php consumer.php

希望本文所述对大家PHP程序设计有所帮助。

原文链接:https://my.oschina.net/qiongtaoli/blog/903889

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容