原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/11794927.html

RMQ Topic

Project Directory

Maven Dependency

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>org.fool.rmq</groupId>
<artifactId>rmq</artifactId>
<version>1.0-SNAPSHOT</version> <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
</parent> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

application.properties

 spring.application.name=rmq
server.port= spring.rabbitmq.host=localhost
spring.rabbitmq.port=
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin #direct part start
rmq.config.exchange=log.direct
rmq.config.queue.info=log.info
rmq.config.queue.info.routing.key=log.info.routing.key
rmq.config.queue.error=log.error
rmq.config.queue.error.routing.key=log.error.routing.key
#direct part end #topic part start
rmq.config.exchange.topic=log.topic
rmq.config.queue.topic.info=log.topic.info
rmq.config.queue.topic.error=log.topic.error
rmq.config.queue.topic.all=log.topic.all rmq.config.queue.user.info.routing.key=user.log.info
rmq.config.queue.user.debug.routing.key=user.log.debug
rmq.config.queue.user.warn.routing.key=user.log.warn
rmq.config.queue.user.error.routing.key=user.log.error rmq.config.queue.product.info.routing.key=product.log.info
rmq.config.queue.product.debug.routing.key=product.log.debug
rmq.config.queue.product.warn.routing.key=product.log.warn
rmq.config.queue.product.error.routing.key=product.log.error rmq.config.queue.order.info.routing.key=order.log.info
rmq.config.queue.order.debug.routing.key=order.log.debug
rmq.config.queue.order.warn.routing.key=order.log.warn
rmq.config.queue.order.error.routing.key=order.log.error
#topic part end

Source Code

Application.java

 package org.fool.rmq;

 import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

RmqConfig.java

 package org.fool.rmq.config;

 import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RmqConfig { @Bean
public Queue queue() {
return new Queue("hello-rmq");
}
}

UserProducer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import java.util.Date; @Component
public class UserProducer {
@Autowired
private AmqpTemplate rmqTemplate; @Value("${rmq.config.exchange.topic}")
private String exchange; @Value("${rmq.config.queue.user.info.routing.key}")
private String infoRoutingKey; @Value("${rmq.config.queue.user.debug.routing.key}")
private String debugRoutingKey; @Value("${rmq.config.queue.user.warn.routing.key}")
private String warnRoutingKey; @Value("${rmq.config.queue.user.error.routing.key}")
private String errorRoutingKey; public void send() {
String message = "Hello User";
rmqTemplate.convertAndSend(exchange, infoRoutingKey, message + " info");
rmqTemplate.convertAndSend(exchange, debugRoutingKey, message + " debug");
rmqTemplate.convertAndSend(exchange, warnRoutingKey, message+ " warn");
rmqTemplate.convertAndSend(exchange, errorRoutingKey, message + " error");
}
}

ProductProducer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import java.util.Date; @Component
public class ProductProducer {
@Autowired
private AmqpTemplate rmqTemplate; @Value("${rmq.config.exchange.topic}")
private String exchange; @Value("${rmq.config.queue.product.info.routing.key}")
private String infoRoutingKey; @Value("${rmq.config.queue.product.debug.routing.key}")
private String debugRoutingKey; @Value("${rmq.config.queue.product.warn.routing.key}")
private String warnRoutingKey; @Value("${rmq.config.queue.product.error.routing.key}")
private String errorRoutingKey; public void send() {
String message = "Hello Product";
rmqTemplate.convertAndSend(exchange, infoRoutingKey, message + " info");
rmqTemplate.convertAndSend(exchange, debugRoutingKey, message + " debug");
rmqTemplate.convertAndSend(exchange, warnRoutingKey, message + " warn");
rmqTemplate.convertAndSend(exchange, errorRoutingKey, message + " error");
}
}

OrderProducer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import java.util.Date; @Component
public class OrderProducer {
@Autowired
private AmqpTemplate rmqTemplate; @Value("${rmq.config.exchange.topic}")
private String exchange; @Value("${rmq.config.queue.order.info.routing.key}")
private String infoRoutingKey; @Value("${rmq.config.queue.order.debug.routing.key}")
private String debugRoutingKey; @Value("${rmq.config.queue.order.warn.routing.key}")
private String warnRoutingKey; @Value("${rmq.config.queue.order.error.routing.key}")
private String errorRoutingKey; public void send() {
String message = "Hello Order";
rmqTemplate.convertAndSend(exchange, infoRoutingKey, message + " info");
rmqTemplate.convertAndSend(exchange, debugRoutingKey, message + " debug");
rmqTemplate.convertAndSend(exchange, warnRoutingKey, message + " warn");
rmqTemplate.convertAndSend(exchange, errorRoutingKey, message + " error");
}
}

