2.6.8 RabbitMQ -- Masstransit 异常处理

  • 异常处理
  • 其他
  • 高级功能

异常处理

  • 异常与重试
  • 重试配置
  • 重试条件
  • 重新投递信息
  • 信箱

异常与重试

Exception

public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public Task Consume(ConsumeContext<SubmitOrder> context)
{
throw new Exception("Very bad things happened");
}
}

UseMessageRetry

var sessionFactory = CreateSessionFactory();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://localhost/"); cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseMessageRetry(r => r.Immediate(5)); e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});
});

重试配置

// 立即重试:一共连续重试10次
ep.UseMessageRetry(r => r.Immediate(10)); // 间隔重试:一共重试10次,每次间隔10秒
ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10))); // 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15))); // 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60))); // 每次叠加50秒
ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));

重试条件

e.UseMessageRetry(r =>
{
r.Handle<ArgumentNullException>();
r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

过滤某些异常类型不进行重试

重新投递信息

cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)

信箱

cfg.ReceiveEndpoint("submit-order", e =>
{
e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
e.UseMessageRetry(r => r.Immediate(5));
e.UseInMemoryOutbox(); e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
});

有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

其他

  • Fault
  • Consuming Faults
  • Error Pipe
  • Dead-Letter Pipe

Fault

public interface Fault<T>
where T : class
{
Guid FaultId { get; }
Guid? FaultedMessageId { get; }
DateTime Timestamp { get; }
ExceptionInfo[] Exceptions { get; }
HostInfo Host { get; }
T Message { get; }
}

Fault 消息在异常的时候会发布出来

Consuming Faults

public class DashboardFaultConsumer :
IConsumer<Fault<SubmitOrder>>
{
public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
{
// update the dashboard
}
}

Fault 消息也是可以进行订阅的

Error Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardFaultedMessages();
});

默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息

Dead-Letter Pipe

cfg.ReceiveEndpoint("input-queue", ec =>
{
ec.DiscardSkippedMessages();
});

死信队列:没有消费者的消息会被移到 _skipped 队列,但可以配置为不移到 _skipped 队列

高级功能

  • 持久化
  • Saga 事件串
  • 调度
  • Courier 最终一致性
  • 监控

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

最新文章

  1. BOOST1.54简化编译
  2. cmake 静态调用 c++ dll 的类的一个例子(Clion IDE)
  3. 三、jQuery--jQuery实践--搜索框制作
  4. linux下 cmatrix的安装和使用
  5. HDU1671 - Phone List(Trie树)
  6. 2:url有规律的多页面爬取
  7. 4.请求方式为application/json时的接口测试要如何做?
  8. Jedis-returnResource使用注意事项
  9. string (KMP+期望DP)
  10. curl的使用基本流程,HTTP的get请求,post请求
  11. Trie树的二三事QWQ
  12. jsp进阶
  13. awk、sed、grep三大shell文本处理工具之awk的应用
  14. Codeforces 269C Flawed Flow (看题解)
  15. Qt5学习记录:QString与int值互相转换
  16. oracle 直接复制表内容到新表
  17. 安装部署 Kubernetes 集群
  18. Hello_Motion_Tracking 任务一:Project Tango采集运动追踪数据
  19. JS获取节点属性个数及值得方法
  20. Linux多电脑ssh免密码登录

热门文章

  1. 【Azure Developer】VS Code运行Java 版Azure Storage SDK操作Blob (新建Container, 上传Blob文件,下载及清理)
  2. AtCoder Regular Contest 107(VP)
  3. 庐山真面目之七微服务架构Consul集群、Ocelot网关集群和IdentityServer4版本实现
  4. C++ cin.ignore() 的使用
  5. qq获取验证码接口
  6. Linux(CentOS7)安装Nginx(附简单配置)
  7. 开源一套原创文本处理工具:Java+Bat脚本实现自动批量处理对账单工具
  8. 面试 16-01.MVVM
  9. js下 Day09、事件(二)
  10. 职场PUA,管理者的五宗罪