假设有如下问题:

  1.如果消费者连接中断,这期间我们应该怎么办?

  2.如何做到负载均衡?

  3.如何有效的将数据发送到相关的接收者?就是怎么样过滤

  4.如何保证消费者收到完整正确的数据

  5.如何让优先级高的接收者先收到数据

一、"Hello RabbitMQ"

如图:P代表生产者,C代表消费者,红色部分为消息队列

二、项目开始

  1.首先创建一个maven项目,然后导入rabbitMQjar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>testRabbit</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
</project>

  2.创建消费者Producer

public class Producer {
public final static String QUEUE_NAME = "rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息
factory.setHost("localhost"); //创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发送消息到队列中
String message = "hello rabbitMQ";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("Producer Send : " + message); //关闭通道和连接
channel.close();
connection.close();
}
}

queueDeclare第一个参数表示队列名称,第二个参数为是否持久化(true表示是,队列将在服务器重启时生存),第三个参数为是否独占队列(创建者可以使用的私有队列,断开后自动删除),第四个参数为当所有消费者客户端连接断开时是否自动删除队列,第五个参数为队列的其他参数。

basicPublish第一个参数为交换机名称,第二个参数为队列映射的路由key,第三个参数为消息的其他属性,第四个参数为发送消息的主体。

  3.创建消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Customer {
public final static String QUEUE_NAME = "rabbitMQ.test"; public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("客户端等待接受消息"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要哪个频道的信息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer comsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String (body,"UTF-8");
System.out.println("客户端接收:"+message);
}
}; //自动回答队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME,true,comsumer); }
}

该方法用于获取生产者发送的消息,其中envelope主要存放生产者相关的信息(比如交换机、路由key等)body是消息实体。

运行结果如下:

三、实现任务分发

一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这样就需要采用分布机制了。

新建一个生产者NewTask

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class NewTask {
public final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String [] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //分发消息
for(int i = 0;i<10;i++){
String message = "hello RabbitMQ "+ i;
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("NewTask send :" + message);
} channel.close();
connection.close(); }
}

然后创建2个工作者work1和work2代码一样

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Work1 {
private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { final ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
System.out.println("Work1 等待接受消息"); //每次从队列获取的数量
channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Worker1 接受到消息:"+message);
try{
//throw new Exception();
doWork(message);
}catch (Exception ex){
channel.abort();
}finally {
System.out.println("Work1 完成了");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}; boolean autoAck=false;
//消息消费完成确认
channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer); }
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}

channel.basicQos(1);保证一次只分发一个,autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当时不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生产者,最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。

两个都不抛出异常时:

其中一个设置为异常时,会把消息都发送给另一个正常的。等待异常的程序重启后,才会继续给它发送。

最新文章

  1. springboot 整合Redis
  2. Node.js 手册查询-5-Ejs 方法
  3. Linux操作系统shell与函数详解
  4. Button 对 TreeView1 所有节点的全选
  5. C#学习笔记-----基于AppDomain的&quot;插件式&quot;开发
  6. Struts2拦截器原理以及实例
  7. NYOJ-235 zb的生日 AC 分类: NYOJ 2013-12-30 23:10 183人阅读 评论(0) 收藏
  8. Android问题-DelphiXE5开发Andriod连接Webservice乱码问题
  9. Debug Assertion Failed!
  10. iOS开发——扫描二维码——工具类
  11. DTM initialization: failure during startup recovery, retry failed, check segment status (cdbtm.c:1603)
  12. iOS 9界面适配利器:详解Xcode 7的新特性UIStackView
  13. Tomcat 在 Linux 上的安装和配置
  14. TF-IDF原理与实现
  15. maskrcnn_benchmark代码分析(1)
  16. Angular2-三种样式封装策略的区别
  17. Java的继承和多态
  18. eclipse集成mybatis的generater插件
  19. maven学习6 Eclipse下Tomcat常用设置
  20. HBuilder的常用快捷键

热门文章

  1. codechef营养题 第三弹
  2. poj 1562 简单深搜
  3. codeforces 371D
  4. hdu 1874 dijkstra 队列实现 比数组高效特别在稀疏图
  5. sql自增长和占位符?&quot;相矛盾&quot;的问题
  6. 夜话JAVA设计模式之单例模式(单件模式Singleton)
  7. 洛谷—— P1187 3D模型
  8. HDU——1133 Buy the Ticket
  9. How to Use DHCP Relay over LAN? - DrayTek Corp
  10. 利用scons构建project