前言:博主才疏学浅,此方案仅供参考,如有更优方案请大佬评论区告知,十分感谢✿✿ヽ(°▽°)ノ✿

问题背景:同一个服务中存在多个不同业务的rabbitmq的消费者,其中一个推送业务的消费者需要加死信队列作为推送失败补偿机制,并且需要根据推送成功与否来判断该消息是否进死信队列,这就需要手动ACK控制,但由于项目配置文件中配置了retry,所以默认是全局自动ack。如果只是在该推送消费者中写手动ack,其他消费者不做改动,会导致其他消费者因没有ack而消息堵塞。

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=2
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

处理方案:

不同消费者使用不同配置 SimpleRabbitListenerContainerFactory

@Configuration
public class ConsumerConfig {
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryManual(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}

/**
* 需要手动ack的队列消费
*/
@RabbitListener(containerFactory = "rabbitListenerContainerFactoryManual",queues ="send.subscribe",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
public void handleSendSubscribe(Channel channel, Message message, SendSubscribeMsg msg)  throws IOException {
  //推送
boolean sendResult = this.sendHandler.prepareSend(msg);
  //根据推送结果判断是否进死信队列
if (!sendResult) {
log.warn("send to dead letter .");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}else{
log.info("success send");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}
}

/**
*需要自动ack的队列消费
*/
  @RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues ="sys.property.post",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
  public void handleDevicePropsPost(DevicePropertiesPostMsg attr) {
//业务逻辑处理
    ......

}




最新文章

  1. docker 初探之简单安装 ----Windows10
  2. 非对称加密算法RSA
  3. summary of k Sum problem and solutions in leetcode
  4. android 利用View自身的setAnimation来实现动画
  5. POJ C++程序设计 编程作业—类和对象 编程题#1
  6. NSPoint
  7. Dom学习笔记-(一)
  8. Android应用开发性能优化完全分析
  9. 了解ANSI编码
  10. 【PAT L2-001】最短路计数
  11. (转)经典线程同步 互斥量Mutex
  12. AngularJS模块的详解
  13. 常用http响应报文分析
  14. MySQL的备份与还原以及常用数据库查看命令
  15. java设计模式自我总结---代理模式
  16. C# 导出Excel Table td 样式
  17. ceph-deploy部署过程
  18. vmware-vcsa6.5 基本管理
  19. weblogic基本目录介绍,位数查看,启动与发布项目,修改JVM参数,设置项目为默认项目
  20. 【湖南师范大学2018年大学生程序设计竞赛新生赛 L】【HDOJ2476】【区间DP】

热门文章

  1. 驱动开发:内核R3与R0内存映射拷贝
  2. Linux中CentOS 7版本安装JDK、Tomcat、MySQL、lezsz、maven软件详解
  3. 01-MySQL8主从详解
  4. 43.Permission源码解析和自定义权限类
  5. 文本挖掘与NLP笔记——代码向:分词
  6. 二、docker安装
  7. Django开发汇总
  8. 线上Electron应用具备哪些特征?
  9. 【题解】CF374C Inna and Dima
  10. 【ASP.NET Core】MVC控制器的各种自定义:应用程序约定的接口与模型