支持下国产开源。

MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf

下载:emqttd-centos64-v2.0-rc.2-20161019.zip

安装:

$ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/

$ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019

$ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd

系统优化配置:

# ulimit -n 1048576

$ sudo sysctl -w fs.file-max=2097152

$ sudo sysctl -w fs.nr_open=2097152

$ sudo sysctl -w net.core.somaxconn=65535

$ sudo sysctl -p

修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):

$ cd /data/emqttd

$ vi etc/emq.conf

## Node name

node.name = emqttd@192.168.60.58

## Cookie for distributed node

node.cookie = emq_dist_cookie_533d99ckd9ji475

## Erlang Process Limit

node.process_limit = 2000000

## Sets the maximum number of simultaneously existing ports for this system

node.max_ports = 1000000

## Size of acceptor pool

mqtt.listener.tcp.acceptors = 64

## Maximum number of concurrent clients

mqtt.listener.tcp.max_clients = 1000000

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

## mqtt.listener.tcp.rate_limit = 100,10

## TCP Socket Options

mqtt.listener.tcp.backlog = 262144

## Distributed node port range

node.dist_listen_min = 6000

node.dist_listen_max = 6999

## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。

## Expired after 1 day:

## w - week

## d - day

## h - hour

## m - minute

## s - second

mqtt.session.expired_after = 2w

# 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。

## Console log. Enum: off, file, console, both

log.console = both

## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency

log.console.level = info

## Console log file

log.console.file = log/console.log

## Error log file

log.error.file = log/error.log

## Enable the crash log. Enum: on, off

log.crash = on

log.crash.file = log/crash.log

修改下启动脚本(在stop处增加一行):

$ vi bin/emqttd

...

stop)

# Wait for the node to completely stop...

PID="$(relx_get_pid)"

if ! relx_nodetool "stop"; then

exit 1

fi

while $(kill -s 0 "$PID" 2>/dev/null);

do

sleep 1

done

killall epmd

;;

启动:

$ cd emqttd

直接进入控制台模式:

$ ./bin/emqttd console

后台运行模式:

$ ./bin/emqttd start

$ ./bin/emqttd_ctl status

$ ./bin/emqttd stop

开启防火墙:

端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --add-port=8883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8883/tcp

sudo firewall-cmd                      --zone=public --add-port=8083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8083/tcp

sudo firewall-cmd                      --zone=public --add-port=18083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=18083/tcp

sudo firewall-cmd                      --zone=public --add-port=4369/tcp

sudo firewall-cmd --permanent --zone=public --add-port=4369/tcp

sudo firewall-cmd                      --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd --permanent --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all

查看各台的启动状态:

http://192.168.60.55:8083/status

http://192.168.60.55:18083/      用户名/密码: admin / public

$ vi data/configs/vm.xxx.args

$ vi data/configs/app.xxx.conf

LOG位置:

$ vi log/

把节点加入集群:

在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):

$ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55

$ ./bin/emqttd_ctl cluster status

Cluster status: [{running_nodes,['emqttd@192.168.60.58',

'emqttd@192.168.60.56',

'emqttd@192.168.60.57',

'emqttd@192.168.60.55']}]

把节点退出集群:

本机退出集群:

$ ./bin/emqttd_ctl cluster leave

把某节点退出集群:

$ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56

测试一下:

下载安装MTQQ协议的客户端:https://mosquitto.org/download/

$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm

$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm

订阅:

$ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111

发布:另开一个ssh或者另外机器上执行:

$ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7"

参数:

-h: 连接到哪台broker。

-p: 连接端口。

-t: topic名字,topic类似文件系统的组织方式,以"/"分隔符来分层,订阅者订阅时可以使用通配符"+"和"#",发布者不能使用通配符。

-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。

-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。

-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。

-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n  -u user -P 123456。

-m: 该topic的一条消息内容。

加入MariaDB的认证:

修改配置:

$ vi ./etc/emq.conf

## Allow Anonymous authentication

mqtt.allow_anonymous = false

## Default ACL File

mqtt.acl_file = etc/acl.conf

$ vi ./data/loaded_plugins

emq_dashboard.

emq_auth_mysql.

修改plugins配置:

$ vi etc/plugins/emq_auth_mysql.conf

## Mysql Server

auth.mysql.server = 192.168.60.60:3306

## Mysql Pool Size

auth.mysql.pool = 8

## Mysql Username

auth.mysql.username = root

## Mysql Password

auth.mysql.password = 123456

## Mysql Database

auth.mysql.database = mqtt

## Variables: %u = username, %c = clientid

## Authentication Query: select password only

auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1

## Password hash: plain, md5, sha, sha256, pbkdf2

auth.mysql.password_hash = plain

## %% Superuser Query

auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

## ACL Query Command

auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

