常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。我们在本次课程中介绍 RabbitMQ的使用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

(5)RocketMQ 阿里巴巴

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

​ Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

已经配置好了ssm的开发环境

1.导入依赖

<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.3</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
</dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>

2.编写生产者

2.1配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="cn.test.rabbitmq.spring"/> <!-- 配置连接工厂 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定义mq管理 -->
<rabbit:admin connection-factory="connectionFactory" /> <!-- 声明队列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" /> <!-- 定义交换机绑定队列(路由模式) -->
<rabbit:direct-exchange name="spring.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="spring.test.queue" key="user.insert" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定义交换机绑定队列(路由模式)使用匹配符
<rabbit:topic-exchange id="springTestExchange" name="spring.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="spring.test.queue" pattern="#.#" />
</rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 定义模版 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="spring.test.exchange"
message-converter="jsonMessageConverter"/> </beans>

2.2 发送方代码

这里是往RabbitMQ队列中放入任务,让消费者去取

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; @Component
public class MqSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(){
//根据key发送到对应的队列
amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
System.out.println("发送成功........");
}
}

2.3 测试代码

package cn.test.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-send.xml")
public class MqSendDemo { @Autowired
private MqSender mqSender;
@Test
public void test(){
//根据key发送到对应的队列
mqSender.sendMessage();
}
}

3.编写消费者

3.1 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 配置连接工厂 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定义mq管理 -->
<rabbit:admin connection-factory="connectionFactory" /> <!-- 声明队列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" /> <!-- 定义消费者 -->
<bean id="testMqListener" class="cn.test.rabbitmq.spring.MqListener" /> <!-- 定义消费者监听队列 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="testMqListener" queues="spring.test.queue" />
</rabbit:listener-container> </beans>

3.2 监听代码

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; public class MqListener implements MessageListener { public void onMessage(Message message) {
try {
System.out.println(message.getBody());
String ms = new String(message.getBody(), "UTF-8");
System.out.println(ms);
} catch (Exception e) {
e.printStackTrace();
}
}
}

3.3 测试代码

package cn.itcast.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-receive.xml")
public class MqReceiveDemo { @Test
public void test(){
//等待队列中放入任务,如果有任务,立即消费任务
while (true){
}
}
}

最新文章

  1. SQL 循环语句 while 介绍 实例
  2. Terra Vista 6.2
  3. js基础知识:变量
  4. iOS运行时Runtime浅析
  5. 昨天的这个先补上--这个是关于 JQ 的移动 和 渐变特效的点击事件
  6. 【转】利用Ajax.BeginForm提交文件
  7. WinForm----DataGridview---连接数据库,以及双击一条数据,显示信息到Label控件,也可以是TextBox控件。
  8. SQL Server Insert操作中的锁
  9. 制作自己的docker镜像
  10. Django 1.11.7学习,配置MySQL数据库(python3.5)
  11. POJ 3278&amp;&amp;2049&amp;&amp;3083
  12. Django的跨域请求
  13. nginx+jwplayer配置flv/MP4点播系统, 视频拖动支持
  14. Python全栈开发之14、Javascript
  15. opencv学习笔记——图像缩放函数resize
  16. gitLab创建自己的私有库
  17. 「七天自制PHP框架」应用:JSON生成器
  18. SQL server 存储过程 C#调用Windows CMD命令并返回输出结果 Mysql删除重复数据保留最小的id C# 取字符串中间文本 取字符串左边 取字符串右边 C# JSON格式数据高级用法
  19. jQuery实现淡入淡出二级下拉导航菜单的方法
  20. 通过Ollydbg定位私有协议通信明文

热门文章

  1. 微信小程序开发之https服务器搭建三步曲
  2. oh my zsh 如何启用插件
  3. Yacc - 一个生成 LALR(1) 文法分析器的程序
  4. robotframework+python3+selenium之创建第一个项目---第三集
  5. html input 上capture 参数在 安卓 苹果上的异同
  6. Linux系统升级
  7. composer(作曲家)安装php-ml
  8. Java-Class-@I:org.springframework.beans.factory.annotation.Autowired
  9. spring boot部署到阿里云碰到的总总问题
  10. web前端好书推荐 CSS权威指南《第3版,Bootstrap实战,精通CSS 高级Web标准解决方案 第2版 中文