一、背景:

我司的系统,用户可以创建任务,启动任务,但任务的运行需要很长的时间,所以采用消息队列的方式,后台异步处理。

这里所用到的是 RabbitMQ ,对应的 Node.js 库为 amqplib ( 这里采用的是回调形式:require("amqplib/callback_api") )。

二、MQ 处理任务的流程


① ② ③ ④ ⑤ :从前端发来 HTTP 请求,被 Producer(express) 处理,经过 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),发送需要处理的任务的 uuid 入 MQ 队列。这时候,还要修改数据库,把该任务的状态从 "new" -> "queue"。

⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 队列吐出来的 message,即任务的 uuid。先修改数据库该任务的状态为"runnning", 然后调用"处理"模块去执行复杂运算,执行完成后,修改数据库该任务的状态是 "success" 还是 ”fail”,然后返回 ack 信号给 MQ 。

注:这次的需求比较简单,所以没有用到 MQ 的交换机功能。

三、问与答


Q:如何做好 MQ 的错误处理?

MQ 的 connection 和 channel 对象都有 "error" 和 "close" 事件,需做好相关的日志记录。尤其是 "error",要加上 reconnect 机制,防止因为某个任务导致的错误或者 MQ 自身的原因,影响到后续任务的处理。

connection.on("error", function(err) {
// reconnect
}); channel.on("error", function(err) {
// reconnect
});

最后可以根据实际需要,在全局加上 try……catch。

Q:如何保证 MQ 自身消息的数据安全?

为了防止 MQ server 的崩溃导致的消息损失,需要对数据做持久化。大致分两块:

队列持久化 + 消息持久化

channel.assertQueue(queue_name, {
durable: true
// 队列持久化
});
channel.sendToQueue(
queue_name,
Buffer.from(uuid),
{
persistent: true
// 消息持久化
},
function(err, ok) { }
);

Q:如何保证 DB 跟 MQ 数据的一致性?

1、发送 message 时

④ 中的 sendToQueue() ,需要在 createConfirmChannel() 的基础下使用,这样 sendToQueue() 的第三个参数才有 MQ 收到 message 成功与否的回调,根据这个,去结合 ② 的 DB 操作, 绑定为事务,来保证数据的一致性。

2、接受 message 时

channel.consume() 需开启 ack 模式,等 Consumer 端一切确认完成后,再通知 MQ 。

channel.consume(
queue_name,
function(msg) {
const uuid = msg.content.toString();
// use uuid todo……
},
{
noAck: false
}
);

Q:如何避免 MQ 多发、少发的问题

从上面的 如何保证 DB 跟 MQ 数据的一致性? 其实就避免了该问题的发生。

但是额外要做的是:

1、重试机制,例如 发送 message 失败,规定重试的次数。

2、善用 MQ 的 Web 控制台,地址形如 http://localhost:15672。除了关注基本的服务器负载状态,还要关注任务队列是否正常吞吐,是否有卡壳。

3、构建运维一体的后台管控系统,比上面的 2 自定义程度更高。

4、提供用户类似"提交工单"/"问题反馈"/"错误上传"的功能,查缺补漏。

最新文章

  1. webstorm快捷键大全
  2. Java Generics and Collections-8.1
  3. PHP中计划任务
  4. poj1131-Octal Fractions(进制转换)
  5. 比较常用的几个maven第三方镜像
  6. switch...case和if...else if的判断应用
  7. oss cmd
  8. 关于 try catch catch
  9. 对于JVM内存配置参数
  10. django-cms 代码研究(六)plugin的深入分析
  11. Scrum会议4(Beta版本)
  12. ios专题 -内存管理 研究
  13. Android面试题目及其答案
  14. java学习笔记2——Eclipse的安装及汉化图解
  15. springMVC源码分析--HandlerMethod
  16. Unity 5.X扩展编辑器之打包assetbundle
  17. redis 设置
  18. Android Studio添加aar依赖的两种方式
  19. python中mysql数据库的操作-sqlalchemy
  20. usb-cam(1)安装

热门文章

  1. asp.net mvc实现微信外H5支付方法
  2. java-mysql(1)
  3. axios和vuex
  4. Webapi实现websocket实时通讯
  5. lodop+art-template实现web端漂亮的小票样式打印
  6. 你一定能看懂的JDK动态代理
  7. Python自学day-9
  8. yii后台模板标签
  9. AntD使用timePiacker封装时间范围选择器(React hook版)
  10. BZOJ 3990 排序