源码Gitub地址:https://github.com/heibaiying/spring-samples-for-all

一、 项目结构说明

1.1 这里采用maven多模块的构建方式,在spring-boot-rabbitmq下构建三个子模块:

  1. rabbitmq-common 是公共模块,用于存放公共的接口、配置和bean,被rabbitmq-producer和rabbitmq-consumer在pom.xml中引用;
  2. rabbitmq-producer 是消息的生产者模块;
  3. rabbitmq-consumer是消息的消费者模块。

1.2 关于rabbitmq安装、交换机、队列、死信队列等基本概念可以参考我的手记《RabbitMQ实战指南》读书笔记,里面有详细的配图说明。

二、关键依赖

在父工程的项目中统一导入依赖rabbitmq的starter(spring-boot-starter-amqp),父工程的pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>

    <modules>
        <module>rabbitmq-consumer</module>
        <module>rabbitmq-producer</module>
        <module>rabbitmq-common</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.heibaiying</groupId>
    <artifactId>spring-boot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-rabbitmq</name>
    <description>RabbitMQ project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

三、公共模块(rabbitmq-common)

  • bean 下为公共的实体类。
  • constant 下为公共配置,用静态常量引用。(这里我使用静态常量是为了方便引用,实际中也可以按照情况,抽取为公共配置文件)
package com.heibaiying.constant;

/**
 * @author : heibaiying
 * @description : rabbit 公用配置信息
 */
public class RabbitInfo {

    // queue 配置
    public static final String QUEUE_NAME = "spring.boot.simple.queue";
    public static final String QUEUE_DURABLE = "true";

    // exchange 配置
    public static final String EXCHANGE_NAME = "spring.boot.simple.exchange";
    public static final String EXCHANGE_TYPE = "topic";

    // routing key
    public static final String ROUTING_KEY = "springboot.simple.*";
}

四、服务消费者(rabbitmq-consumer)

4.1 消息消费者配置

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
    # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 为了保证信息能够被正确消费,建议签收模式设置为手工签收,并在代码中实现手工签收
        acknowledge-mode: manual
        # 侦听器调用者线程的最小数量
        concurrency: 10
        # 侦听器调用者线程的最大数量
        max-concurrency: 50

4.2 使用注解@RabbitListener和@RabbitHandler创建消息监听者

  1. 使用注解创建的交换机、队列、和绑定关系会在项目初始化的时候自动创建,但是不会重复创建;
  2. 这里我们创建两个消息监听器,分别演示消息是基本类型和消息是对象时的配置区别。
/**
 * @author : heibaiying
 * @description : 消息是对象的消费者
 */

@Component
@Slf4j
public class RabbitmqBeanConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitBeanInfo.QUEUE_NAME, durable = RabbitBeanInfo.QUEUE_DURABLE),
            exchange = @Exchange(value = RabbitBeanInfo.EXCHANGE_NAME, type = RabbitBeanInfo.EXCHANGE_TYPE),
            key = RabbitBeanInfo.ROUTING_KEY)
    )
    @RabbitHandler
    public void onMessage(@Payload Programmer programmer, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        log.info("programmer:{} ", programmer);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}
@Component
@Slf4j
public class RabbitmqConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitInfo.QUEUE_NAME, durable = RabbitInfo.QUEUE_DURABLE),
            exchange = @Exchange(value = RabbitInfo.EXCHANGE_NAME, type = RabbitInfo.EXCHANGE_TYPE),
            key = RabbitInfo.ROUTING_KEY)
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageHeaders headers = message.getHeaders();
        // 获取消息头信息和消息体
        log.info("msgInfo:{} ; payload:{} ", headers.get("msgInfo"), message.getPayload());
        //  DELIVERY_TAG 代表 RabbitMQ 向该Channel投递的这条消息的唯一标识ID,是一个单调递增的正整数
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 第二个参数代表是否一次签收多条,当该参数为 true 时,则可以一次性确认 DELIVERY_TAG 小于等于传入值的所有消息
        channel.basicAck(deliveryTag, false);
    }

}

五、 消息生产者(rabbitmq-producer)

5.1 消息生产者配置

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
    # RabbitMQ 默认的用户名和密码都是guest 而虚拟主机名称是 "/"
    # 如果配置其他虚拟主机地址,需要预先用管控台或者图形界面创建 图形界面地址 http://主机地址:15672
    username: guest
    password: guest
    virtual-host: /
    # 是否启用发布者确认 具体确认回调实现见代码
    publisher-confirms: true
    # 是否启用发布者返回 具体返回回调实现见代码
    publisher-returns: true
    # 是否启用强制消息 保证消息的有效监听
    template.mandatory: true