TopicLogInfoConsumer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rmq.config.queue.topic.info}", autoDelete = "true"),
exchange = @Exchange(value = "${rmq.config.exchange.topic}", type = ExchangeTypes.TOPIC),
key = "*.log.info"
))
public class TopicLogInfoConsumer {
@RabbitHandler
public void handle(String message) {
System.out.println("consume info: " + message);
}
}

TopicLogErrorConsumer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rmq.config.queue.topic.error}", autoDelete = "true"),
exchange = @Exchange(value = "${rmq.config.exchange.topic}", type = ExchangeTypes.TOPIC),
key = "*.log.error"
))
public class TopicLogErrorConsumer {
@RabbitHandler
public void handle(String message) {
System.out.println("consume error: " + message);
}
}

TopicLogAllConsumer.java

 package org.fool.rmq.topic;

 import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${rmq.config.queue.topic.all}", autoDelete = "true"),
exchange = @Exchange(value = "${rmq.config.exchange.topic}", type = ExchangeTypes.TOPIC),
key = "*.log.*"
))
public class TopicLogAllConsumer {
@RabbitHandler
public void handle(String message) {
System.out.println("consume all: " + message);
}
}

ApplicationTest.java

 package org.fool.rmq.test;

 import org.fool.rmq.Application;
import org.fool.rmq.direct.LogProducer;
import org.fool.rmq.producer.Producer;
import org.fool.rmq.topic.OrderProducer;
import org.fool.rmq.topic.ProductProducer;
import org.fool.rmq.topic.UserProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest { @Autowired
private Producer producer; @Autowired
private LogProducer logProducer; @Autowired
private UserProducer userProducer; @Autowired
private ProductProducer productProducer; @Autowired
private OrderProducer orderProducer; @Test
public void test() {
producer.send();
} @Test
public void testDirect() {
logProducer.send();
} @Test
public void testTopic() {
userProducer.send();
productProducer.send();
orderProducer.send();
}
}

Console Output

RMQ Management

最新文章

  1. tableLayoutPanel的使用
  2. Salesforce 动态审批
  3. css样式 --- CSS hack
  4. C++浅析——继承类中构造和析构顺序
  5. 【循序渐进学Python】12.Python 正则表达式简介
  6. 【转】android-support-v7-appcompat.jar 的安装及相关问题解决 --- 汇总整理
  7. 我的cocos2d-x集成sharesdk之旅(转)
  8. java 时间
  9. 使用语音识别JAVA SDK 的MAVEN源代码制作语音控制智能家居Java APP-------MAVEN工程加载问题解决
  10. mongodb命令行group分组和java代码中group分组
  11. Swift 笔记汇总
  12. python selenium鼠标键盘操作(ActionChains)
  13. linux安装命令出错(could not resolve host mirrorlist.centos.org)
  14. Spring Cloud Vault介绍
  15. 网络 --- 1 c/s (b/s)架构 ip 初始socket
  16. mybatis mapper配置文件 CustomerMapper.xml
  17. [转]在Eclipse中搭建Python开发环境
  18. 转:【衬线字体与无衬线字体】font-family之Serif和Sans-Serif
  19. odoo开发笔记 -- 后台代码什么时候需要注意编码格式
  20. 自定义cnblogs样式小结

热门文章

  1. UEFI手札
  2. MySQL主从复制 报错处理
  3. qbzt day2 上午
  4. list转datatable,SqlBulkCopy将DataTable中的数据批量插入数据库
  5. java开发客户端发送请求到服务器端出现这样:JSON parse error: Unexpected character (&#39;}&#39; (code 125)): was expecting
  6. 错误 warning: LF will be replaced by CRLF in README.md.
  7. python-异常处理总结
  8. mybatis初步理解
  9. Java -Tips
  10. Java第三周总结&amp;实验报告(1)