50.RocketMQ (quickstart)
2024-08-27 13:33:21
要多给下属表功,绝不能抢功。
1.订阅消息
/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.example.quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; /**
* Consumer,订阅消息
*/
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}); consumer.start(); System.out.println("Consumer Started.");
}
}
2.生产消息
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message; /**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
producer.start(); for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest", // topic
"TagA", // tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
} producer.shutdown();
}
}
最新文章
- iOS-调试技巧
- mysql锁 实战测试代码
- ORA-28001: the password has expired (DBD ERROR: OCISessionBegin) EM无法登录
- 菜鸟学JS(四)——javascript为按钮注册回车事件(设置默认按钮)
- Linux Kernel Synchronization &;&; Mutual Exclusion、Linux Kernel Lock Mechanism Summarize
- 异步http框架简介&;实现原理
- codeforces B. Flag Day 解题报告
- NS3 日志(Logging)、命令行参数、Tracing系统概述(转载)
- Qt5.3企业版和开源版功能区别
- Class loader:static
- Hibernate框架进阶(中篇)之多表关系
- 夏令营提高班上午上机测试 Day 2 解题报告
- 简单易懂的解释c#的abstract和virtual的用法和区别
- django第三天
- Html:html是什麽、html文件结构
- Coding and Paper Letter(五十八)
- IIS7.5配置过程
- hdu 3038 给区间和,算出多少是错的
- LeetCode 589 N-ary Tree Preorder Traversal 解题报告
- FIREDAC不能识别SQL的某些字段类型