package com.lin.flink.stream.customPartition;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class StreamingDemoToRedis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("node1", 9000, "\n"); //lpsuh l_words word //对数据进行组装,把string转化为tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<String, String>("l_words", value);
}
}); //创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build(); //创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis");
} public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从接收的数据中获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中获取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
} @Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}

最新文章

  1. 关于php插件pdo_mysql的安装
  2. 【JavaScript】常用方法
  3. PSAM卡与CPU(用户卡)的操作过程
  4. linux下拷贝整个目录
  5. DBA的那些事
  6. 什么是ObjCTypes?
  7. Spring中的循环依赖
  8. CLR 协变、逆变
  9. mybatis 动态sql 插入报错
  10. golang变量的注意
  11. 再谈Lasso回归 | elastic net | Ridge Regression
  12. 工作总结(一):Linux C
  13. element-ui inputNumber、Card 、Breadcrumb组件源码分析整理笔记(三)
  14. 对jquery新增加的class绑定事件 jquery 对相同class 绑定事件
  15. 单纯形法MATALAB实现
  16. 1、RabbitMQ入门
  17. martin/docker-cleanup-volumes
  18. 关于BigDecimal类型在jsp页面中进行除法运算问题
  19. Smobiler实现扫描条码和拍照功能(开发日志八)
  20. hadoop22---wait,notify

热门文章

  1. C# Winform 窗体界面”假死”后台线程阻塞 解决办法–BeginInvoke
  2. C#设计模式:工厂模式
  3. PHP中redis加锁和解锁的简单实现
  4. Linux拷贝、移动、删除
  5. react学习笔记_01-jsx
  6. psfgettable - 从控制台字体中提取出嵌入的Unicode字符表
  7. squid代理与缓存(上)
  8. cassandra集群
  9. 4G手机网络通信是如何被黑客远程劫持的?
  10. pandas模块之读取文件