消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递消息排队模型,它可以在分布式环境下拓展进程间的通信,对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者)

MQ     消息中间件     消息队列

Message Queue简称MQ

种类:

1.Apache  ActiveMQ  

  ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现.我们在本次课程中介绍ActiveMQ的使用.

2.阿里  RocketMQ

  

3.Pivotal 开发RabbitMQ

  AMQP协议的领导实现,支持多种场景.淘宝的Mysql集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用.

ZeroMQ

  史上最快的消息队列系统.

kafka:

  Apache下的一个子项目.特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统.适合处理海量数据.

使用场景(为什么使用MQ?):

//TODO

JMS简介

什么是JMS?

    JMS(Java Messaging Service)是java平台上有关面向消息中间件的技术规范(可以使用jmsTemplate),它便于消息系统中的java应用程序进行消息交换,并且通过提供标准的产生,发送,接收消息的接口库简化企业应用的开发.

  JMS本身只定义了一系列的接口规范,是一种与厂商无关的API.用来访问消息收发系统.它类似于JDBC(java Database Connectivity);这里,JDBC是可以用来访问许多不同关系数据库的的API.而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务.许多厂商目前都支持JMS,包括IBM的MQseries.BEA的Weblogic.JMSservice和Progress的SonicMQ,这只是几个例子.JMS使您能够通过消息收发服务(有时称为消息中介程序或者路由器)从一个JMS客户机向另一个JMS客户机发送消息,消息是JMS中的一种类型对象,由两部分组成:报头消息主体.

报头由路由信息以及有关该消息的元数据组成.

消息主体则携带着应用程序的数据或有效负载.

  JMS定义了五种不同的消息正文格式,以及调用的消息类型.允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性.

  TextMessage ---一个字符串对象

  MapMessage---一套名称-值对

  ObjectMessage--一个序列化的java对象

  ByteMessage -- 一个字节的数据流

  StreamMessage --Java原始值的数据流

JMS消息传递类型

  对于消息的传递有两种类型:

    一种是点对点,即一个生产者和一个消费者一一对应.

    

    另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收.

JMS入门小Demo

现在是点对点模式:

  点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接行ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发现接收端,如果没有接收端接收,多个接收端,但是一条消息,只会被一个接收端给接收到,那个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息.

创建的是一个没有使用任何骨架的java工程

引入的依赖为:

   <dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>

代码

/**
* @Auther:qingmu
* @Description:脚踏实地,只为出人头地
* @Date:Created in 16:16 2019/4/23
*/ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /**
* 点对点模式
* 生产者
*/ public class QueueProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
/**
* AUTO_ACKNOWLEDGE = 1 自动确认
• CLIENT_ACKNOWLEDGE = 2 客户端手动确认
• DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
• SESSION_TRANSACTED = 0 事务提交并确认 */
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close(); }
}

直接运行这个main方法,后可以mq中查看到:

消费者的代码为:

/**
* @Auther:qingmu
* @Description:脚踏实地,只为出人头地
* @Date:Created in 16:23 2019/4/23
*/ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /**
* 点对点模式
* 消息消费者
*/
public class QueueConsumer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue); //7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close(); }
}

通过上面定义的监听器,可以获取到生产者生产的信息.在控制台上

在进行测试的时候,开启两个以上的消费者,开启一个生产者,然后可以观察到只能在一个消费者的控制台上进行显示,而在另一个消费者的控制台上不能进行打印.

发布和订阅者模式

消费生产者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @Auther:qingmu
* @Description:脚踏实地,只为出人头地
* @Date:Created in 16:32 2019/4/23
*/
public class TopicProducer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close(); }
}

运行完以后的结果为:

消费者为:

/**
* @Auther:qingmu
* @Description:脚踏实地,只为出人头地
* @Date:Created in 16:36 2019/4/23
*/ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /**
* 订阅
* 多对多
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception{
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
//Queue queue = session.createQueue("test-queue");
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic); //7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close(); }
}

运行测试:

  同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现每个消费者会接收到消息.

最新文章

  1. web 项目 连接mycat 读写分离失效问题,
  2. 【转】Checkpoint--与lazy writer区别
  3. Server Tomcat v7.0 Server at localhost failed to start.临时解决办法
  4. How to stop uwsgi when no pidfile in config?
  5. 关于网上常见的几种MD5加密的区别
  6. License Manager 10.3启动失败解决方法
  7. git cheatsheet小抄本
  8. Intent.ACTION_PICK
  9. python学习笔记25(文件管理 os包)
  10. JavaScript HTML DOM 元素(节点)
  11. effective条款15,在资源管理类中小心copying行为
  12. Tracing JIT
  13. cookie的expires属性和max-age属性
  14. ajax加php实现简单的投票效果
  15. boolean类型相关
  16. Android初级教程启动定时器详解
  17. TLS1.2 only
  18. zombodb 低级api 操作
  19. JS 中对变量类型判断的几种方式
  20. 简单创建一个“嗨新房”的mac客户端

热门文章

  1. Spark入门到精通--(第九节)环境搭建(Hive搭建)
  2. LINUX 配置定时任务,每天凌晨1点定时备份数据库
  3. .net拼接json字符串
  4. idea+maven下jrebel的安装破解
  5. 2018-2019-2 网络对抗技术 20165236 Exp3 免杀原理与实践
  6. Vue框架创建项目常遇到问题
  7. 3.3.1 MyBatis框架介绍
  8. openpyxl.utils.exceptions.IllegalCharacterError
  9. 使用Java代码发送邮件
  10. opatchauto failed with error code 42 补丁目录权限问题