1. 使用127.0.0.1启动生产和消费进程:

1)启动生产者进程:

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

输入消息:

this is msg

生产者进程报错:

  1. [2016-06-03 11:33:47,934] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)
  2. [2016-06-03 11:33:49,554] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)
  3. [2016-06-03 11:33:51,177] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)
  4. [2016-06-03 11:33:53,398] WARN Bootstrap broker 127.0.0.1:9092 disconnected (org.apache.kafka.clients.NetworkClient)
 
 

2)启动消费者进程:

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

消费者进程报错:

  1. [2016-06-03 11:34:53,574] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)
  2. java.nio.channels.ClosedChannelException
  3. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  4. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  5. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  6. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  7. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  8. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  9. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  10. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  11. [2016-06-03 11:34:53,651] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
  12. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed
  13. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
  14. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  15. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  16. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  17. Caused by: java.nio.channels.ClosedChannelException
  18. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  19. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  20. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  21. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  22. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  23. ... 3 more
  24. [2016-06-03 11:35:14,916] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)
  25. java.nio.channels.ClosedChannelException
  26. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  27. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  28. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  29. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  30. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  31. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  32. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  33. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  34. [2016-06-03 11:35:14,918] WARN [console-consumer-72675_zzs-1464924871670-2192d80a-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
  35. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed
  36. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
  37. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  38. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  39. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  40. Caused by: java.nio.channels.ClosedChannelException
  41. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  42. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  43. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  44. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  45. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  46. ... 3 more

2 使用localhost启动生产和消费进程:

1)启动生产者进程:

  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  2. this is msg
  3. [2016-06-03 11:44:16,932] WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  4. [2016-06-03 11:44:18,255] WARN Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  5. [2016-06-03 11:44:18,648] WARN Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  6. [2016-06-03 11:44:18,801] WARN Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  7. [2016-06-03 11:44:18,928] WARN Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  8. [2016-06-03 11:44:19,035] WARN Error while fetching metadata with correlation id 5 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  9. [2016-06-03 11:44:19,180] WARN Error while fetching metadata with correlation id 6 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
  10. [2016-06-03 11:44:19,308] WARN Error while fetching metadata with correlation id 7 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

2)启动消费者进程:

  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  2. [2016-06-03 11:45:18,330] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,218.30.64.194,9092)] failed (kafka.client.ClientUtils$)
  3. java.nio.channels.ClosedChannelException
  4. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  5. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  6. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  7. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  8. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  9. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  10. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  11. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  12. [2016-06-03 11:45:18,541] WARN [console-consumer-42554_zzs-1464925496436-ae2ee9c7-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
  13. kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,218.30.64.194,9092))] failed
  14. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
  15. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
  16. at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
  17. at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
  18. Caused by: java.nio.channels.ClosedChannelException
  19. at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
  20. at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
  21. at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
  22. at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
  23. at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
  24. ... 3 more

3.解决问题

1)查看Kafka的配置文件,cat config/server.properties

  1. zookeeper.connect=localhost:2181

连接的zookeeper的为localhost,所以需要用localhost启动生产和消费进程

2)查看kafka启动的日志,发现

  1. Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(218.30.64.194,9092,PLAINTEXT) (kafka.utils.ZkUtils)

为什么启动的broker的ip是 218.30.64.194

==> 没有绑定Kafka启动监听的host信息

vi  config/server.properties

  1. listeners=PLAINTEXT://localhost:9092

3)重新启动zookeeper、kafka、consumer、producer

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

./bin/kafka-server-start.sh ./config/server.properties

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在producer中输入消息,可以在producer中消费:

  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  2. this is msg
  3. this is msg2
  1. [root@zzs kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  2. this is msg
  3. this is msg2
  4. this is msg3
  5. this is mgs4

问题解决!

最新文章

  1. RabbitMQ学习系列(五): RPC 远程过程调用
  2. rem ,em ,px的区别
  3. 【CMD】日常总结
  4. 基本套接字编程(3) -- select篇
  5. 关于一个js连续赋值问题之我见(词略穷,见谅)
  6. J2EE之ServletContext读取资源文件
  7. ASP.NET MVC轻教程 Step By Step 3 ——使用ViewBag
  8. Python for else 循环控制
  9. Android摄像头:只拍摄SurfaceView预览界面特定区域内容(矩形框)---完整(原理:底层SurfaceView+上层绘制ImageView)
  10. VMWare下ubuntu无法全屏的问题解决
  11. cocoapods安装及使用其中 添加新源: gem sources -a https://ruby.taobao.org/
  12. jpa Auditor 自动赋值与自定义 @CreatedBy @LastModifiedBy @CreatedDate @LastModifiedDate
  13. JS最简单的字符串转数字类型
  14. Debian setup the time
  15. Python代码分行问题
  16. K8S入门学习
  17. php让一个数组按照另外一个数组的键名进行排序
  18. Tensorflow张量的形状表示方法
  19. GIS-012-ArcGIS JS API 绘图
  20. uva11324 有向图的强连通分量+记忆化dp

热门文章

  1. @ExceptionHandler异常统一处理
  2. JQUERY获取loaded 宽高这么变态
  3. Spring Boot 2.0 教程 - 配置详解
  4. JAVA实现Base64编码的三种方式
  5. 关于局域网 手机连接apache服务器报403forbidden错误参考如下2017.6.6
  6. 给xmpphp添加了几个常用的方法
  7. rabbit入门教程
  8. asp.net core session丢失问题排查
  9. 查看Linux下系统资源占用常用命令(top、free、uptime)
  10. Cocoa包管理器之CocoaPods详解