消息确认机制---confirm异步
2024-10-18 05:40:05
一:介绍
1.异步模式介绍
Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
二:程序
1.生产者
package com.mq.AsynConfirm; import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection; import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet; public class Send {
private static final String QUEUE_NAME="test_queue_confirm_asyn";
public static void main(String[] args)throws Exception{
Connection connection= ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者调用confirmSelect将channel设置为nconfirm模式
channel.confirmSelect();
final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
//没有问题
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("handleAck multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleAck false");
confirmSet.remove(deliveryTag);
}
}
//有问题
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("handleNack multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleNack false");
confirmSet.remove(deliveryTag);
}
}
});
String msg="success";
while (true){
long seqNo=channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(seqNo);
} }
}
2.消费者
package com.mq.AsynConfirm; import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Receive {
private static final String QUEUE_NAME="test_queue_confirm_asyn";
public static void main(String[] args)throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
}
});
}
}
3.现象
Send:
最新文章
- 教你看懂 OpenStack 日志 - 每天5分钟玩转 OpenStack(29)
- dcraw源码解析
- jvm、jre、jdk
- 【转】浅谈 C++ 中的 new/delete 和 new[]/delete[]
- [转]Java多线程干货系列—(一)Java多线程基础
- BZOJ 1036:树的统计Count(树链剖分)
- POJ2418Hardwood Species
- JSP九个隐式对象及作用域
- Asp.Net WebAPI Get提交、Post提交处理
- C# 中文转拼音类
- Charles_N:HTTP请求响应监听工具
- 【C语言探索之旅】 第一部分第五课:运算那点事
- [转]SQL Server为啥使用了这么多内存?
- 矩阵的奇异值分解(SVD)(理论)
- 【处理多服务器日志合并处理问题】多服务器的日志合并统计——apache日志的cronolog轮循
- 第三篇:Python字符编码
- 关于并发下内存及CPU使用情况的思考
- python 科学计算及数据可视化
- IDEA或Webstorm设置Ctrl+滚轮调整字体大小
- ArcGIS10.6的新功能