背景

在kafka的消费者中,如果消费某条消息出错,会导致该条消息不会被ack,该消息会被不断的重试,阻塞该分区的其他消息的消费,因此,为了保证消息队列不被阻塞,在出现异常的情况下,我们一般还是会ack该条消息,再另外对失败的情况进行重试

目标

实现一个完善的重试逻辑,一般需要考虑一下几个因素:

  1. 重试的时间间隔
  2. 最大重试次数
  3. 是否会漏掉消息

实现

扔回队尾

在消息出错时,将消息扔回队尾

优点:

  1. 实现简单,没有别的依赖项

缺点:

  1. 无法控制重试时间间隔

基于数据库任务表的扫描方案

在数据库中增加一个任务的状态表,然后用一个定时任务去扫描任务表中,失败的任务,然后进行重试,其中记录下重试的次数即可

优点:

  1. 实现简单,一般这种离线任务,根据统计的需求,都会有一个任务状态表的,所以仅仅是增加一个定时任务去扫表

缺点:

  1. 性能较差,定时任务,一般都在无意义的扫描,浪费性能

新增重试队列的方案

新增一个重试队列,消费消息出错时,将时间戳和消息发送到重试队列,然后在重试队列中,根据时间,来判断阻塞时间,代码如下:

func handleRetryEvent(ctx context.Context, conf *util.Conf, data []byte) (err error) {
defer common.Recover(ctx, &err)
log := common.Logger(ctx).WithField("Method", "consumer.handleRetryEvent")
retryEvent := &MergeRetryEvent{}
err = json.Unmarshal(data, retryEvent)
if err != nil {
log.WithError(err).Error("failed to unmarshal data")
return nil
}
log.WithField("contact_id", retryEvent.ContactId).Info("receive message")
delaySecond := (retryEvent.CreateTime + SLEEPSECOND) - time.Now().Unix()
if delaySecond <= 0 {
log.Info("send message to account merge event")
err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
return err
} else {
log.Infof("sleep %d seconds", delaySecond)
time.Sleep(time.Duration(delaySecond) * time.Second)
err = SendAccountMergeEventTopic(ctx, retryEvent.ContactId)
return err
}
}

优点:

相对于扫表的方案,改方案没有无意义的扫表操作,性能更好

注意:之前在网上看到一个重试队列的实现,因为害怕开过多的线程(协程),作者用了一个channel来缓存重试消息,然后在一个协程池中去消费消息,消费的逻辑和上面的实例代码差不多,这样做是有风险的,因为channel是在本机的内存中,没有本地存储的,是存在丢消息的风险的(服务重启等情况)

参考链接:

https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

最新文章

  1. php多文件上传
  2. 从request获取远程IP地址
  3. 解决SQLSERVER数据库表被琐死!
  4. iOS深入学习:(UITableView系列3:insertRow)
  5. jQuery 全选 反选 单击行改变背景色
  6. 支持HTML5 SqlLite的AndroidApp
  7. 用Octopress在Github pages上写博客
  8. 跟我学android- 创建运行环境(二)
  9. 模拟jquery底层链式编程
  10. for循环语句
  11. Android初级教程获取手机位置信息GPS与动态获取最佳方式
  12. go get golang.org被墙问题解决
  13. eclipse neon 发布
  14. .ftl文件介绍
  15. TensorFlow迭代速度变慢的问题
  16. express中session的使用
  17. cocos2d-x JS 字符串
  18. 02-body标签中相关标签-1
  19. PHP学习 文件访问和写入
  20. 让Zend Studio联系关系CakePHP模板文件.ctp

热门文章

  1. shell脚本:备份数据库、代码上线
  2. Handler 机制(一)—— Handler的实现流程
  3. AndroidStudio提高编译速度的建议
  4. Docker虚拟化管理:30分钟教你学会用Docker
  5. Javascript基础之-var,let和const深入解析(二)
  6. mac OS 安装淘宝npm镜像
  7. R 语言命令行参数处理
  8. Mysql 字符串拆分 OR 一行转多行
  9. Semaphores
  10. Simple Math Problems