接觸Mqtt最開始用的是SpingMVC 引入 Mqtt Client Jar包自己實(shí)現(xiàn)收發(fā)重連等操作,而現(xiàn)在則使用Spring Integration 進(jìn)行集成翻擒。其復(fù)雜度和代碼簡(jiǎn)潔度天差地別病蛉。所以特別寫下此文章算是對(duì)自己的一個(gè)記錄店茶。也希望觀看到此文章的人少走一點(diǎn)彎路歧焦,哪怕只有一步缺脉。
當(dāng)然有精力的話最好還是使用Client自己實(shí)現(xiàn)一遍對(duì)自己來說也是一種提高惦辛,生產(chǎn)環(huán)境劳秋,我建議使用Spring Integration 集成
關(guān)于Spring Intergration
介紹關(guān)于Spring Integration 的一些概念性的東西,如果只想知道怎么集成可跳過該部分
Spring Integration provides an extension of the Spring programming model to support the well known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems through declarative adapters. Those adapters provide a higher level of abstraction over Spring’s support for remoting, messaging, and scheduling.
大致意思是
- Spring Integration 為許多著名的企業(yè)級(jí)集成方案提供了擴(kuò)展接口
- 它能夠在基于Sping的應(yīng)用中進(jìn)行輕量級(jí)消息傳遞
- 支持聲明式適配器與發(fā)布系統(tǒng)進(jìn)行集成(見Mqtt send消息的實(shí)現(xiàn))
- 這些適配器在 Spring對(duì)遠(yuǎn)程調(diào)用胖齐,消息傳遞和系統(tǒng)調(diào)度提供了更高層次的抽象
Spring Intergration核心組件
Message(消息)
Message是對(duì)消息的包裝玻淑,在Spring 系統(tǒng)中傳遞的任何消息都會(huì)被包裝為Message⊙交铮可以理解為是Spring Integration消息傳遞的基本單位
Message Channel(消息管道)
消息管道:Message 在Message Channel中進(jìn)行傳遞岁忘,生產(chǎn)者向管道中投遞消息,消費(fèi)者從管道中取出消息区匠。Spring Integration 支持兩種消息傳遞模型干像,point-to-point(點(diǎn)對(duì)點(diǎn)模型)帅腌,Publish-subscribe(發(fā)布訂閱模型)有多種管道類型。
本次使用點(diǎn)對(duì)點(diǎn)模型麻汰,消息管道類型DirectChannel
Message Endpoint(消息切入點(diǎn))
消息切入點(diǎn):消息在管道中流動(dòng)那必定會(huì)有某些流入或流出的點(diǎn)亦或是在某個(gè)位置(即特定函數(shù))需要對(duì)消息進(jìn)行處理速客,過濾,格式轉(zhuǎn)換等五鲫。這些點(diǎn)即為Message Endpoint(實(shí)際為某些處理函數(shù))溺职,例如消息發(fā)送,消息接收都是Message Endpoint位喂。
Message Transformer(消息轉(zhuǎn)化器)
消息轉(zhuǎn)化器:是將消息進(jìn)行特定轉(zhuǎn)換例如將一個(gè) Object 序列化為 Json 字符串
Message Filter
消息過濾器浪耘,過濾掉特定消息。例如在管道中發(fā)送的含username 和 age 屬性的 User 對(duì)象塑崖,如果當(dāng)前消息(一個(gè)User實(shí)例的包裝)的age < 18則將其過濾掉七冲,那么處在過濾器之后的消費(fèi)者將無法接收到 age < 18的User對(duì)象。當(dāng)然過濾條件不僅是消息負(fù)載的屬性规婆,也可以是消息本身的屬性澜躺。
Message Router(消息路由)
消息路由:向管道投遞消息時(shí)可由消息路由根據(jù)路由規(guī)則選擇投遞給那個(gè)管道
Splitter(分割器)
分割器:它從一個(gè)輸入管道接收一條消息并將其分割為多條消息,再將每條消息發(fā)送到不同的輸出管道上
Aggregator(聚合器)
聚合器:與分割器功能剛好相反
Service Activator
Service Activator: 它是一個(gè)用于將系統(tǒng)服務(wù)實(shí)例接入到消息系統(tǒng)的泛型切入點(diǎn)抒蚜,該切入點(diǎn)必須配置輸入管道掘鄙。其返回值可是消息類型也可以是一個(gè)消息處理器,當(dāng)返回值為消息類型時(shí)需要指定輸出管道嗡髓,即在該切入點(diǎn)對(duì)消息加工處理后再發(fā)送到指定的輸出管道操漠,如果返回值為消息處理器。那么消息交由消息處理器進(jìn)行處理饿这。下文中會(huì)為Mqtt消息出站配置Service Activator并且 返回值為消息處理器
Channel Adapter(管道適配器)
管道適配器:因?yàn)橥獠繀f(xié)議有無數(shù)種颅夺,消息適配器則用于連接不同協(xié)議的外部系統(tǒng)。從外部系統(tǒng)讀入數(shù)據(jù)并對(duì)數(shù)據(jù)進(jìn)行處理最終與Spring Integration 內(nèi)部的消息系統(tǒng)適配蛹稍。例如將要進(jìn)行Mqtt集成吧黄,那么就需要一個(gè)Mqtt的管道適配器,事實(shí)上也確實(shí)有一個(gè)唆姐,下文中將會(huì)看到拗慨。
開始集成
依賴管理工具使用Gardle
引入spring-integration-mqtt依賴
implementation "org.springframework.integration:spring-integration-mqtt:5.4.6"
創(chuàng)建Mqtt配置類
@Configuration
public class MqttConfig {
/**
* 以下屬性將在配置文件中讀取
**/
//mqtt Broker 地址
private String[] uris;
//連接用戶名
private String username;
//連接密碼
private String password;
//入站Client ID
private String inClientId;
//出站Client ID
private String outClientId;
//默認(rèn)訂閱主題
private String defaultTopic;
public void setUris(String[] uris) {
this.uris = uris;
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setInClientId(String inClientId) {
this.inClientId = inClientId;
}
public void setOutClientId(String outClientId) {
this.outClientId = outClientId;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
}
這里需要注意為什么創(chuàng)建兩個(gè)client ID,Spring Integration 在集成的時(shí)候入站與出站消息處理并不使用同一個(gè)連接,所以如果clien ID相同奉芦,將會(huì)出現(xiàn)Mqtt反復(fù)重連現(xiàn)象赵抢,實(shí)為 mqtt 出入站連接交替踢對(duì)方下線。
修改配置文件 application.yml
mqtt:
uris: tcp://ip:port
username: user
password: pwd
in-client-id: ${random.value} # 隨機(jī)值声功,使出入站 client ID 不同
out-client-id: ${random.value}
default-topic: defaultTopic
在MqttConfig上使用注解 @ConfigurationProperties(prefix = "mqtt")
將配置文件中屬性注入到MqttConfig中,但別忘記在啟動(dòng)類上使用@EnableConfigurationProperties
啟用屬性注入烦却。
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
.....
}
創(chuàng)建三個(gè)管道
這三個(gè)管道分別用于處理入站消息,出站消息先巴,錯(cuò)誤消息
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel errorChannel(){
return new DirectChannel();
}
添加 Mqtt 客戶端工廠
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(uris);
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
添加 Mqtt 管道適配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(inClientId,factory,defaultTopic);
}
添加消息生產(chǎn)者
@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//入站投遞給入站管道
adapter.setOutputChannel(mqttInboundChannel());
adapter.setErrorChannel(errorChannel());
adapter.setQos(0);
return adapter;
}
添加出站處理器
出站處理器是一個(gè)Service Activator
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
MqttPahoMessageHandler handler =
new MqttPahoMessageHandler(outClientId,factory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic(defaultTopic);
return handler;
}
添加消息接收器
通過前文的配置其爵,當(dāng)Mqtt 訂閱主題產(chǎn)生消息時(shí)會(huì)通過 MessageProducer
(本例中是一個(gè)管道適配器)將消息投遞到入站管道中冒冬,所以當(dāng)需要接收并處理Mqtt消息時(shí)只需要從入站管道中取出消息即可。取出消息即可使用前文的 Endpoint
本例使用Service Activator
創(chuàng)建一個(gè)接收類摩渺,自定義任意類型简烤。重點(diǎn)是使用其內(nèi)部的方法,將其注冊(cè)為Endpoint
@Component
public class Receiver {
@Bean
//使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel摇幻,投遞到mqttInboundChannel管道中的消息會(huì)被該方法接收并執(zhí)行
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handleMessage() {
return message -> {
System.out.println(message.getPayload());
};
}
}
至此横侦,整個(gè)服務(wù)即可接收Mqtt broker的消息了。默認(rèn)接收的主題為配置文件中指定的 "defaultTopic"绰姻。
添加消息發(fā)送器
那么消息發(fā)送器無疑是向出站管道投遞消息即可枉侧。如何實(shí)現(xiàn),Spring Integration 提供了 @MessagingGateway注解狂芋,該注解提供一個(gè)defaultRequestChannel
屬性用于指定出站管道榨馁。如下
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
void sendToMqtt(String text);
void sendWithTopic(@Header(MqttHeaders.TOPIC) String topic, String text);
void sendWithTopicAndQos(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String text);
}
此接口內(nèi)函數(shù)參數(shù)將作為消息的負(fù)載被包裝成為消息并投遞到出站管道中。同時(shí)可以看到银酗,方法參數(shù)可以注解的方式自定義消息發(fā)送的 topic qos retain 等屬性。
至此集成完成徒像,用戶可調(diào)用MqttSender發(fā)布消息或是沖Receiver中接收并處理消息
自定義編解碼器
前文中添加消息生產(chǎn)者和添加出站處理器代碼中都可以看到setConverter(new DefaultPahoMessageConverter());
方法黍特,該方法用與對(duì)消息負(fù)載進(jìn)行編解碼。多數(shù)情況下我們的消息都是可編碼的锯蛀。我在消息傳遞過程中使用的是json編碼灭衷。下面我將以json編碼為例,展示如何自定義編解碼器
//協(xié)議對(duì)象
public class User {
private String username;
private String password;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
實(shí)現(xiàn)自己的編解碼器
代碼如下旁涤,詳見注釋
@Component
public class AlMingConverter implements MqttMessageConverter {
private final static Logger log = LoggerFactory.getLogger(AlMingConverter.class);
private int defaultQos = 0;
private boolean defaultRetain = false;
ObjectMapper om = new ObjectMapper();
//入站消息解碼
@Override
public Message<User> toMessage(String topic, MqttMessage mqttMessage) {
User protocol = null;
try {
protocol = om.readValue(mqttMessage.getPayload(), User.class);
} catch (IOException e) {
if (e instanceof JsonProcessingException) {
System.out.println();
log.error("Converter only support json string");
}
}
assert protocol != null;
MessageBuilder<User> messageBuilder = MessageBuilder
.withPayload(protocol);
//使用withPayload初始化的消息缺少頭信息翔曲,將原消息頭信息填充進(jìn)去
messageBuilder.setHeader(MqttHeaders.ID, mqttMessage.getId())
.setHeader(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos())
.setHeader(MqttHeaders.DUPLICATE, mqttMessage.isDuplicate())
.setHeader(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained());
if (topic != null) {
messageBuilder.setHeader(MqttHeaders.TOPIC, topic);
}
return messageBuilder.build();
}
//出站消息編碼
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
MqttMessage mqttMessage = new MqttMessage();
String msg = null;
try {
msg = om.writeValueAsString(message.getPayload());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
assert msg != null;
mqttMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8));
//這里的 mqtt_qos ,和 mqtt_retained 由 MqttHeaders 此類得出不可以隨便取,如需其他屬性自行查找
Integer qos = (Integer) message.getHeaders().get("mqtt_qos");
mqttMessage.setQos(qos == null ? defaultQos : qos);
Boolean retained = (Boolean) message.getHeaders().get("mqtt_retained");
mqttMessage.setRetained(retained == null ? defaultRetain : retained);
return mqttMessage;
}
//此方法直接拿默認(rèn)編碼器的來用的劈愚,照抄即可
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
Assert.isInstanceOf(MqttMessage.class, payload,
() -> "This converter can only convert an 'MqttMessage'; received: " + payload.getClass().getName());
return this.toMessage(null, (MqttMessage) payload);
}
public void setDefaultQos(int defaultQos) {
this.defaultQos = defaultQos;
}
public void setDefaultRetain(boolean defaultRetain) {
this.defaultRetain = defaultRetain;
}
}
修改MqttConfig
將自定義編解碼器注入到MqttConfig中瞳遍。
private final AlMingConverter alMingConverter;
@Autowired
public MqttConfig(AlMingConverter alMingConverter) {
this.alMingConverter = alMingConverter;
}
將原來所有DefaultPahoMessageConverter
實(shí)例更換為alMingConverter
...
adapter.setConverter(alMingConverter);
...
handler.setConverter(alMingConverter);
修改MqttSender
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
void sendToMqtt(User user);
void sendWithTopic(@Header(MqttHeaders.TOPIC) String topic, User user);
void sendWithTopicAndQos(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, User user);
}
Receiver修改
此時(shí)Reveiver的消息負(fù)載雖然編譯時(shí)為 Object ,但運(yùn)行時(shí)為User可安全的進(jìn)行 Object 到 User 的強(qiáng)轉(zhuǎn)然后進(jìn)行后續(xù)操作
@Component
public class Receiver {
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handleMessage() {
return message -> {
User user = (User) message.getPayload();
System.out.println(user.getUsername()+":"+user.getPassword());
};
}
}
動(dòng)態(tài)修改訂閱主題
前文中發(fā)布訂閱的主題都是默認(rèn)主題,即在配置文件中指定的主題菌羽。生產(chǎn)環(huán)境肯定不止使用一個(gè)主題掠械。那么就需要運(yùn)行時(shí)動(dòng)態(tài)修改主題。Spring Integration也確實(shí)為此提供了支持注祖。上文中向Bean容器中注冊(cè)的管道適配器(MqttPahoMessageDrivenChannelAdapter)提供了addTopic
removeTopic
等方法可用于運(yùn)行時(shí)修改主題猾蒂。所以可以創(chuàng)建一個(gè)MqttService專門用于添加和刪除主題。
但注意只有Spring Integration 4.1以上版本可以這么使用
@Service
public class MqttServiceImpl implements MqttService { //MqttService是自己定義的是晨,僅包含如下方法均已重寫
MqttPahoMessageDrivenChannelAdapter adapter;
@Autowired
public MqttServiceImpl(MqttPahoMessageDrivenChannelAdapter adapter) {
this.adapter = adapter;
}
@Override
public void addTopic(String topic) {
String[] topics = adapter.getTopic();
if(!Arrays.asList(topics).contains(topic)){
adapter.addTopic(topic,0);
}
}
@Override
public void removeTopic(String topic) {
adapter.removeTopic(topic);
}
}
增加或刪除主題時(shí)肚菠,注入該服務(wù)調(diào)用addTopic,removeTopic即可
End&附錄
最終MqttConfig完整代碼
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
/**
* 以下屬性將在配置文件中讀取
**/
//mqtt Broker 地址
private String[] uris;
//連接用戶名
private String username;
//連接密碼
private String password;
//入站Client ID
private String inClientId;
//出站Client ID
private String outClientId;
//默認(rèn)訂閱主題
private String defaultTopic;
private final AlMingConverter alMingConverter;
@Autowired
public MqttConfig(AlMingConverter alMingConverter) {
this.alMingConverter = alMingConverter;
}
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(uris);
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
return new MqttPahoMessageDrivenChannelAdapter(inClientId,factory,defaultTopic);
}
public void setUris(String[] uris) {
this.uris = uris;
}
@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
adapter.setCompletionTimeout(5000);
adapter.setConverter(alMingConverter);
//入站投遞的通道
adapter.setOutputChannel(mqttInboundChannel());
adapter.setErrorChannel(errorChannel());
adapter.setQos(0);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
MqttPahoMessageHandler handler =
new MqttPahoMessageHandler(outClientId,factory);
handler.setAsync(true);
handler.setConverter(alMingConverter);
handler.setDefaultTopic(defaultTopic);
return handler;
}
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel errorChannel(){
return new DirectChannel();
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setInClientId(String inClientId) {
this.inClientId = inClientId;
}
public void setOutClientId(String outClientId) {
this.outClientId = outClientId;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
}
最后
大幻夢(mèng)森羅萬象狂氣斷罪眼\ (???) /