使用EventNext实现基于事件驱动的业务处理
事件驱动模型相信对大家来说并不陌生,因为这是一套非常高效的逻辑处理模型,通过事件来驱动接下来需要完成的工作,而不像传统同步模型等待任务完成后再继续!虽然事件驱动有着这样的好处,但在传统设计上基于消息回调的处理方式在业务处理中相对比较麻烦整体设计成本也比较高,所以落地也不容易。EventNext
是一个事件驱动的应用框架,它的事件驱动支持接口调用,在一系列的业务接口调用过程中通过事件驱动调用来完成;简单来说组件驱动的接口行为是由上一接口行为完成而触发执行,接下来介绍详细介绍一下EventNext
和使用。
NextQueue
EventNext
组件有一个核心的事件驱动队列NextQueue
,NextQueue
和传统的线程队列有着很大的区别;传统队列都是线程不停的执行消息,下一个消息都是线程等待上一个消息完成后再继续。但NextQueue
的设计则不是,它的所有消息都基于上一个消息完成来驱动(不管上一个消息的逻辑是同步还是异步)。实际情况是NextQueue
触发任务的消息是启用线程工作外,后面的消息都是基于上一个消息回调执行;NextQueue
上的消息执行线程是不确定性也不需要等待,虽然队列里的消息执行线程不是唯一的,但执行顺序是一致的这也是NextQueue
所带来的好处,在有序的情况下确保线程的利用率更高。
组件使用
在使用组前需要引用组件,Nuget安装如下
Install-Package EventNext
通过组件制定的业务必须以接口的方式来描述,而业务的调用也是通过接口的方式进行;虽然组件支持以消息回调的方式便不建议这样做,毕竟面向业务接口有着更好的易用性和可维护性。为了确保业务接口方式 的行为满足事件驱动队列的要求 ,所有业务行为方法必须以Task作为返回值;非Task返回值的行为方法都不能被组件注册和调用。
接口定义和实现
接口的定义有一定的规则,除了方法返回值是Task
外,也不支持同一名称的函数进行重载,如果有需要可以使用特定的Attribute
来标记对应的名称(out类型参数不被支持)。以下是一个简单的接口定义:
public interface IUserService
{ Task<int> Income(int value); Task<int> Payout(int value); Task<int> Amount(); }
业务实现:
[Service(typeof(IUserService))]
public class UserService : IUserService
{
private int mAmount; public Task<int> Amount()
{
return Task.FromResult(mAmount);
} public Task<int> Income(int value)
{
mAmount += value;
return Task.FromResult(mAmount);
} public Task<int> Payout(int value)
{
mAmount -= value;
return Task.FromResult(mAmount);
} }
需要通过ServiceAttribute
来描这个类提供那些事件驱动的接口行为。
使用
组件通过一个EventCenter
的对象来进行逻辑调用,创建该对象并注册相应业务功能的程序集即可:
EventCenter eventCenter = new EventCenter();
eventCenter.Register(typeof(Program).Assembly);
定义EventCenter
加载逻辑后就可以创建代理接口调用
var service=EventCenter.Create<IUserService>();
await server.Payout();
await server.Income();
事件驱动队列分配
组件针对不同情况的需要,可以给接口实例或方法定义不同的事件队列配置,主要为以下几种情况
默认
由组件内部队列组进行负载情况进行配置,这种分配方式会导致同一接口的方法有可能分配在不同的队列上;在默认分配下接口实例的方法会存在多线程中同时的运行,因此这种模式的应用并不是线程安全。
Actor
Actor
相信大家也很熟悉,一种高性能一致性的调度模型;组件支持这种模型的接口实例创建,只需要在创建接口代理的时候指定Actor
名称即可
henry = EventCenter.Create<IUserService>("henry");
当指定Actor
名称后,这个接口的所有方法调用都会一致性到对应实例的队列中,即所有功能方法线程调用的唯一性;在接口调用返回的时候也会再次切入到其他事件驱动队列,确保Actor
内部的工作队列不受响后的应逻辑影响;当使用这种方式时整个Actor实例都是线程安全的。
ThreadPool
这种配置只适用于接口方法,描述方法无论什么情况都从线程池中执行相关代码,此行为的方法非线程安全
[ThreadInvoke(ThreadType.ThreadPool)]
public Task<int> ThreadInvoke()
{
mCount++;
return mCount.ToTask();
}
SingleQueue
这种配置只适用于接口方法,用于描述方法不管那个实例都一致性到一个队列中,此行为的方法内线程安全,不保证对应实例是线程安全.
[ThreadInvoke(ThreadType.SingleQueue)]
public Task<int> GetID([ThreadUniqueID]string name)
{
if (!mValues.TryGetValue(name, out int value))
{
value = ;
}
else
{
value++;
}
mValues[name] = value;
return value.ToTask();
}
在这配置下还可以再细分,如上面的[ThreadUniqueID]
对不同参数做一致性对列,这个时候name的不同值会一致性到不同的事件队列中。
Actor性能对比
组件默认集成了Actor
模型,可以通过它实现高并发无锁业务集成,EventNext
最大的特点是以接口的方式集成应用,相对于akka.net
基于消息接收的模式来说有着明显的应用优势。在性能上EventNext
基于接口的ask机制也比akka.net
基于消息receive的ask机制要高,以下是一个简单的对比测试
akak.net
public class UserActor : ReceiveActor
{
public UserActor()
{
Receive<Income>(Income =>
{
mAmount += Income.Memory;
this.Sender.Tell(mAmount);
});
Receive<Payout>(Outlay =>
{
mAmount -= Outlay.Memory;
this.Sender.Tell(mAmount);
});
Receive<Get>(Outlay =>
{
this.Sender.Tell(mAmount);
});
}
private decimal mAmount;
}
//invoke
Income income = new Income { Memory = i };
var result = await nbActor.Ask<decimal>(income);
Payout payout = new Payout { Memory = i };
var result = await nbActor.Ask<decimal>(payout);
Event Next
[Service(typeof(IUserService))]
public class UserService : IUserService
{
private int mAmount; public Task<int> Amount()
{
return Task.FromResult(mAmount);
} public Task<int> Income(int value)
{
mAmount += value;
return Task.FromResult(mAmount);
} public Task<int> Payout(int value)
{
mAmount -= value;
return Task.FromResult(mAmount);
}
}
//invoke
var result = await nb.Income(i);
var result = await nb.Payout(i);
详细测试代码https://github.com/IKende/EventNext/tree/master/samples/EventNext_AkkaNet 在默认配置下不同并发下的测试结果
Event Sourcing
由于事件驱动提倡的业务处理都是异步,这样就带来一个业务事务性的问题,如何确保不同接口方法业务处理一致性就比较关键了。由于不同的逻辑在不同线程中异步进行,所以相对比较好解决的就是在业务处理时引入Event Sourcing
.以下就简单介绍一下组件这方面的应用,就不详细介绍了。毕竟 Event Sourcing设计和业务还有着一些关系
public async Task<long> Income(int amount)
{
await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = amount, Value = user.Amount + amount });
user.Amount += amount;
return user.Amount;
} public async Task<long> Pay(int amount)
{
await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = -amount, Value = user.Amount - amount });
user.Amount -= amount;
return user.Amount;
}
组件提供事件信息的读写接口IEventLogHandler
可以通过实现这个接口扩展自己的事件源处理。
使用注意事项
适应async/await
其实整个事件队列都是使用async/await
,通过它大大简化了消息和回调函数间不同数据状态整合的难度。.Net
也现有所异步API都支持async/wait
。
异步化设计你的逻辑
在实现接口逻辑的情况尽可能使和异步逻辑方法,在逻辑实施过程中禁用Task.Wait
或一些线程相关Wait
的方法,特别不带超时的Wait
因为这种操作极容易导致事件驱动队列逻辑被挂起,导致队列无法正常工作;更糟糕的情况可能引起事件队列假死的情况。
传统异步API
由于各种原因,可能还存在旧的异步API不支持async/wait
,出现这情况可以通过TaskCompletionSource
来扩展已经有的异步方法支持async/wait
项目地址
https://github.com/IKende/EventNext
最新文章
- linux连接远程桌面
- Python模块学习笔记
- jquery validate ajax submit form
- 解决linux crontab PHP fgetcsv 读取中文数据为空问题
- 开发报表时将已有User做成下拉列表,第一项为label为ALL,value为null
- wpfのpack协议
- [leetcode]_Remove Nth Node From End of List
- xcode升级或者重新安装后不能编译的解决方法
- windows下安装redis和php的redis扩展
- JQuery在光标位置插入内容
- java 的序列化
- python之--------封装
- 前端JS面试题汇总 Part 2 (null与undefined/闭包/foreach与map/匿名函数/代码组织)
- Spring boot中自动编译配置
- 在 Linux 上使用 VirtualBox 的命令行管理界面
- CF101D Castle 树形DP、贪心
- Python从菜鸟到高手(3):声明变量
- centos6.6安装hadoop-2.5.0(五、部署过程中的问题解决)
- 嵌入式开发之视频压缩比---h264、mjpeg、mpeg4
- TraceLog.cs 累积 C#
热门文章
- metasploit-shellcode生成
- javascript querySelector和getElementById通过id获取元素的区别
- https 调用验证失败 peer not authenticated
- MySQL中日期函数的使用
- redis集群理解
- CVS使用之:先update后commit
- 2019年6月14日 Web框架之Django_07 进阶操作(MTV与MVC、多对多表三种创建方式、前后端传输数据编码格式contentType、ajax、自定义分页器)
- ZZULIoj 1912 小火山的爱情密码
- !!注意!部署出现the requested resource is not available
- [WPF自定义控件库]使用WindowChrome的问题