背景

  工作中经常会有定时任务的需求,常见的做法可以使用Timer、Quartz、Hangfire等组件,这次想尝试下新的思路,使用RabbitMQ死信队列的机制来实现定时任务,同时帮助再次了解RabbitMQ的死信队列。

交互流程

  

  1. 用户创建定时任务

  2. 往死信队列插入一条消息,并设置过期时间为首个任务执行时间

  3. 死信队列中的消息过期后,消息流向工作队列

  4. 任务执行消费者监听工作队列,工作队列向消费者推送消息

  5. 消费者查询数据库,读取任务信息

  6. 消费者确认任务有效(未被撤销),执行任务

  7. 消费者确认有下个任务,再往死信队列插入一条消息,并设置过期时间为任务执行时间

  8. 重复2-7的步骤,直到所有任务执行完成或任务撤销

环境准备

  请自行完成MongoDB和RabbitMQ的安装,Windows、Linux、Docker皆可,以下提供Windows的安装方法:

  MongoDB:https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/

  RabbitMQ:https://www.rabbitmq.com/install-windows.html

核心代码

  1. (WebApi)创建任务,并根据设置创建子任务,把任务数据写入数据库

    var task = new Task
{
Name = form.Name,
StartTime = form.StartTime,
EndTime = form.EndTime,
Interval = form.Interval,
SubTasks = new List<SubTask>()
}; var startTime = task.StartTime;
var endTime = task.EndTime; while ((endTime - startTime).TotalMinutes >= )
{
var sendTime = startTime;
if (sendTime <= endTime && sendTime > DateTime.UtcNow)
{
task.SubTasks.Add(new SubTask { Id = ObjectId.GenerateNewId(), SendTime = sendTime });
} startTime = startTime.AddMinutes(task.Interval);
} await _mongoDbContext.Collection<Task>().InsertOneAsync(task);

  2. (WebApi)往死信队列中写入消息

    var timeFlag = task.SubTasks[].SendTime.ToString("yyyy-MM-dd HH:mm:ssZ");
var exchange = "Task";
var queue = "Task"; var index = ;
var pendingExchange = "PendingTask";
var pendingQueue = $"PendingTask|Task:{task.Id}_{index}_{timeFlag}"; using (var channel = _rabbitConnection.CreateModel())
{
channel.ExchangeDeclare(exchange, "direct", true);
channel.QueueDeclare(queue, true, false, false);
channel.QueueBind(queue, exchange, queue); var retryDic = new Dictionary<string, object>
{
{"x-dead-letter-exchange", exchange},
{"x-dead-letter-routing-key", queue}
}; channel.ExchangeDeclare(pendingExchange, "direct", true);
channel.QueueDeclare(pendingQueue, true, false, false, retryDic);
channel.QueueBind(pendingQueue, pendingExchange, pendingQueue); var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
["index"] = index,
["id"] = task.Id.ToString(),
["sendtime"] = timeFlag
}; properties.Expiration = ((int)(task.SubTasks[].SendTime - DateTime.UtcNow).TotalMilliseconds).ToString(CultureInfo.InvariantCulture);
channel.BasicPublish(pendingExchange, pendingQueue, properties, Encoding.UTF8.GetBytes(string.Empty));
}

  其中:

  PendingTask为死信队列Exchange,死信队列的队列名(Queue Name)会包含Task、index、timeFlag的信息,帮助跟踪队列和子任务,同时也起到唯一标识的作用。

  task.id为任务Id

  index为子任务下标

  timeFlag为子任务执行时间

  3. (消费者)处理消息

    var exchange = "Task";
var queue = "Task"; _channel.ExchangeDeclare(exchange, "direct", true);
_channel.QueueDeclare(queue, true, false, false);
_channel.QueueBind(queue, exchange, queue); var consumer = new EventingBasicConsumer(_channel);
   //监听处理
