初始化消费者和生产者

  • 生产者 设置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。

    首先 编辑一个配置类,将关于配置rocketmq的东西写在配置类中

`

@Component

@Getter

@Setter

@Slf4j

public class RocketMqConfig {

@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe; private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null; public String getTopic() {
return TOPIC;
} public String getTag() {
return TAG;
} public String getGroupId() {
return GROUP_ID;
} public Properties getProperties() {
log.info("accessKey:" + getAccessKey());
log.info("secretKey:" + getSecretKey());
log.info("naemServer:" + getNameServe());
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
return properties;
}

}

  • 生产者初始化

    `

    import com.aliyun.openservices.ons.api.ONSFactory;

    import com.aliyun.openservices.ons.api.Producer;

    import lombok.extern.slf4j.Slf4j;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;

/**

  • @author

  • @Date 2021/11/3.

    */

    @Component

    @Slf4j

    public class RocketMqProducerInit {

    @Autowired

    private RocketMqConfig mqConfig;

    private static Producer producer;

    @PostConstruct

    public void init(){

    log.info("启动RocketMq生产者!");

    producer = ONSFactory.createProducer(mqConfig.getProperties());

    // 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown

    producer.start();

    }

    /**

    • 初始化生产者
    • @return

      */

      public Producer getProducer(){

      return producer;

      }

}

`

`

  • 编写生产者

    `

    import cn.hutool.core.date.DateUtil;

    import com.aliyun.openservices.ons.api.Message;

    import com.aliyun.openservices.ons.api.SendResult;

    import com.aliyun.openservices.ons.api.exception.ONSClientException;

    import lombok.extern.slf4j.Slf4j;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

import static cn.hutool.core.convert.Convert.longToBytes;

@Component

@Slf4j

public class ProducerSendMessageHandler {

@Autowired

private RocketMqConfig config;

@Autowired
private RocketMqProducerInit producer; public void sendGetLiveStatus(long periodId) { Message message = new Message(config.getTopic(), config.getTag(), longToBytes(periodId));
message.setStartDeliverTime(System.currentTimeMillis() + 100000);
try {
SendResult send = this.producer.getProducer().send(message);
log.info("发送生产消息时间:"+DateUtil.now() + "发送延时消息成功! Topic is:" + config.getTopic() + "msgId is: " +
send.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}

}

`

  • 监听者

`

import cn.hutool.core.date.DateUtil;

import com.aliyun.openservices.ons.api.*;

import com.dapeng.cloud.service.live.application.period.PeriodManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import static cn.hutool.core.convert.Convert.bytesToLong;

@Component

@Slf4j

public class LiveMessageStreamListener {

@Autowired
private RocketMqConfig mqConfig; private static Consumer consumer;
@Autowired
private PeriodManager periodManager; @PostConstruct
public void init(){
log.info("消费者启动!");
consumer = ONSFactory.createConsumer(mqConfig.getProperties());
//监听第一个topic,new对应的监听器
consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) { log.info("消费者接收到MQ消息时间: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
try {
//调用场次接口
long periodId = bytesToLong(message.getBody());
log.info("获取到 场次 id =" +periodId);
log.info("开始执行调用异常回调处理");
// 调用自己的业务代码进行操作
log.info("执行调用异常回调处理结果:"+b);
//消费成功,继续消费下一条消息
return Action.CommitMessage;
} catch (Exception e) {
log.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
//消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
return Action.ReconsumeLater;
}
}
});
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
}
/**
* 初始化消费者
* @return consumer
*/
public Consumer getConsumer(){
return consumer;
}

}

`

最新文章

  1. [转]:Delphi 中的哈希表(1): THashedStringList
  2. (分享)根据IP获取地理位置(百度API)
  3. 设置arc/非arc
  4. 城堡 (spfa+cheng)
  5. Nessus基本命令
  6. Cython:基础教程(1) 语法
  7. android编程常见问题-程序真机中不显示
  8. 公交CPU卡原理
  9. mac下的home键、end键以及insert键的替代
  10. AttributeError at /home/home/ Exception Type: AttributeError at /home/home/
  11. oracle里的优化器
  12. C#中DBNull.Value和Null的用法和区别
  13. mac 开发新户攻略-brew
  14. Windows下安装MySQL5.7.18的方法
  15. Elasticsearch 集群和索引健康状态及常见错误说明
  16. MySQL外键使用详解
  17. nodejs & npm & gulp 安装和配置
  18. Intro to Jedis – the Java Redis Client Library
  19. php的ob缓存详解
  20. 各JAVA开发框架版本及对应信息

热门文章

  1. 「SOL」打扫笛卡尔cartesian (模拟赛)
  2. Python自动发邮件(QQ为例)
  3. Centos 7 环境 安装todesk异常
  4. Babel与webpack
  5. Win10下如何添加“中文简体(美式键盘)”输入法
  6. 批量统一调整PDF页面尺寸大小
  7. C# IObservable与IObserver通知机制 观察者模式(推式模型)
  8. C++11之线程库
  9. py09函数简介
  10. python之路3:文件操作和函数基础