【rabbitmq】单独配置某一个消费者手动ack,其他消费者自动ack
2024-10-20 16:05:56
前言:博主才疏学浅,此方案仅供参考,如有更优方案请大佬评论区告知,十分感谢✿✿ヽ(°▽°)ノ✿
问题背景:同一个服务中存在多个不同业务的rabbitmq的消费者,其中一个推送业务的消费者需要加死信队列作为推送失败补偿机制,并且需要根据推送成功与否来判断该消息是否进死信队列,这就需要手动ACK控制,但由于项目配置文件中配置了retry,所以默认是全局自动ack。如果只是在该推送消费者中写手动ack,其他消费者不做改动,会导致其他消费者因没有ack而消息堵塞。
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=2
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
处理方案:
不同消费者使用不同配置 SimpleRabbitListenerContainerFactory
@Configuration
public class ConsumerConfig {
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryManual(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}
/**
* 需要手动ack的队列消费
*/
@RabbitListener(containerFactory = "rabbitListenerContainerFactoryManual",queues ="send.subscribe",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
public void handleSendSubscribe(Channel channel, Message message, SendSubscribeMsg msg) throws IOException {
//推送
boolean sendResult = this.sendHandler.prepareSend(msg);
//根据推送结果判断是否进死信队列
if (!sendResult) {
log.warn("send to dead letter .");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}else{
log.info("success send");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
/**
*需要自动ack的队列消费
*/
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues ="sys.property.post",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
public void handleDevicePropsPost(DevicePropertiesPostMsg attr) {
//业务逻辑处理
......
}
最新文章
- docker 初探之简单安装 ----Windows10
- 非对称加密算法RSA
- summary of k Sum problem and solutions in leetcode
- android 利用View自身的setAnimation来实现动画
- POJ C++程序设计 编程作业—类和对象 编程题#1
- NSPoint
- Dom学习笔记-(一)
- Android应用开发性能优化完全分析
- 了解ANSI编码
- 【PAT L2-001】最短路计数
- (转)经典线程同步 互斥量Mutex
- AngularJS模块的详解
- 常用http响应报文分析
- MySQL的备份与还原以及常用数据库查看命令
- java设计模式自我总结---代理模式
- C# 导出Excel Table td 样式
- ceph-deploy部署过程
- vmware-vcsa6.5 基本管理
- weblogic基本目录介绍,位数查看,启动与发布项目,修改JVM参数,设置项目为默认项目
- 【湖南师范大学2018年大学生程序设计竞赛新生赛 L】【HDOJ2476】【区间DP】