Flink ETL 实现数据清洗 

  

 一:需求(针对算法产生的日志数据进行清洗拆分)

  1. 算法产生的日志数据是嵌套json格式,需要拆分

  2.针对算法中的国家字段进行大区转换

  3.最后把不同类型的日志数据分别进行储存

二:整体架构 

      这里演示处理从rabbitmq来的数据 进行数据处理 然后发送到rabbitmq                            

 自定义redistSource flink没有redis的source

package com.yw.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException; import java.util.HashMap;
import java.util.Map; /**
* redis中进行数据初始化
* <p>
* 在reids中保存国家和大区关系
* hset areas AREA_IN IN
* hset areas AREA_US US
* hset areas AREA_CT TW,HK
* hset areas AREA_AR PK,KW,SA
*
*
* @Auther: YW
* @Date: 2019/6/15 10:23
* @Description:
*/
public class MyRedisSource implements SourceFunction<HashMap<String, String>> {
private final Logger LOG = LoggerFactory.getLogger(MyRedisSource.class); private boolean isRuning = true;
private Jedis jedis = null;
private final long SLEEP = 60000;
private final long expire = 60; @Override
public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
this.jedis = new Jedis("localhost", 6397);
// 存储国家和地区关系
HashMap<String, String> map = new HashMap<>();
while (isRuning) {
try {
map.clear(); // 老数据清除
Map<String, String> areas = jedis.hgetAll("areas");
for (Map.Entry<String, String> entry : areas.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
String[] splits = value.split(",");
for (String split : splits) {
map.put(split, key);
}
}
if (map.size() > 0) {
// map >0 数据发送出去
ctx.collect(map);
}else {
LOG.warn("获取数据为空!");
}
// 歇6秒
Thread.sleep(SLEEP);
} catch (JedisConnectionException e) {
LOG.error("redis连接异常 重新连接",e.getCause());
// 如果连接异常 重新连接
jedis = new Jedis("localhost", 6397);
}catch (Exception e){
LOG.error("redis Source其他异常",e.getCause());
} }
} @Override
public void cancel() {
isRuning = false;
while (jedis != null) {
jedis.close();
}
}
}
DataClean数据处理
package com.yw;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.yw.source.MyRedisSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector; import java.util.HashMap; /**
* @Auther: YW
* @Date: 2019/6/15 10:09
* @Description:
*/
public class DataClean {
// 队列名
public final static String QUEUE_NAME = "two.aa.in"; public static void main(String[] args) throws Exception {
// 获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 一分钟 checkpoint
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // enableCheckpointing最小间隔时间(一半)
env.getCheckpointConfig().setCheckpointTimeout(10000);// 超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); final RMQConnectionConfig rmqConf = new RMQConnectionConfig.Builder().setHost("127.0.0.1").setPort(5672).setVirtualHost("/").setUserName("guest").setPassword("guest").build();
// 获取mq数据
DataStream<String> data1 = env.addSource(new RMQSource<String>(rmqConf, QUEUE_NAME, false, new SimpleStringSchema())).setParallelism(1);
//{"dt":"2019-06-10","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.1,"level":"B"},{"type":"s3","score":0.2,"level":"C"}]}
DataStreamSource<HashMap<String, String>> mapData = env.addSource(new MyRedisSource());
// connect可以连接两个流
DataStream<String> streamOperator = data1.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {
// 保存 redis返回数据 国家和大区的映射关系
private HashMap<String, String> allMap = new HashMap<String, String>(); // flatMap1 处理rabbitmq的数据
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
String countryCode = jsonObject.getString("countryCode");
String dt = jsonObject.getString("dt");
// 获取大区
String area = allMap.get(countryCode);
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject1 = jsonArray.getJSONObject(i);
jsonObject1.put("area", area);
jsonObject1.put("dt", dt);
out.collect(jsonObject1.toJSONString());
}
} // 处理redis的返回的map类型的数据
@Override
public void flatMap2(HashMap<String, String> value, Collector<String> out) throws Exception {
this.allMap = value;
}
});
streamOperator.addSink(new RMQSink<String>(rmqConf, new SimpleStringSchema(), new RMQSinkPublishOptions<String>() {
@Override
public String computeRoutingKey(String s) {
return "CC";
} @Override
public AMQP.BasicProperties computeProperties(String s) {
return null;
} @Override
public String computeExchange(String s) {
return "test.flink.output";
}
}));
data1.print();
env.execute("etl");
}
}

rabbitmq 模拟数据

package com.yw;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random; /**
* @Auther: YW
* @Date: 2019/6/5 14:57
* @Description:
*/
public class RabbitMQProducerUtil {
public final static String QUEUE_NAME = "two.aa.in"; public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setPort(5672); //创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
// 声明一个队列
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息到队列中
String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\"," +
"{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," +
"{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}," +
"{\"type\":\""+getType()+"\",\"score\":"+getScore()+"\"level\":\""+getLevel()+"\"}]}"; //我们这里演示发送一千条数据
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
System.out.println("Producer Send +'" + message);
} //关闭通道和连接
channel.close();
connection.close();
} public static String getCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date());
} public static String getCountryCode() {
String[] types={"US","TN","HK","PK","KW","SA","IN"};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
} public static String getType() {
String[] types={"s1","s2","s3","s4","s5"};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
} public static String getScore() {
String[] types={"0.1","0.2","0.3","0.4","0.5"};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getLevel() {
String[] types={"A","B","C","D","E"};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
}

redis 初始化数据

* hset areas AREA_IN IN
* hset areas AREA_US US
* hset areas AREA_CT TW,HK
* hset areas AREA_AR PK,KW,SA

------------最后运行DataClean------------

最新文章

  1. 负载均衡——nginx理论
  2. 学习 shell脚本之前的基础知识
  3. 书柜的尺寸(bzoj 1933)
  4. Three.js学习(相机,场景,渲染,形状)
  5. C#执行存储过程的简化
  6. IIS-Server is too busy _解决方法
  7. g++编译总结
  8. http://www.cnblogs.com/AloneSword/p/3370462.html
  9. poj 3565 二分图最优匹配
  10. [Database]Operators
  11. Javascript:看 Javascript 规范,学 this 引用,你会懂的。
  12. Spring基础学习(二)&mdash;详解Bean(上)
  13. [Apio2012]dispatching 左偏树
  14. 使用 symbolicatecrash 解析崩溃堆栈
  15. eclipse上的maven,添加依赖后无法自动下载相应的jar包
  16. OUTLOOK - Unable to Delete Meetings
  17. Apktool反编译apk资源文件
  18. Xcode下的中文乱码问题
  19. Docker集群管理(三)—— docker swarm mode基础教程
  20. python yaml

热门文章

  1. .NET添加新项目-配置不同环境参数
  2. 手工部署yugabyte的几点说明
  3. Lightning Web Components 来自salesforce 的web 组件化解决方案
  4. A simple dispiction of dijkstra
  5. 初试angularjs动画(animate)
  6. bash循环得到日期目录
  7. 原创:logistic regression实战(一):SGD Without lasso
  8. CFD-Post中截取任意面的数据
  9. Intellij IDEA 智能补全的 10 个姿势,太牛逼了。。
  10. Remind Me