http://flume.apache.org/FlumeDeveloperGuide.html#sink


看了 还是比较好上手的,简单翻译一下


sink的作用是从 Channel 提取 Event 然后传给流中的下一个 Flume Agent或者把它们存储在外部的仓库中。在Flume的配置文件中,一个 Sink 和一个唯一的 Channel 关联。有一个 SinkRunner 实例与每一个配好的 Sink 关联,当 Flume 框架调用 SinkRunner 的 start() 方法时,就创建一个新的线程来驱动这个 Sink (使用  SinkRunner 的实现Runnable接口的 PollingRunner 内部静态类来运行)。这个线程管理了 Sink 的生命周期。 Sink 需要实现 start() 和 stop() 方法。Sink 的 start() 方法需要初始化 Sink 并使它能够达到向目的地发送 Event 的状态。 Sink 的 process() 方法是处理从 Channel 传过来的 Event 和 发送 Event 的核心方法。 Sink 的 Stop() 方法需要做必要的清理工作(比如释放某些资源)。 Sink 也需要实现 Configurable 接口来处理自己的一些配置。


官网也给出了模板类:

 public class MySink extends AbstractSink implements Configurable {
private String myProp; @Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method
this.myProp = myProp;
} @Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
} @Override
public void stop() {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
} @Override
public Status process() throws EventDeliveryException {
Status status = null; // Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin(); try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take(); // Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback(); // Log exception, handle individual exceptions as needed
status = Status.BACKOFF; // re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
} return status;
}
}

拿来模板直接填充自己的逻辑代码即可,详细就可以直接参考HDFSSink或者HBaseSink等


最新文章

  1. 各种Android手机Root方法
  2. java视频教程 Java自学视频整理(持续更新中...)
  3. Linux文件处理命令
  4. s2-029 Struts2 标签远程代码执行分析(含POC)
  5. Linux-dd命令详解【转】
  6. oracle查询锁和杀锁
  7. Java笔记——面向接口编程(DAO模式)
  8. [GeekBand] 面向对象的设计模式(C++)(1)
  9. http://www.cnblogs.com/zhwl/p/3642486.html
  10. After Android Studio update: Gradle DSL method not found: 'runProguard()'
  11. 解决 EF 分层查询的一个性能问题[转]
  12. [SQL注入1]From SQL injection to Shell
  13. 编写高性能Javascript
  14. App对接支付宝移动支付功能
  15. MySQL对innodb某一个表进行移动
  16. OC-不可变数组NSArray
  17. 基于FPGA的序列检测器10010
  18. Maven学习 二 Maven环境搭建
  19. java.lang.ClassNotFoundException: org.thymeleaf.spring5.view.ThymeleafViewRe。。。。。。。。。。。
  20. 2-Seventeenth Scrum Meeting-20151217

热门文章

  1. C# -- 泛型(3)
  2. .netcore项目部署IIS问题
  3. (一)springmvc+spring+mybatis+maven框架搭建
  4. 灯塔AOI简易实现
  5. 浅谈 cxx rope
  6. GN算法---《Community structure in social and biological networks》这篇论文讲了什么?
  7. P3705 [SDOI2017]新生舞会 01分数规划+费用流
  8. P3480 [POI2009]KAM-Pebbles 阶梯NIM
  9. Python第四次作业
  10. postgress数据库 出现大写字母 字段名但是提示说不存在