首先到官網(wǎng)下載zip并啟動
下載地址 https://www.emqx.cn/downloads
本次使用的版本是4.3
0淹父、先將mqtt的匿名訪問關(guān)閉
## Value: true | false
allow_anonymous = false
1、首先修改配置文件篷牌,配置pgsql數(shù)據(jù)庫的相關(guān)信息嘁酿,路徑/emqx/etc/plugins/emqx_auth_pgsql.conf
## 服務(wù)器地址
auth.pgsql.server = 127.0.0.1:5432
## 連接池大小
auth.pgsql.pool = 8
auth.pgsql.username = root
auth.pgsql.password = public
auth.pgsql.database = mqtt
auth.pgsql.encoding = utf8
2、在數(shù)據(jù)庫中創(chuàng)建兩張認(rèn)證表詳見官方文檔
CREATE TABLE mqtt_user (
id SERIAL primary key,
is_superuser boolean,
username character varying(100),
password character varying(100),
salt character varying(40)
)
-- 客戶端信息
INSERT INTO mqtt_user (username, password, salt, is_superuser)
VALUES
('emqx', 'efa1f375d76194fa51a3556a97e641e61685f914d446979da50a551a4333ffd7', NULL, false);
CREATE TABLE mqtt_acl (
id SERIAL primary key,
allow integer,
ipaddr character varying(60),
username character varying(100),
clientid character varying(100),
access integer,
topic character varying(100)
)
-- 所有用戶不可以訂閱系統(tǒng)主題
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '$SYS/#');
-- 允許 10.59.1.100 上的客戶端訂閱系統(tǒng)主題
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, '10.59.1.100', NULL, NULL, 1, '$SYS/#');
-- 禁止客戶端訂閱 /smarthome/+/temperature 主題
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, NULL, NULL, 1, '/smarthome/+/temperature');
-- 允許客戶端訂閱包含自身 Client ID 的 /smarthome/${clientid}/temperature 主題
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, NULL, NULL, 1, '/smarthome/%c/temperature');
詳見官方doc
https://docs.emqx.cn/broker/v4.3/advanced/acl-postgres.html
配置完畢后在mqtt的Dashboard中啟用pgsql的ACL
image.png
如果報錯說明數(shù)據(jù)庫配置錯誤袱结,請自行檢查
下面我們寫一個java來測試一下效果
MQTTSSLConsumer
package com.test.emq;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ScheduledExecutorService;
public class MQTTSSLConsumer {
public static final String HOST = "tcp://192.168.203.130:1883";
public static final String TOPIC1 = "taikang/rulee";
private static final String clientid = "client01";
private MqttClient client;
private MqttConnectOptions options;
private String userName = "emqx"; //非必須
private String passWord = "public1"; //非必須
@SuppressWarnings("unused")
private ScheduledExecutorService scheduler;
private void start() {
try {
// host為主機名淫茵,clientid即連接MQTT的客戶端ID摧冀,一般以唯一標(biāo)識符表示菜拓,MemoryPersistence設(shè)置clientid的保存形式瓣窄,默認(rèn)為以內(nèi)存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的連接設(shè)置
options = new MqttConnectOptions();
// 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(false);
// 設(shè)置連接的用戶名
options.setUserName(userName);
// 設(shè)置連接的密碼
options.setPassword(passWord.toCharArray());
// 設(shè)置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線纳鼎,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
// 設(shè)置重連機制
options.setAutomaticReconnect(true);
// 設(shè)置回調(diào)
//client.setCallback(new PushCallback());
MqttTopic topic = client.getTopic(TOPIC1);
//setWill方法康栈,如果項目中需要知道客戶端是否掉線可以調(diào)用該方法。設(shè)置最終端口的通知消息
client.connect(options);
//訂閱消息
int[] Qos = {1};
String[] topic1 = {TOPIC1};
client.subscribe(topic1, Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
System.setProperty("javax.net.debug", "ssl,handshake");
MQTTSSLConsumer client = new MQTTSSLConsumer();
client.start();
}
}
測試結(jié)論
在關(guān)閉匿名請求的情況下
如果客戶端輸入錯誤的用戶名和密碼會報錯:無權(quán)連接(5)
如果認(rèn)證通過喷橙,但是在ACL規(guī)則中認(rèn)證失敗會報錯:MqttException (128)
補充 pom
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>