支持异步同步的分布式CommandBus MSMQ实现
2024-10-11 10:42:03
支持异步同步的分布式CommandBus MSMQ实现
先上一张本文所描述的适用场景图
分布式场景,共3台server:
- 前端Server
- Order App Server
- Warehouse App Server
功能:
- 前端Server可以不停的发送Command到CommandBus,然后由CommandBus分配不同的Command到各自的app server去处理。
- 前端Server可以只发送Command而不必等待Response
- 前端Server可以同步等待Response返回
- MSMQ消息超过3.5M会自动转为网络共享方式传输消息
- 对于同一Command的处理,可以通过增加App Server的方式来提高并发处理速度(比如:可以开2个app server instance来同时处理ACommand的处理)
本文目标是用msmq实现分布式CommandBus的应用(已经更新到A2D Framework中了)。
前端Server发送Command的方式(异步):
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult>(cmdDistributer_ResultCatached);
cmdDistributer.SendRequest(cmd);
同步方式:
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.SendRequest(cmd);
ACommandResult result=cmdDistributer.WaitResponse();
配置文件:
<?xml version="1.0" encoding="utf-8" ?>
<CommandBusSetting>
<AutoCreateIfNotExists>true</AutoCreateIfNotExists>
<CommandQueue>PC-20130606HCVP\private$\Commands_{0}</CommandQueue>
<ResponseQueue>PC-20130606HCVP\private$\CommandResponses</ResponseQueue>
<NetworkLocation>\\PC-20130606HCVP\network</NetworkLocation>
</CommandBusSetting>
Command的编写方式:
[QueueName("ACommand")]//这个可选,没有QueueName时,默认对应的msmq队列名为类名,此处为ACommand
public class ACommand : BaseCommand //需要继承自BaseCommand
{
public string Tag { get; set; }//自定义的属性
}
后端App Server的编写
CommandHandlerHost1: Console程序,相当于App Server 1,会处理部分Command
static void Main(string[] args)
{
Thread.Sleep(2000); CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new TestCommandHandlers());
listener.AddHandler(new Test2CommandHandlers());
listener.Start();
Console.ReadKey();
}
CommandHandlerHost2: Console程序,相当于App Server 2,会处理部分Command
static void Main(string[] args)
{
Thread.Sleep(2000); CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new Test3CommandHandlers());
listener.Start();
Console.ReadKey();
}
CommandHandlers: 所有的Command处理函数都会在这个项目中实现
public class TestCommandHandlers : ICommandHandlers,
ICommandHandler<ACommand, ACommandResult>,
ICommandHandler<BCommand, BCommandResult>
{
public ACommandResult Handler(ACommand cmd)
{
Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag); ACommandResult result = new ACommandResult();
result.Result = "result from ACommand";
return result;
} public BCommandResult Handler(BCommand cmd)
{
Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag); BCommandResult result = new BCommandResult();
result.Result = "result from BCommand";
return result;
}
}
下面是目前的性能测试:
下载代码
自省推动进步,视野决定未来。
心怀远大理想。
为了家庭幸福而努力。
A2D科技,服务社会。
A2D Framework(Alpha)
心怀远大理想。
为了家庭幸福而努力。
A2D科技,服务社会。
A2D Framework(Alpha)
- 1. Cache System(本地缓存与分布式缓存共存、支持Memcache和Redis、支持贴标签形式(类似Spring 3.x的Cache形式))
- 2. Event System(本地事件与分布式事件分发)
- 3. IoC(自动匹配功能,实例数量限制功能)
- 4. Sql Dispatcher System(基于Sql server的读写分离系统)
- 5. Session System(分布式Session系统)
- 6. 分布式Command Bus(MSMQ实现,解决4M限制)
最新文章
- The superclass ";javax.servlet.http.HttpServlet"; was not found on the Java 	 Build Path解决方案
- 错误之thinkphp模型使用发生的错误
- 使用开源软件sentry来收集日志
- JAVA 对象内存分析
- hdu 4920 Matrix multiplication(矩阵乘法)2014多培训学校5现场
- hitTest:withEvent:方法流程
- redis源码笔记(一) —— 从redis的启动到command的分发
- 花了一年时间完成的 在线G代码编辑,加工系统 G-Code Editor V1.0
- Django创建通用视图函数
- 洗礼灵魂,修炼python(50)--爬虫篇—基础认识
- C# 爬虫 正则、NSoup、HtmlAgilityPack、Jumony四种方式抓取小说
- 阅读《7 Series FPGAs GTX/GTH Transceivers User Guide》
- 机器学习进阶-图像金字塔与轮廓检测-图像金字塔-(**高斯金字塔) 1.cv2.pyrDown(对图片做向下采样) 2.cv2.pyrUp(对图片做向上采样)
- PAT 1049 数列的片段和(20)(代码+思路分析)
- xss脚本注入后端的防护
- Linux速成(一)
- idea下maven项目增加依赖项目里面没有添加相关依赖jar
- 【洛谷】P3908 异或之和(异或)
- 面向对象的JavaScript-005-Function.prototype.call()的3种作用
- jstree无限级菜单ajax按需动态加载子节点