## ACL nomatch

auth.mysql.acl_nomatch = deny

往MariaDB插入初始化数据库和表(引擎需要改成InnDB):

认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:

建立mqtt数据库:

$ mysql -uroot -p123456 -hxxxx

MariaDB [test]> create database mqtt;

用户表:

MariaDB [test]> CREATE TABLE `mqtt_user` (

`id` int(11) unsigned NOT NULL AUTO_INCREMENT,

`username` varchar(100) DEFAULT NULL,

`password` varchar(100) DEFAULT NULL,

`salt` varchar(20) DEFAULT NULL,

`is_superuser` tinyint(1) DEFAULT 0,

`created` datetime DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `mqtt_username` (`username`)

) DEFAULT CHARSET=utf8;

用户表插入测试数据:

MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)

VALUES

(1,'superuser','123456','123456',True,'2016-10-26 10:00:00'),

(2,'user','123456','123456',False,'2016-10-26 10:01:00');

acl表:

MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (

`id` int(11) unsigned NOT NULL AUTO_INCREMENT,

`allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',

`ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',

`username` varchar(100) DEFAULT NULL COMMENT 'Username',

`clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',

`access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',

`topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',

PRIMARY KEY (`id`)

) DEFAULT CHARSET=utf8;

acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):

MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)

VALUES

(1,1,NULL,'$all',NULL,2,'#'),

(2,1,NULL,'$all',NULL,1,'#'),

(3,1,NULL,'$all',NULL,3,'#'),

(4,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),

(5,1,'127.0.0.1',NULL,NULL,2,'#'),

(6,1,NULL,'dashboard',NULL,1,'$SYS/#');

测试,重启每台服务器后执行:

$ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112  -u user -P 123456

$ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 1" -u user -P 123456

重新初始化emq:

$ bin/emqttd stop

$ rm -rf data/mnesia/*

$ rm -rf data/configs/*

$ rm -rf log/*

$ bin/emqttd start

$ bin/emqttd_ctl cluster join emqttd@192.168.60.55

使用Haproxy来实现负载分担:

因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。

$ vi /etc/haproxy/haproxy.cfg

################## EMQ ######################

listen emq_emqttd *:1883

mode tcp

balance source

log global

timeout connect         25s

timeout client          0

timeout server          0

option tcpka

option tcplog

option tcp-check

server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

开启haproxy上的防火墙:

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all

使用:

broker操作:

显示broker状态:

$ ./bin/emqttd_ctl status

显示broker版本、启用时间等:

$ ./bin/emqttd_ctl broker

显示broker的pubsub进程状态、内存、队列长度、规约数等:

$ ./bin/emqttd_ctl broker  pubsub

显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:

$ ./bin/emqttd_ctl broker stats

显示broker的测量信息:底层流量、MQTT报文数、消息数等:

$ ./bin/emqttd_ctl broker metrics

Cluster操作:

$ ./bin/emqttd_ctl cluster

cluster join <Node>                             # Join the cluster

cluster leave                                   # Leave the cluster

cluster remove <Node>                           # Remove the node from cluster

cluster status                                  # Cluster status

Client操作:

$ ./bin/emqttd_ctl clients

clients list                                    # List all clients

clients show <ClientId>                         # Show a client

clients kick <ClientId>                         # Kick out a client

Sessions操作:

$ ./bin/emqttd_ctl sessions

sessions list                                   # List all sessions

sessions list persistent                        # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)

sessions list transient                         # List all transient sessions

sessions show <ClientId>                        # Show a session

Routes操作:

$ ./bin/emqttd_ctl routes

routes list                                     # List all routes

routes show <Topic>                             # Show a route

Topics操作:

$ ./bin/emqttd_ctl topics

topics list                                     # List all topics

topics show <Topic>                             # Show a topic

Subscription订阅者操作:

$ ./bin/emqttd_ctl subscriptions

subscriptions list                              # List all subscriptions

subscriptions show <ClientId>                   # Show subscriptions of a client

subscriptions add <ClientId> <Topic> <QoS>      # Add a static subscription manually

subscriptions del <ClientId>                    # Delete static subscriptions manually

subscriptions del <ClientId> <Topic>            # Delete a static subscription manually

Plugins插件操作:

$ ./bin/emqttd_ctl plugins

plugins list                                    # Show loaded plugins

plugins load <Plugin>                           # Load plugin

plugins unload <Plugin>                         # Unload plugin

$ ./bin/emqttd_ctl plugins list

Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)

Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)

Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)

Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)

Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)

Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)

Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)

Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)

Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)

Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)

Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)

Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)

Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)

Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)

Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)

Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)

Bridges桥接操作:

$ ./bin/emqttd_ctl bridges

bridges list                                    # List bridges

bridges options                                 # Bridge options

bridges start <Node> <Topic>                    # Start a bridge

bridges start <Node> <Topic> <Options>          # Start a bridge with options

bridges stop <Node> <Topic>                     # Stop a bridge

vm虚机(erlang虚机)性能查看:

$ ./bin/emqttd_ctl vm all

cpu/load1               : 0.05

cpu/load5               : 0.05

cpu/load15              : 0.07

memory/total            : 166404232

memory/processes        : 32564328

memory/processes_used   : 32563952

memory/system           : 133839904

memory/atom             : 959633

memory/atom_used        : 954350

memory/binary           : 46088

memory/code             : 28250535

memory/ets              : 5741416

process/limit           : 2097152

process/count           : 288

io/max_fds              : 1000000

io/active_fds           : 1

ports/count           : 25

ports/limit           : 1048576

端口使用情况查看:

$ ./bin/emqttd_ctl listeners

mnesia数据库信息查看:

$ bin/emqttd_ctl mnesia

管理dashboard用户:

$ ./bin/emqttd_ctl admins

admins add <Username> <Password> <Tags>         # Add dashboard user

admins passwd <Username> <Password>             # Reset dashboard user password

admins del <Username>                           # Delete dashboard user

追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

追踪客户端(Client):

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" "trace_clientid.log"

追踪主题(Topic):

./bin/emqttd_ctl trace topic "topic1" "trace_topic.log"

查询追踪:

./bin/emqttd_ctl trace list

停止追踪:

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" off

./bin/emqttd_ctl trace topic "topic1" off

钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):

钩子                                说明

client.connected             客户端上线

client.subscribe               客户端订阅主题前

client.unsubscribe           客户端取消订阅主题

session.subscribed          客户端订阅主题后

session.unsubscribed      客户端取消订阅主题后

message.publish             MQTT消息发布

message.delivered          MQTT消息送达

message.acked               MQTT消息回执

client.disconnected        客户端连接断开

钩子使用例子:

-module(emqttd_plugin_template).

-export([load/1, unload/0]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

load(Env) ->

emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),

emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),

emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

on_message_publish(Message, _Env) ->

io:format("publish ~s~n", [emqttd_message:format(Message)]),

{ok, Message}.

on_message_delivered(ClientId, _Username, Message, _Env) ->

io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),

{ok, Message}.

on_message_acked(ClientId, _Username, Message, _Env) ->

io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),

{ok, Message}.

unload() ->

emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),

emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4),

emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4).

MQTT协议的客户端库:

https://github.com/mqtt/mqtt.github.io/wiki/libraries

http://www.eclipse.org/paho/downloads.php

做IM即时通信协议时的考虑:

MQTT作为发布、订阅系统,作为消息推送是很好的。

如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如"USER/1111"),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如"ADMIN/broadcast","ADMIN/1111"),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如"GROUP/1111")。

--------------------- 本文来自 王树民ITDATA 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/wangshuminjava/article/details/80589899?utm_source=copy

最新文章

  1. C#在二维码中添加圆角logo
  2. Launch Mode
  3. $event 获取对象
  4. Collection框架
  5. 模拟搭建Web项目的真实运行环境(一)
  6. 数值分析之QR因子分解篇
  7. [No00001C]不想背单词患者的福音!-快来定制你的个性词包-不想记、记不牢,这可怎么办?
  8. Firefox上运行自动化测试脚本提示元素无法点击“WebDriverException: Message: Element is not clickable at point“解决方法
  9. 几个容易被忽略的mysql知识
  10. Jquery EasyUI DataGrid .net实例
  11. 第一次作业---安卓开发工具Android studio发展演变
  12. java web 实现验证码
  13. Java编译原理
  14. 访问System x3650 IMM2的几种方式
  15. 当桌面的快捷方式图标左下角出现一个X(叉)的时候应该怎么去掉
  16. 【数论】卢卡斯定理模板 洛谷P3807
  17. asp.net 按钮执行前后台方法——前台弹出提示信息,确认后继续执行后台方法,取消则不执行后台方法
  18. php记录
  19. select2插件 多选框动态初始化值
  20. js 限制输入框只能输入数字的问题

热门文章

  1. 洛谷 P3383 【模板】线性筛素数-线性筛素数(欧拉筛素数)O(n)基础题贴个板子备忘
  2. dfs序七个经典问题[转]
  3. 揭秘响应式web设计
  4. Android中的MVC,MVP和MVVM
  5. [Android]Android 布局中如何让图片和文字居中显示?
  6. 利用hsdis和JITWatch查看分析HotSpot JIT compiler生成的汇编代码
  7. Delphi 使窗体Showmodal后可以操作其他窗体
  8. Layui 使用问题汇总
  9. t-sql 笔记(2)
  10. Mysql的共享锁和排他锁(转载)