1.简介

  一个java写的mqtt客户端。项目地址:

  https://github.com/fusesource/mqtt-client

2.引入fusesource-mqtt-client库

  • File--->Project Structure--->Dependencies
  • 点绿色+
  • 在弹出的窗口中输入“‘mqtt-client”回车搜索
  • 在结果中选择org.fusesource.mqtt-client:mqtt-client:1.xxx

3.示例代码

3.1 参考代码

activeMQ服务端软件内提供的示例代码 apache-activemq-5.15.0/examples/mqtt/java/
dzone提供的示例 https://dzone.com/articles/android-mqtt-activemq
github上的示例代码 https://github.com/fusesource/mqtt-client#using-the-callbackcontinuation-passing-based-api

3.2 效果

      

3.3 源码

 package com.example.tt.mqtt;

 import android.app.NotificationManager;
 import android.app.PendingIntent;
 import android.content.Context;
 import android.content.Intent;
 import android.os.Bundle;
 import android.support.v4.app.TaskStackBuilder;
 import android.support.v7.app.AppCompatActivity;
 import android.support.v7.app.NotificationCompat;
 import android.util.Log;
 import android.view.View;
 import android.widget.Button;
 import android.widget.CheckBox;
 import android.widget.CompoundButton;
 import android.widget.EditText;
 import android.widget.TextView;
 import android.widget.ToggleButton;

 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.Callback;
 import org.fusesource.mqtt.client.CallbackConnection;
 import org.fusesource.mqtt.client.Listener;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;

 import java.net.URISyntaxException;

 public class MainActivity extends AppCompatActivity implements View.OnClickListener,CompoundButton.OnCheckedChangeListener {

     final static String TAG = "MQTTClient";

     //UI
     ToggleButton    btnConnect;
     Button          btnPublish, btnSubscribe;
     EditText        edtServer,edtMessage,edtTopic,edtClientID;
     TextView        received;
     CheckBox        cbxPersist;

     //MQTT
     final static String clientId    = "android";
     final ;
     final static String host        = "192.168.1.101";
     final static String user        = "guest";
     final ;
     final static String password    = "admin";

     MQTT                mqtt                ;
     Listener            listener            ;
     CallbackConnection  callbackConnection  ;
     Callback<Void>      connectCallback     ;
     Callback<byte[]>    subscribeCallback   ;
     Callback<Void>      publishCallback     ;
     Callback<Void>      disconnectCallback  ;

     {
         connectCallback = new Callback<Void>(){

             @Override
             public void onSuccess(Void value) {
                 Log.d(TAG, "connectCallback : onSuccess");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("connectCallback success");
                     }
                 });

             }
             @Override
             public void onFailure(Throwable value) {
                 value.printStackTrace();
                 Log.d(TAG, "connectCallback : failure");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("connectCallback failure");
                     }
                 });
                 System.exit(-);
             }
         };
         disconnectCallback = new Callback<Void>(){

             public void onSuccess(Void value) {
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("disconnect success");
                     }
                 });
             }
             public void onFailure(Throwable e) {
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("disconnect failure");
                     }
                 });
             }
         };

         listener = new Listener() {

             @Override
             public void onConnected() {
                 Log.d(TAG, "listener onConnected");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("listener onConnected");
                     }
                 });
             }

             @Override
             public void onDisconnected() {
                 Log.d(TAG, "listener onDisconnected");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("listener onDisconnected");
                     }
                 });
             }

             @Override
             public void onPublish(final UTF8Buffer topic, Buffer msg, Runnable ack) {
                 final String body = msg.utf8().toString();
                 Log.d(TAG, "onPublish: " + body);
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         makeNotification(topic.toString(),body);
                         received.append("\nreceived : " + body);
                     }
                 });
             }

             @Override
             public void onFailure(Throwable value) {
                 Log.d(TAG, "listener failure");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("listener failure");
                     }
                 });
             }
         };

         subscribeCallback = new Callback<byte[]>() {

             public void onSuccess(byte[] qoses) {
                 Log.d(TAG, "subscribe : success");

                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("subscribe " + edtTopic.getText().toString() + ": success");
                     }
                 });
             }
             public void onFailure(Throwable value) {
                 value.printStackTrace();
                 Log.d(TAG, "subscribe : failure");
                 received.post(new Runnable() {
                     @Override
                     public void run() {
                         received.setText("subscribe " + edtTopic.getText().toString() + ": failure");
                     }
                 });
                 System.exit(-);
             }
         };
         publishCallback = new Callback<Void>() {
             @Override
             public void onSuccess(Void value) {
                 Log.d(TAG, "onSuccess: ");
             }

             @Override
             public void onFailure(Throwable value) {
                 Log.d(TAG, "onFailure: ");
             }
         };
     }

     void connect(){
         callbackConnection.connect(connectCallback);
     }

     void disconnect(){
         callbackConnection.disconnect(disconnectCallback);
     }

     void subscribe(){

         String topicName = edtTopic.getText().toString().trim();

         Topic topics[] = new Topic[]{new Topic(topicName,QoS.AT_LEAST_ONCE)};

         callbackConnection.subscribe(topics,subscribeCallback);

     }

     void publish(){

         String data = edtMessage.getText().toString();

         String topicName = edtTopic.getText().toString().trim();

         callbackConnection.publish(topicName,data.getBytes(),QoS.AT_LEAST_ONCE,false,publishCallback);

     }

     void initMqtt(){

         mqtt = new MQTT();
         try {
             mqtt.setHost(host, port);
             mqtt.setUserName(user);
             mqtt.setPassword(password);
             mqtt.setKeepAlive(keepAlive);
             mqtt.getClientId();
             callbackConnection = mqtt.callbackConnection();
             callbackConnection.listener(listener);

         } catch (URISyntaxException e) {
             e.printStackTrace();
             Log.e(TAG,"-=-=-=-=-=-=------------====\n initMqtt exception : " + e.getMessage());
         }
     }

     @Override
     protected void onCreate(Bundle savedInstanceState) {
         super.onCreate(savedInstanceState);
         setContentView(R.layout.activity_main);

         received    = (TextView)    findViewById(R.id.txt_received);
         btnSubscribe= (Button)      findViewById(R.id.btn_subscribe);
         btnConnect  = (ToggleButton)findViewById(R.id.btn_connect);
         btnPublish  = (Button)      findViewById(R.id.btn_publish);
         edtServer   = (EditText)    findViewById(R.id.edt_server);
         edtTopic    = (EditText)    findViewById(R.id.edt_topic);
         edtMessage  = (EditText)    findViewById(R.id.edt_message);
         edtClientID = (EditText)    findViewById(R.id.edt_clientID);
         cbxPersist  = (CheckBox)    findViewById(R.id.cbx_persist);

         btnConnect  .setOnClickListener(this);
         btnConnect  .setOnCheckedChangeListener(this);
         cbxPersist  .setOnCheckedChangeListener(this);
         btnSubscribe.setOnClickListener(this);
         btnPublish  .setOnClickListener(this);

         initMqtt();

     }
     void makeNotification(final String title,final String content){

         NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this);
         mBuilder.setSmallIcon(R.drawable.mail_3_small);//must
         mBuilder.setContentTitle(title);
         mBuilder.setContentText(content);
         // Creates an explicit intent for an Activity in your app
         Intent resultIntent = new Intent(this, MainActivity.class);

         // The stack builder object will contain an artificial back stack for the
         // started Activity.
         // This ensures that navigating backward from the Activity leads out of
         // your app to the Home screen.
         TaskStackBuilder stackBuilder = TaskStackBuilder.create(this);
         // Adds the back stack for the Intent (but not the Intent itself)
         stackBuilder.addParentStack(MainActivity.class);
         // Adds the Intent that starts the Activity to the top of the stack
         stackBuilder.addNextIntent(resultIntent);
         PendingIntent resultPendingIntent = stackBuilder.getPendingIntent(,PendingIntent.FLAG_UPDATE_CURRENT);
         mBuilder.setContentIntent(resultPendingIntent);
         NotificationManager mNotificationManager =
                 (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);

         // mNotificationId is a unique integer your app uses to identify the
         // notification. For example, to cancel the notification, you can pass its ID
         // number to NotificationManager.cancel().
         mNotificationManager.notify(R.string.app_name, mBuilder.build());
     }

     void blocking(){
         BlockingConnection connection = mqtt.blockingConnection();
         try {
             connection.connect();
             //publish
             connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

             //subscribe
             Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
             byte[] qoses = connection.subscribe(topics);

             //receive message
             Message message = connection.receive();
             System.out.println(message.getTopic());
             byte[] payload = message.getPayload();
             // process the message then:
             message.ack();

             //disconnect
             connection.disconnect();
         } catch (Exception e) {
             e.printStackTrace();
         }
     }

     @Override
     public void onClick(View view) {
         switch (view.getId()){
             case R.id.btn_publish   :   publish();      break;
             case R.id.btn_subscribe :   subscribe();    break;
         }

     }

     @Override
     public void onCheckedChanged(CompoundButton compoundButton, boolean b) {
         switch (compoundButton.getId()){
             case R.id.btn_connect:
                 if (!b){
                     connect();
                 }else{
                     disconnect();
                 }
                 break;
             case R.id.cbx_persist:
                 if (mqtt != null) {
                     mqtt.setClientId(edtClientID.getText().toString().trim());
                     mqtt.setCleanSession(!b);
                 }
                 break;
         }
     }

 }

