在应用一中,基本的消息队列使用已经完成了,在实际项目中,一定会出现各种各样的需求和问题,RabbitMQ内置的很多强大机制和功能会帮助我们解决很多的问题,下面就一个一个的一起学习一下。

消息响应机制

应用一的列子,在消费者从指定队列获取消息的时候,把通知参数no_ack给设成true了,这样就不需要给RabbitMQ服务发送已经处理完毕的通知,RabbitMQ把消息发出去后,就会直接删除掉,不去管消费者是否处理成功,这样在实际项目中存在很大的风险,出现代码的健壮性很差的错误。

所以,一定要把no_ack参数设成false,如下

channel.BasicConsume("NewQueue", false, customer);

在接受逻辑全部处理成功后加上一句代码,通知RabbitMQ,接到通知后才会删除

                    while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}

消息持久化

响应保证了消息不会被错误删除,假如RabbitMQ挂了,所有消息全部会丢掉,RabbitMQ一个广泛使用的机制就是可以持久化,做持久化要两步

1.队列持久化

                    // 队列是否持久化
bool durable = true;
// 持久的队列,没有排他性,不自动删除
channel.QueueDeclare("NewQueue", durable, false, false, null);

2.消息持久化,通过设置IBasicProperties.Persistent来做

                    // 消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.DeliveryMode = ; //消息是持久的,存在并不会受服务器重启影响

上面的持久化,大部分时候不会出现问题,但是假如在写入队列的时候RabbitMQ挂了,还是不会持久上,这种情况,我们就要用到我们代码的逻辑来强制进行持久化了。。。。

负载均衡分发消息

如果有两个接收端消费者同时订阅一个队列,会出现不固定的分发流程,某个消费者可能会出现过多的消息流入造成压力,而另一个空闲的蛋疼。所以,如果能公平的接收消息,处理完一个,接受另一个,同时保证压力的均衡。

代码在消费者端设置:

// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);

上面是几个RabbitMQ比较重要的机制,下面开始是RabbitMQ的核心牛逼的东西路由

这里涉及2个概念:

1.exchange,这是交换机,也叫路由器,在消息生产者发送消息的时候,实际上不是直接发送到queue队列中,因为他不知道发送到哪个队列,他会先发送到路由器中exchange里,exchange再通过路由匹配把消息发送到匹配的队列当中。

2.routingKey这个是路由的匹配规则,当消息发送到exchange里后,会根据routingkey来匹配到底发送到哪个队列,如果没匹配到,则消息丢失

 有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

·        Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

·        Fanout exchange: 会向响应的queue广播,无需匹配routingkey消息会发送到所有队列。

·        Topic exchange: 这个是贪婪匹配,也是最灵活的匹配方式,对key进行模式匹配,有两种符号#,*,

            #符号的意思是比如a_#,可以匹配的队列可以是a_a,a_aa,a_aaaaaa,a_a_b.......多词

             *符号的意识是比如a_*,可以匹配的队列可以是a_a,a_b,a_c.......单词

这个是应用一中发送消息给队列的代码

channel.BasicPublish("", "firstQueue", null, body);

通过查看这个方法的参数中可看到第一个参数是exchange路由,第二个是routingkey匹配规则,而发送的代码第一个参数是"",第二个参数是firstQueue,开始以为是队列实际并不是,原因是如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。我们在创建一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去,因为在第一个参数选择了默认的exchange,而我们申明的队列叫firstQueue,所以默认的,它在新建一个也叫firstQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写firstQueue,这样它就会找到定义的同名的queue,并把消息放进去。

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

基本概念已经差不多了,我是很擅长排版解释,往下是各种匹配规则的代码和运行情况,直接上代码:

1、路由类型direct,匹配规则rroutingKey相同,一个生产者,两个消费者,采用负载均衡方式分发

消息生产者(producers)

       // 1、创建链接工厂,设置目标、用户、密码
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "yekai",
Password = "yekaigogo",
AutomaticRecoveryEnabled = true, // 自动重连
RequestedHeartbeat = UInt16.MaxValue // 心跳超时时间:表示ushort的最大值,65535
}; // 2、开启当前服务设置的用户的链接
using (var connection = factory.CreateConnection())
{
// 开启一个频道
using (var channel = connection.CreateModel())
{ // 创建一个队列
// 队列是否持久化
bool durable = true; // 已存在的队列,不能再定义持久化(必须用firstQuere的话,需要关闭并重启RabbitMQ服务:rabbitmqctl stop_app、rabbitmqctl reset)
//channel.QueueDeclare("firstQueue", false, false, false, null); // 创建一个新的,持久的交换区
channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null);
// 持久的队列,没有排他性,不自动删除
channel.QueueDeclare("NewQueue", durable, false, false, null);
// 绑定队列到交换区
channel.QueueBind("NewQueue", "NewExchange", "NewRoutingKey");
// 消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.DeliveryMode = ; //消息是持久的,存在并不会受服务器重启影响 byte[] body = null; // 消息主体
// 消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化然后转化为二进制数组
for (int i = ; i < ; i++)
{
body = Encoding.UTF8.GetBytes(string.Format("这是第------{0}------条消息", i));
// 发布消息
//channel.BasicPublish("", "firstQueue", null, body);
channel.BasicPublish("NewExchange", "NewRoutingKey", properties, body); Console.WriteLine("成功发送第-----" + i + "-----条消息!");
}
Console.ReadKey();
}
}

