半同步半异步模式的实现 - MSMQ实现

所谓半同步半异步是指,在某个方法调用中,有些代码行是同步执行方式,有些代码行是异步执行方式,下面我们来举个例子,还是以经典的PlaceOrder来说,哈哈。

PlaceOrder的主要逻辑:

public bool PlaceOrder(OrderInfo order)
{
//验证Order合法性 //OrderInfo增加到仓储 //生成order的pdf //通知客户,email方式
}

我们假设做出如下决定:

public bool PlaceOrder(OrderInfo order)
{
//验证Order合法性 (同步执行) //OrderInfo增加到仓储 (异步执行,考虑到锁,要放到消息队列中,让后台Worker来执行具体的sql操作) //生成order的pdf (同步执行,其实也可以异步,此处demo意图,因此做成同步执行) //通知客户,email方式 (同步执行,同上)
}

如上面所示,如果我们只是在"OrderInfo增加到仓储"这里通过Async方式(无论是多线程,或者是msmq、rabbitq),如果只是触发这个异步执行,那么到函数返回时,很可能这个异步操作还没有执行完成,但是UI层(又或者其他函数)却需要某些信息,比如OrderID。因此,在这个函数中,除了发起异步调用外,还需要wait,直到异步调用执行完成,其实这个函数相当于完成了2个线程并行执行,但是最终返回的条件必须是2个线程都执行完成。

我们来看下

        EventWaitHandle signal = new EventWaitHandle(true, EventResetMode.ManualReset);   //多线程之间的阻塞机制,需要这个变量
OrderInfo returnedOrderInfo = null; //从多线程那边返回的变量会保存在这里
public bool PlaceOrder(OrderInfo order)
{
//验证Order合法性 (同步执行) //OrderInfo增加到仓储 (异步执行,考虑到锁,要放到消息队列中,让后台Worker来执行具体的sql操作)
string msgId=SendOrder2Queue(order); //发送order到msmq,并且获取相应的消息ID,因为后面会用到这个消息ID //生成order的pdf //会在这里阻塞,直到应答队列出现相应msgId的消息为止
signal.Reset(); //初始化变量
returnedOrderInfo = null; //初始化变量
ThreadPool.QueueUserWorkItem(new WaitCallback(CheckResponseQueue), msgId); //调用ms的ThreadPool进行多线程
signal.WaitOne(); //阻塞住,直到子线程发出Set命令 //通知客户,email方式 //收尾逻辑
if (this.returnedOrderInfo != null && !this.returnedOrderInfo.OrderID.Equals(Guid.Empty))
{
CloneOrder(order, this.returnedOrderInfo);
return true;
}
return false;
}

需要注意的地方是,发送email那里,必须放在最后面,想象下这种情况:插入数据库失败了,此时msmq的应答队列就会返回相应的失败消息,此时就需要根据结果来判断,是否需要发送email了(上面代码没有考虑到这点)。

考虑到断电等情况,需要将msmq创建为Transaction类型的queue,如下:

public static class Config
{
public static readonly string OrderQueueConnectionString = ".\\private$\\Order";
public static readonly string OrderResponseQueueConnectionString = ".\\private$\\OrderResponse";
public static void PrepareMSMQ()
{
if (MessageQueue.Exists(OrderQueueConnectionString))
MessageQueue.Delete(OrderQueueConnectionString);
if (MessageQueue.Exists(OrderResponseQueueConnectionString))
MessageQueue.Delete(OrderResponseQueueConnectionString); MessageQueue.Create(OrderQueueConnectionString, true); //true,代表Transaction的队列
MessageQueue.Create(OrderResponseQueueConnectionString, true); //同上
}
}

大家已经看到了,其实有2个队列需要建立,一个是发送队列,另外一个是应答队列;ThreadPool中的子线程就是用来Monitor这个应答队列中是否有相应消息的,我们来看看代码:

