RabbitMQ系列(五)--高级特性
在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更详细的说明一下
一、Confirm机制
Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证
消息可靠性投递的核心保障
Producer代码如下,只需要修改Producer端,而Consumer端不需要修改
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect(); //5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
} @Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
结果:
-------ack!-----------
只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限
二、Return消息机制
Return Listener用于处理一些不可路由的消息
Producer:
public class Producer { public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("139.196.75.238");
factory.setPort(5672); //2 获取Connection
Connection connection = factory.newConnection(); //3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel(); String exchangeName = "exchange_topic";
String routingKey = "fdasfdsafsadf4543453"; //6 添加一个return监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); }
}
Producer Return
结果:
---------handle return----------
replyCode: 312
replyText: NO_ROUTE
exchange: exchange_topic
routingKey: fdasfdsafsadf4543453
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Send confirm message!
注意:
channel.basicPublish参数里面一定要把Mandatory设置为true,才能收到监听不可达的消息(创建exchange、routingKey不匹配等问题
,导致不可达),然后进行后续处理,如果为false,broker自动删除该消息,上面例子就是routingKey设置不匹配,Consumer的代码就不给了
三、消息端限流
限流一般无法从生产端,只能在消费端处理
在Consumer端设置:
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
qos:
服务质量保证,在非自动确认情况下,一定数目的消息没有确认,不进行消费新的消息,通过producer/consumer设置qos的值
channel.basicQos(prefetchSize, prefetch_count, global);
注意:
prefetchSize和global,rabbitMQ没有实现,默认0表示对单条message的大小没有限制、false(非channel级别,consumer级别)
channel.basicConsume中自动签收一定要设置成false
prefetch_count表示一次给几条进行消费,直到返回ack,才能继续给prefetch_count条message
在MyConsumer中手动签收
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
} @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
四、TTL
五、死信队列
未完待续。。。
最新文章
- Coreseek 安装指南
- c#大数加法
- Unity 脚本生命周期流程图
- NET RichTextBox控件如何可以插入图像
- php读取sql2000的image字段,被截断的问题
- 云计算和大数据时代网络技术揭秘(十七)VOQ机制
- 邻接矩阵c源码(构造邻接矩阵,深度优先遍历,广度优先遍历,最小生成树prim,kruskal算法)
- JavaScript基础14——js的Math对象
- URAL 1069 Prufer Code(模拟)
- ASP.NET动态加载Js代码到Head标签中(三种方法)
- sikuli
- 谈使用Eclipse与DDMS调试Android程序的方法
- No DEFAULT or UI configuration directive found!
- Delphi代码中嵌入ASM代码
- hbase 0.98.1集群安装
- [论文阅读] ImageNet Classification with Deep Convolutional Neural Networks(传说中的AlexNet)
- Numpy 基础
- CF1131F Asya And Kittens(Kruskal重构树,启发式合并)
- 自己实现字符串转整数(不使用JDK的字符串转整数的方法)
- andorid 表格布局
热门文章
- 网站访问分析对SEO的好处
- 嵌入式开发之函数解析---ip地址2进制转字符inet_ntoa 调用中只生效一次
- Codeforces 690 C3. Brain Network (hard) LCA
- Window 无法启动此程序,因为计算机中丢失api-ms-win-crt-runtime-l1-1-0.dll。尝试重新安装该程序以解决此问题。
- LED全彩显示屏色度空间
- bzoj4873
- 什么是需求Bug、设计Bug、功能bug?
- CodeForces 515C Drazil and Factorial (水题)
- bzoj 1648: [Usaco2006 Dec]Cow Picnic 奶牛野餐【dfs】
- Left Join 时筛选条件在on后与where后的区别