server:
  port: 8090

5.2 创建消息生产者

/**
 * @author : heibaiying
 * @description : 消息生产者
 */
@Component
@Slf4j
public class RabbitmqProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendSimpleMessage(Map<String, Object> headers, Object message,
                                  String messageId, String exchangeName, String key) {
        // 自定义消息头
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        // 创建消息
        Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
        /* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
           如果发送时候指定的交换器不存在 ack就是false 代表消息不可达 */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
            if (!ack) {
                System.out.println("进行对应的消息补偿机制");
            }
        });
        /* 消息失败的回调
         * 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE
         */
        rabbitTemplate.setReturnCallback((message1, replyCode, replyText, exchange, routingKey) -> {
            log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
                    message1, replyCode, replyText, exchange, routingKey);
        });
        // 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, msg, correlationData);
    }
}

5.3 以单元测试的方式发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqProducerTests {

    @Autowired
    private RabbitmqProducer producer;

    /***
     * 发送消息体为简单数据类型的消息
     */
    @Test
    public void send() {
        Map<String, Object> heads = new HashMap<>();
        heads.put("msgInfo", "自定义消息头信息");
        // 模拟生成消息ID,在实际中应该是全局唯一的 消息不可达时候可以在setConfirmCallback回调中取得,可以进行对应的重发或错误处理
        String id = String.valueOf(Math.round(Math.random() * 10000));
        producer.sendSimpleMessage(heads, "hello Spring", id, RabbitInfo.EXCHANGE_NAME, "springboot.simple.abc");
    }

    /***
     * 发送消息体为bean的消息
     */
    @Test
    public void sendBean() {
        String id = String.valueOf(Math.round(Math.random() * 10000));
        Programmer programmer = new Programmer("xiaoMing", 12, 12123.45f, new Date());
        producer.sendSimpleMessage(null, programmer, id, RabbitBeanInfo.EXCHANGE_NAME, RabbitBeanInfo.ROUTING_KEY);
    }

}

六、项目构建的说明

因为在项目中,consumer和producer模块均依赖公共模块,所以在构建consumer和producer项目前需要将common 模块安装到本地仓库,依次父工程common模块执行:

mvn install -Dmaven.test.skip = true

consumer中 pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.heibaiying</groupId>
        <artifactId>spring-boot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>rabbitmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-consumer</name>
    <description>RabbitMQ consumer project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.heibaiying</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

producer中 pom.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.heibaiying</groupId>
        <artifactId>spring-boot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>rabbitmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-producer</name>
    <description>RabbitMQ producer project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.heibaiying</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

附:源码Gitub地址:https://github.com/heibaiying/spring-samples-for-all

最新文章

  1. Android ooVoo Apk附件关联分析
  2. 2017《时间的朋友》罗振宇跨年演讲ppt
  3. C# Winform中无焦点状态下获取键盘输入或者USB扫描枪数据
  4. neutron 同一虚拟网卡的多个IP设置
  5. innobackupex err2
  6. Android高级之第十一讲Hybird开发
  7. [LeetCode]题解(python):032-Longest Valid Parentheses
  8. 【转】目前最细致清晰的NSDictionary以及NSMutableDictionary用法总结 -- 不错
  9. ubuntu13.04 Thinkpad W520安装nvidia显卡驱动
  10. go iota
  11. 利用Rsync在windows和linux之间同步数据
  12. 【CSS】display: inline-block,内联元素
  13. KNN邻近分类算法
  14. webpack+vue项目实战(四,前端与后端的数据交互和前端展示数据)
  15. 【SpringBoot笔记】SpringBoot如何正确关闭应用
  16. 并发编程之wait()、notify()
  17. nginx——优化 Nginx 连接超时时间
  18. 接口性能测试(Jmeter)操作总结
  19. 不停止nginx服务,使配置文件生效
  20. CSS3 background-size属性兼容

热门文章

  1. PHP采集类:Snoopy.class.php
  2. 在.net core的web项目中使用kindeditor
  3. POJ 1328 Radar Installation(经典贪婪)
  4. 《得知opencv》注意事项——矩阵和图像处理——cvAdd、cvAddS and cvAddWeighted
  5. arcserver开发小结(二)
  6. matlab 基础知识class &amp;lt; superclass_name
  7. WPF 实现拖动工具箱效果
  8. maven私服nexus安装
  9. IOS开发之把 Array 和 Dictionaries 序列化成 JSON 对象
  10. Asp UserInfoList 方法二