Logstash工作原理

由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处::

1 生产者的负载与消费者的负载解耦

2 消费者按照自己的能力fetch数据

3 消费者可以自定义消费的数量

由于broker采用了主题topic–>分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader–>followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。



由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash–>kafka–>logstash的方案了。

接下来,按照下面的步骤就可以实现logstash与kafka的对接了。

启动zookeeper

  1. $zookeeper/bin/zkServer.sh start

启动kafka

  1. $kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

创建主题

  1. $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

查看主题

  1. $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

测试环境

执行生存者脚本

  1. $kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

执行消费者脚本

  1. $kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

测试输入

  1. input{
  2. stdin{}
  3. }
  4. output{
  5. kafka{
  6. topic_id => "hello"
  7. bootstrap_servers => "192.168.0.4:9092" # kafka的地址
  8. batch_size => 5
  9. }
  10. stdout{
  11. codec => rubydebug
  12. }
  13. }

读取测试

  1. input{
  2. kafka {
  3. codec => "plain"
  4. group_id => "logstash1"
  5. auto_offset_reset => "smallest"
  6. reset_beginning => true
  7. topic_id => "hello"
  8. #white_list => ["hello"]
  9. #black_list => nil
  10. zk_connect => "192.168.0.5:2181" # zookeeper的地址
  11. }
  12. }
  13. output{
  14. stdout{
  15. codec => rubydebug
  16. }
  17. }

最新文章

  1. Hibernate的一级缓存
  2. XtraReport 添加空行的办法,很详细
  3. Visual Studio 2010中的stdafx.h和targetver.h两个头文件是有什么用?
  4. js 去掉字符串前面的0
  5. BarEditItem ContentTemplate
  6. 解决Deprecated: mysql_connect(): The mysql extension is deprecated and will be removed in the future:
  7. C++中const用法总结
  8. 《foreach循环示例》
  9. 2. scala中的数组
  10. 【Xamarin开发 Android 系列 3】循序渐进的学习顺序
  11. [C++程序设计]基于对象的程序设计 基于对象的程序设计
  12. 论文阅读笔记 - YARN : Architecture of Next Generation Apache Hadoop MapReduceFramework
  13. js原生继承之——构造函数式继承实例
  14. 【node】使用nvm管理node版本
  15. NLog在asp.net中的使用
  16. BZOJ_3110_[Zjoi2013]K大数查询_整体二分+树状数组
  17. 6.JAVA-链表实例
  18. python之模块colorsys颜色转换模块 暂不了解
  19. Spark中的IsNotNull函数怎么用
  20. linux,centOS,用LNMP搭建wordpress,更新固定连接--全流程

热门文章

  1. chrome调试如何禁用浏览器缓存
  2. OpenShift-EFK日志管理
  3. docker 端口映射错误解决方法
  4. 20155308『网络对抗技术』Exp7:网络欺诈防范
  5. 20155338《网络对抗》Exp6 信息搜集与漏洞扫描
  6. 关于Trie的一些算法
  7. Spark(Python) 从内存中建立 RDD 的例子
  8. 洛咕 P4474 王者之剑
  9. Android 模拟输入那点事
  10. 10、Dockerfile实战-PHP