private void CheckResponseQueue(object state)
{
string msgId = (string)state;
string sMessageConnectionString_ResponseQueue = Config.OrderResponseQueueConnectionString;
MessageQueue mq_response = new MessageQueue(sMessageConnectionString_ResponseQueue);
mq_response.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) }); //这个很重要,要传输什么样的消息,就要写相应的格式化程序
while (true)
{
System.Threading.Thread.Sleep(200);
Message[] msgs = mq_response.GetAllMessages(); //这句会获取所有的消息,但是不会从队列中去掉,类似于Peek的效果 string foundMsgId = string.Empty;
foreach(Message msg in msgs)
{
if (msg.Label == msgId)
{
foundMsgId = msg.Id;
break;
}
}
if (foundMsgId != string.Empty)
{
Message msg=mq_response.ReceiveById(foundMsgId); //找到msgId后,这句才会真正从队列中移除消息
OrderInfo replyOrderInfo=(OrderInfo)msg.Body; returnedOrderInfo = new OrderInfo(); //赋值给返回变量
returnedOrderInfo.FirstName = replyOrderInfo.FirstName;
returnedOrderInfo.LastName = replyOrderInfo.LastName;
returnedOrderInfo.BuyWhat = replyOrderInfo.BuyWhat;
returnedOrderInfo.OrderID = replyOrderInfo.OrderID;
break;
}
}
signal.Set(); //发出信号,代表完成,让主线程继续往下执行
}

下面再来看看消息发送后,真正的后台处理程序(是个Console程序):

class Program
{
static void Main(string[] args)
{
System.Threading.Thread.Sleep(2000);
string msmq = Core.Config.OrderQueueConnectionString;
if (!MessageQueue.Exists(msmq))
{
Console.WriteLine("msmq not exist.");
return;
} MessageQueue mq = new MessageQueue(msmq);
MessageQueueTransaction tx = new MessageQueueTransaction(); //事务性队列必须用这个才能正确插入消息
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) });
while (true)
{
Message msg = mq.Receive();
Console.WriteLine("processing {0}", msg.Id);
OrderInfo order = (OrderInfo)msg.Body; order.FirstName +=", processed on "+DateTime.Now.ToString();
order.OrderID = Guid.NewGuid(); if (msg.ResponseQueue != null) //由于在发送消息的时候已经指定了应答队列,因此此处只是简单的判断这个属性就可以了
{
Message msg_reply = new Message();
msg_reply.Body = order;
msg_reply.Label = msg.Id;
msg_reply.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] { typeof(OrderInfo) }); tx.Begin(); //很重要
msg.ResponseQueue.Send(msg_reply, tx);
tx.Commit(); //同上
}
Console.WriteLine("done");
}
}
}

我们再来看下主程序:

class Program
{
static void Main(string[] args)
{
Config.PrepareMSMQ(); //准备msmq资源,比如create等 OrderInfo order = new OrderInfo();
order.FirstName = "aaron";
order.LastName = "dai";
order.BuyWhat = "Car"; Console.WriteLine("Old OrderID--->" + order.OrderID);
Console.WriteLine("Old FirstName--->" + order.FirstName); OrderService srv = new OrderService();
bool success=srv.PlaceOrder(order); Console.WriteLine("*******************************");
Console.WriteLine("Processed OrderID--->" + order.OrderID);
Console.WriteLine("Processed FirstName--->"+order.FirstName);
Console.WriteLine("*******************************");
Console.ReadLine();
}
}

就要好了,来看看效果图:

Code

自省推动进步,视野决定未来。
心怀远大理想。
为了家庭幸福而努力。
用A2D科技,服务社会。
 

最新文章

  1. HDU-1233 还是畅通工程
  2. 理解RESTful
  3. 如何在Rails中执行Get/Post/Put请求
  4. CSS BOX模型
  5. HBase的属性
  6. List应用举例
  7. Matlab网格划分
  8. Mac上pod install一直停住的解决办法
  9. Spring-data-redis: 分布式队列
  10. CentOS 6.0找不到ifcfg-eth0解决方案
  11. Node.js web快速入门 -- KoaHub.js组件koa-static-server
  12. Java 开发中如何正确踩坑
  13. 集合框架之Queue接口
  14. NLP自然语言处理原理及名词介绍
  15. Linux 下的 Docker 安装与使用
  16. canutils上板测试问题记录
  17. loadsh常用函数
  18. C# windows服务:创建Windows服务(Windows Services)的一般步骤
  19. Oracle PLSQL读取(解析)Excel文档
  20. 批量导出VBA工程中的Source

热门文章

  1. 【动态规划】leetcode - Maximal Square
  2. 我在Github上的flare-spark项目
  3. linux添加静态路由表,重新启动继续有效
  4. Asp.net MVC + EF + Spring.Net 项目实践(二)
  5. HTTP 错误500.19 -Internal Server Error
  6. s2sh三大框架整合过程(仅供参考)
  7. linux监控命令全覆盖(图文说明)
  8. linux下编译安装mysql5.5以上版本
  9. 使用JavaCompiler编译java源文件
  10. Android slidingmenu详细解释 滑动的优化