模型图

为什么会出现 work queues?

前提:使用 simple 队列的时候 (上一篇博客)
我们应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),而消费者接收完消息
后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用

那么怎么让消费者同事处理多个消息呢?

在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了

使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)
来解决这一问题,使得系统的伸缩性更加容易。

Round-robin(轮询分发)

生产者发送消息

 package cn.wh.work;

 import cn.wh.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Send {
private static final String QUEVE_NAME = "test_work_queue"; public static void main(String[] args) throws Exception { Connection connection = RabbitMqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEVE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg = "hello " + i;
System.out.println(msg);
channel.basicPublish("", QUEVE_NAME, null, msg.getBytes());
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}

消费者 1

 package cn.wh.work;

 import cn.wh.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Recv1 {
private static final String QUEVE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("recv1"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(1+"OK");
}
}
};
channel.basicConsume(QUEVE_NAME,true,consumer);
}
}

消费者 2

 package cn.wh.work;

 import cn.wh.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Recv2 {
private static final String QUEVE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("recv2"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(2+"OK");
}
}
};
channel.basicConsume(QUEVE_NAME,true,consumer);
}
}

测试

备注:消费者 1 我们处理时间是 ;而消费者 2 中处理时间是 2s;但是我们看到的现象并不是 1 处理的多 消费者 2 处理的

消费者 1 中将偶数部分处理掉了

消费者2中将基数部分处理掉了

1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
按道理消费者 1 获取的比消费者 2 要多

这种方式叫做轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个
的分

如果想要代码可以留言 我可以私发

最新文章

  1. [Voice communications] 声音的滤波
  2. Trace2:创建SQL Trace
  3. 转载:《TypeScript 中文入门教程》 10、混入
  4. 关于解析P D X P 协议的心得
  5. R语言画图实例-参考R语言实战
  6. easyui datagrid 可过滤行的数据表格 导出
  7. Ajax异步调用使用
  8. PHP7安装笔记
  9. python 操作 excel
  10. Linux配置支持高并发TCP连接(socket最大连接数)
  11. UVa 10048: Audiophobia
  12. Apache httpd.conf配置详解
  13. Treasure of the Chimp Island
  14. electron通讯
  15. 201771010134杨其菊《面向对象程序设计java》第七周学习总结
  16. centos时区设置
  17. drop redo logfile current报错
  18. windows快速删除大量文件
  19. mysql忘记密码解决方法
  20. Asp.net Ajax(ashx)

热门文章

  1. [LintCode] 合并排序数组
  2. 前端代码tomcat下简单部署
  3. SetForegroundWindow以及 如何将一个某个窗口提到最顶层(转)
  4. Pat 1052 Linked List Sorting (25)
  5. 并发编程6 锁&amp;进程&amp;队列
  6. VC++SDK编程——鼠标的应用示例
  7. python基础之类的内置__setattr__,__delattr__,__getattr__和 二次加工标准类型(包装)
  8. [译]理解Windows消息循环
  9. Python 模块之 xlrd (Excel文件读写)
  10. 0408-服务注册与发现-Eureka常用配置