支持异步同步的分布式CommandBus MSMQ实现 - 支持Session传递、多实例处理
2024-08-25 16:44:07
先上一张本文所描述的适用场景图
分布式场景,共3台server:
- 前端Server
- Order App Server
- Warehouse App Server
功能:
- 前端Server可以不停的发送Command到CommandBus,然后由CommandBus分配不同的Command到各自的app server去处理。
- 前端Server可以只发送Command而不必等待Response
- 前端Server可以同步等待Response返回
- MSMQ消息超过3.5M会自动转为网络共享方式传输消息
- 对于同一Command的处理,可以通过增加App Server的方式来提高并发处理速度(比如:可以开2个app server instance来同时处理ACommand的处理)
- 在App server中能读取前端Server的Session value(写Session还不支持)
本文目标是用msmq实现分布式CommandBus的应用(已经更新到A2D Framework中了)。
前端Server发送Command的方式(异步):
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult>(cmdDistributer_ResultCatached);
cmdDistributer.SendRequest(cmd);
同步方式:
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.SendRequest(cmd);
ACommandResult result=cmdDistributer.WaitResponse();
配置文件:
<?xml version="1.0" encoding="utf-8" ?>
<CommandBusSetting>
<AutoCreateIfNotExists>true</AutoCreateIfNotExists>
<WebSessionSupport>true</WebSessionSupport>
<CommandQueue>PC-20130606HCVP\private$\Commands_{0}</CommandQueue>
<ResponseQueue>PC-20130606HCVP\private$\CommandResponses</ResponseQueue>
<NetworkLocation>\\PC-20130606HCVP\network</NetworkLocation>
</CommandBusSetting>
Command的编写方式:
[QueueName("ACommand")]//这个可选,没有QueueName时,默认对应的msmq队列名为类名,此处为ACommand
public class ACommand : BaseCommand //需要继承自BaseCommand
{
public string Tag { get; set; }//自定义的属性
}
后端App Server的编写
CommandHandlerHost1: Console程序,相当于App Server 1,会处理部分Command
static void Main(string[] args)
{
Thread.Sleep(); CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new TestCommandHandlers());
listener.AddHandler(new Test2CommandHandlers());
listener.Start();
Console.ReadKey();
}
CommandHandlerHost2: Console程序,相当于App Server 2,会处理部分Command
static void Main(string[] args)
{
Thread.Sleep(); CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new Test3CommandHandlers());
listener.Start();
Console.ReadKey();
}
CommandHandlers: 所有的Command处理函数都会在这个项目中实现
public class TestCommandHandlers : ICommandHandlers,
ICommandHandler<ACommand, ACommandResult>,
ICommandHandler<BCommand, BCommandResult>
{
public ACommandResult Handler(ACommand cmd, ISessionContext session)
{
Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag); ACommandResult result = new ACommandResult();
result.Result = "result from ACommand";
return result;
} public BCommandResult Handler(BCommand cmd, ISessionContext session)
{
Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag); if (session != null)
{
Console.WriteLine("session not null");
object o = session.GetWebSession("key1");
if (o != null)
{
Console.WriteLine("From Session['key1']: " + Convert.ToString(o));
//session.Set("key1", "changed from command handler: " + DateTime.Now);
}
} BCommandResult result = new BCommandResult();
result.Result = "result from BCommand";
return result;
}
}
下面是目前的性能测试:
下载代码
最新文章
- Visual Studio EventHandler Delegate 和 EventArgs
- HDU 5015
- 【BZOJ 2809】【APIO 2012】dispatching
- css后代选择器(div.class中间不带空格)
- hdu 2837 坑题。
- c#程序中使用";like“查询access数据库语句的问题
- Drupal如何SQL查询传递参数?
- HDU 2586 How far away ? (LCA)
- git 提交远程
- RMQ with Shifts
- Linux 中 crontab 详解及示例
- Number String
- 关于启动文件分析的(MDK-ARM) 【转】
- 在 JPA、Hibernate 和 Spring 中配置 Ehcache 缓存
- Linux系统网络编程中TCP通讯socket--send导致进程被关闭
- 用python语言算π值并且带有进度条
- 树莓派上使用KickThemOut对局域网内的设备进行ARP欺骗
- iOS关于沙盒文件拷贝manager.copyItem的一个坑
- 如何将img垂直居中?
- 如何修改opencart的模版适合为mycncart系统使用