-- 在pom.xml导入依赖

<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

  

-- 在application.yml添加配置

spring:
#MQTT配置信息
mqtt:
username: bywmqtt
password: RuizxZWrqNBmgk1h7yd4
#MQTT-服务器连接地址,如果有多个,用逗号隔开 tcp://39.108.67.63:1883
url: tcp://39.108.67.63:1883
client:
id: test001
default:
topic: topicTest01
completionTimeout: 3000

-- 配置消息通道,连接,监听主题

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException; import lombok.extern.slf4j.Slf4j; /**
* mqtt配置
*/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttServerConfig { public static final String OUT_BOUND_CHANNEL = "mqttOutboundChannel";
public static final String INPUT_CHANNEL = "mqttInputChannel";
public static final String RECEIVE_TOPIC = "mqtt_receivedTopic";
public static final String TOPIC_1 = "TOPIC_1";
public static final String TOPIC_2 = "TOPIC_2";
public static final String[] SUB_TOPIC = {TOPIC_1, TOPIC_2}; @Value("${spring.mqtt.username:}")
private String username;
@Value("${spring.mqtt.password:}")
private String password;
@Value("${spring.mqtt.url:}")
private String hostUrl;
@Value("${spring.mqtt.client.id:}")
private String serviceId;
@Value("${spring.mqtt.default.topic:}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeout:}")
private int completionTimeout ; //连接超时 @Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
//心跳
mqttConnectOptions.setKeepAliveInterval(20);
//连接超时
mqttConnectOptions.setConnectionTimeout(30);
return mqttConnectOptions;
} @Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
} @Bean
@ServiceActivator(inputChannel = OUT_BOUND_CHANNEL)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(serviceId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
//消息是否永久保留
messageHandler.setDefaultRetained(true);
//消息发布服务质量
messageHandler.setDefaultQos(1);
return messageHandler;
} @Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
} //接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
} //配置client,监听的topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(serviceId+"_inbound", mqttClientFactory(), SUB_TOPIC);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
} //通过通道获取数据
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get(RECEIVE_TOPIC).toString();
log.info("[{}]主题接收到消息:{}", topic, message.getPayload().toString());
}
};
} }

-- 发送消息

@Autowired
private MqttGateway mqttGateway; @RequestMapping("/sendMqtt")
public ReturnResult sendMqtt(String topic, String sendData){
// 默认主题发送
mqttGateway.sendToMqtt(sendData);
// 指定主题推送
mqttGateway.sendToMqtt(topic, sendData);
return ResultBuild.success();
}

  

最新文章

  1. Objective-C中NSInvocation的使用
  2. Bug管理系统 BugFree
  3. 速战速决 (5) - PHP: 动态地创建属性和方法, 对象的复制, 对象的比较, 加载指定的文件, 自动加载类文件, 命名空间
  4. Asynchronous Jobs
  5. iOS打包导出时出现Missing iOS Distribution signing
  6. VS2010使用EventHandler发邮件
  7. Robotium学习笔记三
  8. oe 仓库管理
  9. Canvas之动态波浪效果_陈在真Sunny_chen_新浪博客
  10. shell中exec解析(转)
  11. Oracle-orclEXORIM
  12. 关于MAC设置免费的动态壁纸
  13. Composer之搭建自己的包工具
  14. eclipse安装Freemaker IDE插件
  15. CTEX(LaTeX) 编译 中文
  16. android 首字母迷糊查询 拼音查询 中英文混排查询
  17. vs下C++内存泄露检测
  18. APP的案例分析
  19. python图像处理(2)图像水印和PIL模式转化
  20. Nim游戏学习笔记

热门文章

  1. win10 启用Hyper-V并安装Ubuntu20.10
  2. 2.2 CPU 上下文切换是什么意思?(下)
  3. Linux命令学习—— fdisk -l 查看硬盘及分区信息
  4. ADAS可行驶区域道路积水反光区域的识别算法
  5. ONNX MLIR方法
  6. 将代码生成器带入TVM
  7. 三维点云去噪无监督学习:ICCV2019论文分析
  8. 用OpenCV4实现图像的超分别率
  9. 自监督学习(Self-Supervised Learning)多篇论文解读(下)
  10. 在NVIDIA-Jetson平台上构建智能多媒体服务器