写入redis

resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));

getRedisSinkConfig

 public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){

        String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
.setSentinels(hosts)
.setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
.setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
.setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
.setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
.setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
return redisProduceConfig; }

MyRedisMapper

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /**
* @Auther WeiJiQian
* @描述 Redis 存储的key和value
*/
public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> { /**
* 设置使用的redis数据结构类型,和key的名词
* 通过RedisCommand设置数据结构类型
* Returns descriptor which defines data type.
*
* @return data type descriptor
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
} /**
* 设置value中的键值对 key的值
* Extracts key from data.
*
* @return key
*/
@Override
public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
return stringStringTuple2.f0;
} /**
* 设置value中的键值对 value的值
* Extracts value from data.
*
* @return value
*/
@Override
public String getValueFromData(Tuple2<String, String> tuple2) {
return tuple2.f1;
} }

最新文章

  1. UML九种图作用简介
  2. MyBatis学习(四)、MyBatis配置文件
  3. 编译nginx时,编译参数注意点
  4. JAVA/Android Map与String的转换方法
  5. bzoj4337: BJOI2015 树的同构
  6. c++获取系统时间(引用别人的博文)
  7. JS学习笔记10_Ajax
  8. WCF 入门(19)
  9. JAVA 中BIO,NIO,AIO的理解
  10. 为网站加入Drupal星球制作RSS订阅源
  11. 玩转Android之手摸手教你DIY一个抢红包神器!
  12. web框架之Spring-MVC环境搭建
  13. 20160211.CCPP体系详解(0021天)
  14. 【清北学堂2018-刷题冲刺】Contest 2
  15. IntelliJ IDEA关于logger的live template配置
  16. 2F+1模式才是高可用 途牛旅游网 还是通过proxy层
  17. editplus配置csharp
  18. springmvc跨域+token验证
  19. 使用Lua的扩展库LuaSocket用例
  20. 2.Access the mongo Shell Help-官方文档摘录

热门文章

  1. 面试阿里,字节跳动90%会被问到的Java异常面试题集,史上最全系列!
  2. Pinpoint 设置微信或者钉钉预警
  3. CorelDRAW文件损坏的几种解决方法
  4. FL studio系列教程(十八):FL Studio输出监视面板讲解
  5. guitar pro 系列教程(六):Guitar Pro音频导出功能之RSE音源
  6. word查找与替换
  7. api4excel - 接口自动化测试excel篇
  8. java编写规范及注意事项
  9. 【mq读书笔记】消息拉取
  10. 课时一:JS操作数据