导读

  最新在接触ActiveMQ,里面有个持久订阅者模块,功能是怎么样也演示不出来效果。配置参数比较简单(配置没几个参数),消费者第一次运行时,需要指定ClientID(此时Broker已经记录离线订阅者信息),在启动提供者,此时消息队列存在一条记录,然后在启动消费者,但是怎么样也获取不到消息,阿西吧~~~什么鬼,百度上一大堆,都是这样步骤,消费者端,指定以下ClientID就好了,可,想要的效果死活不出来。。。。。。

采坑之路

废话不多说,先上代码,后面再分析

消费者端代码

    public void testTopicConsumer2() throws Exception {
//第一步:创建ConnectionFactory
String brokerURL = "tcp://192.168.31.215:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
//第二步:通过工厂,创建Connection
Connection connection = connectionFactory.createConnection();
//设置持久订阅的客户端ID
String clientId = "10086";
connection.setClientID(clientId);
//第三步:打开链接
connection.start();
//第四步:通过Connection创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:通过session创建Consumer
Topic topic = session.createTopic("cyb-topic"); //创建持久订阅的消费者客户端
//第一个参数是指定Topic
//第二个参数是自定义的ClientId
MessageConsumer consumer = session.createDurableSubscriber(topic, clientId);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//第七步:处理信息
if (message instanceof TextMessage){
TextMessage tm=(TextMessage)message;
try{
System.out.println(tm.getText());
}
catch (Exception e){
e.printStackTrace();
}
}
}
});
//session.commit();
//第八步:关闭资源
consumer.close();
session.close();
connection.close(); }

  只需要制定ClientID和创建持久客户端即可

提供者端代码

   public void testTopicProducer() throws Exception {
Connection connection = null;
MessageProducer producer = null;
Session session = null;
try {
//第一步:创建ConnectionFactory,用于连接broker
String brokerURL = "tcp://192.168.31.215:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
//设置
//((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
//第二步:通过工厂,创建Connection
connection = connectionFactory.createConnection();
//第三步:连接启动
connection.start();
//第四步:通过连接获取session会话
//第一个参数:是否启用ActiveMQ事务,如果为true,第二个参数无用
//第二个参数:应答模式,AUTO_ACKNOWLEDGE为自动应答
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:通过session创建destination,两种目的地:Queue、Topic
//参数:消息队列的名称,在后台管理系统中可以看到
Topic topic = session.createTopic("cyb-topic");
//第六步:通过session创建MessageProducer
producer = session.createProducer(topic);
//producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//第七步:创建Message
//方式一
//TextMessage message=new ActiveMQTextMessage();
//message.setText("queue test");
//方式二
TextMessage message1 = session.createTextMessage("topic->博客园地址:https://www.cnblogs.com/chenyanbin/");
//第八步:通过producer发送消息
producer.send(message1);
//session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
//第九步:关闭资源
producer.close();
session.close();
connection.close();
}
}

验证离线订阅者功能

失败的验证

正确的验证方式

  首先明确一点,上面的代码是没有一点问题的。为了节省时间,验证步骤和上面的差不多,不启动前两步了,直接启动第三步,也就是:

  1. 先启动消费者(记录持久订阅者ClientID);
  2. 在启动提供者;
  3. 启动消费者(在下面加个死循环)

问题剖析

  第一次运行消费者时,此时Broker已经记录订阅者ClientID,然后程序一闪而过,进入到蓝色框中的,离线订阅者中,然后在执行提供者,此时,Topic中,已经入队一次,再次运行消费者时,运行是异步获取的,运行一闪而过(鄙人猜测,可能是ActiveMQ机制问题,内部逻辑大概是,先遍历非持久订阅者,然后在查看持久订阅者,问题出在,程序执行太快,还没到查看持久订阅者时,程序就执行完了,所以第二次执行消费者时,加了个死循环,不停监听队列消息,具体ActiveMQ底层代码没看过,有兴趣的可以研究下,底层代码找到相应位置后,记得告诉我哦~~~)

  这个小问题,捣鼓一下午,百度上也说,就这2步骤配置即可,运行结果与初衷相违背,大半夜的都打算洗洗睡了,头脑风暴想出来这个方法,在下面写个死循环,不停监听队列消息,这才有了这篇博客,好啦...好啦,时间不早了,马上都快凌晨1点钟了,明个还得上班,洗洗睡了zZZZZZZZZZ

最新文章

  1. 1900. Brainwashing Device
  2. 前N个自然数的随机置换
  3. svn cleanup failed–previous operation has not finished 解决方法
  4. SQL中 and or优先级问题(转)
  5. Android 事件统计
  6. 通过网络路径获取的图片 btye 图片流互转
  7. React脚手架create-react-app
  8. Android中碎片的添加问题
  9. idea中使用github
  10. ZOJ1994有源汇上下界可行流
  11. Java中Sax解析XML
  12. Linux 磁盘分区,文件系统创建、挂载、开机自动挂载和卸载
  13. CSS中正确理解浮动以及clear:both的关系
  14. js Array​.prototype​.reduce()
  15. PAT 乙级 1083 是否存在相等的差(20 分)
  16. kvm配置USB直通
  17. vm 克隆一台新机器启动网卡报错:device eth0 does not seem to be present, delaying initialization
  18. logstash高速入口
  19. Vue2学习笔记:事件对象、事件冒泡、默认行为
  20. FastJson和Gson和Json数据解析分析和用法

热门文章

  1. SpringBoot系列(八)分分钟学会Springboot多种解决跨域方式
  2. C - Roads in the North DFS+树的直径
  3. springboot集成JdbcTemplate+druid
  4. 一口气带你踩完五个 List 的大坑,真的是处处坑啊!
  5. react-devtools安装调试
  6. 负载均衡服务之HAProxy基础配置(一)
  7. pytorch seq2seq模型训练测试
  8. 进阶 Linux基本命令-2
  9. thinkphp--多表查询
  10. Task Scheduler Error Message: 80041318