这个类非常强大,我们可以对他做很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
可以设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
可以设置消费者数量、最大最小数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的转换器、消息转换器等
很多基于RabbitMQ的自制定化后端管控台在进行动态配置的时候,也是根据这一特性去实现的。
注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态
修改其消费者数量的大小、接收消息的模式等

SimpleMessageListenerContainer为什么可以进行动态感知设置变更?

package com.dwz.spring;

import java.util.UUID;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; @Configuration
@ComponentScan("com.dwz.spring.*")
public class RabbitMQConfig { @Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setVirtualHost("/vhost_dwz");
connectionFactory.setUsername("root_dwz");
connectionFactory.setPassword("123456");
return connectionFactory;
} @Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
System.err.println("RabbitAdmin启动了。。。");
//设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
} @Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
} /**
* 针对消费者的配置
* 1.设置交换机的类型
* 2.将队列绑定到交换机
* FanoutExchange:将消息分发到所有绑定的队列,无routingkey的概念
* TopicExchange:多关键字匹配
* HeadersExchange:通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
} @Bean
public Queue queue001() {
return new Queue("queue001", true);//队列持久化
} @Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
} @Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
} @Bean
public Queue queue002() {
return new Queue("queue002", true);//队列持久化
} @Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
} @Bean
public TopicExchange exchange003() {
return new TopicExchange("topic003", true, false);
} @Bean
public Queue queue003() {
return new Queue("queue003", true);//队列持久化
} @Bean
public Binding binding003() {
return BindingBuilder.bind(queue003()).to(exchange003()).with("mq.*");
} @Bean
public Queue queue_image() {
return new Queue("image_queue", true);//队列持久化
} @Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true);//队列持久化
} /*
* 简单消息监听容器
*/
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//同时监听多个队列
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//设置当前的消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置监听外露
container.setExposeListenerChannel(true);
//设置消费端标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody(), "utf-8");
System.out.println("-----------消费者:" + msg);
}
});
return container;
}
}

自定义消费端标签策略效果图:

最新文章

  1. nginx rewrite 实现二级域名跳转
  2. Redis 数据结构使用场景
  3. AssemblyInfo.cs的作用
  4. UIScrollView的属性总结
  5. hdu4649Professor Tian
  6. Sql遍历更新脚本
  7. 2014年度辛星html教程夏季版第四节
  8. string之substring的用法
  9. 经典:十步完全理解 SQL
  10. ●BZOJ 4176 Lucas的数论
  11. 2010-01-20 12:09 ubuntu下minicom的安装及使用
  12. 2个byte类型数据相加(转型问题的分析)
  13. Gin框架源码解析
  14. html5 知识点简单总结02
  15. Android JNI 学习(七):Accessing Fields Api
  16. Vue(二)基础
  17. 【iOS】build diff: /../Podfile.lock: No such file or directory
  18. 学习笔记之MobaXterm
  19. 转 DataTorrent 1.0每秒处理超过10亿个实时事件
  20. js遍历数组和遍历对象

热门文章

  1. 转SSL/TLS协议
  2. the specified service is marked as deletion,can not find the file specified
  3. webpack编写一个plugin插件
  4. MFC下调试 出现 Warning: initial dialog data is out of range.
  5. 给没有连接因特网的centos使用yum安装其他软件(转)
  6. 微信小程序iOS下拉白屏晃动,坑坑坑
  7. vue 编辑
  8. Nginx作为静态资源web服务之缓存原理
  9. vbox的四种网络模式
  10. ISO/IEC 15444-12 MP4 封装格式标准摘录 3