flink连接器-流处理-读写redis
2024-08-30 07:12:43
写入redis
resultStream.addSink(new RedisSink(FlinkUtils.getRedisSinkConfig(parameters),new MyRedisMapper()));
getRedisSinkConfig
public static FlinkJedisSentinelConfig getRedisSinkConfig(ParameterTool parameterTool){
String redisHosts = parameterTool.get(PropertiesUtil.REDIS_HOSTS);
Set<String> hosts = new HashSet<String>(Arrays.asList(redisHosts.split(",")));
FlinkJedisSentinelConfig redisProduceConfig = new FlinkJedisSentinelConfig.Builder()
.setSentinels(hosts)
.setMasterName(parameterTool.get(PropertiesUtil.REDIS_MASTER))
.setPassword(parameterTool.get(PropertiesUtil.REDIS_PASSWORD))
.setMaxIdle(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXIDEL))
.setMaxTotal(parameterTool.getInt(PropertiesUtil.REDIS_POOL_MAXTOTAL))
.setConnectionTimeout(parameterTool.getInt(PropertiesUtil.REDIS_TIMEOUT)).build();
return redisProduceConfig;
}
MyRedisMapper
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @Auther WeiJiQian
* @描述 Redis 存储的key和value
*/
public class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
/**
* 设置使用的redis数据结构类型,和key的名词
* 通过RedisCommand设置数据结构类型
* Returns descriptor which defines data type.
*
* @return data type descriptor
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SETEX, Constant.REDIS_KEY_TTL);
}
/**
* 设置value中的键值对 key的值
* Extracts key from data.
*
* @return key
*/
@Override
public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
return stringStringTuple2.f0;
}
/**
* 设置value中的键值对 value的值
* Extracts value from data.
*
* @return value
*/
@Override
public String getValueFromData(Tuple2<String, String> tuple2) {
return tuple2.f1;
}
}
最新文章
- UML九种图作用简介
- MyBatis学习(四)、MyBatis配置文件
- 编译nginx时,编译参数注意点
- JAVA/Android Map与String的转换方法
- bzoj4337: BJOI2015 树的同构
- c++获取系统时间(引用别人的博文)
- JS学习笔记10_Ajax
- WCF 入门(19)
- JAVA 中BIO,NIO,AIO的理解
- 为网站加入Drupal星球制作RSS订阅源
- 玩转Android之手摸手教你DIY一个抢红包神器!
- web框架之Spring-MVC环境搭建
- 20160211.CCPP体系详解(0021天)
- 【清北学堂2018-刷题冲刺】Contest 2
- IntelliJ IDEA关于logger的live template配置
- 2F+1模式才是高可用 途牛旅游网 还是通过proxy层
- editplus配置csharp
- springmvc跨域+token验证
- 使用Lua的扩展库LuaSocket用例
- 2.Access the mongo Shell Help-官方文档摘录