20140310补充:

  rabbitmq有requeue属性,可以选择消息是否返回队列,另,本文的解决方式非常之山寨,只能应用于发送和接收方式。  

  这几天在折腾消息队列,在.Net环境下有基于RabbitMQ有很多有API的选择,最后选择了比较简单的EasyNetQ(http://easynetq.com/)

  在测试使用的时候发现一个问题,对于处理错误的消息,EasyNetQ默认会放到一个错误队列中并提供了一个工具可以对错误队列的消息进行重新分发。RabiitMQ是有一个事务功能的,但是貌似在EasyNetQ的实现中,没有发现相关的操作方式

  现在的需求是,在某种特定的情况下,需要将处理不成功的消息,保留在消息原队列

  做了一个变通处理,出现错误消息的时候,将其再送回原队列。从客户端上可以直接send回队列,但这样不是一个很好的方式

  研究了一下EasyNetQ的相关代码,发现其定义了一个IConsumerErrorStrategy接口,我们只要自己自行实现,并注册一个自定义方法就可以

  ComponentRegistration.cs原注册方法相关,默认错误消息处理方式在DefaultConsumerErrorStrategy.cs中

     public class ComponentRegistration
{
public static void RegisterServices(IContainer container)
{
Preconditions.CheckNotNull(container, "container"); // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run.
#region DefaultConsumerErrorStrategy
container
.Register(_ => container)
.Register<IMessageQueueQLogger, ConsoleLogger>()
.Register<ISerializer, JsonSerializerN>()
.Register<IConventions, Conventions>()
.Register<IEventBus, EventBus>()
.Register<ITypeNameSerializer, TypeNameSerializer>()
.Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId)
.Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>()
.Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>()
.Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>()
.Register<IPublisherConfirms, PublisherConfirms>()
.Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>()
.Register<IHandlerRunner, HandlerRunner>()
.Register<IInternalConsumerFactory, InternalConsumerFactory>()
.Register<IConsumerFactory, ConsumerFactory>()
.Register<IConnectionFactory, ConnectionFactoryWrapper>()
.Register<IPersistentChannelFactory, PersistentChannelFactory>()
.Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>()
.Register<IHandlerCollectionFactory, HandlerCollectionFactory>()
.Register<IAdvancedBus, RabbitAdvancedBus>()
.Register<IRpc, Rpc>()
.Register<ISendReceive, SendReceive>()
.Register<IBus, RabbitBus>();
#endregion
}
}

  对DefaultConsumerErrorStrategy的代码进行研究后发现,EasyNetQ是产生一个处理错误消息队列的Exchanges,将消息二次封装后推送到默认的错误队列

  只要将消息队列改为源消息队列,取消消息二次封装,直接推送到队列

  队列绑定

         private string DeclareErrorExchangeAndBindToDefaultErrorQueue(IModel model, ConsumerExecutionContext context)
{
var originalRoutingKey = context.Info.RoutingKey; return errorExchanges.GetOrAdd(originalRoutingKey, _ =>
{
var exchangeName = conventions.ErrorExchangeNamingConvention(context.Info);
model.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true);
//更改第一个参数
model.QueueBind(originalRoutingKey, exchangeName, originalRoutingKey);
return exchangeName;
});
}

  消息处理

         public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
{
Preconditions.CheckNotNull(context, "context");
Preconditions.CheckNotNull(exception, "exception"); try
{
Connect(); using (var model = connection.CreateModel())
{
var errorExchange = DeclareErrorExchangeQueueStructure(model, context);
var messageBody = context.Body;
var properties = model.CreateBasicProperties();
context.Properties.CopyTo(properties);
properties.Type = context.Properties.Type;
//消息持久化
properties.SetPersistent(true); model.BasicPublish(errorExchange, context.Info.RoutingKey, properties, messageBody); }
}
catch (Exception unexpectedException)
{
// Something else unexpected has gone wrong :(
logger.ErrorWrite("EasyNetQMessageQueue Consumer Error Handler: Failed to publish error message\nException is:\n"
+ unexpectedException);
}
return Consumer.PostExceptionAckStrategy.ShouldAck;
}

  因为EasyNetQ在CreateBus的时候就会将相关事件注册,所以我们只需最后自行在config中加入有关配置,在注册事件的位置加入判断即可让异常抛回

  由于刚接触RabbitMQ,对于一些概念的理解可能还不是非常到位,这是我目前所能找到的解决方案

  当然可能以后的版本中,EasyNetQ会加入对事务的操作,现在EasyNetQ支持了一种叫Publisher Confirms的方法,貌似还是不是我想要的

												

最新文章

  1. elasticsearch suggest 的几种使用-completion 的基本 使用
  2. 如何写出优雅的Python
  3. auto dock
  4. ed编辑器使用
  5. 你想建设一个能承受500万PV/每天的网站吗?服务器每秒要处理多少个请求才能应对?
  6. 以守护进程方式启动firefly
  7. 那些年,我们一起被坑的H5音频
  8. poj1981 Circle and Points 单位圆覆盖问题
  9. JavaWeb核心编程之(三.6)HttpServlet
  10. HDU2008-数值统计
  11. HDU1305 Immediate Decodability(水题字典树)
  12. win10 UWP 发邮件
  13. Struts2运行流程-源码剖析
  14. Unix:关于一个file在file system和disk中占用空间
  15. React问题集序
  16. 带着新人学springboot的应用13(springboot+热部署)
  17. P4174 [NOI2006]最大获利(网络流)
  18. python模块之xml
  19. 纯css实现元素下出现横线动画(background-image)
  20. exp自动备份在bat中不执行

热门文章

  1. 深入理解java虚拟机(十二) Java 语法糖背后的真相
  2. LightOJ 1044 Palindrome Partitioning(简单字符串DP)
  3. C++ 内敛函数
  4. MlskincolorButton使用方法
  5. Android开发环境包下载地址
  6. java-02 JDK安装与环境变量配置&amp;安装编程IDE
  7. Linux下配置Apache为多端口
  8. WPF Viewport3D 解决透视模式时窗体模糊
  9. leetcode 存在重复元素
  10. C# try catch finally