搭建kafka高级消费 (high-consumer)php7
2024-08-25 08:32:52
说明:有很多同学在服务器上搭建好,kafka,在应用端使用kafka时候出现很多问题,这里提供下我的kafka生产和消费的php函数
环境说明:
1:首先php要有kafka扩展,在命令行中输入 php -m 看是否有rdkafka
没有的话需要安装配置下:
--------------- kafka php客户端安装(php-rdkafka) --------------
1.安装 librdkafka
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make
sudo make install
2.安装php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install
vi /usr/local/lib/php.ini
加入 extension=rdkafka.so
2:在kafka控制器中我直接贴出来我的生产和消费函数:
/**
* 生产单个消息
* @param string $topic
* @param null $post
*/
function kafka_produce($key=null,$post=null)
{
$rk = new \RdKafka\producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("localhost:9092,localhost:9093,localhost:9094,localhost:9095");
$topics = $rk->newTopic('engine.com');
$topics->produce(1, 0,$post,$key); echo 'kafka_produce success!!!';
}
/**
* 高级消费模式
* @param $topic
* @return int
* @throws Exception
*/
function kafka_high_consume($topic='engine.com'){
$conf = new \RdKafka\Conf();
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break; default:
throw new \Exception($err);
}
});
$conf->set('group.id', '0');
$conf->set('metadata.broker.list', 'localhost:9092,localhost:9093,localhost:9094,localhost:9095');
// 针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止
$conf->set('socket.timeout.ms', 50);
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
} $topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('auto.offset.reset', 'smallest');
$topicConf->set('offset.store.path', 'kafka_offset.log');
$conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf);
// $topics->consumeStart(1, RD_KAFKA_OFFSET_STORED); // 更新订阅集(自动分配partitions )
$consumer->subscribe([$topic]); // 指定topic分配partitions使用那个分区
// $consumer->assign([
// new \RdKafka\TopicPartition("zzy8", 0),
// new \RdKafka\TopicPartition("zzy8", 1),
// ]); while (true) {
// 设置120s为超时
$message = $consumer->consume(120 * 1000);
if (!empty($message)) { switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
info('New message received :', $message);
// 拆解对象为数组
$payload = json_decode($message->payload,true);
$Orders = new OrdersController();
$key = $message->key;
// 根据kafka中不同key,调用对应方法传递处理数据
...
...
...
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
var_dump("##################"); break;
default:
var_dump("nothing"); throw new \Exception($message->errstr(), $message->err);
break;
} } else {
var_dump('this is empty obj!!!');
}
} return 0; }
在这里说明下,我放的是我项目中的使用函数,一些参数配置,大家可以根据我的提示自行注释和使用。
最新文章
- java异常处理(父子异常的处理)
- TODO:软件升级的那些事
- IIS 8.5 伪静态去掉index.php thinkphp 3.2.2
- poj 1321 棋盘问题
- innerHTML,innerText,outHTML,outText区别
- ubuntu12.04 gdb安装使用
- github客户端创建仓库
- 【MySql】Linux下更改转移mysql数据库目录
- Lucene学习总结之一:全文检索的基本原理
- js自写字符串 append 方法
- Java 课程设计 ";Give it up";小游戏(团队)
- Activity 的 4 种加载模式
- pop 一个viewController时候会有键盘闪现出来又消失
- hdu2062 Subset sequence----递推
- 关于CSS引入方式的详细见解
- PHP基础介绍
- Vue学习【第二篇】:ES6简单介绍
- zookeeper 集群配置采坑 Connection refused WARN [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:QuorumCnxManager@584] - Cannot open channel to 3 at election address slave2/192.168.127.133:3888
- java项目中显示图表:struts2整合jfreechart
- vue--子组件主动获取父组件的数据和方法
热门文章
- jekyll开发静态网站
- java.lang.IllegalArgumentException: XML fragments parsed from previous mappers does not contain value for
- 几种常见的Windows 服务器无法联网/无法连接远程桌面等故障解决方案
- 创建基于 AFS 的 Docker 容器卷
- MySQL 8.0复制性能的提升(翻译)
- 使用@selector模仿代理功能降低代码耦合度
- 音乐MP4网站 车辆工程 冯大昕
- php测试工具
- IOS XMPP(即时通讯的框架)
- 组合数取模&;&;Lucas定理题集