前言

asp.net core版本选择2.2,只是因为个人习惯了vs2017,代码以及设计皆可移植到vs2019,用asp.net core 3.0以及以上运行起来

项目类似选择web api,基础设施选择entity frame core + Masstransit + aspectCore

先赘述一下思路业务,中间通讯以及容错/重试交给masstransit,部分流程的解耦交给aspectCore来完成,这部分包括,错误后通过masstransit发布给错误处理的模块,最后落盘到ef core

整个过程,业务处理的层之间可以通过Masstransit的通讯,可采用rpc的模式,也可以同异步的发布订阅通讯

整体设计是可单体可分布式的理念,可以根据项目变化,可以自行配置,拆分成完成单体到分布式的过程,完成业务分解,但是对于业务使用而言,都是一样的,无感知。

刚开始的文章会有很多代码段说明,属于前置知识的铺垫,可能会有些啰嗦,后续文章会忽略掉无关的大片代码段。

业务演示,我们选择传统的银行转账业务以及电商的支付到下单。


整体crud的设计图

包括了业务层和基础设施层的设计


当前文章的设计图如下

整体采用Mq异步的发布订阅,订阅之间也通过发布订阅通知,完成最终一致性


业务实体

    public class BaseModel
{
[Key]
public int Id { get; set; }
public DateTime CreateTime { get; set; }
}
    public class UserInfo: BaseModel
{ public string NickName { get; set; }
public decimal Money { get; set; }
public DateTime LastOptions { get; set; }
}
    public class PayOrder: BaseModel
{
public int SourceId { get; set; }
public int TargetId { get; set; }
public decimal Money { get; set; }
}

配置EF Core

    public class TransactionContexts:DbContext
{
public DbSet<PayOrder> PayOrders { get; set; }
public DbSet<UserInfo> UserInfos { get; set; } public TransactionContexts(DbContextOptions<TransactionContexts> options):base(options)
{ }
}

配置数据源偷懒就用InMemory了

            services.AddDbContext<TransactionContexts>(build=> {
build.UseInMemoryDatabase("TransactionContexts");
});

实例编写

建立一个交易的Command

    internal class PayOrderCommand
{
public int SourceId { get; set; }
public int TargetId { get; set; }
public decimal Money { get; set; }
}

一个交易的Event

    internal class PayOrderEvent: PayOrderCommand
{ }

Command对外,Event对内,主要是用于项目区分外部流程和内部流程的区别,代码本身是没有硬编码要求的

构建一个交易的服务,对外公开一个转账的接口

    public interface ITransactionService
{
void TransferAccounts(int sourceId, int targetId, decimal money);
}

这个接口完成Publish/Subscribe模式的交易

Publish端

    internal class TransactionService: ITransactionService
{
private IBusControl busControl; public TransactionService(IBusControl busControl)
{
this.busControl = busControl;
} public async void TransferAccounts(int sourceId, int targetId, decimal money)
{
await busControl.Publish(
new PayOrderCommand { SourceId = sourceId, TargetId = targetId, Money = money }
);
}
}

很常见的发布一个命令

Subscribe端

    internal class TransactionConsumer :
IConsumer<PayOrderCommand>,
IConsumer<PayOrderEvent>
{
private TransactionContexts transactionContexts; public TransactionConsumer(TransactionContexts transactionContexts)
{
this.transactionContexts = transactionContexts;
} public async Task Consume(ConsumeContext<PayOrderCommand> context)
{
var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderCommand Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); await transactionContexts.PayOrders.AddAsync(new PayOrder
{
Id = ,
SourceId = value.SourceId,
TargetId = value.TargetId,
Money = value.Money
});
await transactionContexts.SaveChangesAsync(); await context.Publish(new PayOrderEvent
{
SourceId = value.SourceId,
TargetId = value.TargetId,
Money = value.Money
}); await Console.Out.WriteLineAsync($"PayOrderCommand After:{DateTime.Now}");
} public async Task Consume(ConsumeContext<PayOrderEvent> context)
{
var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderEvent Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); var source = transactionContexts.UserInfos.First(user => user.Id == value.SourceId); if (source.Money < value.Money)
throw new Exception(); var target = transactionContexts.UserInfos.First(user => user.Id == value.TargetId); source.Money -= value.Money;
target.Money += value.Money; transactionContexts.UserInfos.Update(source);
transactionContexts.UserInfos.Update(target); await transactionContexts.SaveChangesAsync(); await Console.Out.WriteLineAsync($"PayOrderEvent After:{DateTime.Now}");
}
}

配置依赖注入流程

