RabbitMQ的发布订阅模式(Publish/Subscribe)
一、发布/订阅(Publish/Subscribe)模式
发布订阅是我们经常会用到的一种模式,生产者生产消息后,所有订阅者都可以收到。RabbitMQ的发布/订阅模型图如下:
1、该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列;
2、该模式必须声明交换机,并且设置模式: channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
fanout指分发模式(将每一条消息都发送到与交换机绑定的队)。
3、队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
二、发布消息
消息生产者向交换机(exchange)发送消息,代码如下:
// 定义交换机名称
static string EXCHANGE_NAME = "ps_exchange_fanout";
public static void PublishMessage()
{
try
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定义exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
string msg = "hello ps!";
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(EXCHANGE_NAME, "", null, body);
Console.WriteLine("send msg:" + msg);
channel.Close();
conn.Close();
}
catch (Exception ex)
{
throw ex;
}
}
消息发送成功,截图如下:
三、订阅消息
在这里需要两个消费者,消息发送后,所有的订阅者都可以收到消息;
3.1 消费者1
和轮询分发以及公平分发不同的是,消费者需要将队列绑定到交换机,来订阅消息;实现代码如下:
static string EXCHANGE_NAME = "ps_exchange_fanout";
static string QUEUE_NAME = "ps_queue_sub1";
/// <summary>
/// 订阅消费者1
/// </summary>
static void SubscribeConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定义exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
// 绑定queue
channel.QueueDeclare(queue: QUEUE_NAME);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义Consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"SubscribeConsumer1 收到消息: {message},时间:{DateTime.Now}");
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.WriteLine("Subscribe Consumer1 消费者已启动");
Console.ReadKey();
channel.Dispose();
conn.Close();
}
3.2 消费者2
消费者2的代码和1的基本相同,大家可以将1的修改一下,就可以使用,在此就不重复贴出了;
3.3 接收消息结果
消费者1接收消息截图:
消费者2接收消息截图:
通过上图,我们可以看到,发布者发布消息后,订阅者1、2均受到了相同的消息,至此功能已经完成;
四、小结
4.1 订阅者代码的主要流程
根据消费者的代码,我们可以提炼流程如下
(1)创建连接
(2)声明exchange
(3)绑定队列到exchange
(4)声明消费者
(5)绑定消费者到channel,监听处理消息
(6)关闭连接
4.2 订阅成功后,我们打开mq的管理地址可以看到,有两个queue绑定到exchange上了:
注意:如果我们的exchange没有消费者订阅,发布的消息将不会被保存到任何队列,直接丢失了;
参考链接:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
最新文章
- Caffe学习系列(9):solver优化方法
- 在 ASP.NET 中使用 jQuery.load() 方法
- Java中Properties类的操作配置文件
- Tomcat8 配置NIO
- iOS国际化支持
- python切片练习
- Linux(常用命令) 中常用的压缩丶解压缩格式命令和参数详解
- 【python学习笔记】2.列表和元组
- mysql export mysqldump version mismatch upgrade or downgrade your local MySQL client programs
- python 上台阶
- 接口约束的另一种方法:Class类的isAssignableFrom
- 椭圆曲线密码学ECC
- node.js中通过stream模块实现自定义流
- oracle完全删除实例
- CentOS 7下NFS Server作rootfs时的兼容性问题
- python中itertools里的product和permutation
- mysql监控工具sqlprofiler,类似sqlserver的profiler工具
- RabbitMQ消息队列的小伙伴: ProtoBuf(Google Protocol Buffer) [转]
- 20155230 2016-2017-2《Java程序设计》第一周学习总结
- 二、Web框架实现
热门文章
- [转载]MySQL中int(11)最大长度是多少?
- 被折磨致死的heroku——herku部署
- SpringBoot与单元测试JUnit的结合
- SpringCloud(五)学习笔记之Hystrix
- pytorch 文本情感分类和命名实体识别NER中LSTM输出的区别
- Python自然语言处理实战核心技术与算法,Python自然语言处理,PyTorch深度学习实战【下载】
- java中String StringBuilder StringBuffer比较和效率(性能)测试
- 关于php抑错方法
- 2019-2020-1 20199326《Linux内核原理与分析》第七周作业
- 通达OA-2017版本漏洞复现