RocketMQ 如何保证消息不丢失

Producer

  1. 提供SYNC的发送消息方式,等待broker处理结果。

  2. 发送消息如果失败或者超时,则重新发送。

    // 同步发送消息,如果5秒内没有发送成功,则重试3次 DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer"); producer.setRetryTimesWhenSendFailed(3); producer.send(msg, 5000L);

  3. broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

总结:

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。

Broker

  1. 默认采用异步刷盘,现在改成同步刷盘

  2. 提供主从模式,同时主从支持同步双写

    即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。

    因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。

    此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。

总结:

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。

RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

Consmer

  1. consumer默认提供的是At least Once机制

    Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。

    通常消费消息的ack机制一般分为两种思路:

    先提交后消费;

    先消费,消费成功后再提交;

    思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

  1. 消费消息重试机制

    当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。

总结

consumer端要保证消费消息的可靠性,主要通过At least Once+消费重试机制保证。

如何保证消息不被重复消费

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

最新文章

  1. 表单元素的submit()方法和onsubmit事件
  2. python第二天-linux权限管理
  3. <转>提高iOS开发效率的方法和工具
  4. JPA SQL 的复杂查询createNamedQuery
  5. springMVC工作原理图
  6. kickStart脚本
  7. 如何解决 SogouIinput not enough space for thread data ?? 虚拟内存
  8. 使用idea Live Template实现eclipse syso自动提示代码功能
  9. 解决eclipse的自动换行问题。
  10. 第一册:lesson thirty five。
  11. TP内部方法访问
  12. centos7安装mysql客户端
  13. springboot + @KafkaListener 手动提交及消费能力优化
  14. c++——基本概念
  15. 本地IP,掩码,网关,DNS设置
  16. 透过摩拜和ofo,看产品从0到1时如何取舍需求(转)
  17. Hadoop基础-HDFS的API实现增删改查
  18. winows下使用sourcetree的问题
  19. 高性能Web服务器Nginx的配置与部署研究(7)核心模块之主模块的非测试常用指令
  20. Python中的global和nonlocal

热门文章

  1. Docker安装SqlServer、Mysql、MariaDB
  2. vue项目部署在nodejs+express
  3. C# Replace:一个熟悉而又陌生的替换
  4. SpringMVC的表单组件、国际化
  5. Destroying Roads
  6. 自己动手从零写桌面操作系统GrapeOS系列教程——7.计算机组成与运行原理
  7. Spring Cloud Alibaba 整合 Seata 实现分布式事务
  8. NLP知识栈
  9. typescript - 学习档案
  10. K8s集群版本升级