今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。
   
      首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html 
   
      下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:

RabbitMQ.Client.dll //基于的发布订阅消息的功能类   
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类

如下图:
    
   
       接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:
   
       首先是ProducerMQ:

public   class  ProducerMQ
{
     public   static    void  InitProducerMQ()
    {
        Uri uri  =   new  Uri( " amqp://10.0.4.85:5672/ " );
         string  exchange  =   " ex1 " ;
         string  exchangeType  =   " direct " ;
         string  routingKey  =   " m1 " ;
         bool  persistMode  =   true ;
        ConnectionFactory cf  =   new  ConnectionFactory();
      
        cf.UserName  =   " daizhj " ;
        cf.Password  =   " 617595 " ;
        cf.VirtualHost  =   " dnt_mq " ;
        cf.RequestedHeartbeat  =   0 ;
        cf.Endpoint  =   new  AmqpTcpEndpoint(uri);
         using  (IConnection conn  =  cf.CreateConnection())
        {
             using  (IModel ch  =  conn.CreateModel())
            {
                 if  (exchangeType  !=   null )
                {
                    ch.ExchangeDeclare(exchange, exchangeType); // ,true,true,false,false, true,null); 
                    ch.QueueDeclare( " q1 " ,  true ); // true, true, true, false, false, null); 
                    ch.QueueBind( " q1 " ,  " ex1 " ,  " m1 " ,  false ,  null ); 
                }
                IMapMessageBuilder b  =   new  MapMessageBuilder(ch);
                IDictionary target  =  b.Headers;
                target[ " header " ]  =   " hello world " ;
                IDictionary targetBody  =  b.Body;
                targetBody[ " body " ]  =   " daizhj " ;
                 if  (persistMode)
                {
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode  =   2 ;
                }
             
                ch.BasicPublish(exchange, routingKey,
                                           (IBasicProperties)b.GetContentHeader(),
                                           b.GetContentBody());             }
        }
    }
}

下面对上面代码进行说明:
    1.  定义要链接的rabbitmq-server地址(基于amqp协议):

Uri uri = new Uri("amqp://10.0.4.85:5672/");

2.  定义交换方式

string  exchange  =   " ex1 " ;
string  exchangeType  =   " direct " ;
string  routingKey  =   " m1 " ;

说明:rabbitmq交换方式分为三种,分别是:
        Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 
        Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 
        Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
        更多内容参见:RabbitMQ 三种Exchange   
        
     3. 是否对消息队列持久化保存

bool  persistMode  =   true ;

4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。

       ConnectionFactory cf  =   new  ConnectionFactory();    
         // 使用前文的配置环境信息   
        cf.UserName  =   " daizhj " ; 
        cf.Password  =   " 617595 " ;
        cf.VirtualHost  =   " dnt_mq " ;
        cf.RequestedHeartbeat  =   0 ;
        cf.Endpoint  =   new  AmqpTcpEndpoint(uri);

