前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据。

  其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.topic = TRAFFIC_LOG
agent1.sinks.kafkaSink.brokerList = 10.208.129.3:,10.208.129.4:,10.208.129.5:
agent1.sinks.kafkaSink.metadata.broker.list = 10.208.129.3:,10.208.129.4:,10.208.129.5:
agent1.sinks.kafkaSink.producer.type=sync
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.DefaultEncoder
agent1.sinks.kafkaSink.channel = memoryChannel

  那么当Flume的channel收到数据的时候,会根据配置文件主动把数据event发送到Kafka的broker上,所以只要安装好Kafka就可以消费收据了。

Step 1: Download the code

下载安装包并解压

 > tar -xzf kafka_2.11-0.10.0.0.tgz

 > cd kafka_2.11-0.10.0.0

Step 2: Start the server

Kafka是基于Zookeeperl来实现分布式协同的,因此先启动Zookeeper:

 > %Zookeeper_Home%/bin/zkServer.sh start

在配置文件server.properties中把下面一句前面的注释去掉,然后启动Kafka服务器

 > #listeners=PLAINTEXT://:9092

 > bin/kafka-server-start.sh config/server.properties

接下来启动其他两个broker:

 > cp config/server.properties config/server-1.properties

 > cp config/server.properties config/server-2.properties

 修改配置文件,broker.id不能重复

config/server-1.properties:
broker.id=1
config/server-2.properties:
broker.id=2

Step 3: Create a topic

创建一个TRAFFIC_LOG主题的broker,复制因子为3(因为有3个Kafka服务器集群),分区个数为1

 > bin/kafka-topics.sh --create --zookeeper 10.208.129.4:2181 --replication-factor 3 --partitions 1 --topic TRAFFIC_LOG

Step 5: Start a consumer

 > bin/kafka-console-consumer.sh --zookeeper 10.208.129.4:2181/kafka --topic TRAFFIC_LOG --from-beginning

topic一定要写正确了,否则消费不到数据

如果在终端看到之前接入的Thrift Source输出,那么整个Flume+Kafka算是跑通了

这里已经引入了Flume和Kafka,下一篇将介绍Kafka以及Flume和Kafka的区别

最新文章

  1. C#制作简易屏保
  2. WPF中RadioButton绑定数据的正确方法
  3. NSUserDefault的使用
  4. asp.net的JSONHelper 类
  5. hdoj 1016 Prime Ring Problem
  6. 通过IP的方式建立PSSession
  7. 设置navigation baritem方法
  8. ASP.NET&AJAX&JSON - 动态读取数据
  9. C# 反射类型转换
  10. docker 中运行 sshd 服务
  11. Android权限机制
  12. [Everyday Mathematics]20150106
  13. 强大DevExpress,Winform LookUpEdit 实现多列查询 gridview弹出下拉选择 z
  14. linux下64位汇编的系统调用系列
  15. 什么是WEBserver? 经常使用的WEBserver有哪些?
  16. 用Pyton玩转数据练习题---第二周
  17. 执行sql时出现错误 extraneous input ';' expecting EOF near '<EOF>'
  18. Winform开发框架中工作流模块的业务表单开发
  19. mysql 主从同步 mysql代理服务器
  20. 设置Nginx+php-fpm显示错误信息

热门文章

  1. android布局ui
  2. Doing Homework(HDU 1074状压dp)
  3. 【ruby on rail 项目之 VPS下载机】
  4. LeetCode题解——Add Two Numbers
  5. win7 开wifi热点
  6. tomcat memory leak
  7. leetcode@ [79/140] Trie树应用 Word Search / Word Search II
  8. 第九章、文件与文件系统的压缩与打包 Linux 系统常见的压缩命令
  9. [iOS基础控件 - 4.2] APP列表 字典转模型Model
  10. Objective-C Runtime 运行时之五:协议与分类