RabbitMQ的使用(五)RabbitMQ Java Client简单生产者、消费者代码示例
2024-08-28 03:38:33
pom文件:
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>2.0.2.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
连接工具类:
- package top.wj.rabbitmq.client.utils;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.util.HashMap;
- import java.util.Map;
- public class ChannelUtils {
- public static Channel getChannelInstance(String connectionDescription) {
- try {
- ConnectionFactory connectionFactory = getConnectionFactory();
- Connection connection = connectionFactory.newConnection(connectionDescription);
- return connection.createChannel();
- } catch (Exception e) {
- throw new RuntimeException("获取Channel连接失败");
- }
- }
- private static ConnectionFactory getConnectionFactory() {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 配置连接信息
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- // 网络异常自动连接恢复
- connectionFactory.setAutomaticRecoveryEnabled(true);
- // 每10秒尝试重试连接一次
- connectionFactory.setNetworkRecoveryInterval(10000);
- return connectionFactory;
- }
- }
创建生产者:
- package top.wj.rabbitmq.client.producer;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import top.wj.rabbitmq.client.utils.ChannelUtils;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.TimeoutException;
- public class MessageProducer {
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = ChannelUtils.getChannelInstance("队列消息生产者");
- // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
- channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
- // 设置消息属性 发布消息 (交换机名, Routing key, 可靠消息相关属性 后续会介绍, 消息属性, 消息体);
- AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
- channel.basicPublish("rabbitmq.wj", "add", false, basicProperties, "body中的消息内容!".getBytes());
- }
- }
创建消费者:
- package top.wj.rabbitmq.client.consumer;
- import com.rabbitmq.client.*;
- import top.wj.rabbitmq.client.utils.ChannelUtils;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.TimeoutException;
- public class MessageConsumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = ChannelUtils.getChannelInstance("队列消息消费者");
- // 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性);
- AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("rabbitmq.wj.add", true, false, false, new HashMap<>());
- // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
- channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
- // 将队列Binding到交换机上 (队列名, 交换机名, Routing key, 绑定属性);
- channel.queueBind(declareOk.getQueue(), "rabbitmq.wj", "add", new HashMap<>());
- // 消费者订阅消息 监听如上声明的队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println(consumerTag);
- System.out.println(envelope.toString());
- System.out.println(properties.toString());
- System.out.println("消息内容:" + new String(body));
- }
- };
- channel.basicConsume(declareOk.getQueue(), true, "消费者标签",defaultConsumer );
- }
- }
控制台打印信息:
- 消费者标签
- Envelope(deliveryTag=1, redeliver=false, exchange=rabbitmq.wj, routingKey=add)
- #contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
- 消息内容:body中的消息内容!
rabbitmq管理界面显示:
最新文章
- Java内存模型深度解析:volatile--转
- python课程第四周重点记录
- 24.编写一个Car类,具有String类型的属性品牌,具有功能drive; 定义其子类Aodi和Benchi,具有属性:价格、型号;具有功能:变速; 定义主类E,在其main方法中分别创建Aodi和Benchi的对象并测试对象的特 性。
- BizTalk开发系列(三十六) Orchestration单实例执行
- R.java 文件内报错:Underscores can only be used with source level 1.7 or greater。
- pclint vc6/linux 工程,测试正常
- jQuery的简单入门练习
- Ip 地址
- 指示灯组与3个复位按钮的介绍Arduino Yun快速入门教程
- 2-sat按照最小字典序输出可行解(hdu1814)
- iOS Architecture Patterns
- HttpWebRequest,HttpWebResponse的用法和用途
- 【动态规划】Vijos P1121 马拦过河卒
- rtmp协议介绍
- pull类型消息中间件-消息服务端(三)
- 【2017-03-30】JS-document对象
- sqlserver笔记
- Vue(基础六)_嵌套路由(续)
- spring注解第04课 @Import
- ******十三 ******、软设笔记【操作系统】-磁盘管理、虚设备与SPOOLing系统