1.7、flume案例二

案例需求:

在数据采集之后,通过flume的拦截器,实现不需要的数据过滤掉,并将指定的第一个字段进行加密,加密之后再往hdfs上面保存

原始数据与处理之后的数据对比

图一  原始文件内容

图二  HDFS上产生收集到的处理数据

实现步骤

第一步:创建maven java工程,导入jar包

<repositories>
    <repository>
        <id>cloudera</id>
 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.14.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

第二步:自定义flume的拦截器

package cn.itcast.iterceptor;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static cn.itcast.iterceptor.CustomParameterInterceptor.Constants.*;

public class CustomParameterInterceptor implements Interceptor {
    /** The
field_separator.
指明每一行字段的分隔符 */
   
private final String fields_separator;

/** The
indexs.
通过分隔符分割后,指明需要那列的字段 下标*/
   
private final String indexs;

/** The
indexs_separator.
多个下标的分隔符*/
   
private final String indexs_separator;

/**
     *
     * @param indexs
    
*
@param indexs_separator
    
*/
   
public CustomParameterInterceptor( String fields_separator,
                                      
String indexs, String indexs_separator,String encrypted_field_index) {
        String f =
fields_separator.trim();
        String i =
indexs_separator.trim();
        this.indexs = indexs;
        this.encrypted_field_index=encrypted_field_index.trim();
        if (!f.equals("")) {
            f = UnicodeToString(f);
        }
        this.fields_separator =f;
        if (!i.equals("")) {
            i = UnicodeToString(i);
        }
        this.indexs_separator = i;
    }

/*
     *
     * \t
制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
     * (bell)
符 ('\u0007') \e 转义符 ('\u001B') \cx  空格(\u0020)对应于 x 的控制符
     *
     * @param str
     * @return
     * @data:2015-6-30
     */

/** The encrypted_field_index. 需要加密的字段下标*/
   
private final String encrypted_field_index;
    public
static
String UnicodeToString(String str) {
        Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
        Matcher matcher =
pattern.matcher(str);
        char ch;
        while
(matcher.find()) {
            ch = (char) Integer.parseInt(matcher.group(2), 16);
            str =
str.replace(matcher.group(1), ch + "");
        }
        return
str;
    }

/*
     * @see
org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
     *
单个event拦截逻辑
     */
   
public Event intercept(Event
event) {
        if (event
== null) {
            return
null
;
        }
        try {
            String line = new String(event.getBody(), Charsets.UTF_8);
            String[] fields_spilts =
line.split(fields_separator);
            String[] indexs_split = indexs.split(indexs_separator);
            String newLine="";
            for
(int i = 0; i < indexs_split.length;
i++) {
                int parseInt = Integer.parseInt(indexs_split[i]);
                //对加密字段进行加密
               
if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
                    newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
                }else{
                    newLine+=fields_spilts[parseInt];
                }

if(i!=indexs_split.length-1){
                    newLine+=fields_separator;
                }
            }
           
event.setBody(newLine.getBytes(Charsets.UTF_8));
            return
event;
        } catch
(Exception e) {
            return
event;
        }
    }

/*
     * @see
org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
     *
批量event拦截逻辑
     */
   
public List<Event>
intercept(List<Event> events) {
        List<Event> out = new ArrayList<Event>();
        for (Event
event : events) {
            Event outEvent = intercept(event);
            if
(outEvent != null) {
                out.add(outEvent);
            }
        }
        return
out;
    }

/*
     * @see
org.apache.flume.interceptor.Interceptor#initialize()
     */
   
public void initialize() {
        // TODO Auto-generated method stub

}

/*
     * @see
org.apache.flume.interceptor.Interceptor#close()
     */
   
public void close() {
        // TODO Auto-generated method stub

}

/**
     *
相当于自定义Interceptor的工厂类
     *
在flume采集配置文件中通过制定该Builder来创建Interceptor对象
     *
可以在Builder中获取、解析flume采集配置文件中的拦截器Interceptor的自定义参数:
     *
字段分隔符,字段下标,下标分隔符、加密字段下标
...

     * @author
    
*
     */
   
