一:介绍

1.异步模式介绍

  Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

二:程序

1.生产者

 package com.mq.AsynConfirm;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection; import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet; public class Send {
private static final String QUEUE_NAME="test_queue_confirm_asyn";
public static void main(String[] args)throws Exception{
Connection connection= ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者调用confirmSelect将channel设置为nconfirm模式
channel.confirmSelect();
final SortedSet<Long> confirmSet= Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
//没有问题
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("handleAck multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleAck false");
confirmSet.remove(deliveryTag);
}
}
//有问题
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("handleNack multiple");
confirmSet.headSet(deliveryTag+1).clear();
}else{
System.out.println("handleNack false");
confirmSet.remove(deliveryTag);
}
}
});
String msg="success";
while (true){
long seqNo=channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(seqNo);
} }
}

2.消费者

 package com.mq.AsynConfirm;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Receive {
private static final String QUEUE_NAME="test_queue_confirm_asyn";
public static void main(String[] args)throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
}
});
}
}

3.现象

  Send:

  

最新文章

  1. 教你看懂 OpenStack 日志 - 每天5分钟玩转 OpenStack(29)
  2. dcraw源码解析
  3. jvm、jre、jdk
  4. 【转】浅谈 C++ 中的 new/delete 和 new[]/delete[]
  5. [转]Java多线程干货系列—(一)Java多线程基础
  6. BZOJ 1036:树的统计Count(树链剖分)
  7. POJ2418Hardwood Species
  8. JSP九个隐式对象及作用域
  9. Asp.Net WebAPI Get提交、Post提交处理
  10. C# 中文转拼音类
  11. Charles_N:HTTP请求响应监听工具
  12. 【C语言探索之旅】 第一部分第五课:运算那点事
  13. [转]SQL Server为啥使用了这么多内存?
  14. 矩阵的奇异值分解(SVD)(理论)
  15. 【处理多服务器日志合并处理问题】多服务器的日志合并统计——apache日志的cronolog轮循
  16. 第三篇:Python字符编码
  17. 关于并发下内存及CPU使用情况的思考
  18. python 科学计算及数据可视化
  19. IDEA或Webstorm设置Ctrl+滚轮调整字体大小
  20. ArcGIS10.6的新功能

热门文章

  1. 绘图QPainter-字体
  2. sonar扫描java、js、jsp技术
  3. linux space/mark设置
  4. 【BARTS计划】【Review_Week1】Google Docs 成为青少年们喜爱的聊天 app
  5. oracle怎么给表和列加注释
  6. js实现页面遮罩层,并且阻止页面body滚动
  7. pl sql 记住用户名密码
  8. unbuntu中如何像Windows一样顺畅的切换中英文输入法
  9. saltStack运维工具的部署及master迁移实现的过程详解
  10. 前端开发必须知道的JS(一) 原型和继承