消息消费者A(consumer)

            // 1、创建链接工厂,设置目标、用户、密码
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.UserName = "yekai";
factory.Password = "yekaigogo"; // 2、开启当前服务设置的用户的链接
using (var connection = factory.CreateConnection())
{
// 链接开启一个频道
using (var channel = connection.CreateModel())
{
// 2.1、创建一个新的,持久的交换区
channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null);
// 2.2、连接到指定队列
// 参数说明:队列名,是否持久化,独占的queue,不使用时是否自动删除,其他参数
channel.QueueDeclare("NewQueue", true, false, false, null);
// 2.3、将队列绑定到交换区
channel.QueueBind("NewQueue", "NewExchange", "NewRoutingKey"); // 定义消息接受者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
//channel.BasicConsume("firstQueue", true, customer);
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue", false, customer);
// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}
}
}

消息消费者B(consumer)

            // 1、创建链接工厂,设置目标、用户、密码
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.UserName = "yekai";
factory.Password = "yekaigogo"; // 2、开启当前服务设置的用户的链接
using (var connection = factory.CreateConnection())
{
// 链接开启一个频道
using (var channel = connection.CreateModel())
{
// 2.1、创建一个新的,持久的交换区
// channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null); // 2.2、连接指定到同一个NewQueue队列
channel.QueueDeclare("NewQueue", true, false, false, null);
// 2.3、将队列绑定到交换区
//channel.QueueBind("NewQueue", "NewExchange", "NewRoutingKey"); // 定义消息接收者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue", false, customer);
// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}
}
}

先启动消息接收者服务端A和B,再运行发送端,运行结果:

 2、路由类型direct,1个生产者,2个消费者,匹配规则routingKey相同,匹配不同的队列,一次发送到2个队列各个消费者取出各自的队列消息

消息生产者(producers),创建一个交换区,创建一个队列

            // 1、创建链接工厂,设置目标、用户、密码
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "yekai",
Password = "yekaigogo",
AutomaticRecoveryEnabled = true, // 自动重连
RequestedHeartbeat = UInt16.MaxValue // 心跳超时时间:表示ushort的最大值,65535
}; // 2、开启当前服务设置的用户的链接
using (var connection = factory.CreateConnection())
{
// 开启一个频道
using (var channel = connection.CreateModel())
{ // 创建一个队列
// 队列是否持久化
bool durable = true;
// 创建一个全新的,持久的交换区
channel.ExchangeDeclare("BrandNewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null);
// 持久的队列,没有排他性,不自动删除
channel.QueueDeclare("NewQueue_a", durable, false, false, null);
// 绑定队列到交换区
channel.QueueBind("NewQueue_a", "BrandNewExchange", "BrandNewRoutingKey");
// 消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.DeliveryMode = ; //消息是持久的,存在并不会受服务器重启影响 byte[] body = null; // 消息主体
// 消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化然后转化为二进制数组
for (int i = ; i < ; i++)
{
body = Encoding.UTF8.GetBytes(string.Format("这是第------{0}------条消息", i));
// 发布消息
//channel.BasicPublish("", "firstQueue", null, body);
channel.BasicPublish("BrandNewExchange", "BrandNewRoutingKey", properties, body); Console.WriteLine("成功发送第-----" + i + "-----条消息!");
}
Console.ReadKey();
}
}

消息消费者A(consumer),创建一个新队列,绑定到和生产者同一个交换区,读取刚刚创建的新队列数据

                // 链接开启一个频道
using (var channel = connection.CreateModel())
{
// 2.1、创建一个新的,持久的交换区
//channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null);
// 2.2、连接到指定队列
// 参数说明:队列名,是否持久化,独占的queue,不使用时是否自动删除,其他参数
channel.QueueDeclare("NewQueue_a_b", true, false, false, null);
// 2.3、将队列绑定到交换区
channel.QueueBind("NewQueue_a_b", "BrandNewExchange", "BrandNewRoutingKey"); // 定义消息接受者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue_a_b", false, customer);
// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}
}
}

消息消费者B(consumer),直接读取生产者创建的queue_a队列消息

            // 1、创建链接工厂,设置目标、用户、密码
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.UserName = "yekai";
factory.Password = "yekaigogo"; // 2、开启当前服务设置的用户的链接
using (var connection = factory.CreateConnection())
{
// 链接开启一个频道
using (var channel = connection.CreateModel())
{
// 2.1、创建一个新的,持久的交换区
// channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null); // 2.2、连接指定到同一个NewQueue队列
//channel.QueueDeclare("NewQueue", true, false, false, null);
// 2.3、将队列绑定到交换区
//channel.QueueBind("NewQueue", "NewExchange", "NewRoutingKey"); // 定义消息接收者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue_a", false, customer);
// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}
}
}

