title : webflux延迟队列逻辑更改过程记录
author : simonLee
date : 2022/11/22 10:26

webflux延迟队列逻辑更改过程记录

一、问题背景

一个业务使用了webflux.delayElement(Duration delay)来发送延迟消息,由于是异步,发送出去无法通过别的途径拿到该延迟消息(webflux没有相关api),但新需求功能上需要对发出去的延迟消息提供终止功能,且规定时间内可重复无限次

老功能简单来说就是有一个数字,数字在选择时间内按照正态分布有规律的一定时间间隔发送延迟消息,并对消息进行后续处理。例:设定数字10000,在半小时内,按一定时间间隔平分10000发送延迟消息。

新功能就是在老功能的基础上,支持中途中断往后的所有消息。

二、当前实现代码

可以见到,在delayElement后使用了doOnNext()做后续操作以及flatMap重新封装流对象,而想要做中途终止,必须能拿到延迟队列。"理解是第一步",我们需要先了解这几行代码的底层逻辑,才能对其进行修改。

三、逻辑详解

3.1创建事件链

方法开头使用了一个Mono对象作为事件链的头,由于webflux都是使用流式编程,使用一个对象便可把所有遍历对象连接起来。

3.2delayElement

先附上delayElement源码

该入参是一个Duration类,意为多久之后消费。在内部又调用了一个重载方法,入参为Duration和Scheduler,时间以及reactor的调度器。

Schedulers.parallel()是调度器的一个线程模型。

从这里我们大概可以知道webflux封装的delayElement也是通过线程延时执行的

3.3publishOn

下一步的publishOn则是将链中的调度器调整为boundedElastic,是一个有界可复用线程池,可动态调节。该线程池可适当解决"且规定时间内可重复无限次"产生的内存问题。

该线程池容量是CPU核心数的10倍,最多提交10万条任务。如果是延时调度,那么延时开始时间是在有线程可用时才开始计算。

3.4doOnNext

再下一步的doOnNext是在不对序列造成改变的情况下,发出元素,相当于做一些额外的逻辑。

在这里doOnNext里是做调用腾讯云操作

3.5flapMap

最后一步是flapMap,即重新执行逻辑修改元素内容。这里还做了落库操作并返回落库方法返回的元素。

四、代码总结

// 由调用链可知道
/*
代码相当于做了这样的操作,封装流,交由webflux执行
*/
eventChain.
.delayElement()
.publishOn()
.doOnNext()
.flatMap() .delayElement()
.publishOn()
.doOnNext()
.flatMap() .delayElement()
.publishOn()
.doOnNext()
.flatMap() ...重复

从Mono模型上来看,当链路中有一个元素终止或者抛错那么整条链路就会中断

五、逻辑修改总结

由于该方法是封装整个链路给到webflux,所以我们只需在封装前给定一个批次号用作是否终止判断,在发送延迟消息后加上一个终止条件判断(暂定redisKey)的元素,若是已终止则在此元素抛出流异常,则整个流会捕获异常结束被gc掉,内存也得以保障。

最新文章

  1. mysql 5.7修改密码
  2. Effective Objective-C 2.0 学习记录
  3. websocket++编译过程
  4. style、currentStyle、getComputedStyle区别介绍
  5. Pandas简易入门(三)
  6. window redis 安装配置
  7. SCTP 关联的建立和终止
  8. ios 测试工程是否内存泄漏
  9. Android------自定义ListView详解
  10. python基础操作_集合_三元运算
  11. JavaScript+svg绘制的一个饼状图
  12. JAVA多线程---高并发程序设计
  13. UNIX环境高级编程——线程私有数据
  14. Java NIO Channel to Channel Transfers通道传输接口
  15. 【bzoj2432】【NOI2011】兔农
  16. 课堂小记---JavaScript(3)
  17. react_结合 redux - 高阶函数 - 高阶组件 - 前端、后台项目打包运行
  18. wpf 加载资源文件
  19. HDOJ2099_整数的尾数
  20. PHP常用工具类积累

热门文章

  1. SCI论文写作指南
  2. Oracle基础知识汇总一
  3. PAT (Basic Level) Practice 1018 锤子剪刀布 分数 20
  4. 《吐血整理》高级系列教程-吃透Fiddler抓包教程(26)-Fiddler如何抓取Android7.0以上的Https包-上篇
  5. 前端程序员学习 Golang gin 框架实战笔记之一开始玩 gin
  6. 【疫情动态条形图】用Python开发全球疫情排名动态条形图bar_chart_race
  7. Mac Mojave 10.14.5安装python tesserocr
  8. javaweb 导出文件名乱码的问题解决方案
  9. 【Spring boot】整合tomcat底层原理
  10. vulnhub靶场|NAPPING: 1.0.1