TTL概念

TTL是Time To Live的缩写,也就是生存时间。
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。

TTL是Time To Live的缩写,也就是生存时间。RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。

RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.

RabbitMQ允许您为消息和队列设置TTL(生存时间)。 这可以使用可选的队列参数或策略来完成(建议使用后一个选项)。 可以对单个队列,一组队列强制执行消息TTL,也可以为单个消息应用消息TTL。

​ ——摘自 RabbitMQ 官方文档

1.消息的 TTL

我们在生产端发送消息的时候可以在 properties 中指定 expiration属性来对消息过期时间进行设置,单位为毫秒(ms)。

     /**
* deliverMode 设置为 2 的时候代表持久化消息
* expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除
* headers 自定义的一些属性
* */
//5. 发送
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("myhead1", "111");
headers.put("myhead2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());

我们也可以后台管理页面中进入 Exchange 发送消息指定expiration

2.队列的 TTL

队列属性:x-message-ttl
可以控制被publish到queue中的message 被丢弃前能够存活的时间

我们也可以在后台管理界面中新增一个 queue,创建时可以设置 ttl,对于队列中超过该时间的消息将会被移除。

//Java代码:message在该queue的存过时间最大为60秒
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

死信队列

死信队列:没有被及时消费的消息存放的队列

消息没有被及时消费的原因:

  • a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

  • b.TTL(time-to-live) 消息超时未消费

  • c.达到最大队列长度

1.DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
2.当这个队列中有死信时,RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列
3.可以监听这个队列的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

实现死信队列步骤

  • 首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

    Exchange: dlx.exchange
    Queue: dlx.queue
    RoutingKey: # 代表接收所有路由 key
  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )
  • 这样消息在过期、requeue失败、 队列在达到最大长度时,消息就可以直接路由到死信队列!

创建生产者

package com.dwz.rabbitmq.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.save"; for(int i = 0; i < 1; i++) {
String msg = "Hello rabbitmq dlc-"+i+" message!"; Map<String, Object> headers = new HashMap<>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("10000")
.headers(headers)
.build();
channel.basicPublish(exchangeName, routingkey, properties, msg.getBytes());
} channel.close();
connection.close();
}
}

创建消费者

package com.dwz.rabbitmq.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.#";
String queueName = "test_dlx_queue"; //这是一个普通的交换机、队列和路由
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingkey); channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#"); DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body); try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println("消费端:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}; //手工签收必须关闭autoAck设置为false
channel.basicConsume(queueName, false, consumer);
}
}

测试步骤

1.先启动consumer,在管控平台上看到test_dlx_exchange、test_dlx_queue和dlx.exchange、dlx.queue都创建完成然后停止consumer。

2.启动producer,10秒钟前发现消息在普通队列test_dlx_queue中,10秒钟后发现消息出现在死信队列dlx.queue中。

总结

DLX也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的DLX Exchange 上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理。

相关文章

RabbitMQ 消费端限流、TTL、死信队列

RabbitMQ Queue中Arguments属性参数过期队列,过期消息,超时队列的声明

最新文章

  1. RealTimePerformanceDemoView
  2. android studio怎么分享项目到Git@OSC托管
  3. iOS - Mac Apache WebServer 服务器配置
  4. [转]理解android.intent.category.LAUNCHER 具体作用
  5. iOS 关于僵尸对象和僵尸指针的那些事儿
  6. 用Jenkins配置自动化构建
  7. 第三方登录之qq登录(转载)
  8. linux下开发板网络速度测试记录
  9. NSSet和NSMutableSet 确保数据的唯一性--备
  10. java 操作POI参考文章
  11. AngularJS是为了克服HTML在构建应用上的不足而设计的
  12. 设计模式 - 观察者模式(Observer Pattern) 详细解释
  13. 学习笔记GAN001:生成式对抗网络,只需10步,从零开始到调试
  14. 了解Scala反射
  15. mysql 5.7~默认sql_mode解读
  16. Windows系统资源监控
  17. codeforces 1082G - Petya and Graph 最大权闭合子图 网络流
  18. 使用 notify.js 桌面提醒
  19. Oracle 实例名/服务名 请问SID和Service_Name有什么区别啊?
  20. HDU-1078.FatMouseandCheese(线性dp + dfs)

热门文章

  1. asp.net 13 缓存,Session存储
  2. X-Router软路由设置
  3. Unity鼠标移动到物体上显示信息
  4. JS基础_逻辑运算符
  5. JavaScript例子3-对多选框进行操作,输出选中的多选框的个数
  6. form表单中的enctype 属性以及post请求里Content-Type方式
  7. js之运算符(关系运算符)
  8. 制作linux云主机镜像
  9. 【异常】Phoenix异常:java.lang.ArithmeticException: Rounding necessary
  10. urllib.parse:很底层,但是是一个处理url路径的好模块