采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

根据需求,首先定义以下3大要素

l  采集源,即source——监控文件目录 :  spooldir

l  下沉目标,即sink——HDFS文件系统  :  hdfs sink

l  source和sink之间的传递通道——channel,可用file channel 也可以用内存memory channel

配置文件编写:

vi spooldir-hdfs-sink.conf


#定义三大组件的名称

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# 配置source组件

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /root/data/

agent1.sources.source1.fileHeader = false

#配置拦截器

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = timestamp

# 配置sink组件

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path =/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

#滚动生成的文件按大小生成

agent1.sinks.sink1.hdfs.rollSize = 102400

#滚动生成的文件按行数生成

agent1.sinks.sink1.hdfs.rollCount = 1000000

#滚动生成的文件按时间生成

agent1.sinks.sink1.hdfs.rollInterval = 60

#开启滚动生成目录

agent1.sinks.sink1.hdfs.round = true

#以10为一梯度滚动生成

agent1.sinks.sink1.hdfs.roundValue = 10

#单位为分钟

agent1.sinks.sink1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

agent1.channels.channel1.keep-alive = 120

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

添加数据

aaa.txt

13601249301     100     200     300     400     500     600     700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700

执行命令

 bin/flume-ng agent -c conf -f conf/spooldir-hdfs-sink.conf -n agent1  -Dflume.root.logger=INFO,console   

flume的source采用spoodir时! 目录下面不允许存放同名的文件,否则报错!

Channel参数解释:

capacity:默认该通道中最大的可以存储的event数量

trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

keep-alive:event添加到通道中或者移出的允许时间

其他组件:Interceptor(拦截器)

用于Source的一组Interceptor,按照预设的顺序在必要地方装饰和过滤events。

内建的Interceptors允许增加event的headers比如:时间戳、主机名、静态标记等等

定制的interceptors可以通过内省event payload(读取原始日志),实现自己的业务逻辑(很强大)

最新文章

  1. OSG3.40 编译时,无法打开输入文件“optimized.lib”
  2. OpenCASCADE BRepTools
  3. cookie记住密码功能
  4. Bzoj1449 [JSOI2009]球队收益
  5. poj1056 (Trie入门)寻找字符串前缀
  6. Git的撤消操作 - 重置, 签出 和 撤消(转载)
  7. Duilib学习笔记《03》— 控件使用
  8. [noi2011]道路修建 树形dp
  9. android143 360 短信电话拦截
  10. mysql 查询每个分组前N条记录
  11. Android应用之基本的组件(一)
  12. Keepalived+LVS+Nginx负载均衡之高可用
  13. mysql中出现没有权限访问或者查看全部数据库的问题
  14. mysql存储过程中in、out、inout参数使用实际案例
  15. Java类加载和卸载的跟踪
  16. vue新手入门——vue-cli搭建
  17. bzoj 1426 收集邮票
  18. javaScript设计模式之面向对象编程(object-oriented programming,OOP)(二)
  19. mybaties插件生成代码
  20. ARTS打卡计划第二周-Algorithm

热门文章

  1. POM报错Failure to transfer org.apache.maven.plugins:maven-resources-plugin:pom:2.6 from
  2. apache http server2.2 + tomcat5.5 性能调优
  3. 基于Java实现的冒泡排序算法
  4. canvas基础绘制-一个小球的坠落、反弹
  5. pre-network android 网络优化预加载框架
  6. array_keys
  7. linux centos 中目录结构的含义
  8. autoHeight.vue 高度自适应
  9. Linux关闭命令行正在执行的程序
  10. hibernate5.x版本org.hibernate.MappingException: Unknown entity问题