应用场景:

利用GitHub上的温度传感器的例子作为讲解,实现从云端获取设备终端状态及使用Java模拟设备数据。其实和官网给的视频一样,只需要将终端设备的数据转换为支持MQTT协议传输的数据,云端就可以拿到数据了。

需要使用的文件及技术

1.云端:创建文件,及开启cloudCore

vim device.yaml    //案例中的设备配置,直接使用请删除所有注释
#apiVersion,该属性定义了我们从k8s获取改设备数据的url路径
apiVersion: devices.kubeedge.io/v1alpha1
kind: Device
metadata:
name: temperature3
labels:
description: 'temperature3'
manufacturer: 'test'
spec:
deviceModelRef:
name: temperature3-model #与设备模板名称进行绑定
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- sunsheen-edge #部署该设备的节点
# status中的属性为我们可以定义的属性,属性名为propertyName的属性与初始期望值
status:
twins:
- propertyName: temperatureState
desired:
metadata:
type: string
value: 'on'
- propertyName: temperature
desired:
metadata:
type: string
value: ''
vim devicemodel.yaml   //设备模板文件,直接使用请删除所有注释。
#apiVersion与设备端保持一致
apiVersion: devices.kubeedge.io/v1alpha1
kind: DeviceModel
metadata:
name: temperature3-model
namespace: default
spec:
#属性与设备的保持一致,这里可以设备权限,这里我们只能修改温度状态,无法控制实际温度
properties:
- name: temperatureState
description: Temperature collected from the edge device
type:
string:
accessMode: ReadWrite
defaultValue: 'on'
- name: temperature
description: Temperature collected from the edge device
type:
string:
accessMode: ReadOnly
defaultValue: ''
vim deployment.yaml  //使用deployment控制器(k8s内容), 创建POD,边缘节点会自动去拉取镜像(很慢,建议手动拉取,或配置私有镜像仓库)

apiVersion: apps/v1
kind: Deployment
metadata:
name: temperature3-mapper
labels:
app: temperature
spec:
replicas: 1
selector:
matchLabels:
app: temperature3
template:
metadata:
labels:
app: temperature3
spec:
hostNetwork: true
nodeSelector:
name: "sunsheen-edge"
containers:
- name: temperature3
image: kubeedge-mapper:v2.2 #需要部署的镜像
imagePullPolicy: IfNotPresent
securityContext:
privileged: true

2.边缘端:开启 mosquitto,启动edgeCore

mosquitto -d -p 1883  //边缘端开启mosquitto,用于传输消息

JAVA代码模拟设备推送接收与推送消息:

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<!-- http请求 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient-cache</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency> <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /**
* @author wanchen.chen
* @ClassName KubeedageClient
* @Despriction: MQTTP 连接类,用于推送/订阅 消息
* @date 2020/4/15 9:20
* @Version 1.0
**/
public class KubeedageClient { private MqttMessage message;
private MqttClient client;
private MqttConnectOptions options;
private MqttTopic clientTopic;
private MqttTopic serverTopic;
//定义主题,document为云端反馈的主题;update为边缘向云端推送的主题。temperature3为设备名称,其他都固定。
private static String clientTopicStr ="$hw/events/device/temperature3/twin/update/document";
private static String serverTopicStr ="$hw/events/device/temperature3/twin/update";
private static final String url ="tcp://0.0.0.0:1883";
//我这里是要打包为镜像部署,所有需要配置边缘节点的用户及密码
private static final String userName ="xxx";
private static final String password ="xxx"; private ScheduledExecutorService scheduler; public KubeedageClient(){
} /**
* 初始化
*/
public void start() {
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(url, "KubeEdgeClient", new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调
client.setCallback(new PushCallback());
clientTopic = client.getTopic(clientTopicStr);
serverTopic = client.getTopic(serverTopicStr);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
// options.setWill(clientTopoc, "close".getBytes(), 2, true);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 订阅主题消息
*/
public void listerData(){
//订阅消息
int[] Qos = {1};
String[] topic1 = {clientTopicStr};
try {
client.subscribe(topic1, Qos);
} catch (MqttException e) {
e.printStackTrace();
}
} /**
* push 消息到主题
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
// System.out.println("message is published completely! "
// + token.isComplete());
} /**
* 发送消息
* @param deviceInfo
*/
public void putData(String deviceInfo){
message = new MqttMessage();
message.setQos(2);
message.setRetained(true);
message.setPayload(deviceInfo.getBytes());
try {
publish(serverTopic,message);
} catch (MqttException e) {
e.printStackTrace();
}
} }
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage; /**
* @author wanchen.chen
* @ClassName PushCallback 发布消息的回调类
* @Despriction: 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
* @date 2020/4/15 9:17
* @Version 1.0
**/
public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
} public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
} public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面,消息只能被消费一次
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收的消息为:"+str);
} }

