一.Flume安装目录

1.安装部署目录
[admin@test01 apache-flume-1.9.0-bin]$ pwd
/opt/apache-flume-1.9.0-bin
2.将所需jar包复制到flume的lib目录下
flume-ng-sql-source-json-1.0.jar
mysql-connector-java-5.1.38.jar

二.压缩与解压

  • 压缩
 tar zcvf flume.tar.tgz apache-flume-1.9.0-bin/

三.Kafka 创建Topic 消费Topic

#创建topic
/bin/kafka-topics --create --zookeeper 10.160.26.85:2181 --replication-factor 2 --partitions 15 --topic flume_mysql_test
#消费topic
/bin/kafka-console-consumer --bootstrap-server 10.160.26.81:9092 --topic flume_mysql_test

四.启动flume服务

[admin@test01 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/ecarx_dealer.conf --name mfkagent -Dflume.root.logger=INFO,console

五.flume配置文件

mfkagent.sources=mysqlSource
mfkagent.channels=memoryChannel
mfkagent.sinks=activeTopCitySink #define source
# For each one of the sources, the type is defined
mfkagent.sources.mysqlSource.type = org.keedio.flume.source.SQLSource mfkagent.sources.mysqlSource.hibernate.connection.url = jdbc:mysql:/id:3306/db_name # Hibernate Database connection properties
mfkagent.sources.mysqlSource.hibernate.connection.user = db_test
mfkagent.sources.mysqlSource.hibernate.connection.password = password
mfkagent.sources.mysqlSource.hibernate.connection.autocommit = true
mfkagent.sources.mysqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
mfkagent.sources.mysqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver #mfkagent.sources.mysqlSource.table = navigation_active_city_top # Columns to import to kafka (default * import entire row)
#mfkagent.sources.mysqlSource.columns.to.select = * # Query delay, each configured milisecond the query will be sent
mfkagent.sources.mysqlSource.run.query.delay=100000 # Status file is used to save last readed row
mfkagent.sources.mysqlSource.status.file.path = /var/log/flume-ng
mfkagent.sources.mysqlSource.status.file.name = mysqlSource.status # Custom query
mfkagent.sources.mysqlSource.start.from = 0
ORDER BY ranking ASC
#mfkagent.sources.mysqlSource.custom.query = select * from table where id>$@$
# 加上$@$的原因是为了去除重复同步
mfkagent.sources.mysqlSource.custom.query = select * from table where id>$@$
#mfkagent.sources.mysqlSource.order.by = id mfkagent.sources.mysqlSource.batch.size = 1000
mfkagent.sources.mysqlSource.max.rows = 10000
mfkagent.sources.mysqlSource.delimiter.entry = | mfkagent.sources.mysqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
mfkagent.sources.mysqlSource.hibernate.c3p0.min_size=1
mfkagent.sources.mysqlSource.hibernate.c3p0.max_size=10 #define channel
mfkagent.channels.memoryChannel.type=memory
mfkagent.channels.memoryChannel.capacity=100000
mfkagent.channels.memoryChannel.transactionCapacity=100000
mfkagent.channels.memoryChannel.keep-alive=3 #defeine sinks
mfkagent.sinks.activeTopCitySink.type = org.apache.flume.sink.kafka.KafkaSink
mfkagent.sinks.activeTopCitySink.kafka.topic = flume_mysql_test
mfkagent.sinks.activeTopCitySink.kafka.bootstrap.servers = test02.local:9092,test03.local:9092,test04.local:9092
mfkagent.sinks.activeTopCitySink.kafka.producer.acks = 1
mfkagent.sinks.activeTopCitySink.kafka.producer.linger.ms = 1
mfkagent.sinks.activeTopCitySink.kafka.flumeBatchSize = 100 # The channel can be defined as follows.
mfkagent.sources.mysqlSource.channels = memoryChannel
mfkagent.sinks.activeTopCitySink.channel = memoryChannel

最新文章

  1. 探讨js字符串数组拼接的性能问题
  2. [转]Redis实现分析
  3. 【转】${sessionScope.user}的使用方法
  4. commondline 之三 执行jar文件
  5. Android 动态改变布局属性RelativeLayout.LayoutParams.addRule()
  6. ACM - ICPC World Finals 2013 B Hey, Better Bettor
  7. C语言使用clock进行计时
  8. checkbox/input文本框与文字对齐
  9. C#文件和字节流的转换方法
  10. python实现ssh远程登录
  11. 2012-2014 三年浙江 acm 省赛 题目 分类
  12. 面向对象+JAVA基础
  13. 百度api查询多个地址的经纬度的问题
  14. Collections工具类
  15. WinForm窗体下Excel的导入
  16. ionic 页面加载事件及loading动画
  17. MoreEffectiveC++Item35(操作符)(条款5-8)
  18. javascript总结48:正则表达式(RegExp)
  19. Spring AOP 源码解析
  20. (1)Smali系列学习之Smali函数调用语句分析

热门文章

  1. Spark 学习笔记之 Streaming Window
  2. MongoDB 学习笔记之 $push,$each,$slice组合使用
  3. 未来实现API管理系统的几个关键词
  4. 使用物理机安装Linux
  5. windows上gedit 安装
  6. 公共DNS性能大比拼
  7. HelloWin详解
  8. USART_FLAG_TXE和USART_FLAG_TC
  9. std::to_string
  10. shell基本运算符(五)