.net常用的mqtt类库有2个,m2mqtt和mqttnet两个类库

当然了,这两个库的教程网上一搜一大把

但mqttnet搜到的教程全是2.7及以下版本的,但3.0版语法却不再兼容,升级版本会导致很多问题,今天进行了成功的升级,现记录下来

参考文档地址:https://github.com/chkr1011/MQTTnet/wiki/Client

上代码:

 ///开源库地址:https://github.com/chkr1011/MQTTnet
///对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms; namespace MqttServerTest
{
public partial class mqtt测试工具 : Form
{
private IMqttClient mqttClient = null;
private bool isReconnect = true; public mqtt测试工具()
{
InitializeComponent();
} private void Form1_Load(object sender, EventArgs e)
{ } private async void BtnPublish_Click(object sender, EventArgs e)
{
await Publish();
} private async void BtnSubscribe_ClickAsync(object sender, EventArgs e)
{
await Subscribe();
} private async Task Publish()
{
string topic = txtPubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("发布主题不能为空!");
return;
} string inputString = txtSendMessage.Text.Trim();
try
{ var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(inputString)
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build(); await mqttClient.PublishAsync(message);
}
catch (Exception ex)
{ Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"发布主题失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
})));
} } private async Task Subscribe()
{
string topic = txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic))
{
MessageBox.Show("订阅主题不能为空!");
return;
} if (!mqttClient.IsConnected)
{
MessageBox.Show("MQTT客户端尚未连接!");
return;
} // Subscribe to a topic
await mqttClient.SubscribeAsync(new TopicFilterBuilder()
.WithTopic(topic)
.WithAtMostOnceQoS()
.Build()
);
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}");
}))); } private async Task ConnectMqttServerAsync()
{
// Create a new MQTT client. if (mqttClient == null)
{
try
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder()
.WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional
.Build(); await mqttClient.ConnectAsync(options, CancellationToken.None);
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"连接到MQTT服务器成功!" + txtIp.Text);
})));
mqttClient.UseApplicationMessageReceivedHandler(e =>
{ Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"收到订阅消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
}))); });
}
catch (Exception ex)
{ Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
})));
}
}
} private void MqttClient_Connected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.Clear();
txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
})));
} private void MqttClient_Disconnected(object sender, EventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.Clear();
DateTime curTime = new DateTime();
curTime = DateTime.UtcNow;
txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine);
}))); //Reconnecting
if (isReconnect)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText("正在尝试重新连接" + Environment.NewLine);
}))); var options = new MqttClientOptionsBuilder()
.WithClientId(txtClientId.Text)
.WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text))
.WithCredentials(txtUsername.Text, txtPsw.Text)
//.WithTls()
.WithCleanSession()
.Build();
Invoke((new Action(async () =>
{
await Task.Delay(TimeSpan.FromSeconds());
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine);
}
})));
}
else
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText("已下线!" + Environment.NewLine);
})));
}
} private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}");
})));
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}");
})));
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
})));
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}");
})));
Invoke((new Action(() =>
{
txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}");
})));
} private void btnLogIn_Click(object sender, EventArgs e)
{
isReconnect = true;
Task.Run(async () => { await ConnectMqttServerAsync(); });
} private void btnLogout_Click(object sender, EventArgs e)
{
isReconnect = false;
Task.Run(async () => { await mqttClient.DisconnectAsync(); });
} }
}

最新文章

  1. sql中 truncate 、delete与drop区别
  2. google jquery用不了啦,你准备好了吗
  3. Log4Net组件的应用详解
  4. 由于C++编译器的分析机制所导致的声明问题
  5. PoEdu- C++阶段班【Po学校】-Lesson03_构造函数精讲 - 第5天
  6. sql server 分布式查询 和 主从服务器搭建
  7. getStyle(),修改样式属性
  8. (6) 深入理解Java Class文件格式(五)
  9. 【转载】Securing Kibana + Elasticsearch
  10. 自己写ORM框架 DBUtils_DG Java(C#的写在链接里)
  11. My Rules of Information
  12. HTML中动态图片切换JQuery实现
  13. adaboost学习资料收集
  14. es6新特性:
  15. Python重写C语言程序100例--Part9
  16. 两张图解读Java异常与断言
  17. JavaScript的计时器对象
  18. struts2 里escape="false"的问题?
  19. 无锁编程 - Double-checked Locking
  20. Django框架的使用教程--环境的搭建和项目的创建[一]

热门文章

  1. dfs 序 欧拉序
  2. Java知识体系框架
  3. Docker学习(一)环境准备安装centos7
  4. C# 根据年月日计算周次
  5. python3三元运算
  6. doT 这个模板 是怎么实现的?
  7. C语言寒假大作战02
  8. Day8-Python3基础-Socket网络编程
  9. mysql数据库my.ini配置文件中文详解
  10. Nginx(一) 简介