- [Smack]
- [1 初始化]
- [1.1 初始化啟動類]
- [2 連接與登錄]
- [2.1 創(chuàng)建連接類]
- [2.2 連接]
- [2.3 登錄]
- [3 消息的發(fā)送]
- [4 消息的接收]
本文分析Smack源碼版本為4.4.0-alpha6
Smack簡介
Smack是一個開源式散,易于使用的XMPP(jabber)客戶端類庫楷拳。
Smack是用Java編寫的開放源代碼柜砾,高度模塊化,易于使用的XMPP客戶端庫,用于Java SE兼容JVM和Android雨效。它是純Java庫微宝,可以嵌入到您的應(yīng)用程序中,以創(chuàng)建從完整的XMPP即時消息客戶端到簡單的XMPP集成(例如發(fā)送通知消息和支持狀態(tài)的設(shè)備)的任何內(nèi)容蝎宇。Smack和XMPP允許您在(M2M弟劲,IoT,…)之間以多種方式輕松交換數(shù)據(jù)姥芥,例如“發(fā)后不管”兔乞,“發(fā)布訂閱”。
Smack官方資料:
官網(wǎng)地址:https://www.igniterealtime.org/
Github:https://github.com/igniterealtime/Smack/tree/master
文檔:https://download.igniterealtime.org/smack/docs/latest/documentation/
Javadoc: http://www.igniterealtime.org/builds/smack/dailybuilds/javadoc/
Smack Modules:
Smack可以輕松地嵌入到任何現(xiàn)有的Java應(yīng)用程序中。該庫作為幾個模塊提供庸追,可為應(yīng)用程序所需的功能提供更大的靈活性:
- smack-core-提供核心XMPP功能霍骄。包括XMPP RFC中的所有XMPP功能。
- smack-im -提供RFC 6121(XMPP-IM)中定義的功能锚国,例如名冊腕巡。
- smack-tcp-支持基于TCP的XMPP。包括您通常要使用的XMPPTCPConnection類
- smack-extensions-支持XMPP標準基金會定義的許多擴展(XEP)血筑,包括多用戶聊天绘沉,文件傳輸,用戶搜索等豺总。這些擴展記錄在擴展手冊中车伞。
- smack-experimental-支持XMPP標準基金會定義的實驗擴展(XEP)。這些擴展的API和功能應(yīng)視為不穩(wěn)定喻喳。
- smack-legacy -支持XMPP標準基金會定義的舊版擴展(XEP)另玖。
- smack-bosh-支持BOSH(XEP-0124)。此代碼應(yīng)視為Beta表伦。
- smack-resolver-minidns-支持在MiniDNS的幫助下解析DNS SRV記錄谦去。不支持javax.naming API的平臺的理想選擇。還支持DNSSEC蹦哼。
- smack-resolver-dnsjava -支持在dnsjava的幫助下解析DNS SRV記錄鳄哭。
- smack-resolver-javax -支持使用javax名稱空間API解析DNS SRV記錄。
- smack-debug-用于協(xié)議流量的增強型GUI調(diào)試器纲熏。在類路徑中找到并啟用調(diào)試后妆丘,它將自動使用它。
初始化
1初始化啟動類
1.1.1 SmackInitialization類
配置文件:
初始化是通過配置文件完成的局劲。在smack-core模塊org.jivesoftware.smack / smack-config.xml勺拣。默認情況下,Smack將把此文件打入jar中鱼填。此特定配置包含要加載的初始化程序類的列表药有。此初始化器列表中包含所有需要初始化的管理器類型類。
smack-config.xml
<?xml version="1.0"?>
<!-- Smack configuration file. -->
<smack>
<!-- Classes that will be loaded when Smack starts -->
<startupClasses>
<className>org.jivesoftware.smack.initializer.VmArgInitializer</className>
<className>org.jivesoftware.smack.ReconnectionManager</className>
</startupClasses>
<optionalStartupClasses>
<className>org.jivesoftware.smack.util.dns.javax.JavaxResolver</className>
<className>org.jivesoftware.smack.util.dns.minidns.MiniDnsResolver</className>
<className>org.jivesoftware.smack.util.dns.dnsjava.DNSJavaResolver</className>
<className>org.jivesoftware.smack.extensions.ExtensionsInitializer</className>
<className>org.jivesoftware.smack.experimental.ExperimentalInitializer</className>
<className>org.jivesoftware.smack.legacy.LegacyInitializer</className>
<className>org.jivesoftware.smack.tcp.TCPInitializer</className>
<className>org.jivesoftware.smack.sasl.javax.SASLJavaXSmackInitializer</className>
<className>org.jivesoftware.smack.sasl.provided.SASLProvidedSmackInitializer</className>
<className>org.jivesoftware.smack.android.AndroidSmackInitializer</className>
<className>org.jivesoftware.smack.java7.Java7SmackInitializer</className>
<className>org.jivesoftware.smack.im.SmackImInitializer</className>
<className>org.jivesoftware.smackx.omemo.OmemoInitializer</className>
<className>org.jivesoftware.smackx.ox.util.OpenPgpInitializer</className>
</optionalStartupClasses>
</smack>
然而配置文件是哪里加載的呢?其實是在SmackInitialization類苹丸,他在靜態(tài)代碼塊中進行的加載塑猖,這里貼出關(guān)鍵代碼:
private static final String DEFAULT_CONFIG_FILE = "org.jivesoftware.smack/smack-config.xml";
//...
static {
//....
//....
InputStream configFileStream;
try {
//1.在這里得到文件的輸入流
configFileStream = FileUtils.getStreamForClasspathFile(DEFAULT_CONFIG_FILE, null);
}
catch (Exception e) {
throw new IllegalStateException("Could not load Smack configuration file", e);
}
try {
//2.
processConfigFile(configFileStream, null);
}
catch (Exception e) {
throw new IllegalStateException("Could not parse Smack configuration file", e);
}
// Add the Java7 compression handler first, since it's preferred
SmackConfiguration.addCompressionHandler(new Java7ZlibInputOutputStream());
//....
//....
}
我們在看看第二步中processConfigFile做了什么操作。
public static void processConfigFile(InputStream cfgFileStream,
Collection<Exception> exceptions) throws Exception {
processConfigFile(cfgFileStream, exceptions, SmackInitialization.class.getClassLoader());
}
public static void processConfigFile(InputStream cfgFileStream,
Collection<Exception> exceptions, ClassLoader classLoader) throws Exception {
XmlPullParser parser = PacketParserUtils.getParserFor(cfgFileStream);
XmlPullParser.Event eventType = parser.getEventType();
do {
if (eventType == XmlPullParser.Event.START_ELEMENT) {
//2.
if (parser.getName().equals("startupClasses")) {
parseClassesToLoad(parser, false, exceptions, classLoader);
}
else if (parser.getName().equals("optionalStartupClasses")) {
parseClassesToLoad(parser, true, exceptions, classLoader);
}
}
eventType = parser.next();
}
//1.判斷是否是節(jié)點結(jié)束 循環(huán)解析
while (eventType != XmlPullParser.Event.END_DOCUMENT);
CloseableUtil.maybeClose(cfgFileStream, LOGGER);
}
可以看見它把流通過XmlPullParser解析出startupClasses谈跛、optionalStartupClasses兩個節(jié)點中的內(nèi)容。也就是我們之前 smack-config.xml中的節(jié)點塑陵。
并且都是調(diào)用parseClassesToLoad方法,區(qū)別只在于第二個參數(shù)的boolen不一樣感憾,其實這個boolen關(guān)系不大,在后面我們就知道了。
繼續(xù)看它的parseClassesToLoad方法做了什么:
private static void parseClassesToLoad(XmlPullParser parser, boolean optional,
Collection<Exception> exceptions, ClassLoader classLoader)
throws Exception {
final String startName = parser.getName();
XmlPullParser.Event eventType;
outerloop: do {//1.循環(huán)解析出className
eventType = parser.next();
if (eventType == XmlPullParser.Event.START_ELEMENT && "className".equals(parser.getName())) {
String classToLoad = parser.nextText();
if (SmackConfiguration.isDisabledSmackClass(classToLoad)) {
continue outerloop;
}
try {
//2.方法中進行反射加載類 參數(shù)classToLoad為節(jié)點中className的內(nèi)容阻桅。
loadSmackClass(classToLoad, optional, classLoader);
} catch (Exception e) {
// Don't throw the exception if an exceptions collection is given, instead
// record it there. This is used for unit testing purposes.
if (exceptions != null) {
exceptions.add(e);
} else {
throw e;
}
}
}
}
while (!(eventType == XmlPullParser.Event.END_ELEMENT && startName.equals(parser.getName())));
}
可以看到第一步循環(huán)出className節(jié)點的內(nèi)容凉倚,第二步把className內(nèi)容String傳給了loadSmackClass方法進行反射加載。
調(diào)用loadSmackClass方法:
private static void loadSmackClass(String className, boolean optional, ClassLoader classLoader) throws Exception {
Class<?> initClass;
try {
// Attempt to load and initialize the class so that all static initializer blocks of
// class are executed
//1.反射類的class
initClass = Class.forName(className, true, classLoader);
}
catch (ClassNotFoundException cnfe) {
Level logLevel;
if (optional) {
logLevel = Level.FINE;
}
else {
logLevel = Level.WARNING;
}
LOGGER.log(logLevel, "A startup class '" + className + "' could not be loaded.");
if (!optional) {
throw cnfe;
} else {
return;
}
}
//2.判斷傳入class類型是否是SmackInitializer類型或者其實現(xiàn)類
if (SmackInitializer.class.isAssignableFrom(initClass)) {
//3.創(chuàng)建實例對象
SmackInitializer initializer = (SmackInitializer) initClass.getConstructor().newInstance();
//4.調(diào)用接口initialize方法嫂沉,其實也就是調(diào)用實例的initialize方法
List<Exception> exceptions = initializer.initialize();
if (exceptions == null || exceptions.size() == 0) {
LOGGER.log(Level.FINE, "Loaded SmackInitializer " + className);
} else {
for (Exception e : exceptions) {
LOGGER.log(Level.SEVERE, "Exception in loadSmackClass", e);
}
}
} else {
LOGGER.log(Level.FINE, "Loaded " + className);
}
}
第一步:反射類的class稽寒。
第二步:判斷傳入class類型是否是SmackInitializer類型或者其實現(xiàn)類
第三步:創(chuàng)建實例對象
第四步:調(diào)用接口initialize方法,其實也就是調(diào)用實例的initialize方法
我們現(xiàn)在看一下SmackInitializer類趟章,其實是一個接口杏糙。
package org.jivesoftware.smack.initializer;
import java.util.List;
import org.jivesoftware.smack.SmackConfiguration;
/**
* Defines an initialization class that will be instantiated and invoked by the {@link SmackConfiguration} class during initialization.
*
* <p>
* Any implementation of this class MUST have a default constructor.
*
* @author Robin Collier
*
*/
public interface SmackInitializer {
List<Exception> initialize();
}
現(xiàn)在可以看出第二步判斷里的邏輯為:SmackInitializer接口的實現(xiàn)類調(diào)用initialize()方法。
好蚓土,這時候我們在回過頭看看我們smack-config.xml中配置的內(nèi)容宏侍。
VmArgInitializer類
public class VmArgInitializer extends UrlInitializer {
protected String getFilePath() {
return System.getProperty("smack.provider.file");
}
@Override
public List<Exception> initialize() {
if (getFilePath() != null) {
super.initialize();
}
return Collections.emptyList();
}
}
父類UrlInitializer
public abstract class UrlInitializer implements SmackInitializer {
//...
@Override
public List<Exception> initialize() {
//...
//...
}
//...
}
可以看見這個類是實現(xiàn)了SmackInitializer接口的。
在看看[smack-config.xml]中的第二個配置類
ReconnectionManager
public final class ReconnectionManager {
//...
//...
static {
XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
@Override
public void connectionCreated(XMPPConnection connection) {
if (connection instanceof AbstractXMPPConnection) {
ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection);
}
}
});
}
//...
//...
}
可以看到這個類并沒有實現(xiàn)SmackInitializer接口,只是類中有靜態(tài)代碼塊執(zhí)行而已蜀漆。
在看看[smack-config.xml]中的第三個配置類
JavaxResolver
public class JavaxResolver extends DNSResolver implements SmackInitializer {
//...
//...
static {
try {
Hashtable<String, String> env = new Hashtable<>();
env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
dirContext = new InitialDirContext(env);
} catch (NamingException e) {
LOGGER.log(Level.SEVERE, "Could not construct InitialDirContext", e);
}
// Try to set this DNS resolver as primary one
setup();
}
public static void setup() {
DNSUtil.setDNSResolver(getInstance());
}
//...
//...
@Override
public List<Exception> initialize() {
setup();
return null;
}
//...
//...
}
這個類也實現(xiàn)了SmackInitializer 接口谅河。
然而實現(xiàn)類方法到底執(zhí)行了什么呢,這里我們舉個栗子确丢,其他的就不一一給大家分析了绷耍。
看好,就是這個栗子:SmackImInitializer
public class SmackImInitializer extends UrlInitializer {
@Override
protected String getProvidersUri() {
return "classpath:org.jivesoftware.smack.im/smackim.providers";
}
@Override
protected String getConfigUri() {
return "classpath:org.jivesoftware.smack.im/smackim.xml";
}
}
父類UrlInitializer
public abstract class UrlInitializer implements SmackInitializer {
private static final Logger LOGGER = Logger.getLogger(UrlInitializer.class.getName());
//1.接口的實現(xiàn)方法
@Override
public List<Exception> initialize() {
InputStream is = null;
final ClassLoader classLoader = this.getClass().getClassLoader();
final List<Exception> exceptions = new LinkedList<Exception>();
//2.getProvidersUri() 有些子類對其進行了方法重載鲜侥,所以具體獲取要看子類是否重載
final String providerUriString = getProvidersUri();
if (providerUriString != null) {
try {
final URI providerUri = URI.create(providerUriString);
is = FileUtils.getStreamForUri(providerUri, classLoader);
LOGGER.log(Level.FINE, "Loading providers for providerUri [" + providerUri + "]");
//3.此方法中進行類加載到內(nèi)存操作
ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
ProviderManager.addLoader(pfl);
exceptions.addAll(pfl.getLoadingExceptions());
}
catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error trying to load provider file " + providerUriString, e);
exceptions.add(e);
} finally {
maybeClose(is);
}
}
//4.getConfigUri() 有些子類對其進行了方法重載褂始,所以具體獲取要看子類是否重載
final String configUriString = getConfigUri();
if (configUriString != null) {
try {
final URI configUri = URI.create(configUriString);
is = FileUtils.getStreamForUri(configUri, classLoader);
//5.調(diào)用加載類操作
SmackInitialization.processConfigFile(is, exceptions, classLoader);
}
catch (Exception e) {
exceptions.add(e);
} finally {
maybeClose(is);
}
}
return exceptions;
}
protected String getProvidersUri() {
return null;
}
protected String getConfigUri() {
return null;
}
private static void maybeClose(InputStream is) {
CloseableUtil.maybeClose(is, LOGGER);
}
}
第一步:SmackImInitializer實例會調(diào)用父類UrlInitializer的initialize方法。
第二步:getProvidersUri()方法,因為SmackImInitializer對其進行了重載剃毒,所以這里看SmackImInitializer中g(shù)etProvidersUri()的方法
protected String getProvidersUri() {
return "classpath:org.jivesoftware.smack.im/smackim.providers";
}
可以看見是一個String路徑病袄,這個路徑文件其實是在SmackImInitializer類所在的smack-im moudle下,我們看一下內(nèi)容:
smackim.providers
<?xml version="1.0"?>
<smackProviders>
<!-- RFC-6121: Extensible Messaging and Presence Protocol (XMPP): Instant Messaging and Presence -->
<iqProvider>
<elementName>query</elementName>
<namespace>jabber:iq:roster</namespace>
<className>org.jivesoftware.smack.roster.provider.RosterPacketReNameProvider</className>
</iqProvider>
<streamFeatureProvider>
<elementName>sub</elementName>
<namespace>urn:xmpp:features:pre-approval</namespace>
<className>org.jivesoftware.smack.roster.provider.SubscriptionPreApprovalStreamFeatureProvider</className>
</streamFeatureProvider>
<!-- XEP-0237: Roster Versioning -->
<streamFeatureProvider>
<elementName>ver</elementName>
<namespace>urn:xmpp:features:rosterver</namespace>
<className>org.jivesoftware.smack.roster.provider.RosterVerStreamFeatureProvider</className>
</streamFeatureProvider>
</smackProviders>
第三步:ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
把smackim.providers配置文件的輸入流傳入赘阀。
第四步:這里在看看ProviderFileLoader中做了什么益缠?
public class ProviderFileLoader implements ProviderLoader {
@SuppressWarnings("unchecked")
public ProviderFileLoader(InputStream providerStream, ClassLoader classLoader) {
//1.這里省略部分代碼
//其實就是把InputStream通過XmlPullParser拿到className節(jié)點信息.
//...
try {
//2.加載class
final Class<?> provider = classLoader.loadClass(className);
switch (typeName) {
case "iqProvider":
//3.節(jié)點為iqProvider 創(chuàng)建實例類并加入內(nèi)存iqProviders
if (IqProvider.class.isAssignableFrom(provider)) {
IqProvider<IQ> iqProvider = (IqProvider<IQ>) provider.getConstructor().newInstance();
iqProviders.add(new IQProviderInfo(elementName, namespace, iqProvider));
}
else {
exceptions.add(new IllegalArgumentException(className + " is not a IQProvider"));
}
break;
case "extensionProvider":
//...
//...
break;
case "streamFeatureProvider":
//4.節(jié)點為streamFeatureProvider創(chuàng)建實例類并加入內(nèi)存sfProviders
ExtensionElementProvider<ExtensionElement> streamFeatureProvider = (ExtensionElementProvider<ExtensionElement>) provider.getConstructor().newInstance();
sfProviders.add(new StreamFeatureProviderInfo(elementName,
namespace,
streamFeatureProvider));
break;
default:
LOGGER.warning("Unknown provider type: " + typeName);
}
}
catch (ClassNotFoundException cnfe) {
LOGGER.log(Level.SEVERE, "Could not find provider class", cnfe);
exceptions.add(cnfe);
}
catch (InstantiationException ie) {
LOGGER.log(Level.SEVERE, "Could not instanciate " + className, ie);
exceptions.add(ie);
}
//...
//...
}
}
1.把傳入的配置文件流InputStream通過XmlPullParser拿到className節(jié)點信息.
2.加載配置文件中的class。
3.通過switch判斷是iqProvider節(jié)點的加入到iqProviders中基公,是streamFeatureProvider節(jié)點的加入到sfProviders中幅慌。
其實smackim.providers文件跟 smack-config.xml文件一樣都是配置文件,只不過smackim.providers是屬于一個moudle中的配置文件轰豆,而smack-config.xml是核心smack-core Moudle中總得配置文件而已(及Smack配置文件)胰伍。
這里梳理一下:所有初始化都是在SmackInitialization的靜態(tài)代碼塊中執(zhí)行的。也就是說那些地方創(chuàng)建獲取調(diào)用了此類方法酸休,此類的靜態(tài)代碼塊就被會執(zhí)行骂租。
最后看一下那些地方調(diào)用了這個方法:
SmackConfiguration類中調(diào)用了他獲取版本的方法。
在看看那些調(diào)用了SmackConfiguration.getVersion();
可以看到很多類中都有調(diào)用斑司,并且關(guān)鍵的AbstractXMPPConnection連接類靜態(tài)代碼塊中也調(diào)用了此方法渗饮。
所以我們得出結(jié)論:只要你創(chuàng)建對象進行連接操作,框架內(nèi)部就會幫你初始化好配置。
1.1.2 SmackConfiguration類
這里截圖這個類的部分代碼
public final class SmackConfiguration {
//...
//...
public static boolean DEBUG = false;
private static SmackDebuggerFactory DEFAULT_DEBUGGER_FACTORY = ReflectionDebuggerFactory.INSTANCE;
/**
* The default parsing exception callback is {@link ExceptionThrowingCallback} which will
* throw an exception and therefore disconnect the active connection.
*/
private static ParsingExceptionCallback defaultCallback = new ExceptionThrowingCallbackWithHint();
private static HostnameVerifier defaultHostnameVerififer;
/**
* Returns the Smack version information, eg "1.3.0".
*
* @return the Smack version information.
*/
public static String getVersion() {
return SmackInitialization.SMACK_VERSION;
}
//...
//...
}
可以看到這個類是final 并且里面全都是靜態(tài)方法和靜態(tài)成員變量互站。其實這個類作用就是在你使用smack前可以進行靜態(tài)值得配置私蕾。比如:SmackConfiguration.DEBUG = true;
小結(jié):
其實Smack的初始化操作都是在smack-core moudle下SmackInitialization類的靜態(tài)代碼中完成的,并且配置文件在同一moudle下的smack-config.xml文件胡桃。
SmackConfiguration類getVersion()時踩叭,SmackInitialization的靜態(tài)代碼塊就會執(zhí)行。并且關(guān)鍵的AbstractXMPPConnection連接類靜態(tài)代碼塊中也調(diào)用了此方法翠胰,所以只要你創(chuàng)建對象進行連接操作容贝,框架內(nèi)部就會幫你初始化好配置。
連接與登錄
1創(chuàng)建連接類
第一步:創(chuàng)建連接配置
configuration = XMPPTCPConnectionConfiguration.builder()
.setXmppDomain(myibc_domain)
.setHostAddress(InetAddress.getByName(mXmppHost))
.setPort(mXmppPort)
.setSendPresence(false)
.setSecurityMode(ConnectionConfiguration.SecurityMode.required)
.setConnectTimeout(30 * 1000)
.setCompressionEnabled(false)
.setCustomSSLContext(SSLUtils.getSSLCAContext(""))
.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER)
.build();
這里我們用的是TCP的連接,所以我們使用XMPPTCPConnectionConfiguration來創(chuàng)建我們的配置信息亡容。
其實XMPPTCPConnectionConfiguration繼承了ConnectionConfiguration抽象類嗤疯。并且在構(gòu)造方法調(diào)用父類的構(gòu)造方法。
配置都保存在了ConnectionConfiguration中
public abstract class ConnectionConfiguration {
protected ConnectionConfiguration(Builder<?, ?> builder) {
//略...
authzid = builder.authzid;
username = builder.username;
password = builder.password;
callbackHandler = builder.callbackHandler;
//略...
}
}
第二步:創(chuàng)建用于連接服務(wù)器的核心類XMPPTCPConnection
//將連接配置對象當(dāng)作參數(shù)傳入給XMPPTCPConnection構(gòu)造函數(shù)
XMPPTCPConnection connection = new XMPPTCPConnection(configuration);
配置信息傳入到XMPPTCPConnection和父類AbstractXMPPConnection中各個保存了一份闺兢。
- 2 連接
用XMPPTCPConnection實例進行連接
//進行連接
connection.connect();
首先我們了解一下XMPPTCPConnection的繼承關(guān)系
connect()方法是在AbstractXMPPConnection抽象類中進行實現(xiàn)的茂缚。
public abstract class AbstractXMPPConnection implements XMPPConnection {
//...
public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
// 檢測是否已連接。
throwAlreadyConnectedExceptionIfAppropriate();
// 重置連接狀態(tài)
initState();
closingStreamReceived = false;
streamId = null;
try {
//1.關(guān)鍵屋谭,執(zhí)行連接的方法
connectInternal();
//判斷TLS是否認證過
if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
throw new SecurityRequiredByClientException();
}
} catch (SmackException | IOException | XMPPException | InterruptedException e) {
instantShutdown();
throw e;
}
// 回調(diào)已連接監(jiān)聽
connected = true;
callConnectionConnectedListener();
return this;
}
//...
//2.
protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
}
關(guān)鍵方法是調(diào)用connectInternal();此方法是AbstractXMPPConnection類的抽象方法脚囊,具體要查看他的實現(xiàn)類。
我們實例的實現(xiàn)類為:
XMPPTCPConnection
public class XMPPTCPConnection extends AbstractXMPPConnection {
@Override
protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
//1.連接
connectUsingConfiguration();
//2.連接成功后桐磁,通過流初始化writer Reader悔耘。
initConnection();
// TLS handled will be true either if TLS was established, or if it was not mandatory.
waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
// Wait with SASL auth until the SASL mechanisms have been received
waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
}
private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
//略...
SocketFactory socketFactory = config.getSocketFactory();
for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
Iterator<? extends InetAddress> inetAddresses;
String host = endpoint.getHost().toString();
UInt16 portUint16 = endpoint.getPort();
int port = portUint16.intValue();
if (proxyInfo == null) {//無代理
SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
final InetAddress inetAddress = inetAddresses.next();
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
//1.這里進行異步的連接
socketFuture.connectAsync(inetSocketAddress, timeout);
try {
//2.獲取socket
socket = socketFuture.getOrThrow();
} catch (IOException e) {
RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
endpoint, inetAddress, e);
connectionExceptions.add(rce);
if (inetAddresses.hasNext()) {
continue innerloop;
} else {
break innerloop;
}
}
} else {
//略...
return;
}
}
// There are no more host addresses to try
// throw an exception and report all tried
// HostAddresses in the exception
throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
}
}
connectUsingConfiguration連接方法
可以看見通過用戶傳入的配置信息,獲取到SocketFactory對象,并調(diào)用connectAsync()進行異步連接我擂。
主要步驟為兩步:
1.異步的連接
socketFactory.connectAsync()
2.獲取socket
socket = socketFuture.getOrThrow();
socketFactory.connectAsync() :
public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {
private final Socket socket;
private final Object wasInterruptedLock = new Object();
private boolean wasInterrupted;
public SocketFuture(SocketFactory socketFactory) throws IOException {
socket = socketFactory.createSocket();
}
public void connectAsync(final SocketAddress socketAddress, final int timeout) {
AbstractXMPPConnection.asyncGo(new Runnable() {
@Override
public void run() {
try {
//在這里進行socket的連接操作
socket.connect(socketAddress, timeout);
}
catch (IOException e) {
setException(e);
return;
}
synchronized (wasInterruptedLock) {
if (wasInterrupted) {
closeSocket();
return;
}
}
setResult(socket);
}
});
}
}
到此連接是已經(jīng)建立了,在初始化寫入寫出流在看看我們之前XMPPTCPConnection類中connectInternal方法的第二步
initConnection()也是XMPPTCPConnection類中的方法衬以。
private void initConnection() throws IOException, InterruptedException {
compressionHandler = null;
//1.初始化reader和writer實例對象
initReaderAndWriter();
//2.初始化 啟動 寫入流 線程
packetWriter.init();
//3.啟動讀線程。startup()方法將阻塞校摩,直到從服務(wù)器獲取打開的流數(shù)據(jù)包
packetReader.init();
}
initReaderAndWriter()
private void initReaderAndWriter() throws IOException {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
if (compressionHandler != null) {
is = compressionHandler.getInputStream(is);
os = compressionHandler.getOutputStream(os);
}
// OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
writer = new OutputStreamWriter(os, "UTF-8");
reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
// If debugging is enabled, we open a window and write out all network traffic.
initDebugger();
}
這里很明白看峻,就是通過socket獲取輸入輸出流,創(chuàng)建writer衙吩、reader互妓。
2.packetWriter.init();
PacketWriter對象是在XMPPTCPConnection類的成員變量中創(chuàng)建的
protected final PacketWriter packetWriter = new PacketWriter();
protected final PacketReader packetReader = new PacketReader();
protected class PacketWriter {
//隊列
private final ArrayBlockingQueueWithShutdown<Element> queue =
new ArrayBlockingQueueWithShutdown<>(QUEUE_SIZE, true);
//略...
void init() {
shutdownTimestamp = null;
if (unacknowledgedStanzas != null) {
// It's possible that there are new stanzas in the writer queue that
// came in while we were disconnected but resumable, drain those into
// the unacknowledged queue so that they get resent now
drainWriterQueueToUnacknowledgedStanzas();
}
queue.start();
running = true;
//1.開啟線程
Async.go(new Runnable() {
@Override
public void run() {
LOGGER.finer(threadName + " start");
try {
//2.寫入報文,報文是在queue隊列中獲取的
writePackets();
} finally {
LOGGER.finer(threadName + " exit");
running = false;
notifyWaitingThreads();
}
}
}, threadName);
}
//...
}
//1.開啟線程
//2.寫入報文坤塞,報文是在queue隊列中獲取的
這里就不貼出writePackets代碼了冯勉。
packetReader.init()
我們在看看之前packetReader.init()又做了些什么
可以看到它主要執(zhí)行的代碼為parsePackets() 方法。
通過XmlPullParser parser 解析器來讀取收到的內(nèi)容摹芙。
這里有一個問題XmlPullParser 哪里來的呢灼狰?我們現(xiàn)在看看他賦值的代碼。
在AbstractXMPPConnection類中
private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
sendStreamOpen();
resetParser();
}
private void resetParser() throws IOException {
try {
packetReader.parser = SmackXmlParser.newXmlParser(reader);
} catch (XmlPullParserException e) {
throw new IOException(e);
}
}
看到reader對象了還記得我們之前連接成功后initReaderAndWriter()方法的賦值嗎?
到這里我們整個連接過程就算完成了浮禾。
小結(jié):
1.連接首先會調(diào)用AbstractXMPPConnection的connect()方法伏嗜。
2.方法會調(diào)用實現(xiàn)類XMPPTCPConnection的connectInternal().
-
connectInternal方法做2步操作
- connectUsingConfiguration:
- 通過config創(chuàng)建SocketFactory并得到Socket坛悉。
- Socket進行異步連接。
-連接成功
-initConnection:
-初始化reader和writer實例對象
-初始化packetWriter 寫入也是在報文隊列中承绸,實際通過writer對象寫入。
-初始化packetReader挣轨,報文在隊列中有數(shù)據(jù)時則會讀取军熏。實際會通過reader讀取。 - connectUsingConfiguration:
- 3 登錄
mConnection.login(loginUserId, loginPassword, resource);
調(diào)用其實是抽象類AbstractXMPPConnection中的login()方法卷扮。
public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,SmackException, IOException, InterruptedException {
if (!config.allowNullOrEmptyUsername) {
StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty");
}
throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
throwAlreadyLoggedInExceptionIfAppropriate();
usedUsername = username != null ? username.toString() : null;
usedPassword = password;
usedResource = resource;
loginInternal(usedUsername, usedPassword, usedResource);
}
繼續(xù)調(diào)用其實是AbstractXMPPConnection的一個抽象方法
protected abstract void loginInternal(String username, String password, Resourcepart resource)
throws XMPPException, SmackException, IOException, InterruptedException;
我們這里的實現(xiàn)類是XMPPTCPConnection
Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
SmackException, IOException, InterruptedException {
// 使用SASL進行身份驗證
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
streamFeaturesAfterAuthenticationReceived = false;
//1.進行身份認證
authenticate(username, password, config.getAuthzid(), sslSession);
//在身份驗證之后等待流特性荡澎。
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
//如果啟用了壓縮,則請求服務(wù)器使用流壓縮
maybeEnableCompression();
//略...
//綁定資源
bindResourceAndEstablishSession(resource);
//略...
//2.登錄成功后
afterSuccessfulLogin(false);
}
這個方法有2個地方需要注意
1.進行身份認證
2.登錄成功后的狀態(tài)發(fā)送與監(jiān)聽晤锹。
首先我們看看authenticate()方法做了些什么摩幔。可以看到這里是調(diào)用的方法
protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,SSLSession sslSession) {
//1.認證
SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
//認證成功
afterSaslAuthenticationSuccess();
return saslMechanism;
}
調(diào)用的是 saslAuthentication對象authenticate方法鞭铆。
saslAuthentication其實是在創(chuàng)建連接對象時新建出來的
protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
saslAuthentication = new SASLAuthentication(this, configuration);
//略...
}
我們看看SASLAuthentication類的authenticate方法:
SASLMechanism authenticate(String username, String password, EntityBareJid authzid, SSLSession sslSession)
throws XMPPErrorException, SASLErrorException, IOException,
InterruptedException, SmackSaslException, NotConnectedException, NoResponseException {
//獲取相應(yīng)jid的SASL機制或衡。
final SASLMechanism mechanism = selectMechanism(authzid);
final CallbackHandler callbackHandler = configuration.getCallbackHandler();
final String host = connection.getHost();
final DomainBareJid xmppServiceDomain = connection.getXMPPServiceDomain();
synchronized (this) {
currentMechanism = mechanism;
//這里我們配置未傳入callbackHandler,所以走else
if (callbackHandler != null) {
currentMechanism.authenticate(host, xmppServiceDomain, callbackHandler, authzid, sslSession);
}
else {
//(重要),認證车遂。
currentMechanism.authenticate(username, host, xmppServiceDomain, password, authzid, sslSession);
}
final long deadline = System.currentTimeMillis() + connection.getReplyTimeout();
while (!mechanism.isFinished()) {
final long now = System.currentTimeMillis();
if (now >= deadline) break;
// Wait until SASL negotiation finishes
wait(deadline - now);
}
}
mechanism.throwExceptionIfRequired();
return mechanism;
}
在看看對應(yīng)SASLMechanism 類中的authenticate方法封断。
public final void authenticate(String username, String host, DomainBareJid serviceName, String password,
EntityBareJid authzid, SSLSession sslSession)
throws SmackSaslException, NotConnectedException, InterruptedException {
this.authenticationId = username;
this.host = host;
this.serviceName = serviceName;
this.password = password;
this.authorizationId = authzid;
this.sslSession = sslSession;
assert authorizationId == null || authzidSupported();
authenticateInternal();
//認證
authenticate();
}
private void authenticate() throws SmackSaslException, NotConnectedException, InterruptedException {
//1.通過配置選擇的加密方式加密用戶賬號,密碼
byte[] authenticationBytes = getAuthenticationText();
String authenticationText;
if (authenticationBytes != null && authenticationBytes.length > 0) {
//進行base64編碼
authenticationText = Base64.encodeToString(authenticationBytes);
} else {
authenticationText = "=";
}
//發(fā)送認證到服務(wù)器舶担。
connection.sendNonza(new AuthMechanism(getName(), authenticationText));
}
到這里認證走完了坡疼,然后之前在AbstractXMPPConnection類中authenticate方法中還有afterSaslAuthenticationSuccess方法進行執(zhí)行。
protected void afterSaslAuthenticationSuccess()
throws NotConnectedException, InterruptedException, SmackWrappedException {
sendStreamOpen();
}
protected void sendStreamOpen() throws NotConnectedException, InterruptedException {
CharSequence to = getXMPPServiceDomain();
CharSequence from = null;
CharSequence localpart = config.getUsername();
if (localpart != null) {
from = XmppStringUtils.completeJidFrom(localpart, to);
}
String id = getStreamId();
StreamOpen streamOpen = new StreamOpen(to, from, id, config.getXmlLang(), StreamOpen.StreamContentNamespace.client);
//發(fā)送 流打開報文衣陶。
sendNonza(streamOpen);
XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder();
xmlEnvironmentBuilder.with(streamOpen);
outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build();
}
可以看到這里發(fā)送了一個流報文柄瑰。
最后回到XMPPTCPConnection類中l(wèi)oginInternal方法,現(xiàn)在進行第二步剪况,認證成功后教沾。
Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
SmackException, IOException, InterruptedException {
// 使用SASL進行身份驗證
SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
streamFeaturesAfterAuthenticationReceived = false;
//1.進行身份認證
authenticate(username, password, config.getAuthzid(), sslSession);
//在身份驗證之后等待流特性。
waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
//如果啟用了壓縮拯欧,則請求服務(wù)器使用流壓縮
maybeEnableCompression();
//略...
//綁定資源
bindResourceAndEstablishSession(resource);
//略...
//2.登錄成功后
afterSuccessfulLogin(false);
}
查看afterSuccessfulLogin方法首先調(diào)用的是XMPPTCPConnection類中的方法然后調(diào)用AbstractXMPPConnection抽象類
afterSuccessfulLogin详囤。
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
// Reset the flag in case it was set
disconnectedButResumeable = false;
super.afterSuccessfulLogin(resumed);
}
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
if (!resumed) {
authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
}
this.authenticated = true;
if (debugger != null) {
debugger.userHasLogged(user);
}
//1.回調(diào)認證成功的監(jiān)聽。
callConnectionAuthenticatedListener(resumed);
//2.如果配置了出席狀態(tài),認證成功后發(fā)送available狀態(tài)
if (config.isSendPresence() && !resumed) {
Presence availablePresence = getStanzaFactory()
.buildPresenceStanza()
.ofType(Presence.Type.available)
.build();
sendStanza(availablePresence);
}
}
可以看見它主要就做了這2步操作:
1.回調(diào)認證成功的監(jiān)聽镐作。
2.如果配置了出席狀態(tài),認證成功后發(fā)送available狀態(tài)
protected void callConnectionAuthenticatedListener(boolean resumed) {
for (ConnectionListener listener : connectionListeners) {
try {
listener.authenticated(this, resumed);
} catch (Exception e) {
// Catch and print any exception so we can recover
// from a faulty listener and finish the shutdown process
LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
}
}
}
到這里整個登錄流程就全部走完了藏姐。
小結(jié):
其實整個認證過程首先會通過用戶的Jid得到SASLMechanism,認證主要是在SASLMechanism類authenticate中進行的该贾,它會通過getAuthenticationText()加密拼接你的用戶名密碼羔杨,這個方法是個抽象,加密的方式就是你之前設(shè)置的方式。然后發(fā)送認證的報文給服務(wù)器杨蛋。
這時候認證成功了兜材,然后: 1.發(fā)送了一個流打開的報文理澎。2.回調(diào)認證成功的Listener。3.如果配置了發(fā)送狀態(tài)曙寡,認證成功后還是發(fā)送一個出席狀態(tài)的報文糠爬。
<h1 id="3">3. 消息的發(fā)送</h1>
第一步:創(chuàng)建ChatManager
這里是通過smack-extensions Moudle中ChatManager進行聊天的管理。
chatManager = ChatManager.getInstanceFor(xmppConnection);
public final class ChatManager extends Manager {
private static final Map<XMPPConnection, ChatManager> INSTANCES = new WeakHashMap<>();
public static synchronized ChatManager getInstanceFor(XMPPConnection connection) {
ChatManager chatManager = INSTANCES.get(connection);
if (chatManager == null) {
chatManager = new ChatManager(connection);
INSTANCES.put(connection, chatManager);
}
return chatManager;
}
//略...
}
這里可以看見ChatManager 是繼承與Manager抽象類举庶。在通過單例獲取ChatManager對象的時候會緩存到靜態(tài)HashMap內(nèi)存中执隧,下次如果內(nèi)存中有就直接獲取。
這里貼出Manager類代碼户侥。
public abstract class Manager {
final WeakReference<XMPPConnection> weakConnection;
public Manager(XMPPConnection connection) {
Objects.requireNonNull(connection, "XMPPConnection must not be null");
weakConnection = new WeakReference<>(connection);
}
protected final XMPPConnection connection() {
return weakConnection.get();
}
protected final XMPPConnection getAuthenticatedConnectionOrThrow() throws NotLoggedInException {
XMPPConnection connection = connection();
if (!connection.isAuthenticated()) {
throw new NotLoggedInException();
}
return connection;
}
protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) {
return schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking);
}
protected static final ScheduledAction scheduleBlocking(Runnable runnable, long delay, TimeUnit unit) {
return schedule(runnable, delay, unit, ScheduledAction.Kind.Blocking);
}
protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
return AbstractXMPPConnection.SMACK_REACTOR.schedule(runnable, delay, unit, scheduledActionKind);
}
}
第二步:創(chuàng)建Chat對象
Chat chat = chatManager.chatWith(toUserJid);
private final Map<EntityBareJid, Chat> chats = new ConcurrentHashMap<>();
public Chat chatWith(EntityBareJid jid) {
Chat chat = chats.get(jid);
if (chat == null) {
synchronized (chats) {
// Double-checked locking.
chat = chats.get(jid);
if (chat != null) {
return chat;
}
chat = new Chat(connection(), jid);
chats.put(jid, chat);
}
}
return chat;
}
在ChatManager中通過chatWith方法得到Chat對象,如果內(nèi)存緩存中沒有就進行創(chuàng)建镀琉。
第二步:發(fā)送消息。
Chat chat = chatManager.chatWith(toUserJid);
Message msg = new Message();
msg.setType(Message.Type.chat);
chat.send(msg);
public void send(Message message) throws NotConnectedException, InterruptedException {
switch (message.getType()) {
case normal:
case chat:
break;
default:
throw new IllegalArgumentException("Message must be of type 'normal' or 'chat'");
}
Jid to = lockedResource;
if (to == null) {
to = jid;
}
message.setTo(to);
connection().sendStanza(message);
}
可以看見這里通過我們傳入的connection調(diào)用sendStanza發(fā)送消息蕊唐。
我們在看一下調(diào)用的sendStanza方法是在哪里執(zhí)行的屋摔。
其實sendStanza是XMPPConnection接口的方法,我們用到的XMPPTCPConnection中沒實現(xiàn),在抽象類AbstractXMPPConnection中進行了實現(xiàn)
Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
Objects.requireNonNull(stanza, "Stanza must not be null");
assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;
throwNotConnectedExceptionIfAppropriate();
switch (fromMode) {
case OMITTED:
stanza.setFrom((Jid) null);
break;
case USER:
stanza.setFrom(getUser());
break;
case UNCHANGED:
default:
break;
}
//設(shè)置攔截器
Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
//發(fā)送Stanza
sendStanzaInternal(stanzaAfterInterceptors);
}
可以看到這個方法主要做了2個重要事情:1.設(shè)置攔截器替梨。2.發(fā)送Stanza钓试。我們關(guān)注下sendStanzaInternal方法。
sendStanzaInternal是AbstractXMPPConnection類的一個抽象方法,我們用的是XMPPTCPConnection,所以具體實現(xiàn)查看XMPPTCPConnection中的sendStanzaInternal方法耙替。
protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
//消息寫入
packetWriter.sendStreamElement(packet);
if (isSmEnabled()) {
for (StanzaFilter requestAckPredicate : requestAckPredicates) {
if (requestAckPredicate.accept(packet)) {
requestSmAcknowledgementInternal();
break;
}
}
}
}
看到packetWriter是不是似曾相識亚侠?是的,這個就是我們之前講解的連接成功后初始化在內(nèi)存的那個packetWriter俗扇。
我們看看packetWriter中的sendStreamElement做了什么操作硝烂。
private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
QUEUE_SIZE, true);
protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
try {
//存入隊列
queue.put(element);
}
catch (InterruptedException e) {
// put() may throw an InterruptedException for two reasons:
// 1. If the queue was shut down
// 2. If the thread was interrupted
// so we have to check which is the case
throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
// If the method above did not throw, then the sending thread was interrupted
throw e;
}
}
可以看見我們發(fā)送的報文存在了隊列中,最終寫入的方法是在writePackets里铜幽。writePackets方法中會從隊列中獲取packet滞谢,最終其實是PacketWriter類中writer進行了寫入操作。PacketWriter和writer在前面smack連接的時候已講解過除抛,這里不再細述狮杨,下面貼出writePackets代碼:
小結(jié):
消息發(fā)送我們是用了Smack擴展中的ChatManager進行的,在ChatManager中會保存Chat對象到忽,在我們發(fā)送消息時根據(jù)Jid來獲取到chat對象橄教,在消息發(fā)送時調(diào)用chat.send(),其實最終的實現(xiàn)是在我們的實現(xiàn)類XMPPTCPConnection sendStanzaInternal中喘漏,sendStanzaInternal會調(diào)用packetWriter的sendStreamElement方法护蝶,最終在PacketWriter中sendStreamElement方法會把報文寫入自己的內(nèi)存隊列中,在writePackets方法中取出進行發(fā)送翩迈,最后是PacketWriter類中的writer流進行的寫入操作持灰。
<h1 id="4">4. 消息的接收</h1>
第一步: 添加監(jiān)聽IncomingChatMessageListener
這里監(jiān)聽設(shè)置在smack-extensions Moudle中ChatManager中
chatManager.addIncomingListener(messageListener);
public interface IncomingChatMessageListener {
void newIncomingMessage(EntityBareJid from, Message message, Chat chat);
}
public final class ChatManager extends Manager {
private final Set<IncomingChatMessageListener> incomingListeners = new CopyOnWriteArraySet<>();
private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();
private ChatManager(final XMPPConnection connection) {
super(connection);
//1.設(shè)置XMPPConnection 的監(jiān)聽
connection.addSyncStanzaListener(new StanzaListener() {
@Override
public void processStanza(Stanza stanza) {
final Message message = (Message) stanza;
if (!shouldAcceptMessage(message)) {
return;
}
final Jid from = message.getFrom();
final EntityFullJid fullFrom = from.asEntityFullJidOrThrow();
final EntityBareJid bareFrom = fullFrom.asEntityBareJid();
final Chat chat = chatWith(bareFrom);
chat.lockedResource = fullFrom;
//2.這里所有chat都是保存在asyncButOrdered中,監(jiān)聽是在隊列里排隊回調(diào)负饲。
asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
@Override
public void run() {
//3.回調(diào)incomingListeners
for (IncomingChatMessageListener listener : incomingListeners) {
listener.newIncomingMessage(bareFrom, message, chat);
}
}
});
}
}, INCOMING_MESSAGE_FILTER);
//略...
}
//...
}
我們首先分析里面第二步:
監(jiān)聽到時執(zhí)行的runnable堤魁。asyncButOrdered.performAsyncButOrdered喂链,asyncButOrdered源碼這里就不貼出來了。
每個chat對象回調(diào)監(jiān)聽時都會在AsyncButOrdered 類中存儲一個Map<K, Queue<Runnable>>妥泉,這里 K 為 chat椭微。
然后進行包裝存儲在成員變量 Map<K, Handler> threadActiveMap = new HashMap<>() 中。
這里的Handle 是AsyncButOrdered的一個內(nèi)部類盲链,然后執(zhí)行run方法赏表。
private class Handler implements Runnable {
private final Queue<Runnable> keyQueue;
private final K key;
Handler(Queue<Runnable> keyQueue, K key) {
this.keyQueue = keyQueue;
this.key = key;
}
@Override
public void run() {
//略。。碰纬。
runnable.run();
}
}
最后回到第三步:
asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
@Override
public void run() {
//3.回調(diào)incomingListeners
for (IncomingChatMessageListener listener : incomingListeners) {
listener.newIncomingMessage(bareFrom, message, chat);
}
}
});
這里不難看出為什么要這么做往衷,因為上層可能會有很多監(jiān)聽過來,回調(diào)的調(diào)用其實也消息同樣效拭,量大。并且會有很多不同的chat對象監(jiān)聽返回,smack還進行了區(qū)分保存不同的chat回調(diào)火架,并在回調(diào)中第三個參數(shù)中返回給調(diào)用者了。
并且這個回調(diào)是異步的忙菠,smack在內(nèi)部維護了一個線程池進行處理何鸡。
好,現(xiàn)在我們在回到第一步監(jiān)聽是如何在connection中回調(diào)的牛欢。
第一步:設(shè)置XMPPConnection 的監(jiān)聽
XMPPConnection
void addSyncStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter);
AbstractXMPPConnection
private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();
@Override
public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
if (packetListener == null) {
throw new NullPointerException("Packet listener is null.");
}
ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
synchronized (syncRecvListeners) {
//添加Listener
syncRecvListeners.put(packetListener, wrapper);
}
}
最終實現(xiàn)是在AbstractXMPPConnection類中實現(xiàn)的骡男。
可以看到保存的是一個ListenerWrapper包裝類,第一個參數(shù)是回調(diào)監(jiān)聽傍睹,第二個參數(shù)是我們傳入的監(jiān)聽過濾器隔盛。
現(xiàn)在我們需要看的是那些地方調(diào)用了成員變量syncRecvListeners中的回調(diào)。
我們這里檢索查看是在AbstractXMPPConnection類中invokeStanzaCollectorsAndNotifyRecvListeners方法中進行了調(diào)用
invokeStanzaCollectorsAndNotifyRecvListeners()
protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
listenersToNotify.clear();
//1.如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
//2.
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@Override
public void run() {
Iterator<StanzaListener> it = listenersToNotify.iterator();
synchronized (syncRecvListeners) {
while (it.hasNext()) {
StanzaListener stanzaListener = it.next();
if (!syncRecvListeners.containsKey(stanzaListener)) {
it.remove();
}
}
}
for (StanzaListener listener : listenersToNotify) {
try {
listener.processStanza(packet);
} catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
});
}
第一步:如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
extractMatchingListeners
private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
Collection<StanzaListener> listenersToNotify) {
synchronized (listeners) {
for (ListenerWrapper listenerWrapper : listeners.values()) {
if (listenerWrapper.filterMatches(stanza)) {//過濾
//加入外部拾稳。
listenersToNotify.add(listenerWrapper.getListener());
}
}
}
}
private final StanzaFilter packetFilter;
public boolean filterMatches(Stanza packet) {
return packetFilter == null || packetFilter.accept(packet);
}
這里過濾調(diào)用的是包裝類傳入的那個過濾器吮炕。并通過accept(packet)接口進行過濾。好访得,我們現(xiàn)在在回過頭看看這個過濾器傳入的啥龙亲?
還記得我們之前在ChatManager addSyncStanzaListener() 方法的第二個參數(shù)嗎?
private static final StanzaFilter INCOMING_MESSAGE_FILTER = new AndFilter(
MESSAGE_FILTER,
FromTypeFilter.ENTITY_FULL_JID
);
注意:這個過濾器是一層一層的悍抑,可以層層過濾鳄炉,所以它的構(gòu)造過程也是一層一層的,接下來不要眨眼传趾。
public class AndFilter extends AbstractListFilter implements StanzaFilter {
public AndFilter() {
super();
}
public AndFilter(StanzaFilter... filters) {
super(filters);
}
@Override
public boolean accept(Stanza packet) {
for (StanzaFilter filter : filters) {
if (!filter.accept(packet)) {
return false;
}
}
return true;
}
}
可以看見這里傳入的構(gòu)造傳入的參數(shù)是一個可變參數(shù)迎膜,他的accept方法調(diào)用了StanzaFilter 過濾器的accept方法。
StanzaFilter 其實是一個接口accept是接口 方法浆兰,所以具體實現(xiàn)是在其實現(xiàn)類中
我們這里傳入了2過濾器磕仅,這里只拿出每個構(gòu)造的第一個作為講解珊豹。
private static final StanzaFilter MESSAGE_FILTER = new AndFilter(
MessageTypeFilter.NORMAL_OR_CHAT,
new OrFilter(MessageWithBodiesFilter.INSTANCE, new StanzaExtensionFilter(XHTMLExtension.ELEMENT, XHTMLExtension.NAMESPACE))
);
public static final StanzaFilter NORMAL_OR_CHAT = new OrFilter(NORMAL, CHAT);
public static final StanzaFilter NORMAL = new MessageTypeFilter(Type.normal);
最后定格在MessageTypeFilter中。
public final class MessageTypeFilter extends FlexibleStanzaTypeFilter<Message> {
private final Message.Type type;
private MessageTypeFilter(Message.Type type) {
super(Message.class);
this.type = type;
}
@Override
protected boolean acceptSpecific(Message message) {
return message.getType() == type;
}
@Override
public String toString() {
return getClass().getSimpleName() + ": type=" + type;
}
}
沒有看見accept方法伴哦店茶?客官,別急嘛劫恒!看看他的父類中是否有贩幻。
public abstract class FlexibleStanzaTypeFilter<S extends Stanza> implements StanzaFilter {
protected final Class<S> stanzaType;
public FlexibleStanzaTypeFilter(Class<S> packetType) {
this.stanzaType = Objects.requireNonNull(packetType, "Type must not be null");
}
@SuppressWarnings("unchecked")
public FlexibleStanzaTypeFilter() {
stanzaType = (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
@Override
@SuppressWarnings("unchecked")
public final boolean accept(Stanza packet) {
if (stanzaType.isInstance(packet)) {//1.驗證消息class是否匹配
return acceptSpecific((S) packet);//2.實現(xiàn)類自己的過濾
}
return false;
}
protected abstract boolean acceptSpecific(S packet);
@Override
public String toString() {
return getClass().getSimpleName() + ": " + stanzaType.toString();
}
}
果然在父類中看見了accept方法。
1.處:調(diào)用了stanzaType.isInstance(packet),這里可以看見stanzaType就是傳入的泛型類型两嘴。我們在MessageTypeFilter 這里傳的是super(Message.class); 所以這個過濾就是過濾到不是Message消息的報文丛楚。
2.處:進行2次過濾acceptSpecific()方法,這里他的子類實現(xiàn)是MessageTypeFilter憔辫。
可以看見條件是message.getType() == type趣些,所以消息Type也要與傳入的type進行匹配。
這里過濾器是如何工作的就講解完了贰您。
好了坏平,現(xiàn)在可以眨眼了。
回過頭我們繼續(xù)看過濾后的監(jiān)聽操作 [ invokeStanzaCollectorsAndNotifyRecvListeners]
protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
listenersToNotify.clear();
//1.如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
//2.
ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
@Override
public void run() {
Iterator<StanzaListener> it = listenersToNotify.iterator();
synchronized (syncRecvListeners) {
while (it.hasNext()) {
StanzaListener stanzaListener = it.next();
if (!syncRecvListeners.containsKey(stanzaListener)) {
it.remove();
}
}
}
//3.回調(diào)監(jiān)聽
for (StanzaListener listener : listenersToNotify) {
try {
listener.processStanza(packet);
} catch (NotConnectedException e) {
LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
break;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
}
}
}
});
}
extractMatchingListeners方法傳入的packet 通過過濾后得到符合條件的監(jiān)聽listenersToNotify 對他們進行回調(diào)锦亦。
2.處:ASYNC_BUT_ORDERED.performAsyncButOrdered
這里就不在做講解了舶替,跟之前ChatManager回調(diào)監(jiān)聽時處理是一樣的,只不過這里hashMap中存儲的key為XMPPConnection杠园。
3.處:進行回調(diào)的監(jiān)聽顾瞪。
好,現(xiàn)在監(jiān)聽回調(diào)的調(diào)用地方算是已經(jīng)找到了返劲,但是我們想繼續(xù)看看invokeStanzaCollectorsAndNotifyRecvListeners是在哪里調(diào)用過來的×崦粒現(xiàn)在就一步一步往上面推。
檢索發(fā)有2地方進行調(diào)用篮绿,這里我們的實現(xiàn)類是繼承AbstractXMPPConnection類的孵延,所以查看AbstractXMPPConnection類中調(diào)用invokeStanzaCollectorsAndNotifyRecvListeners的方法
protected void processStanza(final Stanza stanza) throws InterruptedException {
assert stanza != null;
final SmackDebugger debugger = this.debugger;
if (debugger != null) {
debugger.onIncomingStreamElement(stanza);
}
lastStanzaReceived = System.currentTimeMillis();
// 將傳入的數(shù)據(jù)包傳遞給偵聽器
invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
}
然后在向上檢索在AbstractXMPPConnection類中
protected void parseAndProcessStanza(XmlPullParser parser)
throws XmlPullParserException, IOException, InterruptedException {
ParserUtils.assertAtStartTag(parser);
int parserDepth = parser.getDepth();
Stanza stanza = null;
try {
stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment);
}
catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) {
CharSequence content = PacketParserUtils.parseContentDepth(parser,
parserDepth);
UnparseableStanza message = new UnparseableStanza(content, e);
ParsingExceptionCallback callback = getParsingExceptionCallback();
if (callback != null) {
callback.handleUnparsableStanza(message);
}
}
ParserUtils.assertAtEndTag(parser);
if (stanza != null) {
//1.調(diào)用
processStanza(stanza);
}
}
可以看到此方法把XmlPullParser解析的內(nèi)容封裝成stanza并調(diào)用了processStanza方法。
然后在向上面推亲配,看哪里調(diào)用的parseAndProcessStanza方法尘应。
檢索發(fā)現(xiàn)有多處調(diào)用,這里我們的實現(xiàn)是XMPPTCPConnection類吼虎。
還記得我們之前連接成功后初始化的XMPPTCPConnection的內(nèi)部類PacketReader嗎犬钢?
對的,在他的parsePackets方法中思灰,解析了報文的節(jié)點玷犹,然后進行了調(diào)用過程parseAndProcessStanza()
小結(jié):
其他監(jiān)聽其實也是一樣的,比如:群聊的監(jiān)聽洒疚。
當(dāng)消息過來時歹颓,都是通過用戶設(shè)置的過濾器過濾符合用戶需要的監(jiān)聽進行回調(diào)坯屿。他們在處理過濾時的方法都是在 invokeStanzaCollectorsAndNotifyRecvListeners方法中進行的處理
到此,本文smack簡單的解析就結(jié)束了巍扛,這里也是自己的一些小的見解领跛,可能會有不完善和不正確的地方,望大家覺得有不準確的地方撤奸,歡迎指出并一起探討吠昭。