windows上面安装rabbitmq-server-3.7.4.exe

首先需要安装otp_win64_20.3.exe

步骤1:安装Erlang

RabbitMQ 它依赖于Erlang,需要先安装Erlang。首先确定你的window电脑是32位还是64位,以下的安装以window 64位电脑举例。

Erlang官网:http://www.erlang.org/

rabbit安装注意事项:需要安装erlang20.3以上的版本

erlang21_win64下载地址:https://download.csdn.net/download/qq_38862234/10965869

环境配置:修改系统属性path下添加erlang的路径

安装的是不能存在中文目录,对于erl配置系统环境变量的时候,计算名称也不能存在中文

这样就表示erl环境变量安装成功了

Rabbitmq安装的时候不能存在中文,Rabbitmq安装成功之后,在windows的程序窗口会看到下面的菜单选项

1、windows安装成功之后,我们要使用Rabbitmq提供的管理控制台,我们需要开启管理控制台的插件命令,我们进入到D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin目目录下,使用命令rabbitmq-plugins.bat安装下,插件安装完成之后,需要对rabbitmq服务进行重启,浏览器输入http://localhost:15672/,登录的用户名和密码都是guest,这样登录成功之后,说明应该就安装安装

d:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.\sbin>rabbitmq-plugins.bat  enable rabbitmq_management

接下来我们要创建用户admin 密码和12345,已经对应的vhost

rabbitmq支持6种消息模式

每一种模式我们都需要进行详细的讲解和分析

简单模式Hello World

使用ConnectionFactory创建连接,实质上,此连接就是一个Socket连接,设置host为localhost来连接本地的RabbitMQ Server,设置Port为5672(默认队列连接端口),设置用户名和密码,如果未设置用户名和密码将默认使用guest/guest口令(当然,此口令仅能在localhost本地使用)。

使用channel.queueDeclare()来定义队列,在RabbitMQ中,队列仅能够创建一次,如果发现已经存在此队列,将会忽略此方法。

功能:一个生产者P发送消息到队列Q,一个消费者C接收

生产者实现思路:

创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。

channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);

queue: 队列名称

durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库

exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景

autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

arguments:
队列中的消息什么时候会自动被删除?

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator)

http://www.voidcn.com/article/p-dcbgmfmb-bpw.html

参加博客:https://www.cnblogs.com/leocook/p/mq_rabbitmq_3.html

第一种

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
  • exchange 交换器名称

  • routingKey 路由键

  • props 有14个成员

我们来看下代码

项目的pom.xml的依赖如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion>
<groupId>com</groupId>
<artifactId>com.rabbitmq.test</artifactId>
<version>0.0.-SNAPSHOT</version> <dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

生产者的代码如下

package com.rabbitmq.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘"); } }

这里最关键的就是两个地方,第一个就是channel.queueDeclare(QUEUE_NAME, false, false, false, null);

queue: 队列名称

durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库

exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景

autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

arguments:
队列中的消息什么时候会自动被删除?

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

第二个就是:channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

因为我们使用一个默认exchange,我们使用(””)来标识的。QUEUE_NAME队列的名称

package com.rabbitmq.test1;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费端接收消息:" + message);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

两个消费者,消费者1的代码如下

package com.rabbitmq.test2;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端1接收消息:" + message);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
package com.rabbitmq.test1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
//
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘"); } }

我们来看下web管理控制台的展示

队列中可以看到我们创建的hello,点击hello可以看到队列中的具体指标

在创建队列的时候,可以指定队列中的消息是否持久化、队列中消息的具体参数TTL、是否过期、最大长度等

在Exchange交换页面,可以看到系统在对于的虚拟机下面会给我们创建6中默认的交换机类型

我们也可以自己创建一个Exchange

在概览页面可以查看当前消息实例的具体的信息有多少connection、channel、交换机、队列、消费这等信息

connection下面可以查看当前的connection的具体信息,以及当前connection下面存在多少个channel

在channel下面可以查看当前channel的具体流量信息

RabbitMQ学习总结 第三篇:工作队列Work Queue

生产者可以产生多条消息,但是每一条消息默认只能被一个消费者消费,c1和c2默认采用轮询的策略消费

我们模拟生产者产生20条消息

package com.rabbitmq.test2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//
for(int i = ;i < ;i++){
String message = "hello world"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘");
Thread.sleep(i); } channel.close();
connection.close(); } }

