一:介绍

1.模型

  有两种情形,分别是轮训分发与公平分发。

  

2.出现的场景

  考虑到simple queue中的缺点。

  因为生产者发送消息后,消费者消费要花费时间,这个会造成消息的堆积。

二:Round robin--轮循

1.发送程序

  这个与简单程序类似,只是发送多条数据而已。

 package com.mq.work.round;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class RoundWorkSend {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection= ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel=connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null); //消息与发送放入for循环
for (int i=0;i<50;i++){
String msg="hello "+i;
System.out.println("[send msg]:"+msg);
channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
Thread.sleep(i*1);
} //关闭连接
channel.close();
connection.close();
}
}

2.消费者一

 package com.mq.work.round;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class RoundWorkReceive1 {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args)throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null);
//创建消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[1]receive msg:"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("done");
}
}
};
//监听队列
boolean autoAck=true;
channel.basicConsume(QUENE_NAME,autoAck,consumer);
}
}

3.消费者二

 package com.mq.work.round;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class RoundWorkReceive2 {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args)throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null);
//创建消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[2]receive msg:"+msg);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("done");
}
}
};
//监听队列
boolean autoAck=true;
channel.basicConsume(QUENE_NAME,autoAck,consumer);
}
}

4.现象

  send

  

  receive1:

  

  receive2:

  

三:fair dispatcher

1.介绍

  使用公平分发需要关闭自动应答,改成手动。

  有一种通俗的说法是:能者多劳。 

2.生产者

  需要改动的地方是:每个消费者在得到确认消息之前,消息队列不得发送一个消息给消费者,一次只能处理一个消息。

 package com.mq.work.fair;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class FairWorkSend {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args) throws Exception {
//获取一个连接
Connection connection= ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel=connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null); //限制发送给一个消费者不得超过1条
int prefetchCount=1;
channel.basicQos(prefetchCount); //消息与发送放入for循环
for (int i=0;i<50;i++){
String msg="hello "+i;
System.out.println("[send msg]:"+msg);
channel.basicPublish("",QUENE_NAME,null,msg.getBytes());
Thread.sleep(i*1);
} //关闭连接
channel.close();
connection.close();
}
}

3.消费者一

  需要改动的行数,14,18,33,38

 package com.mq.work.fair;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class FairWorkReceive1 {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args)throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null); //一次只能发送一个消息
channel.basicQos(1); //创建消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[1]receive msg:"+msg);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("done");
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听队列,不是自动应答
boolean autoAck=false;
channel.basicConsume(QUENE_NAME,autoAck,consumer);
}
}

3.消费者二

  与消费者一不同点在于消费每个消息的时间不同。

 package com.mq.work.fair;

 import com.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class FairWorkReceive2 {
private static final String QUENE_NAME="test_work_queue";
public static void main(String[] args)throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();
//创建队列声明
channel.queueDeclare(QUENE_NAME,false,false,false,null); //一次只能发送一个消息
channel.basicQos(1); //创建消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"utf-8");
System.out.println("[1]receive msg:"+msg);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("done");
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听队列,不是自动应答
boolean autoAck=false;
channel.basicConsume(QUENE_NAME,autoAck,consumer);
}
}

4.现象

  消费者一:

  

  消费者二:

  

最新文章

  1. Java集合专题总结(1):HashMap 和 HashTable 源码学习和面试总结
  2. Oracle RAC客户端tnsnames.ora相关配置及测试
  3. Windows 10一周年更新正式版官方ISO镜像(1607)
  4. 《C和指针(Pointer on c)》 学习笔记(转自:http://dsqiu.iteye.com/blog/1687944)
  5. 简单理解javascript的原型prototype
  6. 【TYVJ】1307 联络员(最小生成树)
  7. linux 常用命令及技巧
  8. Oracle复制表结构和表数据
  9. C语言清空输入缓冲区的N种方法对比
  10. CSS3 transition 动画过度属性
  11. MS SQL 监控磁盘空间告警
  12. location和location.href跳转url的区别
  13. angular-单页面应用程序
  14. Ping IP速度范围
  15. 机器学习随笔01 - k近邻算法
  16. 【shell编程】1、shell编程简介
  17. JS数组迭代方法
  18. MFC中添加了一个dialog,并创建了相应的类,初始化函数没有怎么办?
  19. redis写定时任务获取root权限
  20. python 使用set对列表去重后,保持原来列表的顺序排列

热门文章

  1. Win10 x64 + CUDA 10.0 + cuDNN v7.5 + TensorFlow GPU 1.13 安装指南
  2. Elastic Job入门(1) - 简介
  3. C#中富文本编辑器Simditor带图片上传的全部过程(MVC架构的项目)
  4. Builder建造者模式
  5. Kaggle 泰坦尼克
  6. (并发编程)进程 (multiprocessing--Process实现进程并发)
  7. 修改centos和ubuntu ssh远程连接端口提升系统安全性
  8. 使用ajax上传表单(带文件)
  9. 转载:configure执行流程(1.5.2)《深入理解Nginx》(陶辉)
  10. js----jquery和js的区别