flume 学习总结
flume 总结
1 下载、配置、安装
1.1 下载
在官网 http://flume.apache.org/download.html
下载 apache-flume-1.7.0-bin.tar.gz
文件,解压缩。
1.2 配置、安装
进入解压缩文件夹下的 conf
文件夹,执行 cp flume-env.sh.template flume-env.sh
,在 flume-env.sh
中添加 JAVA_HOME=jdk_path
jdk_path 为 java 绝对路径。
执行 /flume-1.7.0-bin/bin/flume-ng version
出现版本信息证明配置安装成功。
2 flume 架构
flume 架构是基于 agent,agent 是由 source、channel、sink 组成。其中 source 相当于生产者,channel 相当于消息队列,sink 相当于消费者。
数据在 flume 中是以 event 为单位的,source 将数据打包为 event 传入 channel、sink 从 channel 中取数据。event 从 source 到 channel 和从 channel 到 sink 都是事务级别的。
由于 flume 是基于 agent 的,所以 flume 支持多级结构、扇入、扇出
3 agent 配置
一个 agent 中至少包括 source、channel、sink 各一个。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
- 1
- 2
- 3
a1 为 agent 的名称。
以下内容均不完全,详情参考 http://flume.apache.org/FlumeUserGuide.html
。
3.1 source 配置
本文只列出了几种类型。
3.1.1 监听网络端口
avro source : 以 avro 格式处理数据
# 必须配置
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 3366
a1.sources.r1.channels = c1
#可选配置
# ssl 认证
a1.sources.r1.ssl = false
a1.sources.r1.keystore =
a1.sources.r1.keystore-password =
# ip 过滤
a1.sources.r1.ipFilter = false
a1.sources.r1.ipFilterRules
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
thrift source :以 thrift 格式处理数据
# 必须配置
a1.sources.r1.type = thrift
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 3366
a1.sources.r1.channels = c1
#可选配置
# ssl 认证
a1.sources.r1.ssl = false
a1.sources.r1.keystore =
a1.sources.r1.keystore-password =
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
还有 http 类型等详见 http://flume.apache.org/FlumeUserGuide.html
3.1.2 监控文件
spooldir source :监控文件夹下新增文件
# 必须配置
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = directory
a1.sources.r1.channels = c1
#可选配置
#是否带头部信息
a1.sources.r1.fileHeader = true
#忽略后缀文件
a1.sources.r1.ignorePattern = .*mv
#最大一行大小
a1.sources.r1.deserializer.maxLineLength = 50000
# 发现新文件 5s 后处理
a1.sources.r1.pollDelay = 5000
#对完成文件添加后缀 .COMPLETED
#a1.sources.r1.fileSuffix = .COMPLETED
#完成文件后是否删除 never or immediate
#a1.sources.r1.deletePolicy = never
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
**exec source** :监控文件夹下新增文件
- 1
# 必须配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F file
a1.sources.r1.channels = c1
#可选配置
#挂掉后是否重启
a1.sources.r1.restart = false
# stderr 是否写入日志
a1.sources.r1.logStdErr = false
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
3.2 channel 配置
memory channel
# 必须配置
a1.channels.c1.type = memory
# channel 队列最大长度
a1.channels.c1.capacity = 1000
# 事务队列最大长度
a1.channels.c1.transactionCapacity = 100
# 超时时间
a1.channels.c1.keep-alive = 3
a1.channels.c1.byteCapacityBufferPercentage = 20
# 最大内存
a1.channels.c1.byteCapacity = 800000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
**file channel**
- 1
# 必须配置
a1.channels.c1.type = file
# 记录传输进度文件目录
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
# 记录日志目录
a1.channels.c1.dataDirs = /mnt/flume/data
# 事务队列最大长度
a1.channels.c1.transactionCapacity = 10000
# 每个文件最大大小
a1.channels.c1.maxFileSize = 2146435071
# 小于此空间大小将停止传输
a1.channels.c1.minimumRequiredSpace = 524288000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
3.3 sink 配置
avro sink
# 必须配置
a1.sinks.s1.type = avro
a1.sinks.s1.hostname = 127.0.0.1
a1.sinks.s1.port = 3366
a1.sinks.s1.channels = c1
#可选配置
# ssl 认证
a1.sinks.s1.ssl = false
a1.sinks.s1.truststore =
a1.sinks.s1.truststore-password =
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
thrift sinks :以 thrift 格式处理数据
# 必须配置
a1.sinks.s1.type = thrift
a1.sinks.s1.hostname = 127.0.0.1
a1.sinks.s1.port = 3366
a1.sinks.s1.channels = c1
#可选配置
# ssl 认证
a1.sinks.s1.ssl = false
a1.sinks.s1.truststore =
a1.sinks.s1.truststore-password =
……
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
**file roll sink**
- 1
# 必须配置
a1.sinks.s1.type = file_roll
a1.sinks.k1.sink.directory = /var/log/flume
#可选配置
# 每 30 秒生成一个文件
a1.sinks.k1.sink.rollInterval = 30
# 事务队列大小
a1.sinks.k1.sink.batchSize = 100
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
4 failover 和 load balancing
flume 的 failover 和 load balancing 都是针对 sink 的。
failover
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
- 1
- 2
- 3
- 4
- 5
- 6
在 sinkgroups 中,先启用的 priority 大的 sink。只有在当前的 sink 挂掉后,flume 才会启用 sinkgroups 中其余最大的。
load balancing
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
# balance : round_robin, random
a1.sinkgroups.g1.processor.selector = random
- 1
- 2
- 3
- 4
- 5
- 6
5 扇出
对 source 的扇出有 replicating
、multiplexing
两种方式。
replicating
在 source 配置后添加以下配置
#以复制方式扇出
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
- 1
- 2
- 3
**multiplexing**
- 1
在 source 配置后添加以下配置。
复用方式需要在消息中附加头部消息,对头部内容进行匹配后扇出。
#以复用方式扇出
a1.sources.r1.selector.type= multiplexing
a1.sources.r1.channels= c1 c2
# 判断 header 中 state 字段
a1.sources.r1.selector.header= state
# state 字段为 CZ 发送到 c1
a1.sources.r1.selector.mapping.CZ= c1
# state 字段为 US 发送到 c2
a1.sources.r1.selector.mapping.US= c2
# 未匹配的数据发送到 c1
a1.sources.r1.selector.default= c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
6 二次开发
sink 开发
public class MySink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(TimeFileSink.class );
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
在开发过程中只要重写 configure
、process
函数就可。
其中 configure
函数是从配置文件获取所需的配置,使用 context.getString
获取字符串,context.getInteger
获取数字。
在新建的类中,会不断的调用 process
函数
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
// 获取事件
Event event = ch.take();
//提交事务
txn.commit();
//回滚事务
txn.rollback();
//关闭
txn.close();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
完成开发后把 .class
文件打包为 .jar
,把 .jar
包放入 plugins.d/personal/lib/
目录下(不存在自建,personal
随意)。在配置文件中 a1.sinks.s1.type
后添加自定义的完整包路径。
source 开发
同 sink 开发。
7 启动 flume
bin/flume-ng agent -c conf/ -f conf/flume_spool.conf -n a1 -Dflume.root.logger=INFO,console
-c 配置文件目录
-f 加载的配置文件
-n 配置文件中 agent 名称
-Dflume.root.logger=INFO,console 将 flume 运行中产生的大于 INFO 级别的消息输出到控制台
-Dflume.root.logger=INFO,LOGFILE 将 flume 运行中产生的大于 INFO 级别的消息输出到日志,详细在 conf 目录下的 log4j.properties 文件
- 1
- 2
- 3
- 4
- 5
总结
有些东西并没有写入,有问题可以给我留言,看到会第一时间回复的。
本人第一次写博客,有很多不完美的地方请大家指出,O(∩_∩)O谢谢。
---------------------
本文来自 lazyun 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/Phoenix_Lzy/article/details/70477519?utm_source=copy
最新文章
- 【分享】标准springMVC+mybatis项目maven搭建最精简教程
- Linux命令全集
- python json学习之路2-认识python种的json模块
- 判断数据库内容,在页面显示自定义数据case when
- Eclipse的中文字体设置
- wp8.1 全球化解决办法
- NewSQL——优化的SQL存储引擎(TokuDB, MemSQL)+?
- IO流+数据库课后习题
- 在Mac pro上如何配置adb命令?
- easui tree载入时自动展开无子节点的节点
- split分割字符串时的一些特殊分隔符
- 如何为开发项目编写规范的README文件(windows),此文详解
- hbase学习一 shell命令操作
- UI(一)
- ajax上传表单的俩种方式
- SQL1:基础
- EDAS字体嵌入问题解决方法
- [剑指Offer] 62.二叉搜索树的第k个结点
- [HNOI2007][BZOJ1185] 最小矩形覆盖 [凸包+旋转卡壳]
- 【LeetCode】011 Container With Most Water