前言
MQTT是輕量級基于代理的發(fā)布/訂閱的消息傳輸協(xié)議谓罗,設(shè)計(jì)思想是開放、簡單季二、輕量檩咱、易于實(shí)現(xiàn)。這些特點(diǎn)使它適用于受限環(huán)境胯舷。關(guān)于MQTT的詳細(xì)介紹推薦這篇文章推薦文章
Apache apollo是一個(gè)代理服務(wù)器刻蚯,其是在ActiveMQ基礎(chǔ)上發(fā)展而來的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多種協(xié)議桑嘶。運(yùn)行apollo需要先配置JAVA_HOME
Apollo下載地址
paho是eclipse提供一個(gè)訪問MQTT服務(wù)器的一種開源客戶端庫炊汹,其中提供7種不同平臺的客戶端類庫,今天咱們以java客戶端為例子演示
paho下載地址
apollo使用的時(shí)候需要 配置JAVA_HOME配置成功之后逃顶,命令行指定到bin目錄下讨便,然后輸入apollo create mybroker(圖1)這時(shí)候會(huì)在bin目錄下生成一個(gè)mybroker文件夾(圖2)在mybroker中進(jìn)入bin目錄(圖3)命令行 cd mybroker/bin(圖4)然后輸入apollo-broker run(圖5)顯示成功后打開瀏覽器輸入 http://127.0.0.1:61680 這時(shí)候要是顯示(圖6)界面就證明成功了充甚。用戶名和密碼可以通過myorker/etc 下的users.properties文件中找到默認(rèn)用戶名和密碼是 admin和password.
paho下載完成后進(jìn)入文件夾啟動(dòng)paho.exe(圖7)點(diǎn)擊+號創(chuàng)建服務(wù)器連接(圖8)。服務(wù)器地址在之前創(chuàng)建的mybroker/etc文件夾中打開apollo.xml找到圖9中那一行改成自己本機(jī)的局域網(wǎng)IP地址霸褒。完成后在圖8中服務(wù)器地址中輸入相同的地址和端口伴找,這時(shí)候點(diǎn)擊連接會(huì)提示“連接失敗-錯(cuò)誤的用戶名和密碼”。在選項(xiàng)中需要開啟登錄輸入之前apollo配置的用戶名和密碼(圖10)重新點(diǎn)擊連接這時(shí)候連接狀態(tài)會(huì)顯示已連接(圖11)
下面是android客戶端配置
- module的build.gradle中找到dependencies 然后添加
compile'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'
compile'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
后Sync Project with Gradle Files
- AndroidManifest.xml中添加
<service android:name="org.eclipse.paho.android.service.MqttService"/>
- 初始化MQTT
public class MQTTService extends Service {
public static final String TAG = MQTTService.class.getSimpleName();
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String host = "tcp://192.168.100.41:61613";
private String userName = "admin";
private String passWord = "password";
private static String myTopic = "test";
private String clientId = "clientId";
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
public static void publish(String msg){
Integer qos = 0;
Boolean retained = false;
try {
client.publish(myTopic, msg.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
private void init() {
// 服務(wù)器地址(協(xié)議+地址+端口號)
client = new MqttAndroidClient(this, host, clientId);
// 設(shè)置MQTT監(jiān)聽并且接受消息
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
// 清除緩存
conOpt.setCleanSession(true);
// 設(shè)置超時(shí)時(shí)間废菱,單位:秒
conOpt.setConnectionTimeout(10);
// 心跳包發(fā)送間隔技矮,單位:秒
conOpt.setKeepAliveInterval(20);
// 用戶名
conOpt.setUserName(userName);
// 密碼
conOpt.setPassword(passWord.toCharArray());
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + clientId + "\"}";
Integer qos = 0;
Boolean retained = false;
client.getClientId();
if ((!message.equals("")) || (!myTopic.equals(""))) {
try {
//setWill方法,如果項(xiàng)目中需要知道客戶端是否掉線可以調(diào)用該方法昙啄。設(shè)置最終端口的通知消息
conOpt.setWill(myTopic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (Exception e) {
doConnect = false;
iMqttActionListener.onFailure(null, e);
}
}
if (doConnect) {
doClientConnection();
}
}
@Override
public void onDestroy() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
/** 連接MQTT服務(wù)器 */
private void doClientConnection() {
if (!client.isConnected() && isConnectIsNomarl()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
// MQTT是否連接成功
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "連接成功 ");
try {
// 訂閱myTopic話題
client.subscribe(myTopic,1);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
// 連接失敗穆役,重連
Log.i(TAG, "連接失敗 ");
}
};
// MQTT監(jiān)聽并且接受消息
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String str1 = new String(message.getPayload());
MQTTMessage msg = new MQTTMessage();
msg.setMessage(str1);
EventBus.getDefault().post(msg);
String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
// 失去連接,重連
Toast.makeText(MQTTService.this, " 失去連接梳凛,重連", Toast.LENGTH_SHORT).show();
}
};
/** 判斷網(wǎng)絡(luò)是否連接 */
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.i(TAG, "MQTT當(dāng)前網(wǎng)絡(luò)名稱:" + name);
return true;
} else {
Log.i(TAG, "MQTT 沒有可用網(wǎng)絡(luò)");
return false;
}
}
@Override
public IBinder onBind(Intent intent) {
return null;
}
}
最后通過Paho發(fā)布消息 客戶端就能收到推送過來的內(nèi)容了耿币!