发送的消息结构:

.yaml文件,用于json数据结构

event_id: 0
timestamp: 0
twin:
temperature:
actual:
value: 0
metadata:
type: Updated
temperatureState:
actual:
value: height
metadata:
type: Updated

通过Java代码将其转换为JSON,将数据put进JSON中就可以发送了:

import org.yaml.snakeyaml.Yaml;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Map; /**
* @author wanchen.chen
* @ClassName AnalysisYAML
* @Despriction: 解析YAML文件的内容
* @date 2020/4/28 9:32
* @Version 1.0
**/
public class AnalysisYAML { /**
* 传参解析
* @param urlStr
* @return
*/
public Map<String,Object> getYamlData(String urlStr){
URL url = AnalysisYAML.class.getClassLoader().getResource(urlStr);
return analysisData(url);
} /**
* 默认解析
* @return
*/
public Map<String,Object> getYamlData(){
URL url = AnalysisYAML.class.getClassLoader().getResource("attribute.yaml");
return analysisData(url);
} /**
* 获取URL 解析内容
* @param url
* @return
*/
public Map<String,Object> analysisData(URL url){
InputStream input = null;
try {
input = new FileInputStream(url.getFile());
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Yaml yaml = new Yaml();
Map<String,Object> map = (Map<String,Object>)yaml.load(input);
return map;
}
}

镜像打包:

通过DockerFile将jar文件打包为镜像:

FROM java:latest
RUN mkdir -p /usr
RUN mkdir -p /usr/local
COPY . /usr/local/
WORKDIR /usr/local
EXPOSE 8892
ENTRYPOINT ["java","-jar","xxx.jar"] //在DockerFile 文件目录下创建镜像
docker build -t kubeedge-mapper:v2.0 .

最新文章

  1. 开源搜索引擎Iveely 0.8.0发布,终见天日
  2. python实现统计你一共写了多少行代码
  3. A C[HDU1570]
  4. 【jQuery UI 1.8 The User Interface Library for jQuery】.学习笔记.6.Dialog控件
  5. angular的post请求,SpringMVC后台接收不到参数值的解决方案
  6. trailingZeroes
  7. [Leetcode][Python]44:Wildcard Matching
  8. 在 Visual Studio 2010 中开发和部署 Windows Azure 应用程序
  9. 前端学PHP之日期与时间
  10. Codeforces Round #464 F. Cutlet
  11. Python编码规范(PEP8)
  12. k64 datasheet学习笔记11---Port Control and Interrupts (PORT)
  13. Python基础-python数据类型之列表(四)
  14. redis分布式工具类 ----RedisShardedPoolUtil
  15. 002.Git日常基础使用
  16. 添加用户到 sudo
  17. unicorn模拟执行学习
  18. 戴尔R720xd服务器系统安装前期环境实现
  19. sqlldr 远程数据库
  20. a标签去掉下划线

热门文章

  1. ZK安装、ZK配置、ZK集群部署踩过的大坑
  2. 【集群实战】inotify
  3. Hybrid Automata 混合自动机入门
  4. idea jdk版本切换
  5. Pig设计模式概要以及与SQL的设计模式的对比
  6. Java之JVM(初学者)
  7. select函数的使用
  8. 2020最新nginx+gunicorn+supervisor部署基于flask开发的项目的生产环境的详细攻略
  9. SQL 文件导入数据库
  10. CC2530ADC应用