package pfs.y2017.m11.mq.activemq.demo07;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random; public class Client implements MessageListener {
private static int ackMode;
private static String clientQueueName; private boolean transacted = false;
private MessageProducer producer; static {
clientQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
} public Client() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(transacted, ackMode);
Destination adminQueue = session.createQueue(clientQueueName); // Setup a message producer to send message to the queue the server is consuming
// from
this.producer = session.createProducer(adminQueue);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a temporary queue that this client will listen for responses on then
// create a consumer
// that consumes message from this temporary queue...for a real application a
// client should reuse
// the same temp queue for each message to the server...one temp queue per
// client
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest); // This class will handle the messages to the temp queue as well
responseConsumer.setMessageListener(this); // Now create the actual message you want to send
TextMessage txtMessage = session.createTextMessage();
// 设置信息
txtMessage.setText("MyProtocolMessage"); // Set the reply to field to the temp queue you created above, this is the queue
// the server
// will respond to
txtMessage.setJMSReplyTo(tempDest); // Set a correlation ID so when you get a response you know which sent message
// the response is for
// If there is never more than one outstanding message to the server then the
// same correlation ID can be used for all the messages...if there is more than
// one outstanding
// message to the server you would presumably want to associate the correlation
// ID with this
// message somehow...a Map works good
String correlationId = this.createRandomString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
} catch (JMSException e) {
// Handle the exception appropriately
}
} private String createRandomString() {
Random random = new Random(System.currentTimeMillis());
long randomLong = random.nextLong();
return Long.toHexString(randomLong);
} public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("响应内容 = " + messageText);
}
} catch (JMSException e) {
// Handle the exception appropriately
}
} public static void main(String[] args) {
new Client();
}
}
package pfs.y2017.m11.mq.activemq.demo07;

public class MessageProtocol {
public String handleProtocolMessage(String messageText) {
String responseText;
// 判断是否是client传过来的信息,在这里就可以做些解析
if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
responseText = "我收到了信息";
} else {
responseText = "我不知道你传的是什么: " + messageText;
} return responseText;
}
}
package pfs.y2017.m11.mq.activemq.demo07;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Server implements MessageListener {
private static int ackMode;
private static String messageQueueName;
private static String messageBrokerUrl; private Session session;
private boolean transacted = false;
private MessageProducer replyProducer;
private MessageProtocol messageProtocol; static {
messageBrokerUrl = "tcp://localhost:61616";
messageQueueName = "client.messages";
ackMode = Session.AUTO_ACKNOWLEDGE;
} public Server() {
try {
// This message broker is embedded
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
} catch (Exception e) {
// Handle the exception appropriately
} // Delegating the handling of messages to another class, instantiate it before
// setting up JMS so it
// is ready to handle messages
this.messageProtocol = new MessageProtocol();
this.setupMessageQueueConsumer();
} private void setupMessageQueueConsumer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(this.transacted, ackMode);
Destination adminQueue = this.session.createQueue(messageQueueName); // Setup a message producer to respond to messages from clients, we will get the
// destination
// to send to from the JMSReplyTo header field from a Message
this.replyProducer = this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Set up a consumer to consume messages off of the admin queue
MessageConsumer consumer = this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
} catch (JMSException e) {
// Handle the exception appropriately
}
} public void onMessage(Message message) {
try {
TextMessage response = this.session.createTextMessage();
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String messageText = txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
} // Set the correlation ID from the received message to be the correlation id of
// the response message
// this lets the client identify which message this is a response to if it has
// more than
// one outstanding message to the server
response.setJMSCorrelationID(message.getJMSCorrelationID()); // Send the response to the Destination specified by the JMSReplyTo field of the
// received message,
// this is presumably a temporary queue created by the client
this.replyProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
// Handle the exception appropriately
}
} public static void main(String[] args) {
new Server();
}
}

最新文章

  1. BZOJ 1927: [Sdoi2010]星际竞速
  2. mybatis 中#{}与${}的区别
  3. (转)大数据时代下的SQL Server第三方负载均衡方案----Moebius测试
  4. 【POJ】1811 Prime Test
  5. Network client/server
  6. Ruby On Rails环境搭建
  7. VIM_插件
  8. 初识ASP.NET---若干常见错误
  9. About Restful Web Api Something.
  10. CocoaPods安装和使用及问题:Setting up CocoaPods master repo-b
  11. 标准的数据获取 -ios
  12. CSS中float属性和clear属性的一些笔记
  13. php学习记录
  14. 【java】打印流的基本实现及java.io.PrintStream、java.io.PrintWriter示例
  15. zabbix报错gd、freetype、png、jpeg
  16. js datagrid 移动去重
  17. android 版本号大小比较
  18. Go语言生成随机数
  19. Spring Boot 构建电商基础秒杀项目 (四) getotp 页面
  20. 铺放骨牌 uva11270

热门文章

  1. Matplotlib基本图形之直方图
  2. Leetcode 376.摆动序列
  3. 只操作git(cmd)就可以使用git将项目上传到github
  4. 新浪微博error:redirect_uri_mismatch的解决方法 [
  5. BZOJ 2829 信用卡凸包 ——计算几何
  6. BZOJ 3230 相似子串 ——后缀数组
  7. SPOJ LCS2 Longest Common Substring II ——后缀自动机
  8. readonly和disabled的异同
  9. java面
  10. Snoop resynchronization mechanism to preserve read ordering