[数据同步]Flume 抽取Mysql历史数据
2024-09-01 12:07:29
一.Flume安装目录
1.安装部署目录
[admin@test01 apache-flume-1.9.0-bin]$ pwd
/opt/apache-flume-1.9.0-bin
2.将所需jar包复制到flume的lib目录下
flume-ng-sql-source-json-1.0.jar
mysql-connector-java-5.1.38.jar
二.压缩与解压
- 压缩
tar zcvf flume.tar.tgz apache-flume-1.9.0-bin/
三.Kafka 创建Topic 消费Topic
#创建topic
/bin/kafka-topics --create --zookeeper 10.160.26.85:2181 --replication-factor 2 --partitions 15 --topic flume_mysql_test
#消费topic
/bin/kafka-console-consumer --bootstrap-server 10.160.26.81:9092 --topic flume_mysql_test
四.启动flume服务
[admin@test01 apache-flume-1.9.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/ecarx_dealer.conf --name mfkagent -Dflume.root.logger=INFO,console
五.flume配置文件
mfkagent.sources=mysqlSource
mfkagent.channels=memoryChannel
mfkagent.sinks=activeTopCitySink
#define source
# For each one of the sources, the type is defined
mfkagent.sources.mysqlSource.type = org.keedio.flume.source.SQLSource
mfkagent.sources.mysqlSource.hibernate.connection.url = jdbc:mysql:/id:3306/db_name
# Hibernate Database connection properties
mfkagent.sources.mysqlSource.hibernate.connection.user = db_test
mfkagent.sources.mysqlSource.hibernate.connection.password = password
mfkagent.sources.mysqlSource.hibernate.connection.autocommit = true
mfkagent.sources.mysqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
mfkagent.sources.mysqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#mfkagent.sources.mysqlSource.table = navigation_active_city_top
# Columns to import to kafka (default * import entire row)
#mfkagent.sources.mysqlSource.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
mfkagent.sources.mysqlSource.run.query.delay=100000
# Status file is used to save last readed row
mfkagent.sources.mysqlSource.status.file.path = /var/log/flume-ng
mfkagent.sources.mysqlSource.status.file.name = mysqlSource.status
# Custom query
mfkagent.sources.mysqlSource.start.from = 0
ORDER BY ranking ASC
#mfkagent.sources.mysqlSource.custom.query = select * from table where id>$@$
# 加上$@$的原因是为了去除重复同步
mfkagent.sources.mysqlSource.custom.query = select * from table where id>$@$
#mfkagent.sources.mysqlSource.order.by = id
mfkagent.sources.mysqlSource.batch.size = 1000
mfkagent.sources.mysqlSource.max.rows = 10000
mfkagent.sources.mysqlSource.delimiter.entry = |
mfkagent.sources.mysqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
mfkagent.sources.mysqlSource.hibernate.c3p0.min_size=1
mfkagent.sources.mysqlSource.hibernate.c3p0.max_size=10
#define channel
mfkagent.channels.memoryChannel.type=memory
mfkagent.channels.memoryChannel.capacity=100000
mfkagent.channels.memoryChannel.transactionCapacity=100000
mfkagent.channels.memoryChannel.keep-alive=3
#defeine sinks
mfkagent.sinks.activeTopCitySink.type = org.apache.flume.sink.kafka.KafkaSink
mfkagent.sinks.activeTopCitySink.kafka.topic = flume_mysql_test
mfkagent.sinks.activeTopCitySink.kafka.bootstrap.servers = test02.local:9092,test03.local:9092,test04.local:9092
mfkagent.sinks.activeTopCitySink.kafka.producer.acks = 1
mfkagent.sinks.activeTopCitySink.kafka.producer.linger.ms = 1
mfkagent.sinks.activeTopCitySink.kafka.flumeBatchSize = 100
# The channel can be defined as follows.
mfkagent.sources.mysqlSource.channels = memoryChannel
mfkagent.sinks.activeTopCitySink.channel = memoryChannel
最新文章
- 探讨js字符串数组拼接的性能问题
- [转]Redis实现分析
- 【转】${sessionScope.user}的使用方法
- commondline 之三 执行jar文件
- Android 动态改变布局属性RelativeLayout.LayoutParams.addRule()
- ACM - ICPC World Finals 2013 B Hey, Better Bettor
- C语言使用clock进行计时
- checkbox/input文本框与文字对齐
- C#文件和字节流的转换方法
- python实现ssh远程登录
- 2012-2014 三年浙江 acm 省赛 题目 分类
- 面向对象+JAVA基础
- 百度api查询多个地址的经纬度的问题
- Collections工具类
- WinForm窗体下Excel的导入
- ionic 页面加载事件及loading动画
- MoreEffectiveC++Item35(操作符)(条款5-8)
- javascript总结48:正则表达式(RegExp)
- Spring AOP 源码解析
- (1)Smali系列学习之Smali函数调用语句分析