1. “ * ”的使用:

生产者:

package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.toov5.utils.MQConnectionUtils; //生产者 交换机类型 producerFanout类型
public class TopicProducer {
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
//生产者绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //交换机名称 交换机类型
String routingKey="log.email"; //消息只会给邮件类型的
//创建对应的消息
String msString = "my_Routing_destination_msg"+routingKey;
//通过频道 发送消息
System.out.println("生产者投递消息:"+msString);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msString.getBytes());
//关闭通道 和 连接
channel.close();
connection.close();
} }

消费者:

package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; //邮件消费者
public class ConsumerSMSTopic {
private static final String SMS_QUEUE ="sms_queue_topic";
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel(); //消费者声明队列
channel.queueDeclare(SMS_QUEUE, false, false, false, null);
//消费者队列绑定 路由
channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "log.*");
//消费者监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("短信消费者获取生产者消息"+msg);
}
};
channel.basicConsume(SMS_QUEUE,true, defaultConsumer); //绑定队列 事件监听 }
}
package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; //邮件消费者
public class ConsumerEmailTopic {
private static final String EMAIL_QUEUE ="email_queue_topic";
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel(); //消费者声明队列
channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
//消费者队列绑定 路由
channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "log.email");
//消费者监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("邮件消费者获取生产者消息"+msg);
}
};
channel.basicConsume(EMAIL_QUEUE,true, defaultConsumer); //绑定队列 事件监听 }
}

可以看到两个消费者都可以接收到

2.换成 “#”

生产者:

package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.toov5.utils.MQConnectionUtils; //生产者 交换机类型 producerFanout类型
public class TopicProducer {
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel();
//生产者绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //交换机名称 交换机类型
String routingKey="log.email.sms"; //消息只会给邮件类型的
//创建对应的消息
String msString = "my_Routing_destination_msg"+routingKey;
//通过频道 发送消息
System.out.println("生产者投递消息:"+msString);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msString.getBytes());
//关闭通道 和 连接
channel.close();
connection.close();
} }

消费者:

package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; //邮件消费者
public class ConsumerSMSTopic {
private static final String SMS_QUEUE ="sms_queue_topic";
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动");
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel(); //消费者声明队列
channel.queueDeclare(SMS_QUEUE, false, false, false, null);
//消费者队列绑定 路由
channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "log.#");
//消费者监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("短信消费者获取生产者消息"+msg);
}
};
channel.basicConsume(SMS_QUEUE,true, defaultConsumer); //绑定队列 事件监听 }
}
package com.toov5.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils; //邮件消费者
public class ConsumerEmailTopic {
private static final String EMAIL_QUEUE ="email_queue_topic";
//交换机名称
private static final String EXCHANGE_NAME = "my_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动");
//建立MQ连接
Connection connection = MQConnectionUtils.newConnection();
//创建通道
Channel channel = connection.createChannel(); //消费者声明队列
channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);
//消费者队列绑定 路由
channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "log.email");
//消费者监听消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//重写监听方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("邮件消费者获取生产者消息"+msg);
}
};
channel.basicConsume(EMAIL_QUEUE,true, defaultConsumer); //绑定队列 事件监听 }
}

总结 “*” 匹配一个词   “#”匹配多个词

最新文章

  1. ASP.NET通过递归添加树(Treeview)
  2. 页面Button/Link 传参数
  3. Javascript模式(第一章简介)------读书笔记
  4. ubuntu 下安装32位库 ia32-libs方法
  5. Evaluation Clustering methods
  6. Android 数据传递(二)Activity与fragment之间的通信
  7. UML 类关系及画法
  8. python 2.X
  9. [Netty] - Netty入门(最简单的Netty客户端/服务器程序)
  10. C/C++ 控制台字体的变颜变色
  11. mongo中用嵌套结构优势是什么
  12. 表达式语言引擎:Apache Commons JEXL 2.1 发布
  13. PHP如何自定义PHP内置函数
  14. Proxy 代理模式 动态代理 cglib MD
  15. 基于HTML5 SVG和CSS3炫酷蹦床式图片切换特效
  16. MSBuild问题积累
  17. Android设备相关配置
  18. Flume直接对接SaprkStreaming的两种方式
  19. [一] sqlinject bypass
  20. Linux--LAMP平台搭建

热门文章

  1. 自动清理DataGuard备机日志
  2. eclipse maven 依赖jar下载失败解决办法
  3. HDU3351 Seinfeld 【贪心】
  4. 调用http接口耗时过长。
  5. Mysql 复制表数据(表结构相同)
  6. solr6.5的分词
  7. lua例子getglobal()
  8. python 补充:join() , 基本数据类型的增删改查以及深浅拷贝
  9. 6.2.3-AbstractBeanFactory
  10. iOS OC和JS的交互 javaScriptCore方法封装