(二)RocketMq入门之消息发送和接收
2024-08-27 20:35:53
一、消息产生、发送
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("172.18.4.114:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = ; i < ; i++) {
Thread.sleep(); //每5秒发送一次MQ
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + " Hello RocketMQ ,QuickStart" + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
二、消息接收、消费
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("172.18.4.114:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getTopic()));
System.out.println(new String(msg.getTags()));
System.out.println("=== " + new String(msg.getBody()));
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
最新文章
- 【vijos】P1514天才的记忆
- Git 学习01
- SQL Server里PIVOT运算符的”红颜祸水“
- Android 文件夹命名规范 国际化资源
- 2. VS使用---HelloWorld
- loadrunner协议的选择
- mac下app store 无法完成您的购物操作
- 关于 JavaScript 数据类型判断
- 如何在Eclipse中查看Android API源码以及support包源码
- CSS 3 属性学习大纲
- Oracle-11g 基于 NBU 的 rman 冷备份及恢复
- centos6.X安装jdk
- RESTful规范建议
- C# ADO.NET的SqlDataReader对象,判断是否包含指定字段
- windows下Go升级及GoLand的安装激活
- mysql string 列类型
- 两个非空的<;div>;元素inline-block化后出现空白部分解决办法
- Android.DebugOnDevices
- Java使用String类格式化当前日期
- iview里select组件搜索后选中的数据和展示内容不一样
热门文章
- SQL2005,错误 0xc00470fe 数据流任务 产品级别对于 组件“源 - 2009_txt”(1) 而言不足
- 如何在Linux上面安装GCC 4.1.2
- 【转】十个经典的C开源项目代码
- SpringMVC -jquery实现分页
- [Javascript] Closure Cove, Common mistake
- 压缩 MongoDB 的数据文件
- 触发器三(行级DML触发器)(学习笔记)
- css before和after伪元素应用
- jquery 保留两个小数的方法
- oracle 快速备份表数据