Kafka是一种高吞吐的分布式发布订阅消息系统

kafka安装和简单测试

安装kafka

下载

wget https://www-us.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz

解压

tar -xzvf kafka_2.11-2.1.1.tgz

修改配置文件

cd kafka_2.11-2.1.1/config

zookeeper.properties 是zookeeper的配置文件,默认端口号2181,可不做修改

server.properties 是kafka配置文件,将 zookeeper.connect 这行 改为自己的zookeeper地址和端口号

这样就可以哦

修改完成之后 返回kafka主目录

cd ..

运行zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties   运行zookeeper

不要关闭此窗口 再开一个新窗口 重新进入kafka目录

bin/kafka-server-start.sh config/server.properties     运行kafka

运行producer和consumer

跟上步操作一样 不要关闭窗口 重新开 重新进入kafka目录

创建一个topic为test

把ip和port改为自己zookeeper中的    咱们上边就是localhost:2181

bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test

我们这么运行

(bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

)      

运行producer

bin/kafka-console-producer.sh --broker-list ip:port --topic test

(bin/kafka-console-producer.sh  --broker-list  localhost:2181 –topic  test )

跟上步操作一样 不要关闭窗口 重新开 重新进入kafka目录

运行consumer

bin/kafka-console-consumer.sh --bootstrap-server ip:port --topic test --from-beginning

(bin/kafka-console-consumer.sh –bootstrap-server localhost:2181 –topic test –from-beginning)

然后在producer发送信息 会发现 consumer的窗口会出现你发送的消息

cd  /usr/local/下边执行

安装kafka的php扩展

# 先安装rdkfka库文件

git clone https://github.com/edenhill/librdkafka.git

cd librdkafka/

./configure

make

sudo make install

执行完退出到local目录下

git clone https://github.com/arnaud-lb/php-rdkafka.git

cd php-rdkafka

phpize

./configure

make all -j 5

sudo make install

退出到cd /etc/nginx目录下

修改php.ini文件

vim [php]/php.ini

添加 extension=rdkafka.so

如果没有报错,php扩展安装就完成啦!

接下来 创建生产者文件

cd /usr/share/nginx/html/

PHP代码实践

<?php

$conf = new RdKafka\Conf();

$conf->setDrMsgCb(function ($kafka, $message) {

file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);

});

$conf->setErrorCb(function ($kafka, $err, $reason) {

file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);

});

$rk = new RdKafka\Producer($conf);

$rk->setLogLevel(LOG_DEBUG);

$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();

// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset

// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉

$cf->set('request.required.acks', 0);

$topic = $rk->newTopic("test", $cf);

$option = 'qkl';

for ($i = 0; $i < 20; $i++) {

//RD_KAFKA_PARTITION_UA自动选择分区

//$option可选

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);

}

$len = $rk->getOutQLen();

while ($len > 0) {

$len = $rk->getOutQLen();

var_dump($len);

$rk->poll(50);

}

创建完运行生产者

php producer.php

会输出

int(20)
int(20)
int(20)
int(20)
int(0)



你也可以试着输入一些消息



创建消费者文件,同一个目录

cd /usr/share/nginx/html/

comsumer.php

<?php

$conf = new RdKafka\Conf();

$conf->setDrMsgCb(function ($kafka, $message) {

    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);

});

$conf->setErrorCb(function ($kafka, $err, $reason) {

    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);

});

 

//设置消费组

$conf->set('group.id', 'myConsumerGroup');

 

$rk = new RdKafka\Consumer($conf);

$rk->addBrokers("127.0.0.1");

 

$topicConf = new RdKafka\TopicConf();

$topicConf->set('request.required.acks', 1);

//在interval.ms的时间内自动提交确认、建议不要启动

//$topicConf->set('auto.commit.enable', 1);

$topicConf->set('auto.commit.enable', 0);

$topicConf->set('auto.commit.interval.ms', 100);

 

// 设置offset的存储为file

//$topicConf->set('offset.store.method', 'file');

// 设置offset的存储为broker

 $topicConf->set('offset.store.method', 'broker');

//$topicConf->set('offset.store.path', __DIR__);

 

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest

//largest:简单理解为从最新的开始消费,其实等价于上面的 latest

//$topicConf->set('auto.offset.reset', 'smallest');

 

$topic = $rk->newTopic("test", $topicConf);

 

// 参数1消费分区0

// RD_KAFKA_OFFSET_BEGINNING 重头开始消费

// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费

// RD_KAFKA_OFFSET_END 最后一条消费

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //

//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

 

while (true) {

    //参数1表示消费分区,这里是分区0

    //参数2表示同步阻塞多久

    $message = $topic->consume(0, 12 * 1000);

    if (is_null($message)) {

        sleep(1);

        echo "No more messages\n";

        continue;

    }

    switch ($message->err) {

        case RD_KAFKA_RESP_ERR_NO_ERROR:

            var_dump($message);

            break;

        case RD_KAFKA_RESP_ERR__PARTITION_EOF:

            echo "No more messages; will wait for more\n";

            break;

        case RD_KAFKA_RESP_ERR__TIMED_OUT:

            echo "Timed out\n";

            break;

        default:

            throw new \Exception($message->errstr(), $message->err);

            break;

    }

}

设置完启动,会接收到你刚刚输入的消息

成功了!

 

最新文章

  1. Unity3d刚体Rigidbody与碰撞检测Collider
  2. 如何判断Javascript对象是否存在
  3. python课程第二周重点记录
  4. tomcat连接器
  5. JavaScript笔记——this的取值
  6. BZOJ 3573 米特运输
  7. 自定义异常以及runtime类
  8. 使用grunt运行hintjs任务
  9. (转载)在状态栏即时显示Hint
  10. JSONModel的基本使用
  11. varnish esi出现no esi processing, first char not ‘&lt;’的错误处理方式
  12. ToDoList-学习中看到的知识盲点
  13. Dom中的nodeName、nodeValue 、nodeType
  14. spring 事务无效解决方法
  15. ArcPy 拷贝数据库
  16. mpvue中使用wxParse,解析a标签跳转问题
  17. Java 将容器 Map中的内容保存到数组
  18. DDoS的类型及原理
  19. 关于lis的方案数
  20. java.io.Serializable中serialVersionUID的作用

热门文章

  1. win10更新后出现System.ComponentModel.Win32Exception
  2. kafka connect rest api
  3. [STM32F103]RTC日历
  4. Koa,React和socket.io
  5. solr参数之facet
  6. LeetCode 92. Reverse Linked List II倒置链表2 C++
  7. spring boot tomcat 打本地包成war,通过Tomcat启动时出现问题: ZipException: error in opening zip file
  8. 基于前台vue,后台是spring boot的压缩图片上传
  9. Swift用户通知授权
  10. 对java开发者来说比较好网站客推荐