Publisher的代码:

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.DeliveryMode;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MapMessage;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Session;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. public class Publisher {
  12. // 单例模式
  13. // 1、连接工厂
  14. private ConnectionFactory connectionFactory;
  15. // 2、连接对象
  16. private Connection connection;
  17. // 3、Session对象
  18. private Session session;
  19. // 4、生产者
  20. private MessageProducer messageProducer;
  21. public Publisher() {
  22. try {
  23. this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
  24. "123", "tcp://localhost:61616");
  25. this.connection = connectionFactory.createConnection();
  26. this.connection.start();
  27. // 不使用事务
  28. // 设置客户端签收模式
  29. this.session = this.connection.createSession(false,
  30. Session.AUTO_ACKNOWLEDGE);
  31. this.messageProducer = this.session.createProducer(null);
  32. } catch (JMSException e) {
  33. throw new RuntimeException(e);
  34. }
  35. }
  36. public Session getSession() {
  37. return this.session;
  38. }
  39. public void send1(/* String QueueName, Message message */) {
  40. try {
  41. Destination destination = this.session.createTopic("topic1");
  42. MapMessage msg1 = this.session.createMapMessage();
  43. msg1.setString("name", "张三");
  44. msg1.setInt("age", 22);
  45. MapMessage msg2 = this.session.createMapMessage();
  46. msg2.setString("name", "李四");
  47. msg2.setInt("age", 25);
  48. MapMessage msg3 = this.session.createMapMessage();
  49. msg3.setString("name", "张三");
  50. msg3.setInt("age", 30);
  51. // 发送消息到topic1
  52. this.messageProducer.send(destination, msg1,
  53. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  54. this.messageProducer.send(destination, msg2,
  55. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  56. this.messageProducer.send(destination, msg3,
  57. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  58. } catch (JMSException e) {
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. public void send2() {
  63. try {
  64. Destination destination = this.session.createTopic("topic1");
  65. TextMessage message = this.session.createTextMessage("我是一个字符串");
  66. // 发送消息
  67. this.messageProducer.send(destination, message,
  68. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  69. } catch (JMSException e) {
  70. throw new RuntimeException(e);
  71. }
  72. }
  73. public static void main(String[] args) {
  74. Publisher producer = new Publisher();
  75. producer.send1();
  76. }
  77. }

Subscribe的代码:

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MapMessage;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. public class Subscriber {
  13. // 单例模式
  14. // 1、连接工厂
  15. private ConnectionFactory connectionFactory;
  16. // 2、连接对象
  17. private Connection connection;
  18. // 3、Session对象
  19. private Session session;
  20. // 4、生产者
  21. private MessageConsumer messageConsumer;
  22. // 5、目的地址
  23. private Destination destination;
  24. public Subscriber() {
  25. try {
  26. this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
  27. "123", "tcp://localhost:61616");
  28. this.connection = connectionFactory.createConnection();
  29. this.connection.start();
  30. // 不使用事务
  31. // 设置客户端签收模式
  32. this.session = this.connection.createSession(false,
  33. Session.AUTO_ACKNOWLEDGE);
  34. this.destination = this.session.createTopic("topic1");
  35. this.messageConsumer = this.session.createConsumer(destination);
  36. } catch (JMSException e) {
  37. throw new RuntimeException(e);
  38. }
  39. }
  40. public Session getSession() {
  41. return this.session;
  42. }
  43. // 用于监听消息队列的消息
  44. class MyLister implements MessageListener {
  45. @Override
  46. public void onMessage(Message message) {
  47. try {
  48. if (message instanceof TextMessage) {
  49. }
  50. if (message instanceof MapMessage) {
  51. MapMessage ret = (MapMessage) message;
  52. System.out.println(ret.toString());
  53. System.out.println(ret.getString("name"));
  54. System.out.println(ret.getInt("age"));
  55. // 因为设置的是客户端的签收模式,所以要手动的去确认消息的消费
  56. message.acknowledge();
  57. }
  58. } catch (JMSException e) {
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. }
  63. // 用于异步监听消息
  64. public void receiver() {
  65. try {
  66. this.messageConsumer.setMessageListener(new MyLister());
  67. } catch (JMSException e) {
  68. throw new RuntimeException(e);
  69. }
  70. }
  71. public static void main(String[] args) {
  72. Subscriber conmuser = new Subscriber();
  73. conmuser.receiver();
  74. }
  75. }

先启动消费者(先订阅后消费),再启动发布者

最新文章

  1. 利用CSS中的:after、: before制作的边三角提示框
  2. #1094 : Lost in the City
  3. 作业一直"执行"
  4. openstack配置注意事项(主要是网络相关)
  5. apache + tomcat 集群
  6. 如何在Angular2中使用jquery
  7. (转载)Convolutional Neural Networks卷积神经网络
  8. 转:PHP超时处理全面总结
  9. json-lib 使用教程
  10. linux shadow破解
  11. javascript每日一练—运动
  12. SQLSERVER复制优化之一《减少包大小》
  13. Djanto static静态文件配置
  14. vue 路由(1)
  15. PHP中的加强型接口Traits
  16. Java中volatile关键字解析
  17. new和delete操作符
  18. SpringBoot配置(2) slf4j&logback
  19. 探索未知种族之osg类生物---器官初始化一
  20. Django入门与实践-第13章:表单处理(完结)

热门文章

  1. 虚拟机VMware 安装CentOS6.5
  2. Java和Js的高精度计算
  3. Selenium2+python自动化69-PhantomJS使用【转载】
  4. mysql的mysqladmin的用法
  5. php多台服务器实现session共享
  6. Linux下安装Sybase ASE 16
  7. 如何修改wamp目录【成功】
  8. linux查看cpu内存信息
  9. 2017中国大学生程序设计竞赛 - 女生专场C【前后缀GCD】
  10. 差分+树状数组【p4868】Preprefix sum