public static class Builder implements Interceptor.Builder {

/**
The fields_separator.
指明每一行字段的分隔符 */
       
private  String fields_separator;

/**
The indexs.
通过分隔符分割后,指明需要那列的字段 下标*/
       
private  String indexs;

/**
The indexs_separator.
多个下标下标的分隔符*/
       
private  String indexs_separator;

/**
The encrypted_field.
需要加密的字段下标*/
       
private  String encrypted_field_index;
        /*
         * @see
org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
         */
       
public void configure(Context context) {
            fields_separator
= context.getString(FIELD_SEPARATOR,
DEFAULT_FIELD_SEPARATOR);
            indexs
= context.getString(INDEXS,
DEFAULT_INDEXS);
            indexs_separator
= context.getString(INDEXS_SEPARATOR,
DEFAULT_INDEXS_SEPARATOR);
            encrypted_field_index=
context.getString(ENCRYPTED_FIELD_INDEX,
DEFAULT_ENCRYPTED_FIELD_INDEX);
        }
        /*
         * @see
org.apache.flume.interceptor.Interceptor.Builder#build()
         */
       
public Interceptor build() {
            return
new
CustomParameterInterceptor(fields_separator,
indexs, indexs_separator,encrypted_field_index);
        }
    }
    /**
     *
常量
     *
     */
   
public static class Constants {
        /**
The Constant FIELD_SEPARATOR. */
        
public
static final
String FIELD_SEPARATOR
= "fields_separator";

/**
The Constant DEFAULT_FIELD_SEPARATOR. */
       
public static final String DEFAULT_FIELD_SEPARATOR =" ";

/**
The Constant INDEXS. */
       
public static final String INDEXS = "indexs";

/**
The Constant DEFAULT_INDEXS. */
       
public static final String DEFAULT_INDEXS = "0";

/**
The Constant INDEXS_SEPARATOR. */
       
public static final String INDEXS_SEPARATOR = "indexs_separator";

/**
The Constant DEFAULT_INDEXS_SEPARATOR. */
       
public static final String DEFAULT_INDEXS_SEPARATOR = ",";

/**
The Constant ENCRYPTED_FIELD_INDEX. */
       
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";

/** The Constant
DEFAUL_TENCRYPTED_FIELD_INDEX. */
       
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";

/**
The Constant PROCESSTIME. */
       
public static final String PROCESSTIME = "processTime";
        /**
The Constant PROCESSTIME. */
       
public static final String DEFAULT_PROCESSTIME = "a";

}
    /**
     *
工具类:字符串md5加密
     */
   
public static class StringUtils
{
        // 全局数组
       
private final static String[] strDigits = { "0",
"1", "2", "3",
"4", "5",
                "6",
"7", "8", "9",
"a", "b", "c",
"d", "e", "f"
};
        // 返回形式为数字跟字符串
       
private static String
byteToArrayString(byte bByte) {
            int
iRet = bByte;
            //
System.out.println("iRet="+iRet);
           
if (iRet < 0)
{
                iRet += 256;
            }
            int
iD1 = iRet / 16;
            int
iD2 = iRet % 16;
            return
strDigits
[iD1] + strDigits[iD2];
        }

// 返回形式只为数字
       
private static String byteToNum(byte bByte) {
            int iRet
= bByte;
            System.out.println("iRet1="
+ iRet);
            if
(iRet < 0) {
                iRet += 256;
            }
            return
String.valueOf(iRet);
        }

// 转换字节数组为16进制字串
       
private static String byteToString(byte[] bByte) {
            StringBuffer sBuffer = new StringBuffer();
            for
(int i = 0; i < bByte.length;
i++) {
                sBuffer.append(byteToArrayString(bByte[i]));
            }
            return
sBuffer.toString();
        }

public
static
String GetMD5Code(String strObj) {
            String resultString = null;
            try
{
                resultString = new String(strObj);
                MessageDigest md =
MessageDigest.getInstance("MD5");
                // md.digest() 该函数返回值为存放哈希值结果的byte数组
               
resultString = byteToString(md.digest(strObj.getBytes()));
            } catch
(NoSuchAlgorithmException ex) {
                ex.printStackTrace();
            }
            return
resultString;
        }
    }

}

第三步:打包上传服务器

将我们的拦截器打成jar包放到flume的lib目录下

第四步:开发flume的配置文件

第三台机器开发flume的配置文件

cd  /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim spool-interceptor-hdfs.conf

a1.channels = c1

a1.sources = r1

a1.sinks = s1

#channel

a1.channels.c1.type = memory

a1.channels.c1.capacity=100000

a1.channels.c1.transactionCapacity=50000

