在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更详细的说明一下

一、Confirm机制

  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证

消息可靠性投递的核心保障

Producer代码如下,只需要修改Producer端,而Consumer端不需要修改

//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect(); //5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
} @Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});

结果:

-------ack!-----------

只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限

二、Return消息机制

Return Listener用于处理一些不可路由的消息

Producer:

public class Producer {

    public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("139.196.75.238");
factory.setPort(5672); //2 获取Connection
Connection connection = factory.newConnection(); //3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel(); String exchangeName = "exchange_topic";
String routingKey = "fdasfdsafsadf4543453"; //6 添加一个return监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); }
}

Producer Return

结果:

---------handle  return----------
replyCode: 312
replyText: NO_ROUTE
exchange: exchange_topic
routingKey: fdasfdsafsadf4543453
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Send confirm message!

注意:

  channel.basicPublish参数里面一定要把Mandatory设置为true,才能收到监听不可达的消息(创建exchange、routingKey不匹配等问题

,导致不可达),然后进行后续处理,如果为false,broker自动删除该消息,上面例子就是routingKey设置不匹配,Consumer的代码就不给了

三、消息端限流

限流一般无法从生产端,只能在消费端处理

在Consumer端设置:

channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));

qos:

  服务质量保证,在非自动确认情况下,一定数目的消息没有确认,不进行消费新的消息,通过producer/consumer设置qos的值

channel.basicQos(prefetchSize, prefetch_count, global);

注意:

  prefetchSize和global,rabbitMQ没有实现,默认0表示对单条message的大小没有限制、false(非channel级别,consumer级别)

  channel.basicConsume中自动签收一定要设置成false

  prefetch_count表示一次给几条进行消费,直到返回ack,才能继续给prefetch_count条message

在MyConsumer中手动签收

public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
} @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}

四、TTL

五、死信队列

未完待续。。。

最新文章

  1. Coreseek 安装指南
  2. c#大数加法
  3. Unity 脚本生命周期流程图
  4. NET RichTextBox控件如何可以插入图像
  5. php读取sql2000的image字段,被截断的问题
  6. 云计算和大数据时代网络技术揭秘(十七)VOQ机制
  7. 邻接矩阵c源码(构造邻接矩阵,深度优先遍历,广度优先遍历,最小生成树prim,kruskal算法)
  8. JavaScript基础14——js的Math对象
  9. URAL 1069 Prufer Code(模拟)
  10. ASP.NET动态加载Js代码到Head标签中(三种方法)
  11. sikuli
  12. 谈使用Eclipse与DDMS调试Android程序的方法
  13. No DEFAULT or UI configuration directive found!
  14. Delphi代码中嵌入ASM代码
  15. hbase 0.98.1集群安装
  16. [论文阅读] ImageNet Classification with Deep Convolutional Neural Networks(传说中的AlexNet)
  17. Numpy 基础
  18. CF1131F Asya And Kittens(Kruskal重构树,启发式合并)
  19. 自己实现字符串转整数(不使用JDK的字符串转整数的方法)
  20. andorid 表格布局

热门文章

  1. 网站访问分析对SEO的好处
  2. 嵌入式开发之函数解析---ip地址2进制转字符inet_ntoa 调用中只生效一次
  3. Codeforces 690 C3. Brain Network (hard) LCA
  4. Window 无法启动此程序,因为计算机中丢失api-ms-win-crt-runtime-l1-1-0.dll。尝试重新安装该程序以解决此问题。
  5. LED全彩显示屏色度空间
  6. bzoj4873
  7. 什么是需求Bug、设计Bug、功能bug?
  8. CodeForces 515C Drazil and Factorial (水题)
  9. bzoj 1648: [Usaco2006 Dec]Cow Picnic 奶牛野餐【dfs】
  10. Left Join 时筛选条件在on后与where后的区别