rabbitmq消息队列,官网有六种,实战常用的也就如下五种。

下面开始demo讲解

大致三步:1.配置消息队列,2.生产者提供消息给队列,3.消费者监听消费队列消息

源码下载:https://pan.baidu.com/s/119Hf0YFrWiQK9m4hwVrKPQ

1.配置消息队列

package com.qy.mq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* @author 七脉
* 描述:这里主要讲解五种队列消息
* 1.一对一普通队列(hello world)
* 2.一对多工作队列
* 3.fanout广播队列(发布订阅)
* 4.direct定向队列(routing-key)
* 5.topic通配符队列(*、#)
*/
@Configuration
public class RabbitmqConfig { /**hello world普通队列**/
public static final String HELLO_WORLD_QUEUE = "hello_world_queue"; /**work工作队列**/
public static final String WORK_QUEUE = "work_queue"; /**fanout使用队列 one**/
public static final String FANOUT_QUEUE_ONE = "fanout_queue_one"; /**fanout使用队列 two**/
public static final String FANOUT_QUEUE_TWO = "fanout_queue_two"; /**direct routing使用队列ONE**/
public static final String DIRECT_QUEUE_ONE = "direct_queue_one"; /**direct routing使用队列TWO**/
public static final String DIRECT_QUEUE_TWO = "direct_queue_two"; /**topic使用队列**/
public static final String TOPIC_QUEUE_ONE = "topic_queue_one"; /**topic使用队列**/
public static final String TOPIC_QUEUE_TWO = "topic_queue_TWO"; /**fanout交换机**/
public static final String FANOUT_EXCHANGE = "fanout_exchange"; /**direct routing交换机**/
public static final String DIRECT_EXCHANGE = "direct_exchange"; /**topic交换机**/
public static final String TOPIC_EXCHANGE = "topic_exchange"; /**定义routing-key提供给direct交换机使用**/
public static final String ROUTING_KEY = "my_routing_key"; /**定义topic通配符提供给topic交换机使用**/
public static final String TOPICS_ONE = "my_topic.*";//*表示匹配任何一个单词
/**定义topic通配符提供给topic交换机使用**/
public static final String TOPICS_MORE = "my_topic.#";//#表示匹配任何多个单词 /**
* @author 七脉
* 描述:hello world普通队列,不需要绑定交换机
* 官方文档里,点对点, 一个生产者、一个队列、一个消费者。
* @return
*/
@Bean
public Queue helloWorldQueue(){
return new Queue(HELLO_WORLD_QUEUE, true, false, false);
//return new Queue(HELLO_WORLD_QUEUE, true);
} /**
* @author 七脉
* 描述:work工作队列,不需要绑定交换机
* 官方文档里, 一个生产者、一个队列、多个消费者。
* 多个消费者时,会均分接收消息。
* @return
*/
@Bean
public Queue workQueue(){
return new Queue(WORK_QUEUE, true);
} /**
* @author 七脉
* 描述:第一个fanout广播队列,需要绑定Fanout交换机
* fanout交换机会把消息发送到每一个绑定的队列
* 官方:发布订阅
* @return
*/
@Bean
public Queue fanoutQueueOne(){
return new Queue(FANOUT_QUEUE_ONE, true);
} /**
* @author 七脉
* 描述:第二个fanout广播队列,需要绑定Fanout交换机
* fanout交换机会把消息发送到每一个绑定的队列
* 官方:发布订阅
* @return
*/
@Bean
public Queue fanoutQueueTwo(){
return new Queue(FANOUT_QUEUE_TWO, true);
} /**
* @author 七脉
* 描述:第一个direct定向队列,需要绑定Direct交换机,并指定routing-key
* direct交换机会把消息发送到每一个绑定且指定相同routing-key的队列,
* 官方:Routing
* @return
*/
@Bean
public Queue directQueueOne(){
return new Queue(DIRECT_QUEUE_ONE, true);
} /**
* @author 七脉
* 描述:第二个direct定向队列,需要绑定Direct交换机,并指定routing-key
* direct交换机会把消息发送到每一个绑定且指定相同routing-key的队列,
* 官方:Routing
* @return
*/
@Bean
public Queue directQueueTwo(){
return new Queue(DIRECT_QUEUE_TWO, true);
} /**
* @author 七脉
* 描述:第一个topic通配符匹配队列,需要绑定Topic交换机
* topic队列使用*、#的通配符进行匹配topic交换机的消息
* 官方:Topics
* @return
*/
@Bean
public Queue topicQueueOne(){
return new Queue(TOPIC_QUEUE_ONE, true);
} /**
* @author 七脉
* 描述:第二个topic通配符匹配队列,需要绑定Topic交换机
* topic队列使用*、#的通配符进行匹配topic交换机的消息
* 官方:Topics
* @return
*/
@Bean
public Queue topicQueueTwo(){
return new Queue(TOPIC_QUEUE_TWO, true);
} /**
* @author 七脉
* 描述:定义 FanoutExchange 交换机
* FanoutExchange交换机,将消息发送到每一个绑定的消息队列中
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE, true, true);
} /**
* @author 七脉
* 描述:定义 DirectExchange 交换机
* DirectExchange交换机,将消息发送到每一个绑定且对应routing-key的队列中
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE, true, true);
} /**
* @author 七脉
* 描述:定义 TopicExchange 交换机
* TopicExchange交换机,将消息发送到每一个绑定且匹配topic通配符的队列中
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE, true, true);
} /**
* @author 七脉
* 描述:将第一个FanoutQueueOne队列绑定到FanoutExchange交换机
* @param fanoutQueueOne
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutQueueOne(Queue fanoutQueueOne, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueOne).to(fanoutExchange);
} /**
* @author 七脉
* 描述:将第二个FanoutQueueTwo队列绑定到FanoutExchange交换机
* @param fanoutQueueOne
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingFanoutQueueTwo(Queue fanoutQueueTwo, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueTwo).to(fanoutExchange);
} /**
* @author 七脉
* 描述:将第一个DirectQueueOne队列绑定到DirectExchange交换机
* @param directQueueOne
* @param directExchange
* @return
*/
@Bean
public Binding bindingDirectQueueOne(Queue directQueueOne, DirectExchange directExchange){
return BindingBuilder.bind(directQueueOne).to(directExchange).with(ROUTING_KEY);
} /**
* @author 七脉
* 描述:将第二个DirectQueueTwo队列绑定到DirectExchange交换机
* @param directQueueOne
* @param directExchange
* @return
*/
@Bean
public Binding bindingDirectQueueTwo(Queue directQueueTwo, DirectExchange directExchange){
return BindingBuilder.bind(directQueueTwo).to(directExchange).with(ROUTING_KEY);
} /**
* @author 七脉
* 描述:将第一个TopicQueueOne队列绑定到TopicExchange交换机
* @param topicQueueOne
* @param topicExchange
* @return
*/
@Bean
public Binding bindingTopicQueueOne(Queue topicQueueOne, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueueOne).to(topicExchange).with(TOPICS_ONE);
} /**
* @author 七脉
* 描述:将第二个TopicQueueOne队列绑定到TopicExchange交换机
* @param topicQueueOne
* @param topicExchange
* @return
*/
@Bean
public Binding bindingTopicQueueTwo(Queue topicQueueTwo, TopicExchange topicExchange){
return BindingBuilder.bind(topicQueueTwo).to(topicExchange).with(TOPICS_MORE);
} }