3.4 完整下载地址

  https://git.oschina.net/xi/mqtt-client-demo.git

4.MQTT 常用方法介绍

setClientId

Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used.

The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

每个客户端id不要相同
指定id后,才可以调用setCleanSession,持久保存订阅的会话,哪个客户端订阅了哪个主题就保存在某个会话中。
setCleanSession Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.

设置false时,服务端将不清除会话,这样就可以持久保存订阅关系。
setKeepAlive

Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client.

It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

设置保活时间,单位是秒,默认为tpc连接时间。
setUserName Sets the user name used to authenticate against the server.

设置服务端验证的用户名
setPassword Sets the password used to authenticate against the server.

设置验证用户的密码
setWillTopic

If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection.

当客户端异常断开时,服务器按这里指定的主题发意愿消息。
setWillMessage The Will message to send. Defaults to a zero length message.

意愿消息
setWillQos Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.

意愿消息的QoS
setWillRetain Set to true if you want the Will to be published with the retain option.
setVersion Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.

设置MQTT协议版本

最新文章

  1. Eclipse断点调试方法
  2. 在WPF应用程序中利用IEditableObject接口实现可撤销编辑的对象
  3. [原][CSS3]会动的盒子机器人
  4. mysql事件调度器定时删除binlog
  5. STM32 常用GPIO操作函数记录
  6. poj 2063 Investment
  7. delete drop truncate
  8. android 内部存储相关知识点: getfilestreampath getDir 子文件夹
  9. TFS的使用
  10. appium的安装过程(图文界面)
  11. SGU 200 Cracking RSA (高斯消元)
  12. sql的强大功能(看一条sql解决的复杂业务)
  13. 一个大浪Java罢工(一个)安装JDK和环境变量配置
  14. HDU5835
  15. MySql Jar 包下载
  16. 【python 3】 函数 进阶
  17. k-means算法之见解(一)
  18. X的平方
  19. java 线程Thread 技术--线程创建源码解释
  20. WebAPI中发送字节数组

热门文章

  1. what is diff. b/w app state &amp; session state
  2. Java web 中的HttpServletRequest对象
  3. Vue 动态传值,Get传值
  4. 搭建linux虚拟机
  5. tcp连接过程中的三次握手和四次挥手
  6. python自带的排列组合函数
  7. hdu 6196 搜索+剪枝
  8. Qt 学习之路 2(47):视图选择
  9. I/O设备
  10. wepsocket 了解一下