在之前的一篇博客RabbitMQ入门:认识并安装RabbitMQ(以Windows系统为例)中,我们安装了RabbitMQ并且对其也有的初步的认识,今天就来写个入门小例子来加深概念理解并了解代码怎么实现。

本篇博客围绕下面几个方面展开:

  1. 代码前的理论热身
  2. 代码实例:Hello RabbitMQ
  3. 运行代码并调试问题

Now, Let's begin !

一、代码前的理论热身

我们来看张图:

Publisher(生产者)生成消息,然后publish(发布)消息到exchange(路由器,也有资料翻译成交换机),然后根据路由规则将消息传递到Queue(队列),最终交由Consumer(消费者)进行消费处理。

这里的生产者和消费者都是我们的应用,因此我们的代码中要实现这两个部分。

中间的节点就是RabbitMQ 提供的内容,需要再生产者和消费者里面调用其接口来定义和使用这些节点。

二、代码实例:Hello RabbitMQ

  1. 首先来实现生产者,这里我没有用Publisher做类名,而是用的Provider,没有特别的用意,就是在起名字的时候不小心写成了这样,不需要在意这个细节,O(∩_∩)O。

    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory; public class Provider { //定义队列名
    static String QUEUE_NAME = "helloRabbit"; public static void main(String[] args) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = null;
    Channel channel = null;
    try {
    //1.创建连接和通道
    connection = factory.newConnection();
    channel = connection.createChannel(); //2.为通道声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null); //3.发布消息
    String msg = " hello rabbitmq, welcome to sam's blog.";
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    System.out.println("provider send a msg: " + msg);
    } catch (IOException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    } finally {
    //4.关闭连接
    if (channel != null) {
    try {
    channel.close();
    } catch (IOException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    } if (connection != null) {
    try {
    connection.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    } } }

    在第2步中,channel.queueDeclare 用来创建队列,有5个参数:String queue, 队列名; boolean durable, 该队列是否需要持久化; boolean exclusive,该队列是否为该通道独占的(其他通道是否可以消费该队列); boolean autoDelete,该队列不再使用的时候,是否让RabbitMQ服务器自动删除掉; Map<String, Object> arguments 其他参数。第3步中,channel.basicPublish 发布消息(用在生产者),有4个参数:String exchange, 路由器(有的资料翻译成交换机)的名字,即将消息发到哪个路由器; String routingKey, 路由键,即发布消息时,该消息的路由键是什么; BasicProperties props, 指定消息的基本属性; byte[] body 消息体,也就是消息的内容,是字节数组。 可能你会疑惑,为什么没有exchange呢?因为如果声明了队列,可以不声明路由器。

  2. 接着来实现消费者,消费者实现和生产者过程差不多,但是在这里并没有关闭连接和通道,是因为要消费者一直等待随时可能发来的消息。代码如下:
    package com.sam.hello_rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope; public class HelloConsumer { public static void main(String[] args) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = null;
    Channel channel = null;
    try {
    // 1.创建连接和通道
    connection = factory.newConnection();
    channel = connection.createChannel(); // 2.为通道声明队列
    channel.queueDeclare(Provider.QUEUE_NAME, false, false, false, null);
    System.out.println(" **** keep alive ,waiting for messages, and then deal them");
    // 3.通过回调生成消费者
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
    com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { //获取消息内容然后处理
    String msg = new String(body, "UTF-8");
    System.out.println("*********** HelloConsumer" + " get message :[" + msg +"]");
    }
    }; //4.消费消息
    channel.basicConsume(Provider.QUEUE_NAME, true, consumer); } catch (IOException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    }
    }

    在第4步中,channel.basicConsume 用来接收消息,用在消费者,有3个参数:String queue, 队列名字,即要从哪个队列中接收消息; boolean autoAck, 是否自动确认,默认true; Consumer callback 消费者,即谁接收消息。

三、运行代码并调试问题

代码写好了,接下来进行测试,

  1. 先来执行下Provider.java,发现报错了:

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at com.sam.hello_rabbitmq.Provider.main(Provider.java:36)
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)
    ... 3 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
    at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:345)
    at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:286)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:600)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:527)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
    at com.sam.hello_rabbitmq.Provider.main(Provider.java:60)
    关键堆栈信息是:inequivalent arg 'durable' for queue 'helloRabbit' in vhost '/': received 'false' but current is 'true',说是helloRabbit这个队列durable(是否需要持久化)
    参数已经设定成了true 但是代码中指定的是false,冲突了,纳尼?访问RabbitMQ管理页面:http://localhost:15672/#/queues 发现已经存在一个队列helloRabbit,

    点helloRabbit的链接,发现队列的durable属性确实是true。哦,原来我之前在做别的练习的时候,创建过一个叫这个名字的队列了,而且属性值刚好为true.

    那么接下来删掉这个既存的队列

    再去执行Provider.java,后台打印了内容,并且队列中有了一条ready的消息。

    问题解决!

  2. 执行HelloConsumer.java,预想的结果是在启动后,控制台直接打印出log并且RabbitMQ管理页面没有ready的消息:

    结果符合预期。

到此,全部工作完美结束。

最新文章

  1. Selenium2(WebDriver)_如何判断WebElement元素对象是否存在
  2. Java 数组基础
  3. ural 1251. Cemetery Manager
  4. 【JAVA】Quartz中时间表达式的设置
  5. zoj 1203 Swordfish prim算法
  6. UVa 458 - The Decoder
  7. text-align:justify小例子
  8. 【译】typeof null的前世今生
  9. 【转】RTSP协议学习笔记
  10. 2017-3-2 C#基础 集合
  11. error: stray &#39;\357&#39; in program编程出错的总结
  12. 关于AJAX异步请求的那些事儿(2)
  13. python2.7入门---内置函数
  14. [Swift]LeetCode242. 有效的字母异位词 | Valid Anagram
  15. Habits of Considerate People
  16. 【JAVA集合框架一 】java集合框架官方介绍 Collections Framework Overview 集合框架总览 翻译 javase8 集合官方文档中文版
  17. Reactjs-JQuery-Omi-Extjs-Angularjs对比
  18. OpenGL教程一
  19. [dpdk] SDK编译配置
  20. 如何配置官方peerDroid,使其运行起来

热门文章

  1. 【hibernate学习杂记】维护关系的一方和不维护关系的一方的区别与联系
  2. 缓冲区溢出基础实践(一)——shellcode 与 ret2libc
  3. Kubernetes-dns 服务搭建
  4. IntelliJ IDEA使用技巧一览表
  5. centos安装swoole
  6. 基于AppDomain的&quot;插件式&quot;开发
  7. $NOIp$前的日常
  8. java学习笔记-JavaWeb篇三
  9. 图片在线处理 webp!
  10. 用windows或ubuntu访问apfs或mac分区