5. 实例化IConnection对象,并设置交换方式

  using  (IConnection conn  =  cf.CreateConnection())
            {
                 using  (IModel ch  =  conn.CreateModel())
                {
                     if  (exchangeType  !=   null )
                    {
                        ch.ExchangeDeclare(exchange, exchangeType); // ,true,true,false,false, true,null); 
                        ch.QueueDeclare( " q1 " ,  true ); // true, true, true, false, false, null); 
                        ch.QueueBind( " q1 " ,  " ex1 " ,  " m1 " ,  false ,  null ); 
                    }
        ....

6. 构造消息实体对象并发布到消息队列上:

  IMapMessageBuilder b  =   new  MapMessageBuilder(ch);
  IDictionary target  =  b.Headers;
  target[ " header " ]  =   " hello world " ;
  IDictionary targetBody  =  b.Body;
  targetBody[ " body " ]  =   " daizhj " ;
   if  (persistMode)
  {
    ((IBasicProperties)b.GetContentHeader()).DeliveryMode  =   2 ;
  }
   // 简单发布方式 
  ch.BasicPublish(exchange, routingKey,
          (IBasicProperties)b.GetContentHeader(),
          b.GetContentBody());

这样就完成了单条消息的发布。
    
    下面是CustmerMq.cs(用于消费消息)实例代码:

public   class  CustmerMq
    {
         public   static   int  InitCustmerMq()
        {
             string  exchange  =   " ex1 " ;
             string  exchangeType  =   " direct " ;
             string  routingKey  =   " m1 " ;              string  serverAddress  =   " 10.0.4.85:5672 " ;
            ConnectionFactory cf  =   new  ConnectionFactory();
            cf.Address  =  serverAddress;
            cf.UserName  =   " daizhj " ;
            cf.Password  =   " 617595 " ;
            cf.VirtualHost  =   " dnt_mq " ;
            cf.RequestedHeartbeat  =   0 ;

可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:

  using  (IConnection conn  =  cf.CreateConnection())
            {
                 using  (IModel ch  =  conn.CreateModel())
                {
                     // 普通使用方式BasicGet
                     // noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                     // noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息 
                    BasicGetResult res  =  ch.BasicGet( " q1 " ,  false /* noAck */ );
                     if  (res  !=   null )
                    {
                         bool  t  =  res.Redelivered;
                        t  =   true ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag,  false );
                    }
                     else 
                    {
                        Console.WriteLine( " No message! " );
                    }  

上面代码比较简单,主要是使用BasicGetResult来进行简单的消息接收,并使用BasicAck方式来告之是否从队列中移除该条消息。这一点很重要,因为在某些应用场景下,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。
  
      当然上面操作只是用于单条数据操作,如果要遍历队列中所有消息,则需要使用如下方式:

while  ( true )
  {
      BasicGetResult res  =  ch.BasicGet( " q1 " ,  false /* noAck */ );
       if  (res  !=   null )
      {
           try 
          {
                bool  t  =  res.Redelivered;
                        t  =   true ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag,  false );
          }
           catch  { }
      }
       else 
           break ;
  }

另外,在rabbitmq中,获取消息可以使用两种方式,一种是上面提到的主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息,形如:

  // 第二种取法QueueingBasicConsumer基于订阅模式 
 QueueingBasicConsumer consumer  =   new  QueueingBasicConsumer(ch);
 ch.BasicConsume( " q1 " ,  false ,  null , consumer);
  while  ( true )
 {
      try 
     {
         BasicDeliverEventArgs e  =  (BasicDeliverEventArgs)consumer.Queue.Dequeue();
         IBasicProperties props  =  e.BasicProperties;
          byte [] body  =  e.Body;
         Console.WriteLine(System.Text.Encoding.UTF8.GetString(body));
          // ch.BasicAck(e.DeliveryTag, true); 
         ProcessRemainMessage();                          
     }
      catch  (EndOfStreamException ex)  
     {
          //The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel(). 
         Console.WriteLine(ex.ToString());
          break ;
     }
 }

这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:)

最新文章

  1. Cookie和Session的总结
  2. Objective-C中的Block回调模式
  3. Oracle创建主外键
  4. node-webkit中使用sqlite3(MAC平台)
  5. RTP、RTCP
  6. lintcode :Count 1 in Binary 二进制中有多少个1
  7. 第二百五十七天 how can I 坚持
  8. STSdb
  9. Ghost.py 0.1b3 : Python Package Index
  10. HTML5 模拟现实物理效果
  11. xmanager 打开centos7图形化窗口
  12. android 自定义ScrollView实现背景图片伸缩(阻尼效果)
  13. spring上下文和springMVC上下文的关系
  14. django反向解析传参
  15. NOIP2018保卫王国
  16. Ubuntu如何自定义tftp服务根目录
  17. VMware安装CentOS6
  18. iOS的动态代理模式的实现
  19. Wiener’s attack python
  20. 曾经的UCOSii

热门文章

  1. Linux下使用nohup实现在后台运行程序(转)
  2. 时序数据库TSDB简单了解
  3. 关于Lisp和函数式编程 & 各种语言对比 & TIOBE
  4. SolidWorks如何绘制抽壳零件
  5. Missing 'name' key attribute on element activity at AndroidMan
  6. 时间格式 2016-08-15T16:00:00.000Z
  7. Java基础:执行时异常和非执行时异常
  8. 【转载】FAT12格式的引导程序
  9. FIR300M刷openwrt
  10. java使用默认线程池踩过的坑(三)