JMS学习七(ActiveMQ之Topic的持久订阅)
2024-10-07 05:13:37
非持久化订阅持续到它们订阅对象的生命周期。这意味着,客户端只能在订阅者活动时看到相关主题发布的消息。如果订阅者不活动,它会错过相关主题的消息。如果花费较大的开销,订阅者可以被定义为durable(持久化的)。持久化的订阅者注册一个带有JMS保持的唯一标识的持久化订阅(subscription)。带有相同标识的后续订阅者会再续前一个订阅者的订阅状态。如果持久化订阅没有活动的订阅者,JMS会保持订阅消息,直到消息被订阅接收或者过期。
生产者:
package cn.slimsmart.activemq.demo.topic; import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException {
// 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("slimsmart.topic.test");
MessageProducer producer = session.createProducer(topic);
// NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage();
message.setText("topic 消息。");
message.setStringProperty("property", "消息Property");
// 发布主题消息
producer.send(message);
System.out.println("Sent message: " + message.getText());
session.commit();
session.close();
connection.close();
}
}
消费者:
package cn.slimsmart.activemq.demo.topic; import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /**
* 持久订阅设置唯一的客户端ID和订阅者ID。
*/
public class ConsumerPersistent { public static void main(String[] args) throws JMSException {
String clientId = "client_id"; // 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.43:61616");
Connection connection = factory.createConnection();
//客户端ID,持久订阅需要设置
connection.setClientID(clientId);
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic("slimsmart.topic.test");
// 创建持久订阅,指定客户端ID。
MessageConsumer consumer = session.createDurableSubscriber(topic,clientId);
consumer.setMessageListener(new MessageListener() {
// 订阅接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText()+":"+tm.getStringProperty("property"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}); }
}
注:
1.activemq区分消费者,是通过clientID和订阅客户名称来区分的。
2.消息的生产者,发送消息时用使用持久模式。
3.使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
4.activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。
最新文章
- oracle11g RAC1执行脚本结果
- Spring对象绑定与类型转换
- Git学习记录
- hide(1000)跟show(1000)
- UVA 10608 Friends
- 单台电脑上启动多个Modelsim图形环境窗口的简单办法(windows)
- 关于TCP的三次握手和四次分手(整理)
- 摘抄--全面理解面向对象的 JavaScript
- linux上安装fastdfs+nginx+ngin-module实践并解决多个异常篇
- spark actions 算子
- python 安装第三方包时 read timed out
- ng 的 ng-repeat(对象) 把对象的 key 和value 都拿出来 循环
- bzoj 1014 火星人prefix - 链表 - 分块
- Mybatis(七)-- LRU LFU 算法
- 【mysql】连接的空闲时间超过8小时后 MySQL自动断开该连接解决方案
- CF434D Nanami's Power Plant
- python乱码问题之爬虫篇
- spring mvc之启动过程源码分析
- MySQL 数据库常用命令小结
- 【转载】log4j详解使用
热门文章
- 【转帖】Linux系统上面qemu 模拟arm
- springboot笔记之helloworld
- 【字符串大模拟】潜伏者—— NOIP2009原题
- POJ - 1287 Networking (最小生成树&;并查集
- C++中的自定义内存管理
- array_chunk的用法和php操作大数据
- http://www.pythontutor.com/visualize.html#mode=edit python在线检测代码
- Linux的用户与用户组(权限管理)
- 3424:Candies(差分约束,Dijkstra)(配对堆优化
- 082、数据收集利器 cAdvisor (2019-04-30 周二)