Source 是负责接收数据到 Flume Agent 的组件。

Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

官方也提供了自定义 source 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source

根据官方说明自定义 Source 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

实现相应方法:

getBackOffSleepIncrement();

getMaxBackOffSleepInterval();

// 初始化 context(读取配置文件内容)
configure(Context context); // 获取数据封装成 event 并写入 channel,这个方法将被循环调用
process();

使用场景:读取 MySQL 数据或者其他文件系统。

这里使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。

一、创建自定义 Source

1.添加 pom 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com</groupId>
<artifactId>flume</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>

2.编写自定义的 Source 类

package source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义配置文件将来要读取的字段
private Long delay;
private String field; // 初始化配置信息
@Override
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "Hello!");
} @Override
public Status process() throws EventDeliveryException {
try {
// 创建事件头信息
HashMap<String, String> hearderMap = new HashMap<>();
// 创建事件
SimpleEvent event = new SimpleEvent();
// 循环封装事件
for (int i = 0; i < 5; i++) {
// 给事件设置头信息
event.setHeaders(hearderMap);
// 给事件设置内容
event.setBody((field + i).getBytes());
// 将事件写入 channel
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
} @Override
public long getBackOffSleepIncrement() {
return 0;
} @Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}

二、打包测试

1.打包上传

参考:https://www.cnblogs.com/jhxxb/p/11582804.html

2.编写 flume 配置文件

mysource.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = source.MySource
# 代码中要获取的配置信息
a1.sources.r1.delay = 1000
# a1.sources.r1.field = jhxxb # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

cd /opt/apache-flume-1.9.-bin
bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/source/mysource.conf -Dflume.root.logger=INFO,console

最新文章

  1. [WCF编程]12.事务:服务事务编程(上)
  2. HTML5本地存储——IndexedDB(一:基本使用)
  3. 国内其他的maven库
  4. C# 压缩文件与字节互转
  5. linux下启动AP热点时出错
  6. WebGL on iOS8 终于等到了这一天
  7. Java中Properties类的使用
  8. [Regionals 2012 :: Asia - Tokyo ]
  9. ExtJS4.2 Ext.grid.panel Store更改后刷新表格
  10. Javascript数据类型共有六种
  11. Mysql 范围查询优化
  12. 四则运算题目生成(python版)
  13. C# 插入超链接到PDF文档(3种情况)
  14. 洛谷[LnOI2019]长脖子鹿省选模拟赛 简要题解
  15. Windows下使用Diskpart格式化U盘
  16. Controller层aop
  17. R0~R16寄存器作用
  18. es6的解构赋值用途
  19. html 绘制矩形轨迹,选中区域
  20. 2014年可用的TRACKER服务器大全

热门文章

  1. 解决ios8下coreData没有NSPersistentContainer的问题
  2. OpenCV手工实现灰度及RGB直方图
  3. Personalize Oracle Applications Home Page Browser Window Title
  4. git——日常保险操作
  5. The openssl extension is missing, which means that secure HTTPS transfers are impossible
  6. 阿里高级架构师教你使用Spring JMS处理消息事务源码案例
  7. Dubbo:1
  8. springboot中访问html页面
  9. EL表达式,JSP内置对象
  10. Socket嵌套字通讯