consumer.Received += (model, ea) =>
{
     //获取消息头信息
var index = (int)ea.BasicProperties.Headers["index"];
var id = (ea.BasicProperties.Headers["id"] as byte[]).BytesToString();
var timeFlag = (ea.BasicProperties.Headers["sendtime"] as byte[]).BytesToString();    //删除临时死信队列
_channel.QueueDelete($"PendingTask|Task:{id}_{index}_{timeFlag}", false, true); var taskId = new ObjectId(id);
var task = _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefault();      //撤销或已完成的任务不执行
if (task == null || task.Status != TaskStatus.Normal)
{
_channel.BasicAck(ea.DeliveryTag, false);
return;
}

     //执行任务
_logger.LogInformation($"[{DateTime.UtcNow}]执行任务...");

//设置子任务已完成
task.SubTasks[index].IsSent = true; if (task.SubTasks.Count > index + ) //还有未完成的子任务,把下个子任务的信息写入死信队列
{
PublishPendingMsg(_channel, task, index + );
}
else
{
task.Status = TaskStatus.Finished; //所有子任务执行完毕,设置任务状态为完成
} _mongoDbContext.Collection<Task>().ReplaceOne(n => n.Id == taskId, task); //更新任务状态
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume(queue, false, consumer);

  4. (WebApi)撤销任务,更新任务状态即可

    var taskId = new ObjectId(id);
var task = await _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefaultAsync();
if (task == null)
{
return NotFound(new { message = "任务不存在!" });
} task.Status = TaskStatus.Canceled;
await _mongoDbContext.Collection<Task>().FindOneAndReplaceAsync(n => n.Id == taskId, task);

效果展示

  1. 先使用控制台把消费者启动起来。

  

  2. 创建任务

  启动WebApi,创建一个任务,开始时间为2019-07-16T07:55:00.000Z,结束时间为2019-07-16T07:59:00.000Z,执行时间间隔1分钟:

  

  任务与相应的子任务也写入了MongoDB,这里假设子任务可能是邮件发送任务:

  

  

  创建了一个临时死信队列,队列名称包含任务Id,子任务下标、以及子任务执行时间,并往其写入一条消息:

  

  3. 执行(子)任务

  从日志内容可以看出,(子)任务正常执行:

  

  子任务状态也标注为已发送

  

  同时也往消息队列写入了下一个子任务的消息:

  

  

  4. 撤销任务

  

  任务状态被置为已撤销:

  

  任务没再继续往下执行:

  

  

  消息队列中的临时队列被删除,消息也被消费完

  

源码地址

  https://github.com/ErikXu/rabbit-scheduler

最新文章

  1. javascript - 状态模式 - 简化分支判断流程
  2. django redis操作
  3. mongodb学习03 操作详解
  4. hdu 1312
  5. ruby -- 进阶学习(十五)friendly_id配置
  6. [ActionScript 3.0] AS3调用百度天气预报查询API
  7. golang初体验
  8. MongoDB基本操作
  9. Java WeakHashMap 源码解析
  10. .NET 编译器(”Roslyn“)介绍
  11. 【JavaScript】轻易改变的背景和字体颜色页面
  12. CoreAnimation 目录
  13. css预处理器less和scss之sass介绍(二)
  14. queue的入门
  15. java--计时器
  16. Leetcode_70_Climbing Stairs
  17. 介绍几个好用的android自定义控件
  18. Java基础随记-不定时更新
  19. Java——对象比较
  20. MySQL排序函数field()详解

热门文章

  1. iOS NSString追加字符串的方法
  2. select ,update 加锁
  3. Database Comparer VCL 6.4.908.0 D5-XE10.1
  4. 还在羡慕BAT等公司的大流量的架构吗,commonrpc 是一个以netty 传输协议框架为基础(支持FTP)
  5. libevent for qt的讨论
  6. Delphi&amp;C#代码模拟“显示桌面”的功能(使用CreateOleObject(&#39;Shell.Application&#39;))
  7. face=&#39;Webdings&#39; 的字体对照表
  8. kafka笔记6
  9. Python连载7-time包的其他函数
  10. centos安装最新版MySQL 8.0教程