創(chuàng)建連接
MainActivity
首先我們得需要mqtt3.jar包
mqtt3下載連接
package com.servermqtt;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.EditText;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MainActivity extends AppCompatActivity {
private ServerMQTT mServerMQTT;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
final EditText msg= (EditText) findViewById(R.id.msg);
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mServerMQTT.sendMessage(msg.getText().toString().trim()+"");
}
});
findViewById(R.id.create). setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
try {
mServerMQTT=new ServerMQTT();
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
}
ServerMQTT
package com.servermqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* Title:Server
* Description: 服務(wù)器向多個客戶端推送主題,即不同客戶端可向服務(wù)器訂閱相同主題
* @author admin
*/
public class ServerMQTT {
//tcp://MQTT安裝的服務(wù)器地址:MQTT定義的端口號
public static final String HOST = "tcp://192.168.10.67:61613";
//定義一個主題
public static final String TOPIC = "topic11";
//定義MQTT的ID,可以在MQTT服務(wù)配置中指定
private static final String clientid = "client1";
private MqttClient client;
private MqttTopic topic11;
private String userName = "admin";
private String passWord = "password";
private MqttMessage message;
private ServerMQTT mServer;
/**
* 構(gòu)造函數(shù)
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence設(shè)置clientid的保存形式灾锯,默認為以內(nèi)存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
}
/**
* 用來連接服務(wù)器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 設(shè)置超時時間
options.setConnectionTimeout(10);
// 設(shè)置會話心跳時間
options.setKeepAliveInterval(20);
try {
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
public void sendMessage(String msg){
try {
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload(msg.getBytes());
publish(topic11 , message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback
package com.servermqtt; /**
*
* Description:
* @author admin
* 2017年2月10日下午18:04:07
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 發(fā)布消息的回調(diào)類
*
* 必須實現(xiàn)MqttCallback的接口并實現(xiàn)對應(yīng)的相關(guān)接口方法CallBack 類將實現(xiàn) MqttCallBack。
* 每個客戶機標(biāo)識都需要一個回調(diào)實例猎醇。在此示例中,構(gòu)造函數(shù)傳遞客戶機標(biāo)識以另存為實例數(shù)據(jù)努溃。
* 在回調(diào)中硫嘶,將它用來標(biāo)識已經(jīng)啟動了該回調(diào)的哪個實例。
* 必須在回調(diào)類中實現(xiàn)三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經(jīng)預(yù)訂的發(fā)布梧税。
*
* public void connectionLost(Throwable cause)在斷開連接時調(diào)用沦疾。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經(jīng)發(fā)布的 QoS 1 或 QoS 2 消息的傳遞令牌時調(diào)用。
* 由 MqttClient.connect 激活此回調(diào)第队。
*
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 連接丟失后哮塞,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執(zhí)行到這里面
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內(nèi)容 : " + new String(message.getPayload()));
}
}
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
>
<Button
android:text="創(chuàng)建"
android:id="@+id/create"
android:onClick="onClick"
android:layout_width="match_parent"
android:layout_height="50dp"/>
<Button
android:text="發(fā)送消息"
android:id="@+id/send"
android:onClick="onClick"
android:layout_width="match_parent"
android:layout_height="50dp"/>
<EditText
android:id="@+id/msg"
android:layout_width="match_parent"
android:layout_height="50dp"/>
</LinearLayout>