阿里云 rocketMq 延时消息
初始化消费者和生产者
- 生产者 设置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。
首先 编辑一个配置类,将关于配置rocketmq的东西写在配置类中
`
@Component
@Getter
@Setter
@Slf4j
public class RocketMqConfig {
@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe;
private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null;
public String getTopic() {
return TOPIC;
}
public String getTag() {
return TAG;
}
public String getGroupId() {
return GROUP_ID;
}
public Properties getProperties() {
log.info("accessKey:" + getAccessKey());
log.info("secretKey:" + getSecretKey());
log.info("naemServer:" + getNameServe());
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
return properties;
}
}
- 生产者初始化
`
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
@author
@Date 2021/11/3.
*/
@Component
@Slf4j
public class RocketMqProducerInit {@Autowired
private RocketMqConfig mqConfig;private static Producer producer;
@PostConstruct
public void init(){
log.info("启动RocketMq生产者!");
producer = ONSFactory.createProducer(mqConfig.getProperties());
// 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
producer.start();
}
/**- 初始化生产者
- @return
*/
public Producer getProducer(){
return producer;
}
}
`
`
- 编写生产者
`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static cn.hutool.core.convert.Convert.longToBytes;
@Component
@Slf4j
public class ProducerSendMessageHandler {
@Autowired
private RocketMqConfig config;
@Autowired
private RocketMqProducerInit producer;
public void sendGetLiveStatus(long periodId) {
Message message = new Message(config.getTopic(), config.getTag(), longToBytes(periodId));
message.setStartDeliverTime(System.currentTimeMillis() + 100000);
try {
SendResult send = this.producer.getProducer().send(message);
log.info("发送生产消息时间:"+DateUtil.now() + "发送延时消息成功! Topic is:" + config.getTopic() + "msgId is: " +
send.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
`
- 监听者
`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.*;
import com.dapeng.cloud.service.live.application.period.PeriodManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static cn.hutool.core.convert.Convert.bytesToLong;
@Component
@Slf4j
public class LiveMessageStreamListener {
@Autowired
private RocketMqConfig mqConfig;
private static Consumer consumer;
@Autowired
private PeriodManager periodManager;
@PostConstruct
public void init(){
log.info("消费者启动!");
consumer = ONSFactory.createConsumer(mqConfig.getProperties());
//监听第一个topic,new对应的监听器
consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
log.info("消费者接收到MQ消息时间: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
try {
//调用场次接口
long periodId = bytesToLong(message.getBody());
log.info("获取到 场次 id =" +periodId);
log.info("开始执行调用异常回调处理");
// 调用自己的业务代码进行操作
log.info("执行调用异常回调处理结果:"+b);
//消费成功,继续消费下一条消息
return Action.CommitMessage;
} catch (Exception e) {
log.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
return Action.ReconsumeLater;
}
}
});
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return consumer
*/
public Consumer getConsumer(){
return consumer;
}
}
`
最新文章
- [转]:Delphi 中的哈希表(1): THashedStringList
- (分享)根据IP获取地理位置(百度API)
- 设置arc/非arc
- 城堡 (spfa+cheng)
- Nessus基本命令
- Cython:基础教程(1) 语法
- android编程常见问题-程序真机中不显示
- 公交CPU卡原理
- mac下的home键、end键以及insert键的替代
- AttributeError at /home/home/ Exception Type: AttributeError at /home/home/
- oracle里的优化器
- C#中DBNull.Value和Null的用法和区别
- mac 开发新户攻略-brew
- Windows下安装MySQL5.7.18的方法
- Elasticsearch 集群和索引健康状态及常见错误说明
- MySQL外键使用详解
- nodejs &; npm &; gulp 安装和配置
- Intro to Jedis – the Java Redis Client Library
- php的ob缓存详解
- 各JAVA开发框架版本及对应信息