基础知识

  1. 虚拟主机 (Virtual Host): 每个 virtual host 拥有自己的 exchanges, queues 等 (类似 MySQL 中的库)
  2. 交换器 (Exchange): 生产者产生的消息并不是直接发送给 queue 的,而是要经过 exchange 路由, exchange 类型如下:
    1. fanout: 把所有发送到该 exchange 的消息路由到所有与它绑定的 queue 中
    2. direct: 把消息路由到 binding key 与routing key 完全匹配的 queue 中
    3. topic: 模糊匹配 (单词间使用”.”分割,”*” 匹配一个单词,”#” 匹配零个或多个单词)
    4. headers: 根据发送的消息内容中的 headers 属性进行匹配
  3. 信道 (Channel): 建立在真实的 TCP 连接之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过 channel 完成的

使用示例

RabbitMQ 安装参考: docker 安装rabbitMQ

新建 Spring Boot 项目,添加配置:

spring:
rabbitmq:
host: 192.168.30.101
port: 5672
username: admin
password: admin
virtual-host: my_vhost logging:
level:
com: INFO

1. 基本使用

Queue

@Configuration
public class RabbitmqConfig { @Bean
public Queue hello() {
return new Queue("hello");
} }

Producer

@Component
@EnableAsync
public class SenderTask { private static final Logger logger = LoggerFactory.getLogger(SenderTask.class); @Autowired
private RabbitTemplate rabbitTemplate; @Autowired
private Queue queue; @Async
@Scheduled(cron = "0/1 * * * * ? ")
public void send(){
String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); rabbitTemplate.convertAndSend(queue.getName(), message); logger.info(" [x] Sent '" + message + "'");
}
}

Consumer

@Component
@RabbitListener(queues = "hello")
public class ReceiverTask { private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class); @RabbitHandler
public void receive(String in){
logger.info(" [x] Received '" + in + "'");
}
}

2. fanout

Exchange, Queue, Binding

@Configuration
public class RabbitmqConfig { @Bean
public FanoutExchange fanout() {
return new FanoutExchange("fanoutExchangeTest");
} @Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列
} @Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
} @Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
} @Bean
public Binding binding2(FanoutExchange fanout,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}
}

Producer

@Component
@EnableAsync
public class SenderTask { private static final Logger logger = LoggerFactory.getLogger(SenderTask.class); @Autowired
private RabbitTemplate rabbitTemplate; @Autowired
private FanoutExchange fanoutExchange; @Async
@Scheduled(cron = "0/1 * * * * ? ")
public void send(){
String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", message); logger.info(" [x] Sent '" + message + "'");
}
}

Consumer

@Component
public class ReceiverTask { private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class); @RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in){
receive(in, 1);
} @RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in){
receive(in, 2);
} public void receive(String in, int receiver){
logger.info("instance " + receiver + " [x] Received '" + in + "'");
}
}

3. direct

Exchange, Queue, Binding

@Configuration
public class RabbitmqConfig { @Bean
public DirectExchange direct() {
return new DirectExchange("directExchangeTest");
} @Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列
} @Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
} @Bean
public Binding binding1a(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("orange");
} @Bean
public Binding binding1b(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(direct).with("green");
} @Bean
public Binding binding2a(DirectExchange direct,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("green");
} @Bean
public Binding binding2b(DirectExchange direct,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(direct).with("black");
}
}

Producer

@Component
@EnableAsync
public class SenderTask { private static final Logger logger = LoggerFactory.getLogger(SenderTask.class); @Autowired
private RabbitTemplate rabbitTemplate; @Autowired
private DirectExchange directExchange; private final String[] keys = {"orange", "black", "green"}; @Async
@Scheduled(cron = "0/1 * * * * ? ")
public void send(){ Random random = new Random(); String key = keys[random.nextInt(keys.length)]; String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+ " to: " + key; rabbitTemplate.convertAndSend(directExchange.getName(), key, message); logger.info(" [x] Sent '" + message + "'");
}
}

Consumer

@Component
public class ReceiverTask { private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class); @RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in){
receive(in, 1);
} @RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in){
receive(in, 2);
} public void receive(String in, int receiver){
logger.info("instance " + receiver + " [x] Received '" + in + "'");
}
}

4. topic

Exchange, Queue, Binding

@Configuration
public class RabbitmqConfig { @Bean
public TopicExchange topic() {
return new TopicExchange("topicExchangeTest");
} @Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();// 创建一个非持久的,独占的自动删除队列
} @Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
} @Bean
public Binding binding1a(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.orange.*");
} @Bean
public Binding binding1b(TopicExchange topic,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(topic).with("*.*.rabbit");
} @Bean
public Binding binding2a(TopicExchange topic,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(topic).with("lazy.#");
} }

Producer

@Component
@EnableAsync
public class SenderTask { private static final Logger logger = LoggerFactory.getLogger(SenderTask.class); @Autowired
private RabbitTemplate rabbitTemplate; @Autowired
private TopicExchange topicExchange; private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; @Async
@Scheduled(cron = "0/1 * * * * ? ")
public void send(){ Random random = new Random(); String key = keys[random.nextInt(keys.length)]; String message = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+ " to: " + key; rabbitTemplate.convertAndSend(topicExchange.getName(), key, message); logger.info(" [x] Sent '" + message + "'");
}
}

Consumer

@Component
public class ReceiverTask { private static final Logger logger = LoggerFactory.getLogger(ReceiverTask.class); @RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in){
receive(in, 1);
} @RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in){
receive(in, 2);
} public void receive(String in, int receiver){
logger.info("instance " + receiver + " [x] Received '" + in + "'");
}
}

完整代码:GitHub

最新文章

  1. Mono.Ceil 无法保存Silverlight 程序集
  2. laravel administrator 一款通用的后台插件(PHP框架扩展)
  3. Apache Solr查询语法(转)
  4. POJ 2524 Ubiquitous Religions
  5. object-c面向对象1
  6. sharepoint 开发
  7. DevExpress GridControl GridView 导出到 Excel 类
  8. 《Linux命令行大全》系列(一、shell是什么)
  9. ajax请求dotnet webservice格式
  10. 对VC++6.0爱得深沉(二)个性工具的定制
  11. Tutorial 03_分布式数据库HBASE
  12. springMVC 多文件上传前后台demo
  13. [CodeForces - 447B] B - DZY Loves Strings
  14. 基于JavaScript 声明全局变量的三种方式详解
  15. Linux 学习日记 2 (常用命令 + deb包的安装)
  16. [gj]HK一行所见闻
  17. [流水账]搜索与web-container版本匹配的jar包
  18. CentOS怎样安装Python3.6
  19. 2019.03.29 读书笔记 关于params与可选参数
  20. Xcode打包提交至itunes connect后,提交审核成功,随后出现二进制文件无效

热门文章

  1. vim-配置教程+源码
  2. 【Kata Daily 190908】How Much?
  3. js练习题之查找数组中的位子
  4. kubernetes-1.18.2集群安装-02
  5. Core WebApi项目快速入门(一):环境部署
  6. Flutter(75):Sliver组件之SliverFixedExtentList
  7. 数字取证autopsy——性能优化(二)
  8. 邻居子系统 arp 状态图
  9. close与shutdown
  10. 《.NET 5.0 背锅案》第5集-案情大转弯:都是我们的错,让 .NET 5.0 背锅