好,接下来让我们看看运行效果

消息生产者(producers)向RabbitMQ Server发送Message后,查看NewQueue_a和NewQueue_a_b消息数分别为100

可以看见,通过路由匹配,一次发送消息,发送到匹配到的两个队列中,两个消费者各自读取各自的队列。

3、篇幅有限,再来一个路由类型为Topic的代码例子

消息生产者(producers)

由于已经创建了一个queueexChange类型为direct的交换区,不能更改类型,所以重新创建一个交换区

                    // 创建一个队列
// 队列是否持久化
bool durable = true;
// 创建一个全新的,持久的交换区
channel.ExchangeDeclare("QueueTopicExchange", RabbitMQ.Client.ExchangeType.Topic, true, false, null);
// 持久的队列,没有排他性,不自动删除
channel.QueueDeclare("NewQueue_Topic", durable, false, false, null);
// 绑定队列到交换区
channel.QueueBind("NewQueue_Topic", "QueueTopicExchange", "TopicRoutingKey");
// 消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.DeliveryMode = ; //消息是持久的,存在并不会受服务器重启影响 byte[] body = null; // 消息主体
// 消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化然后转化为二进制数组
for (int i = ; i < ; i++)
{
body = Encoding.UTF8.GetBytes(string.Format("这是第------{0}------条消息", i));
// 发布消息
//channel.BasicPublish("", "firstQueue", null, body);
channel.BasicPublish("BrandNewExchange", "BrandNewRoutingKey", properties, body); Console.WriteLine("成功发送第-----" + i + "-----条消息!");
}

消息消费者A(consumer)

                    // 2.1、创建一个新的,持久的交换区
//channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null);
// 2.2、连接到指定队列
// 参数说明:队列名,是否持久化,独占的queue,不使用时是否自动删除,其他参数
channel.QueueDeclare("NewQueue_Topic_a", true, false, false, null);
// 2.3、将队列绑定到交换区
channel.QueueBind("NewQueue_Topic_a", "QueueTopicExchange", "TopicRoutingKey"); // 定义消息接受者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue_Topic_a", false, customer); // 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}

消息消费者B(consumer)

                    // 2.1、创建一个新的,持久的交换区
// channel.ExchangeDeclare("NewExchange", RabbitMQ.Client.ExchangeType.Direct, true, false, null); // 2.2、连接指定到同一个NewQueue队列
//channel.QueueDeclare("NewQueue", true, false, false, null);
// 2.3、将队列绑定到交换区
//channel.QueueBind("NewQueue", "NewExchange", "NewRoutingKey"); // 定义消息接收者
var customer = new RabbitMQ.Client.QueueingBasicConsumer(channel); // 从指定队列获取数据
// 中间这个参数实际必须打开,为false,意思是是否不通知rabbitm已经处理完毕,我们这里要设成false,要通知
channel.BasicConsume("NewQueue_Topic", false, customer);
// 由于队列分发不公平导致一个压力很大,一个很小,在这设置下,公平q分发,也就是一个消费者处理完通知队列后,才会继续分发一个
channel.BasicQos(, , false);
// 开始不断循环出队列的消息
while (true)
{ RabbitMQ.Client.Events.BasicDeliverEventArgs basicDeliverEventArgs = customer.Queue.Dequeue();
// 将消息二进制转回字符串
string msg = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
// 通知队列,已经处理完毕
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); Console.WriteLine(msg);
}

运行结果:

消息发送者成功把Message传输给了消息消费者

消息消费者A和消息消费者B也成功获取到Message

代码例子就不一一写出来了,还有很多种情况,实际项目根据不同的需求灵活运用,有兴趣的可以自己搭配测试一下。

不要因为篇幅长就不看下去!篇幅长是因为讲的细,持之以恒,加油

最新文章

  1. Openlayers简介
  2. 代码生成AnimatorController
  3. 关于SqlServer的DBHelper类以及数据分页
  4. Struts2笔记——通配符和动态方法调用
  5. Tilera 服务器上OpenJDK的安装尝试
  6. Debian openvpn 配置
  7. 《Java并发编程实战》读书笔记(更新中)
  8. win8连接蓝牙听歌
  9. A.indexOf(array[i])表达的含义
  10. 11个实用但你可能不知道的Python程序库
  11. Leetcode: Spiral Matrix. Java
  12. JS 一条原型链扯到底
  13. mysql必知必会
  14. CentOS下使用命令行Web浏览器Links
  15. jenkins乱码解决问题
  16. Winform开发框架中工作流模块的动态处理
  17. PHP实现微信开发中提现功能(企业付款到用户零钱)
  18. php之常用扩展总结
  19. 【PAT】B1079 延迟的回文数(20 分)
  20. redis基本结构

热门文章

  1. 网关 apache APISIX
  2. darknet 的python接口使用
  3. nginx高级玩法之根据来源ip分流
  4. git---分支的合并
  5. Objective-C轻量级泛型
  6. linux系统执行.exe文件
  7. zookeeper在windows的常用命令
  8. mobile crane 1
  9. Python利用ctypes实现按引用传参
  10. mysql登录指令