Flink从socket读取数据sink到redis
2024-08-28 07:01:56
package com.lin.flink.stream.customPartition; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("node1", 9000, "\n"); //lpsuh l_words word //对数据进行组装,把string转化为tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<String, String>("l_words", value);
}
}); //创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build(); //创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis");
} public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从接收的数据中获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中获取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
} @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
最新文章
- 关于php插件pdo_mysql的安装
- 【JavaScript】常用方法
- PSAM卡与CPU(用户卡)的操作过程
- linux下拷贝整个目录
- DBA的那些事
- 什么是ObjCTypes?
- Spring中的循环依赖
- CLR 协变、逆变
- mybatis 动态sql 插入报错
- golang变量的注意
- 再谈Lasso回归 | elastic net | Ridge Regression
- 工作总结(一):Linux C
- element-ui inputNumber、Card 、Breadcrumb组件源码分析整理笔记(三)
- 对jquery新增加的class绑定事件 jquery 对相同class 绑定事件
- 单纯形法MATALAB实现
- 1、RabbitMQ入门
- martin/docker-cleanup-volumes
- 关于BigDecimal类型在jsp页面中进行除法运算问题
- Smobiler实现扫描条码和拍照功能(开发日志八)
- hadoop22---wait,notify