系统重构解耦的过程涉及不同领域服务分拆,或同一服务下实时响应部分和非响应部分分拆,分解后的各部分通过异步消息的流转传递,完成整体的业务逻辑,但是频繁的在业务层面直接调用不同消息队列的SDK,个人感觉不够简洁,最近开源一个中间件OSS.Dataflow,希望能帮到看到的同学。

  OSS.Dataflow主要实现异步消息传递的过程抽象,在业务层面提供消息发布订阅的统一抽象接口,在业务逻辑分支之间,以简单的调用完成消息的传递,和具体的消息存储触发实现无关。同时,在底层的存储和触发层面提取接口,能够在系统的全局适配具体的消息基础设施。(在这些接口之上,还实现了事件处理器,通过消息的重复投放,实现事件执行的容错补充机制,这个后边文章再介绍,源代码单元测试有示例。)

一. 消息业务侧使用

  OSS.Dataflow 的代码可以通过GiteeGitHub获取,使用时可以通过Nuget直接安装,也可以通过命令行:Install-Package OSS.DataFlow

  组件的使用非常简单,只需要关注:

  1. 消息发布者接口,由组件注册时返回,供业务方法调用传入消息体。
  2. 消息订阅(消费)者接口实现或委托方法,在组件注册时传入。

  具体示例:

  1. 消息的发布订阅独立调用示例
       // 全局初始化,注入订阅者实现
const string msgPSKey = "Publisher-Subscriber-MsgKey";
DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) =>
{
// 当前通过注入消费的委托方法,也可通过接口实现
// DoSomething(data);
return true;
}); // 获取发布者接口
private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); // 业务方法中发布消息
await publisher.Publish(msgPSKey,new MsgData() {name = "test"});

  2. 消息的流式调用示例

    // 直接注册消费实现并获取消息发布接口
private static readonly IDataPublisher _delegateFlowpusher =
DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) =>
{
// 当前通过注入消费的委托方法,也可通过接口实现
// DoSomething(data);
return true;
}); // 业务方法中发布消息
await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});

  如上,只需要获取发布者,并注入消费实现,即可完成整个消息的异步消费处理,同一个消息key可以注册多个消费实现,当有消息进入消费时,会并发处理。

二. 消息底层存储适配扩展

  前边介绍了业务接口的使用,和具体消息队列或数据库等隔离,这是对接业务层面的使用。因为业务场景不同,不同的项目对消息的响应速度和处理机制又各有需求,所以 OSS.DataFlow 同样提供了对接消息产品的扩展接口,方便使用者适配已有消息基础设施。

1. 消息存储适配接口

  对于事件消息处理,需要关注两件事情:接收存储 和 消费触发。在类库中提供了 DataFlowManager 消息流管理类,用户可以通过实现IDataPublisherProvider接口,完成具体的存储实现。

  同时在不同的消息产品触发消费时(比如数据库定时任务或者RabbitMQ消费), 调用通知方法(NotifySubscriber ),来触发通过类库注册的具体的业务订阅处理。

    // 消息流核心部件管理者
public static class DataFlowManager
{
/// <summary>
/// 自定义 数据流发布(存储)实现的 提供者
/// </summary>
public static IDataPublisherProvider PublisherProvider { get; set; } /// <summary>
/// 通过自定义消息触发机制通知订阅者
/// 调用时请做异常拦截,防止脏数据导致 msgData 类型错误
/// </summary>
/// <param name="msgDataKey"></param>
/// <param name="msgData">消息内容,自定义触发时,请注意和注册订阅者的消费数据类型转换安全</param>
/// <returns></returns>
public static Task<bool> NotifySubscriber(string msgDataKey, object msgData)
{
....
}
}

  关于 IDataPublisherProvider

   public interface IDataPublisherProvider
{
/// <summary>
/// 数据发布者
/// </summary>
/// <param name="option"></param>
/// <returns> 返回消息发布接口实现 </returns>
IDataPublisher CreatePublisher(DataPublisherOption option);
}   /// <summary>
/// 数据的发布者
/// </summary>
public interface IDataPublisher
{
/// <summary>
/// 推进数据(存储具体消息队列或者数据库实现)
/// </summary>
/// <param name="dataKey"></param>
/// <param name="data"></param>
/// <returns>是否推入成功</returns>
Task<bool> Publish<TData>(string dataKey,TData data);
}

  可以看到 IDataPublisher 接口负责具体的存储实现,可以根据 DataPublisherOption 的 source_name 业务属性实现对不同业务需求返回不同的具体实现。

2. 默认实现介绍

  借助.Net 自身的内存消息队列,在类库中提供了默认的内部消息存储转发实现(内存级别),使用者可以自行实现扩展相关接口并进行全局配置。

  内置的.Net Core消息队列, 设置了默认1个队列,最大并发为32线程。 如果需要可以通过设置DataPublisherOption的source_name,类库将会为每个source_name 创建独立的内存队列。

如果你已经看到这里,并且感觉还行的话可以在下方点个赞,或者也可以关注我的公总号(见二维码)


最新文章

  1. jQuery 上传头像插件Jcrop的实例
  2. Java并发编程实现概览
  3. [iOS基础控件 - 4.2] APP列表 字典转模型Model
  4. Ubuntu使用apt-get安装本地deb包
  5. Java基础知识强化之IO流笔记60:打印流 之 改进复制文本文件的案例
  6. iOS 开发技巧
  7. Apache Thrift的简单使用
  8. Spring中的FactoryBean
  9. php运行
  10. MySQL 查询重复的数据,以及部分字段去重和完全去重
  11. 201521123095 《Java程序设计》第2周学习总结
  12. web缓存之--http缓存机制
  13. ubuntu 18.04启动samba图形管理界面
  14. 使用webpack将es6 es7转换成es2015
  15. OneZero产品视频
  16. Docker安装ActiveMQ
  17. 隐藏控件--HiddenField控件
  18. Numpy函数库基础
  19. 用原生JavaScript写AJAX
  20. 阿里云搭建hadoop集群服务器,内网、外网访问问题(详解。。。)

热门文章

  1. Oracle基本入门
  2. 自动化测试报告----allure2(一)
  3. ubuntu 安装 gightingale
  4. CF39C-Moon Craters【dp】
  5. 查看显卡报错:NVIDIA-SMI has failed because it couldn&#39;t communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.
  6. 精准容量、秒级弹性,压测工具 + SAE 方案如何完美突破传统大促难关?
  7. 题解 CF833D Red-Black Cobweb
  8. 告别Vuex,发挥compositionAPI的优势,打造Vue3专用的轻量级状态
  9. Flink Sql 之 Calcite Volcano优化器(源码解析)
  10. 【Java技术专题】「性能优化系列」针对Java对象压缩及序列化技术的探索之路