订阅模式

公众号-->订阅之后才会收到相应的文章。

解读:
1.一个生产者,多个消费者
2.每个消费者都有自己的队列
3.生产者没有将消息直接发送到队列里,而是发送给了交换机(转发器)exchange
4.每个队列都要绑定到交换机(转发器)上
5.生产者发送的消息记过交换机然后到达队列,然后就能实现被多个消费者消费

图例:

     |-------------|-----Q-----C3

P------------X-------------|-----Q-----C3

     |-------------|-----Q-----C3

注册--->发邮件--->发短信

MQ工厂类Connection

 package com.mmr.rabbitmq.util;

 import java.io.IOException;

 import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtils {
/**
* @desc 获取Mq 的链接
* @author zp
* @throws IOException
* @date 2018-7-19
*/
public static Connection getConnection() throws IOException {
// 1.定义一个链接工厂
ConnectionFactory factroy = new ConnectionFactory(); // 2.设置服务地址
factroy.setHost("127.0.0.1"); // 3.设置端口号
factroy.setPort(5672); // 4.vhost 设置数据库
factroy.setVirtualHost("vhtest"); // 5.设置用户名
factroy.setUsername("jerry"); // 6. 设置密码
factroy.setPassword("123456"); // 7.返回链接
return factroy.newConnection();
}
}

消息生产者类Send,这个时候,运行代码再到控制台去查看,并没有发现我们的消息,因为在MQ中只有队列可以存储消息,而交换机不可以存储消息,下面这段代码并没有将交换机和队列进行绑定,所以数据就丢失了。

 package com.mmr.rabbitmq.ps;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Send {
private static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 获取通道
Channel channel = connection.createChannel(); // 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// fanout 分发 // 发送消息
String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close(); }
}

代码运行后的控制台:

由于交换机不能存储数据,那么我们就需要考虑如何将交换机和队列进行绑定。因为只要将两者进行绑定之后,那么数据存储问题就迎刃而解。

消费者Recv1 Recv2

 package com.mmr.rabbitmq.ps;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv1 {
private static final String QUEUE_NAME_STRING="test_queue_fanout_email";
private static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 创建通道
final Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME_STRING, false, false, false, null); // 绑定队列,绑定到交换机/转发器
channel.queueBind(QUEUE_NAME_STRING, EXCHANGE_NAME, ""); // 保证每次只分发一个
channel.basicQos(1); // 定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String msg = new String(body,"utf-8");
System.out.println("[1]Recv msg:"+msg);
try {
// 每次休息一会儿
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
System.out.println("recv1 done");
//回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;// 不自动应答
channel.basicConsume(QUEUE_NAME_STRING, autoAck,consumer); }
}
 package com.mmr.rabbitmq.ps;

 import java.io.IOException;

 import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv2 {
private static final String QUEUE_NAME_STRING="test_queue_fanout_sms";
private static final String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws IOException {
// 创建连接
Connection connection = ConnectionUtils.getConnection(); // 创建通道
final Channel channel = connection.createChannel(); // 声明队列
channel.queueDeclare(QUEUE_NAME_STRING, false, false, false, null); // 绑定队列,绑定到交换机/转发器
channel.queueBind(QUEUE_NAME_STRING, EXCHANGE_NAME, ""); // 保证每次只分发一个
channel.basicQos(1); // 定义一个消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String msg = new String(body,"utf-8");
System.out.println("[2]Recv msg:"+msg);
try {
// 每次休息一会儿
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}finally{
System.out.println("recv2 done");
//回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;// 不自动应答
channel.basicConsume(QUEUE_NAME_STRING, autoAck,consumer); }
}

运行上述代码进行监听,再通过运行Send发送消息,我们可以在MQ-管理平台上看到:

进过这样的使用,我们的消息订阅就完成了。

最新文章

  1. sql 入门经典(第五版) Ryan Stephens 学习笔记  第四部分:建立复杂的数据库查询/
  2. 辅助的写与数据库交互的XML文件的类
  3. SSI框架中配置log4j
  4. c功能实战
  5. 新的疑问(未解决):VC项目的配置,不是都能在Project -- Properties里设置解决的
  6. scheme 宏macro写法
  7. ASN.1 Encode an Object Identifier (OID) with OpenSSL
  8. linux 内核的futex - requeue 以及 requeue-pi
  9. python3 selenium 鼠标悬停操作
  10. commons-pool与commons-pool2连接池(Hadoop连接池)
  11. Backbone简单示例
  12. string转QBytearray
  13. Where are your from!!!!!!!!!!!! !Baby! {封装}
  14. 全栈开发工程师微信小程序-中
  15. C# 利用Log4Net进行日志记录
  16. ASP.NET -- WebForm -- Session的使用
  17. 架构师成长之路4.4-多维监控体系_zabbix
  18. js json转对象
  19. Jmeter 如何让变量中包含变量
  20. 扩展gcd算法

热门文章

  1. Altium Designer 17 ------ 多层板设计
  2. lucene的普通搜索(二)
  3. M1-Flask-Day2
  4. Spring Cloud构建微服务架构(六)高可用服务注册中心
  5. MySQL中的主键,外键有什么作用详解
  6. 本地服务器上挂载A目录到B目录
  7. java 可设置最大内存
  8. js中html拼接
  9. Spark源码剖析 - 任务提交与执行
  10. Linux 下装逼技巧