模型

生产者

 package cn.wh;

 import java.io.IOException;
import java.util.concurrent.TimeoutException; import cn.util.RabbitMqConnectionUtil; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Send { private static final String EXCHANGE_NAME="test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg="hello direct!"; String routingKey="error";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send "+msg); channel.close();
connection.close();
}
}

消费者

 package cn.wh;

 import java.io.IOException;
import java.util.concurrent.TimeoutException; import cn.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqConnectionUtil.getConnection();
final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.basicQos(1); //定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException { String msg=new String(body,"utf-8");
System.out.println("[1] Recv msg:"+msg); try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[1] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}; boolean autoAck=false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
} }

消费者2

 package cn.wh;

 import java.io.IOException;
import java.util.concurrent.TimeoutException; import cn.util.RabbitMqConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqConnectionUtil.getConnection();
final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); channel.basicQos(1); //定义一个消费者
Consumer consumer=new DefaultConsumer(channel){
//消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException { String msg=new String(body,"utf-8");
System.out.println("[2] Recv msg:"+msg); try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
System.out.println("[2] done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}; boolean autoAck=false;//自动应答 false
channel.basicConsume(QUEUE_NAME,autoAck , consumer);
} }

Topic模型

 public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "id=1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}

消费者

 public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

消费者2

 public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

Exchanges(转发器|交换机)

转发器一方面它接受生产者的消息,另一方面向队列推送消息

Nameless exchange(匿名转发)

之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码

Fanout Exchange

不处理路由键。你只需要将队列绑定到交换机上。发送消息到交换机都会被转发到与该交换机绑定的所有队列

Direct Exchange

处理路由键。
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发 dog.puppy,也不会转发dog.guard,只会转发 dog。
                                                    

Topic Exchange

将路由键和某模式进行匹配。
此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

最新文章

  1. [LeetCode] Paint House 粉刷房子
  2. H5 表单
  3. IntelliJ IDEA 转移C盘.IntelliJIdea(索引目录)
  4. 用Update Select批量更新某一字段的值[可以跨库]
  5. C语言数据在内存分配
  6. ListIterator的使用
  7. java.io.FileNotFoundException: D:\Program%20Files\Apache%20Software%20Foundation\Tomcat%205.0\webapp
  8. bugku web 头等舱
  9. JQ中的 offsetTop 和 offset().top 的区别
  10. java基础---->java注解的使用(一)
  11. 第七章 ReentrantLock总结
  12. NO.5:自学python之路------标准库,正则表达式
  13. Xcode8出现问题总结
  14. Http扫盲
  15. (推荐JsonConvert )序列化和反序列化Json
  16. 《Language Implementation Patterns》之 语言翻译器
  17. input属性 disabled与readonly的区别
  18. Magic Cast Method in Java Magic Trick In Java
  19. HyperLedger Fabric 1.4 区块链技术形成(1.2)
  20. 解决Postgresql服务启动又关闭的问题

热门文章

  1. 【Django错误】OSError: raw write() returned invalid length 14 (should have been between 0 and 7)
  2. 对比python的进程和线程:多线程是假的
  3. 005-hive概述,计算原理及模型
  4. java-mybaits-00503-延迟加载
  5. python学习笔记(四)random 、json模块
  6. 栈的最大值问题 max问题 min问题 队列的max问题
  7. python 学习笔记(循环,print的几种写法,操作符)
  8. jmeter接口测试实战
  9. linux用户 群组权限
  10. DOM—节点