一、Routing(路由) (using the Java client)

在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制。

二、Bindings(绑定)

在前面的学习中已经创建了绑定(bindings),代码如下:
  channel.queueBind(queueName, EXCHANGE_NAME, "");    

一个绑定就是一个关于exchange和queue的关系,它可以简单的被理解为:队列是从这个exchange中获取消息的。

绑定可以采取一个额外的routingKey的参数,为了避免与basicPublish参数冲突,称之为一个绑定Key,这是如何创建一个带routingKey的绑定的关键。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

一个绑定Key依赖于exchange的类型,像之前使用fanout类型的exchange,完全忽略了该绑定key的值。

三、Direct exchange(直接交换机)


前面实现的日志记录系统中广播所有的消息给所有的消费者,现在对其进行扩展,允许根据信息的严重程度来对消息进行过滤,比如,希望一个程序写入到磁盘的日志消息只接收错误的消息,而不是浪费磁盘保存所有的日志消息。

为了实现这个目标,使用一个fanout类型的exchange,显然是不能够满足这样的需求的,因为它只能广播所有的消息。

为此将使用一个direct exchange来代替fanout exchange,direct exchange使用简单的路由算法,将消息通过绑定的Key匹配将要到达的队列。

从上面的结构图中可以看出direct exchange X绑定着两个queue(Q1,Q2),第一个queue绑定的routingKey为orange,第二个有两个routingKey被绑定,一个routingKey为black,另外一个routingKey为green.

说明:发送带有routingKey为orange的消息到X(exchange)中,X将该消息路由到Q1中,发送带有routingKey为black和green的消息都将被路由到Q2中,其他所有消息将会被丢弃。

四、Multiple bindings(多绑定)

多个队列绑定相同的routingKey是允许的,在上述实例中,可以把X和Q1用routingKey:black绑定起来,这种情况下,direct exchange将像fanout类型的exchange一样会将消息广播都到所有匹配的queues中,即一个routingKey为black的消息将会被发送到Q1和Q2中。

五、Emitting logs(发送的日志)

使用direct代替fanout类型的exchange,发送消息到一个direct exchange中,将根据消息的重要程度作为routingKey,这样接收程序能够选择它想要接收的日志信息,首先必须先创建一个exchange.

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
其次,发送一条信息:
   channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化程序,将severity设定为info、warning、error三种类型中的一种。

六、Subscribing(订阅消息)

接收者根据自己感兴趣的severity来创建一个新到的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

七、Putting it all together(代码实现)

EmitLogDirect.java代码清单如下:


  1. public class EmitLogDirect {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] argv)
  4. throws java.io.IOException {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10. String severity = getSeverity(argv);
  11. String message = getMessage(argv);
  12. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
  13. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  14. channel.close();
  15. connection.close();
  16. }
  17. //..
  18. }

ReceiveLogsDirect代码清单如下:

  1. public class ReceiveLogsDirect {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] argv)
  4. throws java.io.IOException,
  5. java.lang.InterruptedException {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("localhost");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  11. String queueName = channel.queueDeclare().getQueue();
  12. if (argv.length < 1){
  13. System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
  14. System.exit(1);
  15. }
  16. for(String severity : argv){
  17. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  18. }
  19. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  20. QueueingConsumer consumer = new QueueingConsumer(channel);
  21. channel.basicConsume(queueName, true, consumer);
  22. while (true) {
  23. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  24. String message = new String(delivery.getBody());
  25. String routingKey = delivery.getEnvelope().getRoutingKey();
  26. System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
  27. }
  28. }
  29. }
	编译和往常一样(参见以往教程用于编译和类路径的建议)。现在,为了方便起见,我们将使用一个环境变量$CP(%CP%在Windows上)的运行时类路径的例子。
如果你只想保存 “警告”和“错误”(而不是“信息”)日志消息到一个文件,打开一个控制台和type: [*] Waiting for logs. To exit press CTRL+C [x] Sent 'error':'Run. Run. Or it will explode.'
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
$ java -cp $CP ReceiveLogsDirect info warning error
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."

最新文章

  1. mysql select日期格式
  2. eayui datagrid 分页 排序 详解
  3. follow me learning sqlserver transql
  4. LeetCode Permutations II (全排列)
  5. CListView虚拟列表
  6. Win32下 Qt与Lua交互使用(四):在Lua脚本中自由执行Qt类中的函数
  7. hibernate ——helloWorld程序(annotation配置)
  8. 关于css禁止文本复制属性
  9. JAVA课程设计-学生信息管理系统(个人博客)
  10. 状压dp初探
  11. Disconf 分布式配置管理平台(安装配置)
  12. VirtualBox查看虚拟机IP地址
  13. python_6
  14. vue脚手架搭建的具体步骤
  15. select 选中是否包含
  16. 蓝桥杯 大臣的旅费_树的最长度_两次DFS
  17. Tensorflow问题汇总
  18. 使用R内置函数操作数据框
  19. 充电 IC 对 0V 电池充电问题
  20. windows平台tensorboard的配置及使用

热门文章

  1. 转-----------------------js window.open() 操作
  2. muduo总结
  3. SSAO + FXAA
  4. DirectX11 学习笔记4 - 一个完整的封装框架
  5. Oracle多表连接效率,性能优化
  6. Java类的根Object
  7. c++面向对象程序设计 课后题 答案 谭浩强 第四章
  8. 关于content-type
  9. 一名3年工作经验的java程序员应该具备的技能
  10. QT-项目文件说明