延迟队列的应用场景也很常见,例如:session的超时过期、自动取消未付款订单等等。redis中有一种数据结构叫做zset,即有序集合。元素类型为String类型,且元素具有唯一性不能重复,每个元素可附带float类型的score即分值。从zset中获取元素的时候可以通过分值进行排序后获取某个分值范围内的元素或所有元素。

public class DelayQueue {

    private String redisHost = "10.5.31.155";
private int redisPort = 6379;
private Jedis redis; private String queueMapKey = "DelayQueueMap";
private String queueSetKey = "DelayQueueSet"; private int delaySecond = 3; @Before
public void before() {
redis = new Jedis(redisHost, redisPort);
} @Test
public void pub() throws InterruptedException {
for (int i = 1; i <= 100000; i++) {
String messageId = UUID.randomUUID().toString().replace("-", "");
String messageBody = "第" + i + "条消息:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
redis.hset(queueMapKey, messageId, messageBody);
redis.zadd(queueSetKey, System.currentTimeMillis() + (delaySecond * 1000), messageId);
Thread.sleep(Math.round(Math.floor(Math.random() * 2000)));
}
} @Test
public void sub() throws InterruptedException {
while (true) {
Set<Tuple> messages = redis.zrangeByScoreWithScores(queueSetKey, System.currentTimeMillis() - (delaySecond * 1000), System.currentTimeMillis());
for (Tuple message : messages) {
Long zrem = redis.zrem(queueSetKey, message.getElement());
if (zrem > 0) {
String messageBody = redis.hget(queueMapKey, message.getElement());
redis.hdel(queueMapKey, message.getElement());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + ":" + messageBody);
}
}
Thread.sleep(1000);
}
} @After
public void after() {
redis.close();
} }

输出结果:可以看到pub的消息都是延迟3秒被消费的。

2018-09-28 15:02:58.863:第1条消息:2018-09-28 15:02:52.891
2018-09-28 15:02:58.866:第2条消息:2018-09-28 15:02:54.240
2018-09-28 15:02:59.870:第3条消息:2018-09-28 15:02:56.011
2018-09-28 15:02:59.872:第4条消息:2018-09-28 15:02:56.373
2018-09-28 15:02:59.874:第5条消息:2018-09-28 15:02:56.587

上面的代码模拟了一个简单的延迟队列,思路如下:

  1. 在redis中建立一个hash类型数据,用来存储消息id及消息内容
  2. 在redis中建立一个zset类型数据,用来存储消息id及对应的分数(该消息的过期时间)
  3. pub端推送消息时候,先写hash数据,再写zset数据。
  4. sub端定时按照分数从zset中获取消息id,获取到消息id后逐个删除,删除成功后再处理消息。

缺点:

  1. sub端需要定时轮训,所以会出现不及时消费的情况
  2. 如果pub端的生产能力大于sub端的消费能力,则会导致redis内数据越来越多

需要注意的是:

  1. sub端获取消息、删除消息、处理消息不是一个原子操作,在高并发的情况下,获取到的消息可能被其他服务删除。所以在删除、处理消息的时候,不能一次性删除所有获取到的消息,而是要逐条删除后判断是否删除成功再处理消息。以免消息被重复消费。
  2. 为什么要用消息id呢,不能直接把消息内容放在zset里吗?答案是不可以。消息内容是可能重复的,而zset中的String是不能重复的。
  3. 存在消息id及消息内容的数据不能直接使用redis的String数据结构吗?答案是不建议。因为这样会导致redis的key急剧曾多。

最新文章

  1. redis慢查询日志
  2. java基础之——DecimalFormat格式化数字
  3. ubuntun pptpd
  4. MVC中的错误过滤器无法拦截URL路径错误的解决办法
  5. QF——OC的多态,动态绑定及实现原理
  6. Socket 基础解析使用ServerSocket建立聊天服务器
  7. python模块:csv
  8. react组件中的constructor和super小知识
  9. JVM虚拟机查找类文件的顺序
  10. MyBatis全局配置文件mybatis-config.xml
  11. 【libreOJ模板】并查集(输入挂,取模与find优化)
  12. [PHP]PHP缓存机制之Output Control
  13. Android studio的gradle
  14. nginx之location.md
  15. Excel函数使用
  16. package.json参数简单介绍
  17. 一个基于NodeJS开发的APP管理CMS系统
  18. POJ--2449--Remmarguts&amp;#39; Date【dijkstra_heap+A*】第K短路
  19. BZOJ 1192 鬼谷子的钱袋 找规律
  20. iOS开发之--TableViewCell重用机制避免重复显示问题

热门文章

  1. bsearch的使用
  2. 基于RabbitMQ的MQTT协议及应用
  3. VUE - 使用axios数据请求时数据绑定时 报错 TypeError: Cannot set property &#39;xxxx&#39; of undefined 的解决办法
  4. C++编程学习(十二) STL
  5. postProcessBeanFactory方法源码跟踪
  6. 谈谈HashSet的存储原理及为什么重写equals必须重写hashcode方法
  7. python进行md5加密的两种方法
  8. 吴裕雄 Bootstrap 前端框架开发——Bootstrap 字体图标(Glyphicons):glyphicon glyphicon-question-sign
  9. 2020/2/21 fiyocms代码审计
  10. HYSBZ - 1588 营业额统计 (伸展树)