消费者2的代码如下

package com.rabbitmq.test2;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer2 { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端2接收消息:" + message);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

我们来看下日志的打印

生产者产生的20条消息,消费者1消费了10条,消费者2默认消费了10条,使用的是轮询的策略,默认情况下,RabbitMQ会把每个消息以此轮询发到各个消费者那,把消息平均的发到各个消费者那。这种分配管理的方式叫轮询,还可以测试多个worker的情形。

但是上面的这种work模式存在一定的问题,

你可能会注意到有的时候RabbitMQ不能像你预想中的那样分发消息。例如有两个worker,第奇数个消息对应的任务都很耗时,第偶数个消息对应的任务都很快就能执行完。这样的话其中有个worker就会一直都很繁忙,另外一个worker几乎不做任务。RabbitMQ不会去对这种现象做任何处理,依然均匀的去推送消息。

这是因为RabbitMQ在消息被生产者推送过来后就被推送到消费者端,它不会去查看未接收到消费者确认的消息数量。它只会把N个消息均与的分发到N个消费者那。

为了能解决这个问题,我们可以使用basicQos放来来设置消费者最多会同时接收多少个消息。这里设置为1,表示RabbitMQ同一时间发给消费者的消息不超过一条。这样就能保证消费者在处理完某个任务,并发送确认信息后,RabbitMQ才会向它推送新的消息,在此之间若是有新的消息话,将会被推送到其它消费者,若所有的消费者都在处理任务,那么就会等待。,这里我们来看下消费者代码的修改

我们来看下代码,注意修改消费者的代码

package com.rabbitmq.test2;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端1接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

消费者的代码需要修改下面的三个地方

1、将消费的自动ack修改为需要人为的手动ack

channel.basicConsume(QUEUE_NAME, false, consumer);

2、增加手动ack消息的代码,ack不需要批量回复

//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);

3、增加代码 channel.basicQos(1);

修改之后我们来看下日志的打印

RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)。

接下来我们实现发布一条消息,多个消费者都能够接受到

生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义。

有一些可用的exchange类型:direct, topic, headers和fanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanout的exchange:

channel.exchangeDeclare("logs", "fanout");

fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中

我们来看下代码

发送日志消息的生产者程序和之前的程序没有太多的差别。最大的区别就是我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange。在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略。下边是生产者的代码EmitLog.java:

package com.rabbitmq.test3;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
//channel发送消息到交换机,采用fanout类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//
for(int i = ;i < ;i++){
String message = "hello world"+i;
channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘");
Thread.sleep(i); } channel.close();
connection.close(); } }

改动点有下面的三处:

1、channel与交换机进行绑定,数据channel发送到Exchange中,交换机采用fanout类型,把消息发送到全部和改Exchange绑定的队列中

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

2、发送消息的时候,是发送给交换机,不再是发送给消息队列

channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());

我们再来看消费者的代码

1、消费者1的队列是fanout1,需要将fanout1队列和生产者的Exchange进行绑定

2、消费者2的队列是fanout2,需要将fanout2队列和生产者的Exchange进行绑定

2、生产者发布了5条消息,消费者1和消费者2都能够收到这5条消息,我们来看下代码

package com.rabbitmq.test3;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout1";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, ""); Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端1接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
package com.rabbitmq.test3;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer2 { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout2";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//设置消费者预取得消费的数量
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "");
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端2接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

我们来看下日志的打印信息如下

RabbitMQ学习总结 第五篇:路由Routing

上面的publish模式中,Exchange会把全部的消息发送给与之绑定的队列中,下面我们可以采用路由的模式,Exchange只把消息发送到指定的队列中,使用direct模式

这里生产者需要做下面的修改,就是需要发送消息的时候,需要指定交换机依据那种bingkey进行发送,这里制定bingkey为error

package com.rabbitmq.test4;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
//channel发送消息到交换机,采用fanout类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//
for(int i = ;i < ;i++){
String message = "hello world"+i;
channel.basicPublish(EXCHANGE_NAME,"error", null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘");
Thread.sleep(i); } channel.close();
connection.close(); } }

生产者修改的代码如下:

1、指定交换机类型为 channel.exchangeDeclare(EXCHANGE_NAME, "direct");

2、指定交换机和队列绑定的key为error

