目的:功能1:判断json文件,如何格式合格则正常传输,否则就不传输

功能2:判断出合格的json文件,并且key值中包含“date”才进行传输

一、创建一个Json的工具类

package com.atguigu.gmall.flume.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Event; public class JSONUtil {
// public static void main(String[] args) {
// String log = "{date:2}";
// String key = "date";
// System.out.println(isJSONValidate(log));
// System.out.println(isContainsKey(log, key));
//// System.out.println(JSONObject.parseObject(log).containsKey(key));
// } // 判断log是否为一个合法的json
public static boolean isJSONValidate(String log) {
try {
JSONObject.parseObject(log); //是将str转化为相应的JSONObject对象
return true;
} catch (Exception e) {
// e.printStackTrace();
return false;
}
} // 判断json中是否包含key值,其中如果log不是合格的json,直接走异常返回false
public static boolean isContainsKey(String log,String key){
try {
if (JSONObject.parseObject(log).containsKey(key)) {
return true;
} else{
return false;
}
} catch (Exception e) {
// e.printStackTrace();
return false;
}
}
}

  二、编写拦截器,只需要重写Event intercept和List<Event> intercept方法即可。如果传输的json文件不是很复杂,只需要重写单个event的intercept方法即可

package com.atguigu.gmall.flume.intercepter;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; public class ETLIntercepter implements Interceptor {
@Override
public void initialize() {
} @Override
public Event intercept(Event event) {
// 获取body中的数据
byte[] body = event.getBody();
String log= new String(body, StandardCharsets.UTF_8);
// 1.判断是否为合法的json,是:返回event , 否:返回null
// if(JSONUtil.isJSONValidate(log ){
// return event;
// }else{
// return null;
// }
// 2.判断是否为合法的json,且是都包含key值“date”
if(JSONUtil.isContainsKey(log,"date") ){
return event;
}else{
return null;
}
} @Override
public List<Event> intercept(List<Event> list) {
// 判断事件list中的event是都合格,如果不合格就将event的删掉,返回合格的event列表
// 如果list中的event都不合格,则返回null
Iterator<Event> iterator = list.iterator();
while(iterator.hasNext()){
Event event = iterator.next();
if(intercept(event) == null){
iterator.remove();
}
}
return list;
} @Override
public void close() {
} public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLIntercepter();
}
@Override
public void configure(Context context) {
}
}
}

三、生成rtProj-1.0-SNAPSHOT-jar-with-dependencies.jar包上传到flume的lib目录下,然后再编辑flume的配置文件

a1.sources = r1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = taildir
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1 = /workplace/data/log*.*
a1.sources.r1.positionFile = /workplace/data/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.intercepter.ETLIntercepter$Builder
#i1.type :此地址是拦截器中Builder的地址(右键点击Copy Reference,记得将.Builder中的 . 换成$号) # Use a channel which buffers events in memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=master:9092
a1.channels.c1.kafka.topic = test1 # Bind the source and sink to the channel
a1.sources.r1.channels = c1

四、启动flume开始传输数据

bin/flume-ng agent --conf conf --conf-file ./conf/job/flume_to_kafka2.conf --name a1 -Dflume.root.logger=INFO,consol

五、往文件里写数据,查看fafka中的数据,发现已经能成功过滤出正常json文件,且含有date的key值的数据了

/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.80.128:9092 --from-beginning --topic test1

最新文章

  1. Linux下man安装及使用方法
  2. pro*c调用过程
  3. Python学习笔记09
  4. Hex-Rays decompiler type definitions and convenience macros
  5. linux 多线程基础2
  6. curl向web服务器发送json数据
  7. HIDKomponente使用读写Hid设备(转)
  8. 使用yum安装cmake
  9. javascript的词法作用域
  10. Jupyter(Python)中无法使用Cache原理分析
  11. 【Tarjan缩点】POJ2186 Popular Cows
  12. [SimplePlayer] 4. 从视频文件中提取音频
  13. 从零开始写自己的PHP框架系列教程(一)[core.php]
  14. 【PS技巧】常用概念和功能操作
  15. js——数组操作
  16. 突破拐点:企业成长的S曲线
  17. Handler相关
  18. dom4j 操作总结
  19. 写好shell脚本的13个技巧【转】
  20. HDU 1848 Fibonacci again and again【博弈SG】

热门文章

  1. CCF 201912-2 回收站选址
  2. gradle的配置
  3. vue自定义组件的总结(一)
  4. myForEach
  5. 【转载】Adobe Acrobat XI Pro闪退原因及解决办法
  6. Camstar获取回参
  7. 梦想云图Node.JS服务 (网页CAD,在线CAD )
  8. Scala集合排序
  9. Spring boot 无法加载css样式,image解决办法
  10. 【python基础教程】-10.开箱即用(模块的工作原理,获悉模块的功能以及常用模块)