介绍

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 WriterReader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列。

开始Channel之旅

创建一个 channel 非常简单,Channel 类公开的API支持创建无限容量和有限容量的 channel

 // 创建有限容量的channel
var channel = Channel.CreateBounded<string>(100);
 // 创建无限容量的channel
var channel = Channel.CreateUnbounded<string>();

这里需要注意的是,当你使用一个有限容量的 Channel 时,你需要指定容量的大小,还可以指定一个 BoundedChannelFullMode 的枚举类型,来告诉 channel 达到容量限制的时候,继续写入时应该怎么处理

public enum BoundedChannelFullMode
{
Wait,
DropNewest,
DropOldest,
DropWrite
}
  • Wait 是默认值,当 channel 容量满了以后,写入数据时会返回 false,直到channel有数据被消费了以后,才可以继续写入
  • DropNewest 移除最新的数据,也就是从队列尾部开始移除
  • DropOldest 移除最老的数据,也就是从队列头部开始移除
  • DropWrite 写入数据返回成功,但是转头就把刚才的数据丢了
// 创建有限容量的channel, 并指定容量达到最大的策略
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
});

生产数据

生产数据主要通过 Channel 提供的 Writer api, 常规的写入操作如下:

await channel.Writer.WriteAsync("hello");

Channel 还提供了 TryWrite() 方法,如果写入数据失败时会返回 false,WaitToWriteAsync() 方法会做非阻塞的等待,直到 Channel 允许写入新的数据时返回 true,同样的 Channel 关闭后会返回 false,翻了一下源码发现,WriteAsync() 方法内部其实是调用了 TryWrite()WaitToWriteAsync() 方法。

消费数据

消费数据主要通过 Channel 提供的 Reader api, 常规的读取操作如下:

var item = await channel.Reader.ReadAsync();

同样的,Channel 提供了 TryRead() 尝试读取数据,WaitToReadAsync() 方法会做非阻塞的等待,直到 Channel 可以读取到数据时会返回 true,在 Channel 关闭后会返回 false, ReadAsync() 的方法内部其实是调用了 TryRead()WaitToReadAsync() 方法, 另外你可以通过 channel.Reader.Count 获取队列元素的数量。

在实际的使用场景中,可能需要一些后台任务,长时间的进行消费,那么你可以使用下边的方式

while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
Console.WriteLine(item);
}
}

ReadAllAsync() 方法返回的是一个 IAsyncEnumerable<T> 对象,也可以用 await foreach 的方式来获取数据

await foreach(var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}

单一生产者和消费者

创建 Channel 时,可以设置 ChannelOptions 的 SingleWriterSingleReader,来指定 Channel 时单一的生产者和消费者,默认都是 false,当设置了 SingleWriter = true 时, 会限制同一个时间只能有一个生产者可以写入数据, SingleReader = true 是同样的。

另外,如果只需要一个消费者的话,你应该设置 SingleReader = true, Channel 在内部做了一些优化,在读取时避免了锁操作,性能上有些许的提升。

性能

这里的基准测试我对比了三种类型,Channel, BufferBlock, BlockingCollection,分别写入了10000条数据,然后进行读取,发现 Channel 确实是表现比较好。

总结

Channel 实际上还是使用 ConcurrentQueue做的封装, 使用起来更方便,对异步更友好,另外,.NET 5 其中的 Quic 内部就使用了Channel,CAP 也在新版本中使用 Channel 替换掉了之前的 BlockingCollection,来实现进程内的队列。

官方介绍

https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels

源码

https://github.com/dotnet/runtime/tree/main/src/libraries/System.Threading.Channels/src/System/Threading/Channels

CAP

https://github.com/dotnetcore/CAP

Quic

https://github.com/dotnet/runtime/tree/main/src/libraries/System.Net.Quic

最新文章

  1. 百度广告 高亮 Chrome插件(附源码)
  2. mybatis高级(3)_延迟加载_深度延迟_一级缓存_二级缓存
  3. 由select * from table where 1=1中的1=1说开数据库
  4. 给ListView视图添加行号
  5. FreeIconMaker - 在线创建免费和时尚的图标
  6. Crusher Django Tutorial(5) 使用内置管理员系统
  7. WinPcap编程入门实践
  8. 用QT创建WINDOWS服务程序
  9. ThoughtWorks Merchant&#39;s Guide To The Galaxy
  10. VR应用向导,全球Top10 VR应用排行榜
  11. 读书笔记 effective c++ Item 10 让赋值运算符返回指向*this的引用
  12. 网络编程之TCP编程
  13. JN_0001:在微信朋友圈分享时长大于10s的视频
  14. bgfx入门练习3——编译自定义Shader
  15. Linux文件系统命令 cat
  16. 关于CMS的那点事 I
  17. 【题解】 [NOI2009]变换序列 (二分图匹配)
  18. 百度搜索URL参数含义
  19. slider轮播插件的多种写法
  20. .Net公用代码

热门文章

  1. MySQL之数据查询语言(DQL)
  2. WEB安全防护相关响应头(上)
  3. 【原创】X86下ipipe接管中断/异常
  4. 人工智能训练云燧T10
  5. 白*衡(Color Constancy,无监督AWB):CVPR2019论文解析
  6. 目标检测中特征融合技术(YOLO v4)(下)
  7. Net Core 5.0 部署IIS错误-500.31-Failed to load ASP.NET Core runtime
  8. vue3函数setUp和reactive函数详细讲解
  9. Zab协议 (史上最全)
  10. Spring事务管理详解