http://riddickbryant.iteye.com/blog/441890

【发送端】

session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

 import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author LIN NP
*/
public class JmsSender{ private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null; private static final int SEND_NUMBER = 1; public void init()
{
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616"); // ActiveMQ默认使用的TCP连接端口是61616 try{
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //第一种方式:Queue // destination = session.createQueue("xkey"); // "xkey"可以取其他的。
// producer = session.createProducer(destination); // 得到消息生成者【发送者】 //第二种方式:Topic
Topic topic = session.createTopic("xkey.Topic");
producer = session.createProducer(topic); // 持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session,producer); //session.commit();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
try
{
connection.close();
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} private void sendMessage(Session session,MessageProducer producer) throws JMSException
{
for (int i = 1; i <= SEND_NUMBER; i ++)
{
TextMessage message = session.createTextMessage("发送消息" + i); System.out.println("发送消息" + i); // 发送消息
producer.send(message); }
}
/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
JmsSender jms = new JmsSender();
jms.init();
}
}

【接收端】

connection = connectionFactory.createConnection();
connection.setClientID("bbb");
connection.start();

session = connection.createSession(Boolean.FALSE,  Session.AUTO_ACKNOWLEDGE);

consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅

 import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author LIN NP
*/
public class JmsReceiver
{
private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private Destination destination = null;
public void init()
{
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616"); // ActiveMQ默认使用的TCP连接端口是61616
try
{
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.setClientID("bbb");
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
/**
* 第一种方式:Queue
*/
// destination = session.createQueue("xkey");
// consumer = session.createConsumer(destination);
/**
* 第二种方式:Topic
*/ Topic topic = session.createTopic("xkey.Topic");
//consumer = session.createConsumer(topic);
consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅 /**
*
*/
while (true)
{
//设置接收者接收消息的时间,为了便于测试,这里设定为500s
TextMessage message = (TextMessage) consumer.receive(500);
if (null != message)
{
System.out.println("Receiver: " + message.getText());
}
else
{
break;
}
}
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
try
{
connection.close();
}
catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args)
{
// TODO Auto-generated method stub
JmsReceiver jms = new JmsReceiver();
jms.init();
}
}
 //创建JMS连接和会话
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
connection.setClientID(Constant.JMS_CLIENT_ID);
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建消息发送主题和发送者
Topic jmsSendTopic = session.createTopic(sendTopic);
sendTopicProducer = session.createProducer(jmsSendTopic);
sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
// 创建消息接收主题和接收者
Topic jmsReceiveTopic = session.createTopic(receiveTopic);
receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
receiveTopicConsumer.setMessageListener(this);
connection.start();

解答:问题原因在于这段代码在接收到JMS消息时不会向ActiveMQ服务器确认消息的接收,故而ActiveMQ服务器一直认为该消息没有成功发送给接收者,因而每次接收者重启之后就会收到ActiveMQ服务器发送过来的消息。在这里要解释一下session的创建。

session = connection.createSession(true,Session.Auto_ACKNOWLEDGE);

当createSession第一个参数为true时,表示创建的session被标记为transactional的,确认消息就通过确认和校正来自动地处理,第二个参数应该是没用的。

当createSession的第一个参数为false时,表示创建的session没有标记为transactional,此时有三种用于消息确认的选项:
**AUTO_ACKNOWLEDGE session将自动地确认收到的一则消息;
**CLIENT_ACKNOWLEDGE 客户端程序将确认收到的一则消息,调用这则消息的确认方法;
**DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散的”确认消息传递,可以想到,这将导致消息提供者传递的一些复制消息可能出错。

JMS有两种消息传递方式。标记为NON_PERSISTENT的消息最多传递一次,而标记为PERSISTENT的消息将使用暂存后再转发的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失,但是得等到这个服务恢复联机的时候才会被传递。所以默认的消息传递方式是非持久性的,虽然使用非持久性消息可能降低内存和需要的存储器,但这种传递方式只有当你不需要接收所有消息时才使用。
因此正确的代码只需改动一处就行了,即将true改为false

 //创建JMS连接和会话
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
connection.setClientID(Constant.JMS_CLIENT_ID);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息发送主题和发送者
Topic jmsSendTopic = session.createTopic(sendTopic);
sendTopicProducer = session.createProducer(jmsSendTopic);
sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
// 创建消息接收主题和接收者
Topic jmsReceiveTopic = session.createTopic(receiveTopic);
receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME);
receiveTopicConsumer.setMessageListener(this);
connection.start();

最新文章

  1. Jquery 关于span标签的取值赋值用法
  2. petapoco定制,比较SQL事务,存储过程,分布式事务(MSDTC)的区别和场景
  3. 【java基础学习】字符串
  4. soapUI 在多个测试套件 testsuite 里,多个testcase里传值如何实现
  5. 《高性能javascript》读书笔记:P1减少跨作用域的变量访问
  6. 一幅图概括Android测试的方方面面
  7. C++学习笔记:不用sizeof判断int类型占用几个字节
  8. [GeekBand] C++学习笔记(2)——BigThree、OOP
  9. disconf实践(二)
  10. codevs 2241 排序二叉树
  11. UESTC_酱神寻宝 2015 UESTC Training for Dynamic Programming&lt;Problem O&gt;
  12. pyqt样式表语法笔记(上) --原创
  13. zlib报“LNK2001:无法解析的外部符号”错误
  14. Mysql----关于内联,左联,右联,全联的使用和理解
  15. 分分钟解决MySQL查询速度慢与性能差
  16. Android JS 交互出现 Uncaught Error: Error calling method on NPObject
  17. qhfl-3 Course模块
  18. 如何启动Intel VT-X及合理利用搜索
  19. Ansi与Unicode编码
  20. Java9都快发布了,Java8的十大新特性你了解多少呢?

热门文章

  1. XAMPP 的安装配置
  2. 让层遮挡select(ie6下的问题)
  3. 嵌入式 hi3518平台多路码流添加osd
  4. [转]Java Web乱码过滤器
  5. kali 安装完成后,无法进入界面
  6. 20+非常棒的Photoshop卡通设计教程
  7. 使用Log.isLoggable方法
  8. 使用Eclipse的几个必须掌握的快捷方式(能力工场小马哥收集)
  9. [札记]IL经典指令解析之方法调度
  10. 转】MyEclipse使用总结——设置MyEclipse使用的Tomcat服务器