一.造成重复消费的原因

在于回馈机制。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个ACK确认信息给消息队列(broker),消息队列(broker)就知道该消息被消费了,就会将该消息从消息队列中删除。

不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念。

  造成重复消费的原因?,就是因为网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。(因为消息重试等机制的原因,如果一个consumer断了,rocketmq有consumer集群,会将该消息重新发给其他consumer)

这个问题针对业务场景来答,分以下三种情况:

(1)比如,你拿到这个消息做数据库的insert操作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

(2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

(3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可。

二.单机环境解决方案

生产者:发送消息同时set一个key做唯一标识

public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 1; i++) {
Thread.sleep(1000); // 每秒发送一次MQ
Message msg = new Message("itmayiedu-topic", // topic 主题名称
"TagA", // tag 临时值
("itmayiedu-6" + i).getBytes()// body 内容
);
//setKey,做唯一标识
msg.setKeys(System.currentTimeMillis() + ""); SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}

消费者:

//保存标识的集合
static private Map<String, String> logMap = new HashMap<>(); public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("itmayiedu-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
String key = null;
String msgId = null;
try {
for (MessageExt msg : msgs) {
key = msg.getKeys();
//判断集合当中有没有存在key,存在就不需要重试,不存在先存key再回来重试后消费消息
if (logMap.containsKey(key)) {
// 无需继续重试。
System.out.println("key:"+key+",无需重试...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgId = msg.getMsgId();
System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
//模拟异常
int i = 1 / 0;
} } catch (Exception e) {
e.printStackTrace();
//重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} finally {
//保存key
logMap.put(key, msgId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}

执行效果:

三.集群环境解决方案

在生产者端要保证幂等性的话,大概可以使用以下两种方式:
    ① RocketMQ支持消息查询的功能,只要去RocketMQ查询一下是否已经发送过该条消息就可以了,不存在则发送,存在则不发送
    ② 引入redis,在发送消息到RocketMQ成功之后,向redis中插入一条数据,如果发生重试,则先去redis中查询一下该条消息是否已经发送过了,存在的话就不重复发送消息了
    生产者的这两种幂等性方案都可以实现,但是都存在一定的缺陷
    方案①,RocketMQ消息查询的性能不是特别好,如果是在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,可能会影响接口的性能
    方案②,在一些极端的场景下,redis也无法保证消息发送成功之后,就一定能写入redis成功,比如写入消息成功而redis此时宕机,那么再次查询redis判断消息是否已经发送过,是无法得到正确结果的

既然在消费者做幂等性的方案都不是特别靠谱,那就再在消费者端来做吧
消息的消费,最后都对应的是数据库的操作,只要在消息消费的时候,判断一下数据库中是否已经消费过了这条消息,就可以保证幂等性了,例如使用唯一索引,保证一条消息只被消费一次。

参考:https://blog.csdn.net/LO_YUN/article/details/104135197

去重原则:1.幂等性 2.业务去重

  幂等性:(处理必须唯一) 无论这个业务请求被(consumer)执行多少次,我们的数据库的结果都是唯一的,不可变的。

  去重策略:去重表机制,业务拼接去重策略(比如唯一流水号)

  1.建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突。

    高并发下去重:采用Redis去重(key天然支持原子性并要求不可重复),但是由于不在一个事务,要求有适当的补偿策略,但是对于很重要的业务,不应该支持补偿

  2.利用redis事务,主键(我们必须把全量的操作数据都存放在redis里,然后定时去和数据库做数据同步)—-即消费处理后,该处理本来应该保存在数据库的,先保存在redis,再通过一定业务方式从redis中取数据进行db持久化

  3.利用redis和关系型数据库一起做去重机制

  4.拿到这个消息做redis的set的操作.redis就是天然幂等性

  5.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将 < id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

消息重复消费是一个非常常见的问题,在很多系统调用频繁的场景下,都可能会出现超时重试的情况,还有就是系统频繁迭代,经常重启系统更新的场景,也会出现消息重复消费
生产者端发送重复的消息到RocketMQ中其实问题不大,消息只是在RocketMQ中重复了,并没有影响到系统的数据,我们只需要在最后修改数据库的时候,保证好幂等性即可

最新文章

  1. trigger事件模拟
  2. IIS7.0部署MVC/WebApi项目,报404.4错误
  3. BZOJ 2132 圈地计划(最小割)
  4. MQ使用几个命令
  5. Android中通过广播方式调起第三方App
  6. ntfs安全权限和共享权限的区别
  7. Python学习教程(learning Python)--3.3.3 Python逻辑关系表达式
  8. 【转】unity3d input输入
  9. 在页面加载后在设置embed 的src 怎么实现?
  10. 通过多线程处理提高Redis性能
  11. POJ 3723
  12. mean
  13. Eclipse开发环境debug模式调试断点从jar跳到源码
  14. &lt;Using ZooKeeper&gt;&lt;Deploy &amp; Use&gt;
  15. Redis和RabbitMQ在项目中的使用
  16. JQuery EasyUI 1.5.1 美化主题大包
  17. Breaking Down Type Erasure in Swift
  18. 安装sybase服务器并连接数据库
  19. QNX的深度嵌入过程
  20. OAF TABLE中第一列添加事件不生效

热门文章

  1. Digital Twin 数字孪生
  2. Python 实现转堆排序算法原理及时间复杂度(多图解释)
  3. 记录KVM虚拟机常用操作管理命令
  4. lua学习之深入函数第二篇
  5. windows下python3使用pip安装scrapy提示安装失败
  6. Java的开发—面向对象的7大原则之开闭原则(一)
  7. centos6.8 安装.net core2.1 sdk 或 .net core2.1 runtime
  8. Hash存储模型、B-Tree存储模型、LSM存储模型介绍
  9. 2020软件工程作业01 Deadline: 2020/03/07 20:00pm
  10. 剑指offer-字符的所有组合,复制复杂链表,二叉树中和为某一值的路径