Flume-自定义 Source
2024-09-05 05:13:42
Source 是负责接收数据到 Flume Agent 的组件。
Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source
根据官方说明自定义 Source 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
getBackOffSleepIncrement(); getMaxBackOffSleepInterval(); // 初始化 context(读取配置文件内容)
configure(Context context); // 获取数据封装成 event 并写入 channel,这个方法将被循环调用
process();
使用场景:读取 MySQL 数据或者其他文件系统。
这里使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。
一、创建自定义 Source
1.添加 pom 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com</groupId>
<artifactId>flume</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.编写自定义的 Source 类
package source; import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义配置文件将来要读取的字段
private Long delay;
private String field; // 初始化配置信息
@Override
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "Hello!");
} @Override
public Status process() throws EventDeliveryException {
try {
// 创建事件头信息
HashMap<String, String> hearderMap = new HashMap<>();
// 创建事件
SimpleEvent event = new SimpleEvent();
// 循环封装事件
for (int i = 0; i < 5; i++) {
// 给事件设置头信息
event.setHeaders(hearderMap);
// 给事件设置内容
event.setBody((field + i).getBytes());
// 将事件写入 channel
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
} @Override
public long getBackOffSleepIncrement() {
return 0;
} @Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
二、打包测试
1.打包上传
参考:https://www.cnblogs.com/jhxxb/p/11582804.html
2.编写 flume 配置文件
mysource.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = source.MySource
# 代码中要获取的配置信息
a1.sources.r1.delay = 1000
# a1.sources.r1.field = jhxxb # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动
cd /opt/apache-flume-1.9.-bin
bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/source/mysource.conf -Dflume.root.logger=INFO,console
最新文章
- [WCF编程]12.事务:服务事务编程(上)
- HTML5本地存储——IndexedDB(一:基本使用)
- 国内其他的maven库
- C# 压缩文件与字节互转
- linux下启动AP热点时出错
- WebGL on iOS8 终于等到了这一天
- Java中Properties类的使用
- [Regionals 2012 :: Asia - Tokyo ]
- ExtJS4.2 Ext.grid.panel Store更改后刷新表格
- Javascript数据类型共有六种
- Mysql 范围查询优化
- 四则运算题目生成(python版)
- C# 插入超链接到PDF文档(3种情况)
- 洛谷[LnOI2019]长脖子鹿省选模拟赛 简要题解
- Windows下使用Diskpart格式化U盘
- Controller层aop
- R0~R16寄存器作用
- es6的解构赋值用途
- html 绘制矩形轨迹,选中区域
- 2014年可用的TRACKER服务器大全
热门文章
- 解决ios8下coreData没有NSPersistentContainer的问题
- OpenCV手工实现灰度及RGB直方图
- Personalize Oracle Applications Home Page Browser Window Title
- git——日常保险操作
- The openssl extension is missing, which means that secure HTTPS transfers are impossible
- 阿里高级架构师教你使用Spring JMS处理消息事务源码案例
- Dubbo:1
- springboot中访问html页面
- EL表达式,JSP内置对象
- Socket嵌套字通讯