RabbitMQ消费端配置

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 手动确定(默认自动确认)
concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
max-concurrency: 10 # 消费端的监听最大个数
prefetch: 10
connection-timeout: 15000 # 超时时间

在消费端,配置prefetchconcurrency参数便可以实现消费端MQ并发处理消息,那么这两个参数到底有什么含义??

1. prefetch

每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。

对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1

模拟:
生产者每次生产10条消息:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
//直接向队列中发送数据
@GetMapping("send")
public String send() {
for (int i = 0; i < 10; i++) {
String content = "Date:" + System.currentTimeMillis();
content = content + ":::" + i;
rabbitTemplate.convertAndSend("kinson", content);
}
return "success";
}
}

控制页面:

 
消费端直接预取了10条消息.png

2. concurrency

上面配置中,concurrency =1,即每个Listener容器将开启一个线程去处理消息。

在2.0版本后,可以在注解中配置该参数:

@Component
@Slf4j
public class CustomerRev {
//会覆盖配置文件中的参数。
@RabbitListener(queues = {"kinson"},concurrency = "2")
public void receiver(Message msg, Channel channel) throws InterruptedException { // Thread.sleep(10000);
byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【消3】:{}", message);
}
} }

启动服务:

 
可以看到MQ有两个消费者.png

即该Listener容器产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。

 

最新文章

  1. SPOJ GSS2 Can you answer these queries II
  2. Spark 实时计算整合案例
  3. linux下创建库函数
  4. wuzhicms 无规律推荐位标签的嵌套使用
  5. Open CASCADE 基础类(Foundation Classes)
  6. Codeforces 474E - Pillars
  7. The summary of Interview
  8. UNIX环境高级编程——管道读写规则和pipe Capacity、PIPE_BUF
  9. 文本不能被选中的css
  10. java8 LocalDateTime转unix时间戳(带毫秒,不带毫秒)
  11. python之OpenCv
  12. 【linux】ssh无法root免密解决
  13. WIN10下,JAVA安装及环境变量配置(cmd可以运行java,却不能运行javac)
  14. 计数排序/Counting Sort
  15. Linux常用基本命令(tail )
  16. iOS开发多线程篇—GCD简介
  17. CS小分队第二阶段冲刺站立会议(6月1日)
  18. Android-MediaPlayer-音频播放-异步准备
  19. Python2 读取表格类型文件
  20. 去死吧!USB转串口!!!

热门文章

  1. Edit Static Web File Http Header Metadata of AWS S3 from SDK | SDK编程方式编辑储存在AWS S3中Web类文件的Http Header元数据
  2. GAN网络从入门教程(三)之DCGAN原理
  3. 题解:2018级算法第五次上机 C5-图2
  4. 1-The next outbreak we&#39;re not ready
  5. bzoj3446[Usaco2014 Feb]Cow Decathlon*
  6. P5198 [USACO19JAN]Icy Perimeter S (洛谷) (水搜索)
  7. PyQt5多线程和定时器
  8. CentOS7上安装Hadoop
  9. Java继承多态
  10. Zookeeper ----- 系统模型