转播给所有订阅这个topic的使用者

package com.study.mq.b7_transaction;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException; /**
* 消费者事务
*
* 消费者开启事务后,接收到消息后,需要手动提交事务,否则broker上的消息不会真正被消费
*/
// http://activemq.apache.org/destination-options.html
public class Consumer {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory = null;
Connection conn = null;
Session session = null;
MessageConsumer consumer = null; try {
// brokerURL http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.242:61616");
// 2、创建连接对象
conn = connectionFactory.createConnection("admin", "admin");
conn.start(); // 3、创建会话
// 第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
// 第一个参数为false时,第二个参数的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
// Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
// Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
// DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 4、创建点对点接收的目标
Destination destination = session.createQueue("queue2");
// 创建订阅的目标
// Destination b4_destination = session.createTopic("topic1"); // 5、创建消费者消息 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination); // 6、接收消息
Session finalSession = session;
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message);
}
try {
finalSession.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
}
}); System.in.read();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
} if (session != null) {
try {
session.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
} if (conn != null) {
try {
conn.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
package com.study.mq.b7_transaction;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* 生产者事务
*
* 生产者开启事务后,消息发送后,提交事务后,broker上的消息才能发到消费者
*/
public class Producer {
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory;
Connection conn = null;
Session session = null; try {
// 1、创建连接工厂
// connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "udp://vm1.tony.com:61616");
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.158.129:61616");
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 3、创建会话
// 第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
// 第一个参数为false时,第二个参数的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
// Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
// Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
// DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 4、创建点对点发送的目标
Destination destination = session.createQueue("queue2");
// 创建发布的目标
// Destination b4_destination = session.createTopic("topic1"); // 5、创建生产者消息
MessageProducer producer = session.createProducer(destination);
// 设置生产者的模式,有两种可选
// DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
// DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 10; i++) {
// 6、创建一条消息
// 有6中消息类型:
// BytesMessage 用来传递字节
// MapMessage 用来传递字节
// ObjectMessage 用来传递序列化对象
// StreamMessage 用来传递文件等
// TextMessage 用来传递字符串
String text = "Hello world! " + i;
TextMessage message = session.createTextMessage(text); // 7、发送消息
producer.send(message); if (i % 3 == 0) { // 3的倍数,发送,但回滚
session.rollback();
} else {
// 在开启持久化模式时,commit后,会同步到磁盘
// 所以当一个原子步骤中发送大批量消息,不建议每条消息发送后提交,而是批量发送完后一次性提交,以最大限度地减少磁盘同步产生的延迟.
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
} if (session != null) {
try {
session.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}

也可以使用spring进行集成

        <!--直接使用spring-boot-starter-activemq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

Jms注解自动完成数据的获取

package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener; @SpringBootApplication
@EnableJms
public class Consumer { @JmsListener(destination = "queue1")
public void receive(String message) {
System.out.println("收到消息:" + message);
} public static void main(String[] args) {
SpringApplication.run(Consumer.class, args);
}
}
package hello;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate; import javax.annotation.PostConstruct; @SpringBootApplication
public class Producer { @Autowired
private JmsTemplate jmsTemplate; @PostConstruct
public void init() {
jmsTemplate.convertAndSend("queue1", "Hello Spring 4");
} public static void main(String[] args) {
SpringApplication.run(Producer.class, args);
}
}

最新文章

  1. js实现文件上传
  2. 为什么js加事件时不要写括号
  3. 2013长沙邀请赛A So Easy!(矩阵快速幂,共轭)
  4. struts2:非表单标签
  5. Percona-Galera-Monitoring-Template监控模板说明
  6. Cookie Session Cache
  7. 文件I/O(不带缓冲)之creat函数
  8. Redis_基本类型介绍和指令___1
  9. 协程coroutine
  10. Linux中的段管理,bss段,data段,
  11. CPU卡中T=0通讯协议的分析与实现
  12. Python urllib和urllib2模块学习(二)
  13. 物理DG主备库切换时遇到ORA-16139: media recovery required错误
  14. drawInRect:withAttributes:
  15. 腾讯文学动作密集 疑为手Q发力移动阅读铺路
  16. 爬虫day 04_01(爬百度页面)
  17. sso示例代码
  18. vultr VPS的购买及搭建ss介绍,支持锐速加速优化
  19. Odoo薪酬管理 公式配置
  20. express和cors跨域

热门文章

  1. C++算法代码——纪念品分组[NOIP2007 普及组]
  2. SpringBoot读取配置文件的内容
  3. 第七届蓝桥杯JavaB组——第6题方格填数
  4. 零基础学Python:数据容器
  5. mysql添加远程连接权限
  6. 【转载】java类加载时机与过程
  7. c#(NPOI)DataTable导出execl,execl(支持解析公式)导入DataTable
  8. 在onBackPress中实现退出拦截时不生效
  9. Kilo 使用教程
  10. FreeBSD Fcitx 输入法框架设置