#source

a1.sources.r1.channels = c1

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /export/servers/intercept

a1.sources.r1.batchSize= 50

a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2

a1.sources.r1.interceptors.i1.type =cn.itcast.iterceptor.CustomParameterInterceptor$Builder

a1.sources.r1.interceptors.i1.fields_separator=\\u0009

a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6

a1.sources.r1.interceptors.i1.indexs_separator
=\\u002c

a1.sources.r1.interceptors.i1.encrypted_field_index
=0

a1.sources.r1.interceptors.i2.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder

#sink

a1.sinks.s1.channel = c1

a1.sinks.s1.type = hdfs

a1.sinks.s1.hdfs.path
=hdfs://192.168.52.100:8020/flume/intercept/%Y%m%d

a1.sinks.s1.hdfs.filePrefix = event

a1.sinks.s1.hdfs.fileSuffix = .log

a1.sinks.s1.hdfs.rollSize = 10485760

a1.sinks.s1.hdfs.rollInterval =20

a1.sinks.s1.hdfs.rollCount = 0

a1.sinks.s1.hdfs.batchSize = 1500

a1.sinks.s1.hdfs.round = true

a1.sinks.s1.hdfs.roundUnit = minute

a1.sinks.s1.hdfs.threadsPoolSize = 25

a1.sinks.s1.hdfs.useLocalTimeStamp = true

a1.sinks.s1.hdfs.minBlockReplicas = 1

a1.sinks.s1.hdfs.fileType =DataStream

a1.sinks.s1.hdfs.writeFormat = Text

a1.sinks.s1.hdfs.callTimeout = 60000

a1.sinks.s1.hdfs.idleTimeout =60

第五步:上传测试数据

上传我们的测试数据到/export/servers/intercept 这个目录下面去,如果目录不存在则创建

mkdir 
-p /export/servers/intercept

测试数据如下

13601249301 100    200   300   400   500   600   700

13601249302 100    200   300   400   500   600   700

13601249303 100    200   300   400   500   600   700

13601249304 100    200   300   400   500   600   700

13601249305 100    200   300   400   500   600   700

13601249306 100    200   300   400   500   600   700

13601249307 100    200   300   400   500   600   700

13601249308 100    200   300   400   500   600   700

13601249309 100    200   300   400   500   600   700

13601249310 100    200   300   400   500   600   700

13601249311 100    200   300   400   500   600   700

13601249312 100    200   300   400   500   600   700

第六步:启动flume

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

bin/flume-ng agent -c conf -f
conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console

小结:一般不在flume上进行数据处理。数据的处理都在MR上进行,flume主要就是数据的收集。

最新文章

  1. 【转发】关于Java性能的9个谬论
  2. request和session作用域的意义
  3. MySQL备份方式简介
  4. JavaScript基础——使用运算符
  5. angularjs 权威指南 版本 1.2.6
  6. 动手学习TCP:数据传输
  7. node-webkit中使用sqlite3(MAC平台)
  8. Eclipse 4.6 Neon, could not create the java virtual machine
  9. Jersey Rest服务类型
  10. HTML 多媒体
  11. quartz 的学习和使用。
  12. 把《C语言接口与实现》读薄之第一章:引言
  13. SQL 脚本持续收集...
  14. js数字货币格式互转
  15. 最简单的cmd命令行取得系统路径和python的安装路径(适用于winxp.win7和win10)
  16. layui:根据行内某个值,设定该行得背景色
  17. async await的用法
  18. babel更新之后的 一些坑
  19. JSON之Asp.net MVC C#对象转JSON,DataTable转JSON,List&lt;T&gt;转JSON,JSON转List&lt;T&gt;,JSON转C#对象
  20. Java编程的逻辑 (75) - 并发容器 - 基于SkipList的Map和Set

热门文章

  1. [Xcode 实际操作]一、博主领进门-(15)读取当前应用的信息
  2. sequoiadb sdbexprt 导入工具进阶使用
  3. 慕课笔记-Java入门第二季
  4. Java的理解
  5. 如何利用python制作微信好友头像照片墙?
  6. 黑马Stream流学习 Stream流 函数式接口 Lambda表达式 方法引用
  7. c3p0连接池的简单使用和测试1
  8. djangoAdmin组件
  9. dzzoffice 任意文件删除漏洞分析
  10. C#项目中的bin目录和obj目录的区别