MassTransit的通讯选择MassTransit.RabbitMQ,这个库,也支持很多MQ,个人图方便就选的rabbitmq,后期要更换,修改一下依赖注入的配置即可

            services.AddScoped<TransactionConsumer>();

            services.AddMassTransit(c =>
{
c.AddConsumer<TransactionConsumer>();
c.AddBus(serviceProvider =>
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst =>
{
hst.Username("guest");
hst.Password("guest");
}); cfg.ReceiveEndpoint("Transaction", config =>
{
config.ConfigureConsumer<TransactionConsumer>(serviceProvider);
});
});
});
});

这样就完成了引入Masstransit做数据通讯部分,Masstransit支持Rpc模式,也支持Publish/Subscribe,后续的文章会混搭Rpc模式和发布订阅的模式,主要根据业务场景的选择做调整


编写演示例子

在Configure里面配置初始化数据

            using (var serviceScoped = app.ApplicationServices.CreateScope())
{
var serviceProvider = serviceScoped.ServiceProvider;
var context = serviceProvider.GetRequiredService<TransactionContexts>(); context.UserInfos.Add(new UserInfo
{
Id = ,
NickName = "Form",
CreateTime = DateTime.Now,
LastOptions = DateTime.Now,
Money =
});
context.UserInfos.Add(new UserInfo
{
Id = ,
NickName = "To",
CreateTime = DateTime.Now,
LastOptions = DateTime.Now,
Money =
}); context.SaveChanges();
}
#endregion

给Configure方法增加一个注入的接口

public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{
//。。。
var busControl = app.ApplicationServices.GetRequiredService<IBusControl>(); lifetime.ApplicationStarted.Register(busControl.Start);
lifetime.ApplicationStopped.Register(busControl.Stop);
}

依旧是省事儿的Configure方法里面写的一个管道

            app.Run(async (context) =>
{
var serviceProvider = context.RequestServices; var bus = serviceProvider.GetRequiredService<IBusControl>(); await bus.Publish(new PayOrderCommand
{
SourceId = 1,
TargetId = 2,
Money = 2000
}); await context.Response.WriteAsync("Hello World!");
});

后话

写在默认的index管道,会触发一个有意思的BUG,会两次执行,但是主键是唯一的,这样重复执行会抛出异常,提示主键已存在

这个例子是先铺垫一下,很多有意思的实现还没开展

1、某个业务需要执行过程和下游强一致性,要么一起完成,要么当场失败

2、某个业务完成最终一致性,这个过程,如果失败需要重试/限流/熔断

3、某个业务完成最终一致性,失败了则触发补偿

先铺垫一下,后空余时间逐一编写示例演示

现在的代码一点都不优雅,还有很强的面向于具体实现的,后续会逐步高度抽象化,从相对麻烦的代码,变成纯crud拿来主义,业务层 和后续代码的流程无感

打个小广告

如果有技术交流可以加NCC的群 24791014、436035237,我在群里,有任何关于asp.net core/Masstransit的问题或者建议都可以与我交流,非常欢迎

示例代码:

https://github.com/htrlq/Crud.Sample

最新文章

  1. hadoop-2.7.1基于QMJ高可用安装配置
  2. ios -- cell的图片下载
  3. Mysql笔记【3】-SQL约束
  4. mysql修改字符集 转载
  5. C#构造函数里的base和this的区别
  6. 解决asp.net中“从客户端中检测到有潜在危险的Request.Form值”的错误
  7. mybatis逆向工程使用步骤详解
  8. python基础——抽象类
  9. Error creating bean
  10. springBoot 随笔(一)
  11. 将应用代码由eclipse导入Android studio的方法NDK-Build和Cmake两种方法(以android_serialport_api为例)
  12. python基础学习笔记(十三)
  13. Hibernate的CRUD以及junit测试
  14. OpenCV 学习笔记 04 深度估计与分割——GrabCut算法与分水岭算法
  15. Mybatis:通过MapperScannerConfigurer进行mapper扫描
  16. redis.conf配置详解(转)
  17. B1015 德才论 (25 分)
  18. nginx 安装手记
  19. iOS的AssetsLibrary框架访问所有相片
  20. &lt;转&gt;MYSQL数据库数据拆分之分库分表总结

热门文章

  1. python中pymysql executemany 批量插入数据
  2. Pop!_OS安装与配置(二):基础配置
  3. SSTI(模板注入)
  4. Scala 面向对象(二):package 包 (一) 入门
  5. scrapy 基础组件专题(一):scrapy框架中各组件的工作流程
  6. 如何写一个自己的HashMap
  7. toad for oracle 小技巧
  8. 【python大牛分享】python——接口自动化测试框架环境的使用
  9. Ethical Hacking - NETWORK PENETRATION TESTING(3)
  10. Oracle DataGuard主备切换(switchover)