producer:

package com.toov5.Producer;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.toov5.utils.MQConnectionUtils; public class Producer {
// 队列名称
private static final String UEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException {
// 创建新的连接
Connection connection = MQConnectionUtils.newConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare(UEUE_NAME, false, false, false, null);
channel.basicQos(1); // 保证 取一个消费 队列给消费者发送消息时候 一个消息
for (int i = 0; i < 10; i++) {
// 创建message
String msg = "toov5_message";
System.out.println("生产者投递消息" + msg + i);
// 生产者发送消息
channel.basicPublish("", UEUE_NAME, null, msg.getBytes());
}
// 关闭通道和连接
channel.close();
connection.close(); }
}

Consumer1

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.toov5.utils.MQConnectionUtils; public class Consumer1 { //队列名称
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者启动..........1");
//创建新的连接
Connection connection = MQConnectionUtils.newConnection();
//创建Channel
final Channel channel = connection.createChannel();
// 消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg =new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
try {
//模拟应答等待时间
Thread.sleep(1000);
} catch (Exception e) { }finally {
channel.basicAck(envelope.getDeliveryTag(), false); //手动应答 告诉消息队列服务器 消费成功
}
}
};
//牵手模式设置 默认自动应答模式 true:自动应答模式
channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手动应答 }
}

Consumer2

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.toov5.utils.MQConnectionUtils; public class Consumer2 { //队列名称
private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者启动..........2");
//创建新的连接
Connection connection = MQConnectionUtils.newConnection();
//创建Channel
final Channel channel = connection.createChannel();
// 消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg =new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
try {
//模拟应答等待时间
Thread.sleep(300);
} catch (Exception e) { }finally {
channel.basicAck(envelope.getDeliveryTag(), false); //手动应答 告诉消息队列服务器 消费成功
}
}
};
//牵手模式设置 默认自动应答模式 true:自动应答模式
channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手动应答
}
}

运行结果:

睡眠少的(执行快的) 指定的多

 注意 每个消费者 必须要应答 一下! 队列服务器没有收到应答 就不会发送下一个给消费者~

最新文章

  1. ImportError: cannot import name &#39;_imagingtk&#39;
  2. 银行卡BIN: Bank Identification Number
  3. javascript 设计模式-----策略模式
  4. 黄聪:PHP页面跳转如何实现延时跳转
  5. 再谈C++继承
  6. cf 645F Cowslip Collections 组合数学 + 简单数论
  7. ftp自动上传下载文件脚本
  8. Away3D 的实体收集器流程1
  9. CodeSMART for VS.NET插件工具
  10. Hbase 配置问题(ERROR: org.apache.hadoop.hbase.PleaseHoldException: org.apache.hadoop.hbase.PleaseHoldEx)
  11. 6种基础排序算法java源码+图文解析[面试宝典]
  12. Proxysql读写分离配置
  13. python实现斐波那契数列
  14. (五)JMM的介绍
  15. Win7建立FTP站点
  16. String、StringBuffer与StringBuilder之间区别(转)
  17. 9 云计算系列之Cinder的安装与NFS作为cinder后端存储
  18. 定位性能问题的18个linux命令
  19. maxout激活函数
  20. 添加信任站点和允许ActiveX批处理

热门文章

  1. mac 下 pycharm 快捷键
  2. [ Laravel 5.6 文档 ]laravel数据库操作分页(自定义分页实现和自定义分页样式)
  3. java.String中的方法
  4. macOS10.12部署sonarqube5.6.3
  5. cocos2dx3.0戳青蛙游戏(打地鼠)
  6. Apollo-open-capacity-platform 微服务能力开发平台 (转)
  7. 【Python3.6】之在Windows中安装Python3.6.1
  8. 刨根问底 HTTP 和 WebSocket 协议(下)
  9. run ceph in docker
  10. eclipse tasks