2.生产者提供消息给队列

package com.qy.mq.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author 七脉
* 描述:MQ消息发送类
*/
@Service
public class ProviderService { @Autowired
private AmqpTemplate amqpTemplate; /**
* @author 七脉
* 描述:发送普通队列消息 HelloWorld
* @param msg
*/
public void sendMsgForHelloWorldQueue(String msg){
System.out.println("普通队列HelloWorld-生产者发送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.HELLO_WORLD_QUEUE, msg);
} /**
* @author 七脉
* 描述:发送工作队列消息
* @param msg
*/
public void sendMsgForWorkQueue(String msg){
System.out.println("工作队列-生产者发送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUE, msg);
} /**
* @author 七脉
* 描述:发送到FanoutExchange交换机,交换机会发送到绑定的队列
* @param msg
*/
public void sendMsgForFanoutExchange(String msg){
System.out.println("FanoutExchange交换机-生产者发送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.FANOUT_EXCHANGE, null, msg);
} /**
* @author 七脉
* 描述:发送消息到DirectExchange交换机,交换机会发送到绑定且指定routing-key的队列
* @param msg
*/
public void sendMsgForDirectExchange(String msg){
System.out.println("DirectExchange交换机-生产者发送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg);
} /**
* @author 七脉
* 描述:发送消息到TopicExchange交换机,交换机会发送到绑定且匹配通配符的队列
* @param msg
*/
public void sendMsgForTopicExchange(String msg, String wildcard){
System.out.println("TopicExchange交换机-生产者发送:"+msg);
amqpTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE, wildcard, msg);
}
}

3.消费者监听消费队列消息

package com.qy.mq.consumer;

import java.io.IOException;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; /**
* @author 七脉
* 描述:普通队列helloworld,一个生产者、一个队列、一个消费者
*/
@Service
public class ConsumerHelloWorld { @Value("${server.port}")
private String port; @RabbitListener(queues=RabbitmqConfig.HELLO_WORLD_QUEUE)
public void recive(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println(msg);
channel.basicAck(deliveryTag, false);//应答
//channel.basicNack(deliveryTag, false, true);//不应答
//channel.basicReject(deliveryTag, true);//拒绝应答
} }

  

最新文章

  1. [JS]笔记12之事件机制--事件冒泡和捕获--事件监听--阻止事件传播
  2. 【转】App架构设计经验谈:接口的设计
  3. Linux 下 YUM 安装 Percona Server 5.6
  4. Tengine
  5. 转载文章----初识Ildasm.exe——IL反编译的实用工具
  6. 【spoj8222】Substrings
  7. [POJ3061]Subsequence(二分,前缀和)
  8. poj 3604 Professor Ben
  9. windows中.msc文件详解
  10. 一周学会Mootools 1.4中文教程:(1)Dom选择器
  11. struts+hibernate 请求数据库增删改查(小项目实例)
  12. 201521123098 《Java程序设计》第10周学习总结
  13. Java 零散笔记
  14. android JNI调用机制
  15. https://doc.opensuse.org/projects/kiwi/doc/
  16. python使用telnetlib
  17. 三星Galaxy S8 刷机经验记录
  18. WebBrowser2控件使用
  19. vim复制粘帖代码
  20. C#中 == 与 Equals的简单理解

热门文章

  1. BFS实现8数码问题,思考与总结
  2. 关于qt creator各种假死的问题
  3. 使用 udev 进行动态内核设备管理(转自suse文档)
  4. 【计算机视觉】ImageNet介绍
  5. 屏蔽打开文件时提示“您尝试打开的文件xxx.xls的格式与文件扩展名指定的格式不一致。打开文件前请验证文件没有损坏且来源可信。是否立即打开该文 件?”
  6. docker 学习操作记录 1
  7. Lua table的remove函数
  8. U9数据权限分配枚举值方法
  9. NOIP2018 填数游戏 搜索、DP
  10. python操作jenkins、python-jenkins api