ActiveMQ Topic使用示例
2024-09-01 14:25:36
一、非持久的Topic
Topic 发送
public class NoPersistenceSender { public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination topic=session.createTopic("myTopic"); MessageProducer producer=session.createProducer(topic); for(int i=0 ; i<3 ; i++){
TextMessage message=session.createTextMessage("message"+i);
//message.setStringProperty("queue", "queue"+i);
//message.setJMSType("1");
producer.send(message);
}
session.commit();
session.close(); connection.close(); } }
Topic 接收
public class NoPersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
Connection connection = connectionFactory.createConnection();
connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination topic=session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(topic); Message message=consumer.receive();
while (message !=null){
TextMessage textMessage=(TextMessage) message;
//System.out.println(message.getStringProperty("queue"));
System.out.println(textMessage.getText());
session.commit();
message = consumer.receive(1000L);
} session.close();
connection.close(); } }
二、持久化得Topic
Topic 发送
public class PersistenceSender { public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
Connection connection = connectionFactory.createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination topic=session.createTopic("myTopic1"); MessageProducer producer=session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start(); for(int i=0 ; i<3 ; i++){
TextMessage message=session.createTextMessage("message"+i);
//message.setStringProperty("queue", "queue"+i);
//message.setJMSType("1");
producer.send(message);
}
session.commit();
session.close(); connection.close(); } }
- 要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
- 一定要设置完成后,再start 这个 connection
Topic 接收
public class PersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
Connection connection = connectionFactory.createConnection(); connection.setClientID("cc1");
Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic=session.createTopic("myTopic1"); TopicSubscriber ts = session.createDurableSubscriber(topic, "t1"); connection.start(); Message message=ts.receive();
while (message !=null){
TextMessage textMessage=(TextMessage) message;
//System.out.println(message.getStringProperty("queue"));
System.out.println(textMessage.getText());
session.commit();
message = ts.receive(1000L);
} session.close();
connection.close(); } }
- 需要在连接上设置消费者id,用来识别消费者
- 需要创建TopicSubscriber来订阅
- 要设置好了过后再start 这个 connection
- 一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
最新文章
- Xamarin Studio在Mac环境下的配置和Xamarin.iOS常用控件的示例
- 【POJ】2096 Collecting Bugs
- FR #2题解
- 课堂所讲整理:HTML--6运算符、类型转换
- MARKDOWN--介绍http://www.jianshu.com/p/q81RER
- Perceptron Learning Algorithm (PLA)
- winform保存登录cookie
- 快速排序算法(C#实现)
- 浅谈一下SSI+Oracle框架的整合搭建
- velocity 高亮显示
- MySQL中 InnoDB 和 MyISAM 小结
- 在windows下运行spark
- python+selenium自动化软件测试(第13章):selenium面试题
- 转 Java输入输出流详解(非常详尽)
- Mysql SQL Mode详解
- SDE与shapefile之间的数据导入与导出
- 【Netty】(7)---搭建websocket服务器
- C++入门篇十一
- tcp的连接数量
- springboot实现自定义的错误页面展示
热门文章
- C++利用openssl进行公钥解密
- 表单Content-Type为multipart/form-data时,后台数据的接收
- OpenSL ES: 利用OpenSL ES播放一个存在于SDcard上的PCM文件
- 编译grub时报告";grub_script.yy.c:19:22: error: statement with no effect [-Werror=unused-value]";怎么处理?
- 【转载】 AutoML相关论文
- osg MatrixManipulator CameraManipulator
- elasticsearch _search结果解析
- 一秒 解决 ERROR 1044 (42000): Access denied for user &#39;&#39;@&#39;localhost&#39; to database &#39;mysql 问题
- 根据DELTA自动生成SQL语句
- redis未设置idle超时时间导致连接过多