直连交换机类型为:direct。加入了路由键routingKey的概念。

就是说 生产者投递消息给指定交换机的指定路由键。

只有绑定了此交换机指定路由键的消息队列才可以收到消息。

生产者:

package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; /**
* 路由模式的生产者(带路由键)
* @author kf
*
*/
public class DirectProducer {
//交换机
private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
//路由键
private static String ROUTINGKEY = "SMS"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机类型为路由模式
channel.exchangeDeclare(DIRECTEXCHANGE, "direct"); String msg = "direct_mes"; //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上
channel.basicPublish(DIRECTEXCHANGE, ROUTINGKEY, null, msg.getBytes()); channel.close();
connection.close(); } }

消费者:

package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
/**
* 路由模式的邮件消费者
* @author kf
*
*/
public class DirectEMAILConsumer {
//队列名
private static String EMAILQUEUENAME = "EMAILQUEUENAME";
//路由键名
private static String SMSROUTINGKEY = "SMS";
private static String EMAILROUTINGKEY = "EMAIL";
//交换机
private static String DIRECTEXCHANGE = "DIRECTEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者启动=====");
Connection connection = RabbitConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
channel.queueDeclare(EMAILQUEUENAME, false, false, false, null);
//绑定队列到交换机的指定路由键
channel.queueBind(EMAILQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("进入邮件接收消息的监听");
String s = new String(body, "utf-8");
System.out.println("邮件消费者接收到消息:"+s);
};
}; //参数分别是:队列名,是否自动应答,监听的回调类
channel.basicConsume(EMAILQUEUENAME, true, consumer); } }
package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer; /**
* 路由模式的短信消费者
* @author kf
*
*/
public class DirectSMSConsumer {
//队列名
private static String SMSQUEUENAME = "SMSQUEUENAME";
//路由键名
private static String SMSROUTINGKEY = "SMS";
private static String EMAILROUTINGKEY = "EMAIL";
//交换机
private static String DIRECTEXCHANGE = "DIRECTEXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者启动=====");
Connection connection = RabbitConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
channel.queueDeclare(SMSQUEUENAME, false, false, false, null);
//绑定队列到交换机的指定路由键
channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, SMSROUTINGKEY);
//绑定多个交换机的路由键
channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY); DefaultConsumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("进入短信接收消息的监听");
String s = new String(body, "utf-8");
System.out.println("短信消费者接收到消息:"+s);
};
}; //参数分别是:队列名,是否自动应答,监听的回调类
channel.basicConsume(SMSQUEUENAME, true, consumer); } }

最新文章

  1. python中threading模块详解(一)
  2. 关于LED 流水灯的软件调试方法(非开发板调试)
  3. mysql查询中通配符的使用
  4. 怎样修改Response中的内容
  5. 【英语】Bingo口语笔记(30) - 表示“拒绝”
  6. 【M18】分期摊还预期的计算成本
  7. CAF(C++ actor framework)使用随笔(使用类去构建actor和使用的一些思路)
  8. [swift] NSClassFromString 无法获得该类
  9. perl 调用按钮输出到文本框
  10. TagBuilder 性能如此低下?
  11. 获取Skype用户IP地址
  12. PHP正则式PCRE
  13. JVM性能监控与故障处理命令汇总(jps、jstat、jinfo、jmap、jhat、jstack)
  14. 三,用户交互方式与python基本数据类型
  15. Android 通过onTouchEvent判断是否为双击事件
  16. ubuntu安装jdk,maven,tomcat
  17. ABP框架系列之十五:(Caching-缓存)
  18. teamviewer & commercial-use
  19. BeanWrapper
  20. webstorm的个性化设置settings

热门文章

  1. c++实验4 栈及栈的应用+回文+中、后缀表达式
  2. 29.Combination Sum(和为sum的组合)
  3. linux LVM 磁盘管理
  4. log4j日志+面向切面监控异常
  5. 2019.2.14 t3 车辆销售
  6. Ubuntu16.04装机后处理
  7. Kettle 解决数据锁的问题(事务(进程 ID 51)与另一个进程被死锁在 锁 资源上)
  8. 02. css3有哪些新特性?
  9. css第一篇:元素选择器
  10. 常用sql记录