说明:有很多同学在服务器上搭建好,kafka,在应用端使用kafka时候出现很多问题,这里提供下我的kafka生产和消费的php函数

环境说明:

1:首先php要有kafka扩展,在命令行中输入 php -m  看是否有rdkafka

没有的话需要安装配置下:

--------------- kafka php客户端安装(php-rdkafka) --------------
1.安装 librdkafka
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make
sudo make install

2.安装php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vi /usr/local/lib/php.ini
加入 extension=rdkafka.so

2:在kafka控制器中我直接贴出来我的生产和消费函数:

 /**
* 生产单个消息
* @param string $topic
* @param null $post
*/
function kafka_produce($key=null,$post=null)
{
$rk = new \RdKafka\producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("localhost:9092,localhost:9093,localhost:9094,localhost:9095");
$topics = $rk->newTopic('engine.com');
$topics->produce(1, 0,$post,$key); echo 'kafka_produce success!!!';
}
/**
* 高级消费模式
* @param $topic
* @return int
* @throws Exception
*/
function kafka_high_consume($topic='engine.com'){
$conf = new \RdKafka\Conf();
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break; default:
throw new \Exception($err);
}
});
$conf->set('group.id', '0');
$conf->set('metadata.broker.list', 'localhost:9092,localhost:9093,localhost:9094,localhost:9095');
// 针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止
$conf->set('socket.timeout.ms', 50);
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
} $topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set('offset.store.path', 'kafka_offset.log');
$conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf);
// $topics->consumeStart(1, RD_KAFKA_OFFSET_STORED); // 更新订阅集(自动分配partitions )
$consumer->subscribe([$topic]); // 指定topic分配partitions使用那个分区
// $consumer->assign([
// new \RdKafka\TopicPartition("zzy8", 0),
// new \RdKafka\TopicPartition("zzy8", 1),
// ]); while (true) {
// 设置120s为超时
$message = $consumer->consume(120 * 1000);
if (!empty($message)) { switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
info('New message received :', $message);
// 拆解对象为数组
$payload = json_decode($message->payload,true);
$Orders = new OrdersController();
$key = $message->key;
// 根据kafka中不同key,调用对应方法传递处理数据
...
               ...
                ...
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
var_dump("##################"); break;
default:
var_dump("nothing"); throw new \Exception($message->errstr(), $message->err);
break;
} } else {
var_dump('this is empty obj!!!');
}
} return 0; }

  在这里说明下,我放的是我项目中的使用函数,一些参数配置,大家可以根据我的提示自行注释和使用。

最新文章

  1. java异常处理(父子异常的处理)
  2. TODO:软件升级的那些事
  3. IIS 8.5 伪静态去掉index.php thinkphp 3.2.2
  4. poj 1321 棋盘问题
  5. innerHTML,innerText,outHTML,outText区别
  6. ubuntu12.04 gdb安装使用
  7. github客户端创建仓库
  8. 【MySql】Linux下更改转移mysql数据库目录
  9. Lucene学习总结之一:全文检索的基本原理
  10. js自写字符串 append 方法
  11. Java 课程设计 "Give it up"小游戏(团队)
  12. Activity 的 4 种加载模式
  13. pop 一个viewController时候会有键盘闪现出来又消失
  14. hdu2062 Subset sequence----递推
  15. 关于CSS引入方式的详细见解
  16. PHP基础介绍
  17. Vue学习【第二篇】:ES6简单介绍
  18. zookeeper 集群配置采坑 Connection refused WARN [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@584] - Cannot open channel to 3 at election address slave2/192.168.127.133:3888
  19. java项目中显示图表:struts2整合jfreechart
  20. vue--子组件主动获取父组件的数据和方法

热门文章

  1. jekyll开发静态网站
  2. java.lang.IllegalArgumentException: XML fragments parsed from previous mappers does not contain value for
  3. 几种常见的Windows 服务器无法联网/无法连接远程桌面等故障解决方案
  4. 创建基于 AFS 的 Docker 容器卷
  5. MySQL 8.0复制性能的提升(翻译)
  6. 使用@selector模仿代理功能降低代码耦合度
  7. 音乐MP4网站 车辆工程 冯大昕
  8. php测试工具
  9. IOS XMPP(即时通讯的框架)
  10. 组合数取模&&Lucas定理题集