9.RabbitMQ Topic类型交换机
2024-09-05 08:28:59
RabbitMQ消息服务中Topic类型交换机根据通配符路由消息,*代表一个单词,#代表代表0或多个单词。
生产者
消费者
代码
Producer.java
package com.test.topic2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Producer {
public static void main(String[] args)
{
try
{
//1.
ConnectionFactory cf = new ConnectionFactory();
cf.setUsername("admin");
cf.setPassword("admin");
cf.setHost("192.168.169.142");
cf.setPort(5672);
//2.
Connection con = cf.newConnection();
//3.
Channel channel = con.createChannel();
//4.
String queue1 = "topic_queue1";
String queue2 = "topic_queue2";
String queue3 = "topic_queue3";
//5.
channel.queueDeclare(queue1, false, false, false, null);
channel.queueDeclare(queue2, false, false, false, null);
channel.queueDeclare(queue3, false, false, false, null);
//6.
String exg = "topic_exg";
channel.exchangeDeclare(exg, "topic", false);
//7.
channel.queueBind(queue1, exg, "*.test");//Binding key
channel.queueBind(queue2, exg, "#.test");//Binding key
channel.queueBind(queue3, exg, "my.user.test");//Binding
key
key
//8.
String message = "Hello Topic Message";
channel.basicPublish(exg, "user.test",
MessageProperties.TEXT_PLAIN, message.getBytes());
MessageProperties.TEXT_PLAIN, message.getBytes());
//9.
channel.close();
con.close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
Customer.java
package com.test.topic2;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
public class Customer implements
com.rabbitmq.client.Consumer{
com.rabbitmq.client.Consumer{
public static void main(String[] args)
{
try
{
//1.
ConnectionFactory cf = new ConnectionFactory();
cf.setUsername("admin");
cf.setPassword("admin");
cf.setHost("192.168.169.142");
cf.setPort(5672);
//2.
Connection con = cf.newConnection();
//3.
Channel channel = con.createChannel();
//4.
String queue1 = "topic_queue1";
channel.queueDeclare(queue1, false, false, false, null);
com.test.topic2.Customer cust = new
com.test.topic2.Customer();
com.test.topic2.Customer();
channel.basicConsume(queue1, true, cust);
Thread.sleep(5000);
channel.close();
con.close();
}
catch(Exception e)
{
e.printStackTrace();
}
}
@Override
public void handleConsumeOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void handleCancelOk(String consumerTag) {
// TODO Auto-generated method stub
}
@Override
public void handleCancel(String consumerTag) throws
IOException {
IOException {
// TODO Auto-generated method stub
}
@Override
public void handleDelivery(java.lang.String consumerTag,
Envelope
envelope,
AMQP.BasicProperties properties,
byte[]
body) throws IOException {
// TODO Auto-generated method stub
System.out.println("receive=" + new String(body));
}
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
ShutdownSignalException sig) {
// TODO Auto-generated method stub
}
@Override
public void handleRecoverOk(String consumerTag) {
// TODO Auto-generated method stub
}
}
最新文章
- [开源 .NET 跨平台 数据采集 爬虫框架: DotnetSpider] [二] 基本使用
- tcpdump用法
- js限制文本框只可以输入数字
- HDU3657 Game(最小割)
- python实现的json数据以HTTP GET,POST,PUT,DELETE方式页面请求
- 分段统计与Oracle的分析函数、逻辑判断等知识点的综合运用
- HW3.27
- Aspose.cells异步读写操作
- ST-4
- Perf工具
- Socket通信例子
- Shell 脚本合集
- Alpha冲刺(2/10)——追光的人
- 字符编码——python学习
- 如何查找消耗资源较大的SQL
- Zabbix Windos agent 安装
- 前端之HTML和CSS
- 包嗅探和包回放简介-tcpdump,tcpreplay
- Microsoft Data Access Components 2.8
- 数据库sql语句的exists和in的区别
热门文章
- Magento笔记/记录(1)
- 【dart学习】-- dart 安装开发环境
- 关于Web前端密码加密是否有意义的总结
- springboot接口:CommandLineRunner
- split("\\s+") 和 split(" +") 有什么区别?
- linux jps命令
- IntelliJ快捷键记录
- 0 ‘+new Array(017)’ 输出? js+相当于Number()类型转换
- Java 8 终于支持 Docker !
- [USACO06JAN]牛的舞会The Cow Prom