配置:

spring:
  rabbitmq:
    addresses:
    connection-timeout:
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true

依赖:

  <!--rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
<dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId></dependency>

配置类:

package com.jds.rabbitmq;

import com.jds.common.constant.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

    public static final String RBA_QUEUE = "rba.queue";
/*    public static final String QUEUE = "queue";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String HEADER_QUEUE = "header.queue";
    public static final String TOPIC_EXCHANGE = "topicExchage";
    public static final String FANOUT_EXCHANGE = "fanoutxchage";
    public static final String HEADERS_EXCHANGE = "headersExchage";

    *//**
     * Direct模式 交换机Exchange
     * *//*
    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }

    *//**
     * Topic模式 交换机Exchange
     * *//*
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1, true);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2, true);
    }
    @Bean
    public TopicExchange topicExchage(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
    }
    *//**
     * Fanout模式 交换机Exchange
     * *//*
    @Bean
    public FanoutExchange fanoutExchage(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
    }
    @Bean
    public Binding FanoutBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
    }
    *//**
     * Header模式 交换机Exchange
     * *//*
    @Bean
    public HeadersExchange headersExchage(){
        return new HeadersExchange(HEADERS_EXCHANGE);
    }
    @Bean
    public Queue headerQueue1() {
        return new Queue(HEADER_QUEUE, true);
    }
    @Bean
    public Binding headerBinding() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("header1", "value1");
        map.put("header2", "value2");
        return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
    }*/

    /**
     * 订单消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单延迟队列队列所绑定的交换机
     */
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单实际消费队列
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    /**
     * 订单延迟队列(死信队列)
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
                .build();
    }

    /**
     * 将订单队列绑定到交换机
     */
    @Bean
    Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    /**
     * 将订单延迟队列绑定到交换机
     */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }

}

发送者:

package com.jds.rabbitmq;

/**
 * @program: red-bag-activity->CancelOrderSender
 * @description:
 * @author: cxy
 * @create: 2019-12-20 17:57
 **/

import com.jds.common.constant.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 取消订单消息的发出者
 *
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}

接受者:

package com.jds.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 取消订单消息的处理
 */
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);

    @RabbitHandler
    public void handle(Long orderId){
        System.err.println(System.currentTimeMillis());
        LOGGER.info("receive delay message orderId:{}",orderId);
        System.err.println(System.currentTimeMillis());
        System.err.println(orderId);
        System.out.println("大傻逼222222");
    }
}

配置调用类:

package com.jds.controller;

import com.jds.rabbitmq.CancelOrderSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @program: red-bag-activity->DelayQController
 * @description:
 * @author: cxy
 * @create: 2019-12-20 18:00
 **/
@RestController
public class DelayQController {
    @Autowired
    CancelOrderSender cancelOrderSender;
    @RequestMapping(value = "/id", method = RequestMethod.GET)
    @ResponseBody
    public void detail() {
         * ;
        //发送延迟消息
        cancelOrderSender.sendMessage(12333333L, delayTimes);
        System.out.println(12333333L);
        System.err.println(delayTimes);
        System.out.println(System.currentTimeMillis());

        System.out.println("  大傻逼");

        }
}

测试结果:

-- ::-exec-] com.jds.rabbitmq.CancelOrderSender [] -| send delay message orderId:

  大傻逼

-- ::-] com.jds.rabbitmq.CancelOrderReceiver [] -| receive delay message orderId:

大傻逼222222

调用:

http://localhost:8082/id,可以看到时间会延迟三秒。

实现主要地方:

new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }

给参数设置延迟时间

最新文章

  1. 7九章算法强化班全解--------Hadoop跃爷Spark
  2. GIS公交查询-flex/java
  3. laravel in centos
  4. SQL如何增删修改字段
  5. AC日记——字符环 openjudge 1.7 30
  6. jQuery ajax同步的替换方法,使用 $.Deferred()对象
  7. ci中与类名相同 的方法 index控制器 下面index方法 会输出两份
  8. 百度或者Google---SEO优化(转载)
  9. 用java实现冒泡排序法
  10. PAT 07-2 A+B和C
  11. javascript实现数据结构与算法系列:队列 -- 链队列和循环队列实现及示例
  12. java vm args
  13. (转载)linux中shell变量
  14. OpenWrt backfire trunk源码下载及编译
  15. php 截取字符串
  16. Java 之final,static小结
  17. 利用vertical-align:middle垂直居中
  18. dojo页面调试出错
  19. django form 组件插件
  20. ML.NET 示例:二元分类之垃圾短信检测

热门文章

  1. python系列——文件操作的代码
  2. Python3学习笔记——异常处理
  3. React-Native基础-安卓篇(二)
  4. shell函数与变量
  5. 数据库索引原理,及MySQL索引类型(转)
  6. 解决 &#39;express&#39; 不是内部或外部命令,也不是可运行的程序
  7. 【串线篇】SpringMvc源码分析
  8. excel acm 高校排名(hdoj)
  9. 【leetcode】316. Remove Duplicate Letters
  10. SCP-bzoj-1069