channel.basicPublish(EXCHANGE_NAME,"error", null, message.getBytes());

消费者的代码需要修改为,当消费者队列与交换机绑定的时候,需要指定对于的bingkey

消费者1的代码为,bingkey为info,  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info");

package com.rabbitmq.test4;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout1";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info"); Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端1接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

消费者2的代码为,bingkey为error,  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "error");

package com.rabbitmq.test3;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer2 { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout2";
private static final String EXCHANGE_NAME = "public-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//设置消费者预取得消费的数量
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info");
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端2接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

因为消费者2指定的bingkey和生产者中的bingkey一样,所以消费者2能够收到生产者发送的数据,消费者1收不到,消费者1没有任何日志的打印

我们来看下日志

RabbitMQ学习总结 第六篇:Topic类型的exchange

上面的交换机中的direct类型中,bingkey都必须是很明确的error或者info,如果bingkey要想使用通配符的模式,能否实现了,可以使用交换机的Topic模式

这些binding可以总结为:

  • Q1对所有橘色的(orange)的动物感兴趣;
  • Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。

一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。

如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。

我们来看下代码

生产者的代码

package com.rabbitmq.test5;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class MessageProvider { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "worker";
private static final String EXCHANGE_NAME = "topic-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01"); //获得连接:Rabbitmq中对于的connection
Connection connection = factory.newConnection();
//从connection中获得对应的channel
Channel channel = connection.createChannel();
//channel发送消息到交换机,采用fanout类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//
for(int i = ;i < ;i++){
String message = "hello world"+i;
channel.basicPublish(EXCHANGE_NAME,"lazy.1", null, message.getBytes());
System.out.println(" 生产消息:‘" + message + "‘");
Thread.sleep(i); } channel.close();
connection.close(); } }

生产者使用topic模式,bingkey为"lazy.1",那么消费者的bingkey为lazy.1和lazy.*都可以收到生产者发送的消息,*表示通配符全部所有的意思

我们来看消费者的全部代码为

package com.rabbitmq.test5;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout1";
private static final String EXCHANGE_NAME = "topic-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "lazy.1"); Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端1接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//true 异步接收消息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
package com.rabbitmq.test5;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer2 { private static final String HOST = "127.0.0.1"; private static final int PORT = ; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = ""; private static final String QUEUE_NAME = "fanout2";
private static final String EXCHANGE_NAME = "topic-exchange"; public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost("vhost01");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//设置消费者预取得消费的数量
channel.basicQos();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//把队列绑定到交换机中
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "lazy.*");
Consumer consumer = new DefaultConsumer(channel) { @Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
//模拟业务处理需要200毫秒
try {
Thread.sleep();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("消费端2接收消息:" + message);
//envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
//false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
channel.basicAck(envelope.getDeliveryTag(), false);
} };
//这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

最新文章

  1. Mysql Master-slave 主从配置
  2. Infobright存储引擎的特点
  3. Tautology 分类: POJ 2015-06-28 18:40 10人阅读 评论(0) 收藏
  4. 5.css字体
  5. Disable right click on the website
  6. URL传参中文乱码encodeURI、UrlDecode
  7. Nginx+Php-fpm+MySQL+Redis源码编译安装指南
  8. linux入侵控制与痕迹清理
  9. 如何添加在eclipse 中添加 window Builder
  10. 必须掌握的ES6新特性
  11. Linux时间子系统之(十二):periodic tick
  12. pycharm创建scrapy项目教程及遇到的坑
  13. kubernetes 基础命令及操作
  14. git在本地回退
  15. 浅谈css3长度单位rem,以及移动端布局技巧
  16. git修改提交的用户名
  17. 客户端代码压缩成zip和服务器开启gzip
  18. 大数据时代——为什么用HADOOP?
  19. 高性能mysql 第1,2,3章。
  20. dede的cfg_keywords和cfg_description无法显示

热门文章

  1. MySQL如何有效的存储IP地址
  2. R语言入门二
  3. 【Java8新特性】不了解Optional类,简历上别说你懂Java8!!
  4. 利用Nginx设置跨域的方式
  5. 15期day01编程与计算机硬件
  6. vue 中引入使用jquery
  7. Java实现 蓝桥杯VIP 算法训练 sign函数
  8. java实现Prim算法
  9. 【Jquery】判断宽度跳转
  10. CMD指令和GIT指令