一、发布/订阅(Publish/Subscribe)模式

发布订阅是我们经常会用到的一种模式,生产者生产消息后,所有订阅者都可以收到。RabbitMQ的发布/订阅模型图如下:



1、该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列;

2、该模式必须声明交换机,并且设置模式: channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);  

fanout指分发模式(将每一条消息都发送到与交换机绑定的队)。

3、队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

二、发布消息

消息生产者向交换机(exchange)发送消息,代码如下:

   // 定义交换机名称
static string EXCHANGE_NAME = "ps_exchange_fanout";
public static void PublishMessage()
{
try
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定义exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
string msg = "hello ps!";
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(EXCHANGE_NAME, "", null, body);
Console.WriteLine("send msg:" + msg);
channel.Close();
conn.Close();
}
catch (Exception ex)
{
throw ex;
}
}

消息发送成功,截图如下:

三、订阅消息

在这里需要两个消费者,消息发送后,所有的订阅者都可以收到消息;

3.1 消费者1

和轮询分发以及公平分发不同的是,消费者需要将队列绑定到交换机,来订阅消息;实现代码如下:

        static string EXCHANGE_NAME = "ps_exchange_fanout";
static string QUEUE_NAME = "ps_queue_sub1";
/// <summary>
/// 订阅消费者1
/// </summary>
static void SubscribeConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定义exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
// 绑定queue
channel.QueueDeclare(queue: QUEUE_NAME);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义Consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"SubscribeConsumer1 收到消息: {message},时间:{DateTime.Now}");
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.WriteLine("Subscribe Consumer1 消费者已启动");
Console.ReadKey();
channel.Dispose();
conn.Close();
}

3.2 消费者2

消费者2的代码和1的基本相同,大家可以将1的修改一下,就可以使用,在此就不重复贴出了;

3.3 接收消息结果

消费者1接收消息截图:



消费者2接收消息截图:



通过上图,我们可以看到,发布者发布消息后,订阅者1、2均受到了相同的消息,至此功能已经完成;

四、小结

4.1 订阅者代码的主要流程

根据消费者的代码,我们可以提炼流程如下

(1)创建连接

(2)声明exchange

(3)绑定队列到exchange

(4)声明消费者

(5)绑定消费者到channel,监听处理消息

(6)关闭连接

4.2 订阅成功后,我们打开mq的管理地址可以看到,有两个queue绑定到exchange上了:



注意:如果我们的exchange没有消费者订阅,发布的消息将不会被保存到任何队列,直接丢失了;

参考链接:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

最新文章

  1. Caffe学习系列(9):solver优化方法
  2. 在 ASP.NET 中使用 jQuery.load() 方法
  3. Java中Properties类的操作配置文件
  4. Tomcat8 配置NIO
  5. iOS国际化支持
  6. python切片练习
  7. Linux(常用命令) 中常用的压缩丶解压缩格式命令和参数详解
  8. 【python学习笔记】2.列表和元组
  9. mysql export mysqldump version mismatch upgrade or downgrade your local MySQL client programs
  10. python 上台阶
  11. 接口约束的另一种方法:Class类的isAssignableFrom
  12. 椭圆曲线密码学ECC
  13. node.js中通过stream模块实现自定义流
  14. oracle完全删除实例
  15. CentOS 7下NFS Server作rootfs时的兼容性问题
  16. python中itertools里的product和permutation
  17. mysql监控工具sqlprofiler,类似sqlserver的profiler工具
  18. RabbitMQ消息队列的小伙伴: ProtoBuf(Google Protocol Buffer) [转]
  19. 20155230 2016-2017-2《Java程序设计》第一周学习总结
  20. 二、Web框架实现

热门文章

  1. [转载]MySQL中int(11)最大长度是多少?
  2. 被折磨致死的heroku——herku部署
  3. SpringBoot与单元测试JUnit的结合
  4. SpringCloud(五)学习笔记之Hystrix
  5. pytorch 文本情感分类和命名实体识别NER中LSTM输出的区别
  6. Python自然语言处理实战核心技术与算法,Python自然语言处理,PyTorch深度学习实战【下载】
  7. java中String StringBuilder StringBuffer比较和效率(性能)测试
  8. 关于php抑错方法
  9. 2019-2020-1 20199326《Linux内核原理与分析》第七周作业
  10. 通达OA-2017版本漏洞复现