非持久化订阅持续到它们订阅对象的生命周期。这意味着,客户端只能在订阅者活动时看到相关主题发布的消息。如果订阅者不活动,它会错过相关主题的消息。如果花费较大的开销,订阅者可以被定义为durable(持久化的)。持久化的订阅者注册一个带有JMS保持的唯一标识的持久化订阅(subscription)。带有相同标识的后续订阅者会再续前一个订阅者的订阅状态。如果持久化订阅没有活动的订阅者,JMS会保持订阅消息,直到消息被订阅接收或者过期。

生产者:

package cn.slimsmart.activemq.demo.topic;  

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException {
// 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("slimsmart.topic.test");
MessageProducer producer = session.createProducer(topic);
// NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage();
message.setText("topic 消息。");
message.setStringProperty("property", "消息Property");
// 发布主题消息
producer.send(message);
System.out.println("Sent message: " + message.getText());
session.commit();
session.close();
connection.close();
}
}

消费者:

package cn.slimsmart.activemq.demo.topic;  

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 持久订阅设置唯一的客户端ID和订阅者ID。
*/
public class ConsumerPersistent { public static void main(String[] args) throws JMSException {
String clientId = "client_id"; // 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616");
Connection connection = factory.createConnection();
//客户端ID,持久订阅需要设置
connection.setClientID(clientId);
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("slimsmart.topic.test");
// 创建持久订阅,指定客户端ID。
MessageConsumer consumer = session.createDurableSubscriber(topic,clientId);
consumer.setMessageListener(new MessageListener() {
// 订阅接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText()+":"+tm.getStringProperty("property"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}); }
}

注:

1.activemq区分消费者,是通过clientID和订阅客户名称来区分的。

2.消息的生产者,发送消息时用使用持久模式。
3.使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
4.activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。

最新文章

  1. oracle11g RAC1执行脚本结果
  2. Spring对象绑定与类型转换
  3. Git学习记录
  4. hide(1000)跟show(1000)
  5. UVA 10608 Friends
  6. 单台电脑上启动多个Modelsim图形环境窗口的简单办法(windows)
  7. 关于TCP的三次握手和四次分手(整理)
  8. 摘抄--全面理解面向对象的 JavaScript
  9. linux上安装fastdfs+nginx+ngin-module实践并解决多个异常篇
  10. spark actions 算子
  11. python 安装第三方包时 read timed out
  12. ng 的 ng-repeat(对象) 把对象的 key 和value 都拿出来 循环
  13. bzoj 1014 火星人prefix - 链表 - 分块
  14. Mybatis(七)-- LRU LFU 算法
  15. 【mysql】连接的空闲时间超过8小时后 MySQL自动断开该连接解决方案
  16. CF434D Nanami's Power Plant
  17. python乱码问题之爬虫篇
  18. spring mvc之启动过程源码分析
  19. MySQL 数据库常用命令小结
  20. 【转载】log4j详解使用

热门文章

  1. 【转帖】Linux系统上面qemu 模拟arm
  2. springboot笔记之helloworld
  3. 【字符串大模拟】潜伏者—— NOIP2009原题
  4. POJ - 1287 Networking (最小生成树&并查集
  5. C++中的自定义内存管理
  6. array_chunk的用法和php操作大数据
  7. http://www.pythontutor.com/visualize.html#mode=edit python在线检测代码
  8. Linux的用户与用户组(权限管理)
  9. 3424:Candies(差分约束,Dijkstra)(配对堆优化
  10. 082、数据收集利器 cAdvisor (2019-04-30 周二)