Smack源碼解析

  • [Smack]
  • [1 初始化]
    • [1.1 初始化啟動類]
  • [2 連接與登錄]
    • [2.1 創(chuàng)建連接類]
    • [2.2 連接]
    • [2.3 登錄]
  • [3 消息的發(fā)送]
  • [4 消息的接收]

本文分析Smack源碼版本為4.4.0-alpha6

Smack簡介

Smack是一個開源式散,易于使用的XMPP(jabber)客戶端類庫楷拳。
Smack是用Java編寫的開放源代碼柜砾,高度模塊化,易于使用的XMPP客戶端庫,用于Java SE兼容JVM和Android雨效。它是純Java庫微宝,可以嵌入到您的應(yīng)用程序中,以創(chuàng)建從完整的XMPP即時消息客戶端到簡單的XMPP集成(例如發(fā)送通知消息和支持狀態(tài)的設(shè)備)的任何內(nèi)容蝎宇。Smack和XMPP允許您在(M2M弟劲,IoT,…)之間以多種方式輕松交換數(shù)據(jù)姥芥,例如“發(fā)后不管”兔乞,“發(fā)布訂閱”。

Smack官方資料:

官網(wǎng)地址:https://www.igniterealtime.org/
Github:https://github.com/igniterealtime/Smack/tree/master
文檔:https://download.igniterealtime.org/smack/docs/latest/documentation/
Javadoc: http://www.igniterealtime.org/builds/smack/dailybuilds/javadoc/

Smack Modules:

Smack可以輕松地嵌入到任何現(xiàn)有的Java應(yīng)用程序中。該庫作為幾個模塊提供庸追,可為應(yīng)用程序所需的功能提供更大的靈活性:

  • smack-core-提供核心XMPP功能霍骄。包括XMPP RFC中的所有XMPP功能。
  • smack-im -提供RFC 6121(XMPP-IM)中定義的功能锚国,例如名冊腕巡。
  • smack-tcp-支持基于TCP的XMPP。包括您通常要使用的XMPPTCPConnection類
  • smack-extensions-支持XMPP標準基金會定義的許多擴展(XEP)血筑,包括多用戶聊天绘沉,文件傳輸,用戶搜索等豺总。這些擴展記錄在擴展手冊中车伞。
  • smack-experimental-支持XMPP標準基金會定義的實驗擴展(XEP)。這些擴展的API和功能應(yīng)視為不穩(wěn)定喻喳。
  • smack-legacy -支持XMPP標準基金會定義的舊版擴展(XEP)另玖。
  • smack-bosh-支持BOSH(XEP-0124)。此代碼應(yīng)視為Beta表伦。
  • smack-resolver-minidns-支持在MiniDNS的幫助下解析DNS SRV記錄谦去。不支持javax.naming API的平臺的理想選擇。還支持DNSSEC蹦哼。
  • smack-resolver-dnsjava -支持在dnsjava的幫助下解析DNS SRV記錄鳄哭。
  • smack-resolver-javax -支持使用javax名稱空間API解析DNS SRV記錄。
  • smack-debug-用于協(xié)議流量的增強型GUI調(diào)試器纲熏。在類路徑中找到并啟用調(diào)試后妆丘,它將自動使用它。
  1. 初始化

  2. 1初始化啟動類

1.1.1 SmackInitialization類

配置文件:

初始化是通過配置文件完成的局劲。在smack-core模塊org.jivesoftware.smack / smack-config.xml勺拣。默認情況下,Smack將把此文件打入jar中鱼填。此特定配置包含要加載的初始化程序類的列表药有。此初始化器列表中包含所有需要初始化的管理器類型類。

smack-config.xml

<?xml version="1.0"?>
<!-- Smack configuration file. -->
<smack>
    <!-- Classes that will be loaded when Smack starts -->
    <startupClasses>
        <className>org.jivesoftware.smack.initializer.VmArgInitializer</className>
        <className>org.jivesoftware.smack.ReconnectionManager</className>
    </startupClasses>

    <optionalStartupClasses>
        <className>org.jivesoftware.smack.util.dns.javax.JavaxResolver</className>
        <className>org.jivesoftware.smack.util.dns.minidns.MiniDnsResolver</className>
        <className>org.jivesoftware.smack.util.dns.dnsjava.DNSJavaResolver</className>
        <className>org.jivesoftware.smack.extensions.ExtensionsInitializer</className>
        <className>org.jivesoftware.smack.experimental.ExperimentalInitializer</className>
        <className>org.jivesoftware.smack.legacy.LegacyInitializer</className>
        <className>org.jivesoftware.smack.tcp.TCPInitializer</className>
        <className>org.jivesoftware.smack.sasl.javax.SASLJavaXSmackInitializer</className>
        <className>org.jivesoftware.smack.sasl.provided.SASLProvidedSmackInitializer</className>
        <className>org.jivesoftware.smack.android.AndroidSmackInitializer</className>
        <className>org.jivesoftware.smack.java7.Java7SmackInitializer</className>
        <className>org.jivesoftware.smack.im.SmackImInitializer</className>
        <className>org.jivesoftware.smackx.omemo.OmemoInitializer</className>
        <className>org.jivesoftware.smackx.ox.util.OpenPgpInitializer</className>
    </optionalStartupClasses>
</smack>

然而配置文件是哪里加載的呢?其實是在SmackInitialization類苹丸,他在靜態(tài)代碼塊中進行的加載塑猖,這里貼出關(guān)鍵代碼:

private static final String DEFAULT_CONFIG_FILE = "org.jivesoftware.smack/smack-config.xml";
//...
static {
    //....
    //....
    
    InputStream configFileStream;
    try {
        //1.在這里得到文件的輸入流
        configFileStream = FileUtils.getStreamForClasspathFile(DEFAULT_CONFIG_FILE, null);
    }
    catch (Exception e) {
        throw new IllegalStateException("Could not load Smack configuration file", e);
    }

    try {
        //2.
        processConfigFile(configFileStream, null);
    }
    catch (Exception e) {
        throw new IllegalStateException("Could not parse Smack configuration file", e);
    }

    // Add the Java7 compression handler first, since it's preferred
    SmackConfiguration.addCompressionHandler(new Java7ZlibInputOutputStream());
    
    //....
    //....
}

我們在看看第二步中processConfigFile做了什么操作。

public static void processConfigFile(InputStream cfgFileStream,
                Collection<Exception> exceptions) throws Exception {
    processConfigFile(cfgFileStream, exceptions, SmackInitialization.class.getClassLoader());
}
public static void processConfigFile(InputStream cfgFileStream,
                Collection<Exception> exceptions, ClassLoader classLoader) throws Exception {
    XmlPullParser parser = PacketParserUtils.getParserFor(cfgFileStream);
    XmlPullParser.Event eventType = parser.getEventType();
    do {
        if (eventType == XmlPullParser.Event.START_ELEMENT) {
            //2.
            if (parser.getName().equals("startupClasses")) {
                parseClassesToLoad(parser, false, exceptions, classLoader);
            }
            else if (parser.getName().equals("optionalStartupClasses")) {
                parseClassesToLoad(parser, true, exceptions, classLoader);
            }
        }
        eventType = parser.next();
    }
    //1.判斷是否是節(jié)點結(jié)束 循環(huán)解析
    while (eventType != XmlPullParser.Event.END_DOCUMENT);
    CloseableUtil.maybeClose(cfgFileStream, LOGGER);
}

可以看見它把流通過XmlPullParser解析出startupClasses谈跛、optionalStartupClasses兩個節(jié)點中的內(nèi)容。也就是我們之前 smack-config.xml中的節(jié)點塑陵。
并且都是調(diào)用parseClassesToLoad方法,區(qū)別只在于第二個參數(shù)的boolen不一樣感憾,其實這個boolen關(guān)系不大,在后面我們就知道了。

繼續(xù)看它的parseClassesToLoad方法做了什么:

private static void parseClassesToLoad(XmlPullParser parser, boolean optional,
                Collection<Exception> exceptions, ClassLoader classLoader)
                throws Exception {
    final String startName = parser.getName();
    XmlPullParser.Event eventType;
    outerloop: do {//1.循環(huán)解析出className
        eventType = parser.next();
        if (eventType == XmlPullParser.Event.START_ELEMENT && "className".equals(parser.getName())) {
            String classToLoad = parser.nextText();
            if (SmackConfiguration.isDisabledSmackClass(classToLoad)) {
                continue outerloop;
            }

            try {
                //2.方法中進行反射加載類 參數(shù)classToLoad為節(jié)點中className的內(nèi)容阻桅。
                loadSmackClass(classToLoad, optional, classLoader);
            } catch (Exception e) {
                // Don't throw the exception if an exceptions collection is given, instead
                // record it there. This is used for unit testing purposes.
                if (exceptions != null) {
                    exceptions.add(e);
                } else {
                    throw e;
                }
            }
        }
    }
    while (!(eventType == XmlPullParser.Event.END_ELEMENT && startName.equals(parser.getName())));
}

可以看到第一步循環(huán)出className節(jié)點的內(nèi)容凉倚,第二步把className內(nèi)容String傳給了loadSmackClass方法進行反射加載。

調(diào)用loadSmackClass方法:

    private static void loadSmackClass(String className, boolean optional, ClassLoader classLoader) throws Exception {
        Class<?> initClass;
        try {
            // Attempt to load and initialize the class so that all static initializer blocks of
            // class are executed
            
            //1.反射類的class
            initClass = Class.forName(className, true, classLoader);
        }
        catch (ClassNotFoundException cnfe) {
            Level logLevel;
            if (optional) {
                logLevel = Level.FINE;
            }
            else {
                logLevel = Level.WARNING;
            }
            LOGGER.log(logLevel, "A startup class '" + className + "' could not be loaded.");
            if (!optional) {
                throw cnfe;
            } else {
                return;
            }
        }
        //2.判斷傳入class類型是否是SmackInitializer類型或者其實現(xiàn)類
        if (SmackInitializer.class.isAssignableFrom(initClass)) {
            //3.創(chuàng)建實例對象
            SmackInitializer initializer = (SmackInitializer) initClass.getConstructor().newInstance();
            //4.調(diào)用接口initialize方法嫂沉,其實也就是調(diào)用實例的initialize方法
            List<Exception> exceptions = initializer.initialize();
            if (exceptions == null || exceptions.size() == 0) {
                LOGGER.log(Level.FINE, "Loaded SmackInitializer " + className);
            } else {
                for (Exception e : exceptions) {
                    LOGGER.log(Level.SEVERE, "Exception in loadSmackClass", e);
                }
            }
        } else {
            LOGGER.log(Level.FINE, "Loaded " + className);
        }
    }

第一步:反射類的class稽寒。
第二步:判斷傳入class類型是否是SmackInitializer類型或者其實現(xiàn)類
第三步:創(chuàng)建實例對象
第四步:調(diào)用接口initialize方法,其實也就是調(diào)用實例的initialize方法

我們現(xiàn)在看一下SmackInitializer類趟章,其實是一個接口杏糙。

package org.jivesoftware.smack.initializer;

import java.util.List;

import org.jivesoftware.smack.SmackConfiguration;

/**
 * Defines an initialization class that will be instantiated and invoked by the {@link SmackConfiguration} class during initialization.
 *
 * <p>
 * Any implementation of this class MUST have a default constructor.
 *
 * @author Robin Collier
 *
 */
public interface SmackInitializer {
    List<Exception> initialize();
}

現(xiàn)在可以看出第二步判斷里的邏輯為:SmackInitializer接口的實現(xiàn)類調(diào)用initialize()方法。
好蚓土,這時候我們在回過頭看看我們smack-config.xml中配置的內(nèi)容宏侍。

VmArgInitializer類

public class VmArgInitializer extends UrlInitializer {

    protected String getFilePath() {
        return System.getProperty("smack.provider.file");
    }

    @Override
    public List<Exception> initialize() {
        if (getFilePath() != null) {
            super.initialize();
        }
        return Collections.emptyList();
    }
}

父類UrlInitializer

public abstract class UrlInitializer implements SmackInitializer {
    //...
       @Override
    public List<Exception> initialize() {
        //...
        //...
    }
    //...
}

可以看見這個類是實現(xiàn)了SmackInitializer接口的。

在看看[smack-config.xml]中的第二個配置類
ReconnectionManager

public final class ReconnectionManager {
    //...
    //...
    static {
        XMPPConnectionRegistry.addConnectionCreationListener(new ConnectionCreationListener() {
            @Override
            public void connectionCreated(XMPPConnection connection) {
                if (connection instanceof AbstractXMPPConnection) {
                    ReconnectionManager.getInstanceFor((AbstractXMPPConnection) connection);
                }
            }
        });
    }
    //...
    //...
}

可以看到這個類并沒有實現(xiàn)SmackInitializer接口,只是類中有靜態(tài)代碼塊執(zhí)行而已蜀漆。

在看看[smack-config.xml]中的第三個配置類
JavaxResolver

public class JavaxResolver extends DNSResolver implements SmackInitializer {
    //...
    //...
    static {
        try {
            Hashtable<String, String> env = new Hashtable<>();
            env.put("java.naming.factory.initial", "com.sun.jndi.dns.DnsContextFactory");
            dirContext = new InitialDirContext(env);
        } catch (NamingException e) {
            LOGGER.log(Level.SEVERE, "Could not construct InitialDirContext", e);
        }
    
        // Try to set this DNS resolver as primary one
        setup();
    }

    public static void setup() {
        DNSUtil.setDNSResolver(getInstance());
    }

    //...
    //...

    @Override
    public List<Exception> initialize() {
        setup();
        return null;
    }
    //...
    //...
}

這個類也實現(xiàn)了SmackInitializer 接口谅河。

然而實現(xiàn)類方法到底執(zhí)行了什么呢,這里我們舉個栗子确丢,其他的就不一一給大家分析了绷耍。
看好,就是這個栗子:SmackImInitializer

public class SmackImInitializer extends UrlInitializer {

    @Override
    protected String getProvidersUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.providers";
    }

    @Override
    protected String getConfigUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.xml";
    }

}

父類UrlInitializer

public abstract class UrlInitializer implements SmackInitializer {
    private static final Logger LOGGER = Logger.getLogger(UrlInitializer.class.getName());

    //1.接口的實現(xiàn)方法
    @Override
    public List<Exception> initialize() {
        InputStream is = null;
        final ClassLoader classLoader = this.getClass().getClassLoader();
        final List<Exception> exceptions = new LinkedList<Exception>();
        
        //2.getProvidersUri() 有些子類對其進行了方法重載鲜侥,所以具體獲取要看子類是否重載
        final String providerUriString = getProvidersUri();
        if (providerUriString != null) {
            try {
                final URI providerUri = URI.create(providerUriString);
                is = FileUtils.getStreamForUri(providerUri, classLoader);

                LOGGER.log(Level.FINE, "Loading providers for providerUri [" + providerUri + "]");
                //3.此方法中進行類加載到內(nèi)存操作
                ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
                ProviderManager.addLoader(pfl);
                exceptions.addAll(pfl.getLoadingExceptions());
            }
            catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Error trying to load provider file " + providerUriString, e);
                exceptions.add(e);
            } finally {
                maybeClose(is);
            }
        }

        //4.getConfigUri() 有些子類對其進行了方法重載褂始,所以具體獲取要看子類是否重載
        final String configUriString = getConfigUri();
        if (configUriString != null) {
            try {
                final URI configUri = URI.create(configUriString);
                is = FileUtils.getStreamForUri(configUri, classLoader);
                //5.調(diào)用加載類操作
                SmackInitialization.processConfigFile(is, exceptions, classLoader);
            }
            catch (Exception e) {
                exceptions.add(e);
            } finally {
                maybeClose(is);
            }
        }
        return exceptions;
    }

    protected String getProvidersUri() {
        return null;
    }

    protected String getConfigUri() {
        return null;
    }

    private static void maybeClose(InputStream is) {
        CloseableUtil.maybeClose(is, LOGGER);
    }
}

第一步:SmackImInitializer實例會調(diào)用父類UrlInitializer的initialize方法。
第二步:getProvidersUri()方法,因為SmackImInitializer對其進行了重載剃毒,所以這里看SmackImInitializer中g(shù)etProvidersUri()的方法

 protected String getProvidersUri() {
        return "classpath:org.jivesoftware.smack.im/smackim.providers";
 }

可以看見是一個String路徑病袄,這個路徑文件其實是在SmackImInitializer類所在的smack-im moudle下,我們看一下內(nèi)容:
smackim.providers

<?xml version="1.0"?> 
<smackProviders>

    <!-- RFC-6121: Extensible Messaging and Presence Protocol (XMPP): Instant Messaging and Presence -->
    <iqProvider>
        <elementName>query</elementName>
        <namespace>jabber:iq:roster</namespace>
        <className>org.jivesoftware.smack.roster.provider.RosterPacketReNameProvider</className>
    </iqProvider>
    <streamFeatureProvider>
        <elementName>sub</elementName>
        <namespace>urn:xmpp:features:pre-approval</namespace>
        <className>org.jivesoftware.smack.roster.provider.SubscriptionPreApprovalStreamFeatureProvider</className>
    </streamFeatureProvider>

    <!-- XEP-0237: Roster Versioning -->
    <streamFeatureProvider>
        <elementName>ver</elementName>
        <namespace>urn:xmpp:features:rosterver</namespace>
        <className>org.jivesoftware.smack.roster.provider.RosterVerStreamFeatureProvider</className>
    </streamFeatureProvider>

</smackProviders>

第三步:ProviderFileLoader pfl = new ProviderFileLoader(is, classLoader);
把smackim.providers配置文件的輸入流傳入赘阀。

第四步:這里在看看ProviderFileLoader中做了什么益缠?

public class ProviderFileLoader implements ProviderLoader {

    @SuppressWarnings("unchecked")
    public ProviderFileLoader(InputStream providerStream, ClassLoader classLoader) {
        //1.這里省略部分代碼
        //其實就是把InputStream通過XmlPullParser拿到className節(jié)點信息.
        //...
        try {
            //2.加載class
            final Class<?> provider = classLoader.loadClass(className);
            switch (typeName) {
            case "iqProvider":
                //3.節(jié)點為iqProvider 創(chuàng)建實例類并加入內(nèi)存iqProviders
                if (IqProvider.class.isAssignableFrom(provider)) {
                    IqProvider<IQ> iqProvider = (IqProvider<IQ>) provider.getConstructor().newInstance();
                    iqProviders.add(new IQProviderInfo(elementName, namespace, iqProvider));
                }
                else {
                    exceptions.add(new IllegalArgumentException(className + " is not a IQProvider"));
                }
                break;
            case "extensionProvider":
                //...
                //...
                break;
            case "streamFeatureProvider":
                //4.節(jié)點為streamFeatureProvider創(chuàng)建實例類并加入內(nèi)存sfProviders
                ExtensionElementProvider<ExtensionElement> streamFeatureProvider = (ExtensionElementProvider<ExtensionElement>) provider.getConstructor().newInstance();
                sfProviders.add(new StreamFeatureProviderInfo(elementName,
                                namespace,
                                streamFeatureProvider));
                break;
            default:
                LOGGER.warning("Unknown provider type: " + typeName);
            }
        }
        catch (ClassNotFoundException cnfe) {
            LOGGER.log(Level.SEVERE, "Could not find provider class", cnfe);
            exceptions.add(cnfe);
        }
        catch (InstantiationException ie) {
            LOGGER.log(Level.SEVERE, "Could not instanciate " + className, ie);
            exceptions.add(ie);
        }
        
        //...
        //...
    }
    
}

1.把傳入的配置文件流InputStream通過XmlPullParser拿到className節(jié)點信息.
2.加載配置文件中的class。
3.通過switch判斷是iqProvider節(jié)點的加入到iqProviders中基公,是streamFeatureProvider節(jié)點的加入到sfProviders中幅慌。

其實smackim.providers文件跟 smack-config.xml文件一樣都是配置文件,只不過smackim.providers是屬于一個moudle中的配置文件轰豆,而smack-config.xml是核心smack-core Moudle中總得配置文件而已(及Smack配置文件)胰伍。

這里梳理一下:所有初始化都是在SmackInitialization的靜態(tài)代碼塊中執(zhí)行的。也就是說那些地方創(chuàng)建獲取調(diào)用了此類方法酸休,此類的靜態(tài)代碼塊就被會執(zhí)行骂租。

最后看一下那些地方調(diào)用了這個方法:
SmackConfiguration類中調(diào)用了他獲取版本的方法。


SmackConfiguration

在看看那些調(diào)用了SmackConfiguration.getVersion();


SmackConfigurationGetVersion

可以看到很多類中都有調(diào)用斑司,并且關(guān)鍵的AbstractXMPPConnection連接類靜態(tài)代碼塊中也調(diào)用了此方法渗饮。
所以我們得出結(jié)論:只要你創(chuàng)建對象進行連接操作,框架內(nèi)部就會幫你初始化好配置。

1.1.2 SmackConfiguration類

這里截圖這個類的部分代碼

public final class SmackConfiguration {
    //...
    //...

    public static boolean DEBUG = false;

    private static SmackDebuggerFactory DEFAULT_DEBUGGER_FACTORY = ReflectionDebuggerFactory.INSTANCE;

    /**
     * The default parsing exception callback is {@link ExceptionThrowingCallback} which will
     * throw an exception and therefore disconnect the active connection.
     */
    private static ParsingExceptionCallback defaultCallback = new ExceptionThrowingCallbackWithHint();

    private static HostnameVerifier defaultHostnameVerififer;

    /**
     * Returns the Smack version information, eg "1.3.0".
     *
     * @return the Smack version information.
     */
    public static String getVersion() {
        return SmackInitialization.SMACK_VERSION;
    }
    
    //...
    //...
}

可以看到這個類是final 并且里面全都是靜態(tài)方法和靜態(tài)成員變量互站。其實這個類作用就是在你使用smack前可以進行靜態(tài)值得配置私蕾。比如:SmackConfiguration.DEBUG = true;

小結(jié):
其實Smack的初始化操作都是在smack-core moudle下SmackInitialization類的靜態(tài)代碼中完成的,并且配置文件在同一moudle下的smack-config.xml文件胡桃。

SmackConfiguration類getVersion()時踩叭,SmackInitialization的靜態(tài)代碼塊就會執(zhí)行。并且關(guān)鍵的AbstractXMPPConnection連接類靜態(tài)代碼塊中也調(diào)用了此方法翠胰,所以只要你創(chuàng)建對象進行連接操作容贝,框架內(nèi)部就會幫你初始化好配置。

  1. 連接與登錄

  2. 1創(chuàng)建連接類
    第一步:創(chuàng)建連接配置

configuration = XMPPTCPConnectionConfiguration.builder()
            .setXmppDomain(myibc_domain)
            .setHostAddress(InetAddress.getByName(mXmppHost))
            .setPort(mXmppPort)
            .setSendPresence(false)
            .setSecurityMode(ConnectionConfiguration.SecurityMode.required)
            .setConnectTimeout(30 * 1000)
            .setCompressionEnabled(false)
            .setCustomSSLContext(SSLUtils.getSSLCAContext(""))
            .setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER)
            .build();

這里我們用的是TCP的連接,所以我們使用XMPPTCPConnectionConfiguration來創(chuàng)建我們的配置信息亡容。
其實XMPPTCPConnectionConfiguration繼承了ConnectionConfiguration抽象類嗤疯。并且在構(gòu)造方法調(diào)用父類的構(gòu)造方法。
配置都保存在了ConnectionConfiguration中

public abstract class ConnectionConfiguration {
    protected ConnectionConfiguration(Builder<?, ?> builder) {
        //略...
        authzid = builder.authzid;
        username = builder.username;
        password = builder.password;
        callbackHandler = builder.callbackHandler;
        //略...
    }
}

第二步:創(chuàng)建用于連接服務(wù)器的核心類XMPPTCPConnection

//將連接配置對象當(dāng)作參數(shù)傳入給XMPPTCPConnection構(gòu)造函數(shù)
XMPPTCPConnection connection = new XMPPTCPConnection(configuration);

配置信息傳入到XMPPTCPConnection和父類AbstractXMPPConnection中各個保存了一份闺兢。

  1. 2 連接
    用XMPPTCPConnection實例進行連接
//進行連接
connection.connect();

首先我們了解一下XMPPTCPConnection的繼承關(guān)系


XMPPTCPConnectionExtends

connect()方法是在AbstractXMPPConnection抽象類中進行實現(xiàn)的茂缚。

public abstract class AbstractXMPPConnection implements XMPPConnection {
    //...
    
    public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
        // 檢測是否已連接。
        throwAlreadyConnectedExceptionIfAppropriate();

        // 重置連接狀態(tài)
        initState();
        closingStreamReceived = false;
        streamId = null;

        try {
            //1.關(guān)鍵屋谭,執(zhí)行連接的方法
            connectInternal();

            //判斷TLS是否認證過
            if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
                throw new SecurityRequiredByClientException();
            }
        } catch (SmackException | IOException | XMPPException | InterruptedException e) {
            instantShutdown();
            throw e;
        }

        // 回調(diào)已連接監(jiān)聽
        connected = true;
        callConnectionConnectedListener();

        return this;
    }
    
    //...

    //2.
    protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
}

關(guān)鍵方法是調(diào)用connectInternal();此方法是AbstractXMPPConnection類的抽象方法脚囊,具體要查看他的實現(xiàn)類。
我們實例的實現(xiàn)類為:
XMPPTCPConnection

public class XMPPTCPConnection extends AbstractXMPPConnection {

    @Override
    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
        //1.連接
        connectUsingConfiguration();

        //2.連接成功后桐磁,通過流初始化writer Reader悔耘。
        initConnection();

        // TLS handled will be true either if TLS was established, or if it was not mandatory.
        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");

        // Wait with SASL auth until the SASL mechanisms have been received
        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
    }


   private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
        //略...
        SocketFactory socketFactory = config.getSocketFactory();
        
        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
            Iterator<? extends InetAddress> inetAddresses;
            String host = endpoint.getHost().toString();
            UInt16 portUint16 = endpoint.getPort();
            int port = portUint16.intValue();
            if (proxyInfo == null) {//無代理
                SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
                final InetAddress inetAddress = inetAddresses.next();
                final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
                LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
                //1.這里進行異步的連接
                socketFuture.connectAsync(inetSocketAddress, timeout);
                try {
                    //2.獲取socket
                    socket = socketFuture.getOrThrow();
                } catch (IOException e) {
                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
                                    endpoint, inetAddress, e);
                    connectionExceptions.add(rce);
                    if (inetAddresses.hasNext()) {
                        continue innerloop;
                    } else {
                        break innerloop;
                    }
                }
            } else {
                //略...
                return;
            }
        }

        // There are no more host addresses to try
        // throw an exception and report all tried
        // HostAddresses in the exception
        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
    }
    
}

connectUsingConfiguration連接方法
可以看見通過用戶傳入的配置信息,獲取到SocketFactory對象,并調(diào)用connectAsync()進行異步連接我擂。

主要步驟為兩步:
1.異步的連接
socketFactory.connectAsync()
2.獲取socket
socket = socketFuture.getOrThrow();

socketFactory.connectAsync() :

   public static class SocketFuture extends InternalSmackFuture<Socket, IOException> {

    private final Socket socket;

    private final Object wasInterruptedLock = new Object();

    private boolean wasInterrupted;

    public SocketFuture(SocketFactory socketFactory) throws IOException {
        socket = socketFactory.createSocket();
    }
    
    public void connectAsync(final SocketAddress socketAddress, final int timeout) {
        AbstractXMPPConnection.asyncGo(new Runnable() {
            @Override
            public void run() {
                try {
                    //在這里進行socket的連接操作
                    socket.connect(socketAddress, timeout);
                }
                catch (IOException e) {
                    setException(e);
                    return;
                }
                synchronized (wasInterruptedLock) {
                    if (wasInterrupted) {
                        closeSocket();
                        return;
                    }
                }
                setResult(socket);
            }
        });
    }
}

到此連接是已經(jīng)建立了,在初始化寫入寫出流在看看我們之前XMPPTCPConnection類中connectInternal方法的第二步
initConnection()也是XMPPTCPConnection類中的方法衬以。

private void initConnection() throws IOException, InterruptedException {
    compressionHandler = null;

    //1.初始化reader和writer實例對象
    initReaderAndWriter();

    //2.初始化 啟動 寫入流 線程 
    packetWriter.init();
    //3.啟動讀線程。startup()方法將阻塞校摩,直到從服務(wù)器獲取打開的流數(shù)據(jù)包
    packetReader.init();
}

initReaderAndWriter()

private void initReaderAndWriter() throws IOException {
    InputStream is = socket.getInputStream();
    OutputStream os = socket.getOutputStream();
    if (compressionHandler != null) {
        is = compressionHandler.getInputStream(is);
        os = compressionHandler.getOutputStream(os);
    }
    // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
    writer = new OutputStreamWriter(os, "UTF-8");
    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));

    // If debugging is enabled, we open a window and write out all network traffic.
    initDebugger();
}

這里很明白看峻,就是通過socket獲取輸入輸出流,創(chuàng)建writer衙吩、reader互妓。

2.packetWriter.init();
PacketWriter對象是在XMPPTCPConnection類的成員變量中創(chuàng)建的

protected final PacketWriter packetWriter = new PacketWriter();

protected final PacketReader packetReader = new PacketReader();
protected class PacketWriter {
    //隊列
    private final ArrayBlockingQueueWithShutdown<Element> queue = 
            new ArrayBlockingQueueWithShutdown<>(QUEUE_SIZE, true);

    //略...
    void init() {
        shutdownTimestamp = null;

        if (unacknowledgedStanzas != null) {
            // It's possible that there are new stanzas in the writer queue that
            // came in while we were disconnected but resumable, drain those into
            // the unacknowledged queue so that they get resent now
            drainWriterQueueToUnacknowledgedStanzas();
        }

        queue.start();
        running = true;
        //1.開啟線程
        Async.go(new Runnable() {
            @Override
            public void run() {
                LOGGER.finer(threadName + " start");
                try {
                    //2.寫入報文,報文是在queue隊列中獲取的
                    writePackets();
                } finally {
                    LOGGER.finer(threadName + " exit");
                    running = false;
                    notifyWaitingThreads();
                }
            }
        }, threadName);
    }

    //...
    
}

//1.開啟線程
//2.寫入報文坤塞,報文是在queue隊列中獲取的
這里就不貼出writePackets代碼了冯勉。

packetReader.init()
我們在看看之前packetReader.init()又做了些什么


PacketReader

可以看到它主要執(zhí)行的代碼為parsePackets() 方法。


parsePackets

通過XmlPullParser parser 解析器來讀取收到的內(nèi)容摹芙。

這里有一個問題XmlPullParser 哪里來的呢灼狰?我們現(xiàn)在看看他賦值的代碼。
在AbstractXMPPConnection類中

private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
    sendStreamOpen();
    resetParser();
}
private void resetParser() throws IOException {
        try {
            packetReader.parser = SmackXmlParser.newXmlParser(reader);
        } catch (XmlPullParserException e) {
            throw new IOException(e);
        }
   }

看到reader對象了還記得我們之前連接成功后initReaderAndWriter()方法的賦值嗎?

到這里我們整個連接過程就算完成了浮禾。

小結(jié):
1.連接首先會調(diào)用AbstractXMPPConnection的connect()方法伏嗜。
2.方法會調(diào)用實現(xiàn)類XMPPTCPConnection的connectInternal().

  • connectInternal方法做2步操作

    • connectUsingConfiguration:
      • 通過config創(chuàng)建SocketFactory并得到Socket坛悉。
      • Socket進行異步連接。

    -連接成功
    -initConnection:
    -初始化reader和writer實例對象
    -初始化packetWriter 寫入也是在報文隊列中承绸,實際通過writer對象寫入。
    -初始化packetReader挣轨,報文在隊列中有數(shù)據(jù)時則會讀取军熏。實際會通過reader讀取。

  1. 3 登錄
mConnection.login(loginUserId, loginPassword, resource);

調(diào)用其實是抽象類AbstractXMPPConnection中的login()方法卷扮。

public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,SmackException, IOException, InterruptedException {
    if (!config.allowNullOrEmptyUsername) {
        StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty");
    }
    throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
    throwAlreadyLoggedInExceptionIfAppropriate();
    usedUsername = username != null ? username.toString() : null;
    usedPassword = password;
    usedResource = resource;
    loginInternal(usedUsername, usedPassword, usedResource);
}

繼續(xù)調(diào)用其實是AbstractXMPPConnection的一個抽象方法

protected abstract void loginInternal(String username, String password, Resourcepart resource)
                throws XMPPException, SmackException, IOException, InterruptedException;

我們這里的實現(xiàn)類是XMPPTCPConnection

Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
                SmackException, IOException, InterruptedException {
    // 使用SASL進行身份驗證
    SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;

    streamFeaturesAfterAuthenticationReceived = false;
    //1.進行身份認證
    authenticate(username, password, config.getAuthzid(), sslSession);

    //在身份驗證之后等待流特性荡澎。
    waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
    
    //如果啟用了壓縮,則請求服務(wù)器使用流壓縮
    maybeEnableCompression();
    
    //略...
    
    //綁定資源
    bindResourceAndEstablishSession(resource);
    
    //略...
    
    //2.登錄成功后
    afterSuccessfulLogin(false);
}

這個方法有2個地方需要注意
1.進行身份認證
2.登錄成功后的狀態(tài)發(fā)送與監(jiān)聽晤锹。

首先我們看看authenticate()方法做了些什么摩幔。可以看到這里是調(diào)用的方法

protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,SSLSession sslSession)  {
    //1.認證
    SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
    //認證成功
    afterSaslAuthenticationSuccess();
    return saslMechanism;
}

調(diào)用的是 saslAuthentication對象authenticate方法鞭铆。
saslAuthentication其實是在創(chuàng)建連接對象時新建出來的

protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
    saslAuthentication = new SASLAuthentication(this, configuration);   
    //略...
}

我們看看SASLAuthentication類的authenticate方法:

    SASLMechanism authenticate(String username, String password, EntityBareJid authzid, SSLSession sslSession)
                    throws XMPPErrorException, SASLErrorException, IOException,
                    InterruptedException, SmackSaslException, NotConnectedException, NoResponseException {

        //獲取相應(yīng)jid的SASL機制或衡。
        final SASLMechanism mechanism = selectMechanism(authzid);
        final CallbackHandler callbackHandler = configuration.getCallbackHandler();
        final String host = connection.getHost();
        final DomainBareJid xmppServiceDomain = connection.getXMPPServiceDomain();

        synchronized (this) {
            currentMechanism = mechanism;

            //這里我們配置未傳入callbackHandler,所以走else
            if (callbackHandler != null) {
                currentMechanism.authenticate(host, xmppServiceDomain, callbackHandler, authzid, sslSession);
            }
            else {
                //(重要),認證车遂。
                currentMechanism.authenticate(username, host, xmppServiceDomain, password, authzid, sslSession);
            }

            final long deadline = System.currentTimeMillis() + connection.getReplyTimeout();
            while (!mechanism.isFinished()) {
                final long now = System.currentTimeMillis();
                if (now >= deadline) break;
                // Wait until SASL negotiation finishes
                wait(deadline - now);
            }
        }

        mechanism.throwExceptionIfRequired();

        return mechanism;
    }

在看看對應(yīng)SASLMechanism 類中的authenticate方法封断。

public final void authenticate(String username, String host, DomainBareJid serviceName, String password,
                EntityBareJid authzid, SSLSession sslSession)
                throws SmackSaslException, NotConnectedException, InterruptedException {
    this.authenticationId = username;
    this.host = host;
    this.serviceName = serviceName;
    this.password = password;
    this.authorizationId = authzid;
    this.sslSession = sslSession;
    assert authorizationId == null || authzidSupported();
    authenticateInternal();
    //認證
    authenticate();
}

private void authenticate() throws SmackSaslException, NotConnectedException, InterruptedException {
    //1.通過配置選擇的加密方式加密用戶賬號,密碼
    byte[] authenticationBytes = getAuthenticationText();
    String authenticationText;
    if (authenticationBytes != null && authenticationBytes.length > 0) {
        //進行base64編碼
        authenticationText = Base64.encodeToString(authenticationBytes);
    } else {
        authenticationText = "=";
    }
    //發(fā)送認證到服務(wù)器舶担。
    connection.sendNonza(new AuthMechanism(getName(), authenticationText));
}

到這里認證走完了坡疼,然后之前在AbstractXMPPConnection類中authenticate方法中還有afterSaslAuthenticationSuccess方法進行執(zhí)行。

protected void afterSaslAuthenticationSuccess()
                 throws NotConnectedException, InterruptedException, SmackWrappedException {
     sendStreamOpen();
 }



protected void sendStreamOpen() throws NotConnectedException, InterruptedException {

    CharSequence to = getXMPPServiceDomain();
    CharSequence from = null;
    CharSequence localpart = config.getUsername();
    if (localpart != null) {
        from = XmppStringUtils.completeJidFrom(localpart, to);
    }
    String id = getStreamId();

    StreamOpen streamOpen = new StreamOpen(to, from, id, config.getXmlLang(), StreamOpen.StreamContentNamespace.client);
    //發(fā)送 流打開報文衣陶。
    sendNonza(streamOpen);

    XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder();
    xmlEnvironmentBuilder.with(streamOpen);
    outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build();
}

可以看到這里發(fā)送了一個流報文柄瑰。

最后回到XMPPTCPConnection類中l(wèi)oginInternal方法,現(xiàn)在進行第二步剪况,認證成功后教沾。

Override
protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
                SmackException, IOException, InterruptedException {
    // 使用SASL進行身份驗證
    SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;

    streamFeaturesAfterAuthenticationReceived = false;
    //1.進行身份認證
    authenticate(username, password, config.getAuthzid(), sslSession);

    //在身份驗證之后等待流特性。
    waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
    
    //如果啟用了壓縮拯欧,則請求服務(wù)器使用流壓縮
    maybeEnableCompression();
    
    //略...
    
    //綁定資源
    bindResourceAndEstablishSession(resource);
    
    //略...
    
    //2.登錄成功后
    afterSuccessfulLogin(false);
}

查看afterSuccessfulLogin方法首先調(diào)用的是XMPPTCPConnection類中的方法然后調(diào)用AbstractXMPPConnection抽象類
afterSuccessfulLogin详囤。

protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
    // Reset the flag in case it was set
    disconnectedButResumeable = false;
    super.afterSuccessfulLogin(resumed);
}
protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
    if (!resumed) {
        authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
    }

    this.authenticated = true;


    if (debugger != null) {
        debugger.userHasLogged(user);
    }
    
    //1.回調(diào)認證成功的監(jiān)聽。
    callConnectionAuthenticatedListener(resumed);

    //2.如果配置了出席狀態(tài),認證成功后發(fā)送available狀態(tài)
    if (config.isSendPresence() && !resumed) {
        Presence availablePresence = getStanzaFactory()
                        .buildPresenceStanza()
                        .ofType(Presence.Type.available)
                        .build();
        sendStanza(availablePresence);
    }
}

可以看見它主要就做了這2步操作:
1.回調(diào)認證成功的監(jiān)聽镐作。
2.如果配置了出席狀態(tài),認證成功后發(fā)送available狀態(tài)

  protected void callConnectionAuthenticatedListener(boolean resumed) {
       for (ConnectionListener listener : connectionListeners) {
           try {
               listener.authenticated(this, resumed);
           } catch (Exception e) {
               // Catch and print any exception so we can recover
               // from a faulty listener and finish the shutdown process
               LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
           }
       }
   }

到這里整個登錄流程就全部走完了藏姐。

小結(jié):
其實整個認證過程首先會通過用戶的Jid得到SASLMechanism,認證主要是在SASLMechanism類authenticate中進行的该贾,它會通過getAuthenticationText()加密拼接你的用戶名密碼羔杨,這個方法是個抽象,加密的方式就是你之前設(shè)置的方式。然后發(fā)送認證的報文給服務(wù)器杨蛋。

這時候認證成功了兜材,然后: 1.發(fā)送了一個流打開的報文理澎。2.回調(diào)認證成功的Listener。3.如果配置了發(fā)送狀態(tài)曙寡,認證成功后還是發(fā)送一個出席狀態(tài)的報文糠爬。

<h1 id="3">3. 消息的發(fā)送</h1>
第一步:創(chuàng)建ChatManager
這里是通過smack-extensions Moudle中ChatManager進行聊天的管理。

chatManager = ChatManager.getInstanceFor(xmppConnection);
public final class ChatManager extends Manager {
    private static final Map<XMPPConnection, ChatManager> INSTANCES = new WeakHashMap<>();

    public static synchronized ChatManager getInstanceFor(XMPPConnection connection) {
        ChatManager chatManager = INSTANCES.get(connection);
        if (chatManager == null) {
            chatManager = new ChatManager(connection);
            INSTANCES.put(connection, chatManager);
        }
        return chatManager;
    }
    
    //略...
}

這里可以看見ChatManager 是繼承與Manager抽象類举庶。在通過單例獲取ChatManager對象的時候會緩存到靜態(tài)HashMap內(nèi)存中执隧,下次如果內(nèi)存中有就直接獲取。

這里貼出Manager類代碼户侥。

public abstract class Manager {

    final WeakReference<XMPPConnection> weakConnection;

    public Manager(XMPPConnection connection) {
        Objects.requireNonNull(connection, "XMPPConnection must not be null");

        weakConnection = new WeakReference<>(connection);
    }

    protected final XMPPConnection connection() {
        return weakConnection.get();
    }

    protected final XMPPConnection getAuthenticatedConnectionOrThrow() throws NotLoggedInException {
        XMPPConnection connection = connection();
        if (!connection.isAuthenticated()) {
            throw new NotLoggedInException();
        }
        return connection;
    }

    protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) {
        return schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking);
    }

    protected static final ScheduledAction scheduleBlocking(Runnable runnable, long delay, TimeUnit unit) {
        return schedule(runnable, delay, unit, ScheduledAction.Kind.Blocking);
    }

    protected static final ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
        return AbstractXMPPConnection.SMACK_REACTOR.schedule(runnable, delay, unit, scheduledActionKind);
    }
}

第二步:創(chuàng)建Chat對象

Chat chat = chatManager.chatWith(toUserJid);

private final Map<EntityBareJid, Chat> chats = new ConcurrentHashMap<>();

public Chat chatWith(EntityBareJid jid) {
    Chat chat = chats.get(jid);
    if (chat == null) {
        synchronized (chats) {
            // Double-checked locking.
            chat = chats.get(jid);
            if (chat != null) {
                return chat;
            }
            chat = new Chat(connection(), jid);
            chats.put(jid, chat);
        }
    }
    return chat;
}

在ChatManager中通過chatWith方法得到Chat對象,如果內(nèi)存緩存中沒有就進行創(chuàng)建镀琉。

第二步:發(fā)送消息。

Chat chat = chatManager.chatWith(toUserJid);
Message msg = new Message();
msg.setType(Message.Type.chat);
chat.send(msg);
public void send(Message message) throws NotConnectedException, InterruptedException {
    switch (message.getType()) {
    case normal:
    case chat:
        break;
    default:
        throw new IllegalArgumentException("Message must be of type 'normal' or 'chat'");
    }

    Jid to = lockedResource;
    if (to == null) {
        to = jid;
    }
    message.setTo(to);

    connection().sendStanza(message);
}

可以看見這里通過我們傳入的connection調(diào)用sendStanza發(fā)送消息蕊唐。

我們在看一下調(diào)用的sendStanza方法是在哪里執(zhí)行的屋摔。
其實sendStanza是XMPPConnection接口的方法,我們用到的XMPPTCPConnection中沒實現(xiàn),在抽象類AbstractXMPPConnection中進行了實現(xiàn)

Override
public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
    Objects.requireNonNull(stanza, "Stanza must not be null");
    assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;

    throwNotConnectedExceptionIfAppropriate();
    switch (fromMode) {
    case OMITTED:
        stanza.setFrom((Jid) null);
        break;
    case USER:
        stanza.setFrom(getUser());
        break;
    case UNCHANGED:
    default:
        break;
    }
    
    //設(shè)置攔截器
    Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
    //發(fā)送Stanza
    sendStanzaInternal(stanzaAfterInterceptors);
}

可以看到這個方法主要做了2個重要事情:1.設(shè)置攔截器替梨。2.發(fā)送Stanza钓试。我們關(guān)注下sendStanzaInternal方法。

sendStanzaInternal是AbstractXMPPConnection類的一個抽象方法,我們用的是XMPPTCPConnection,所以具體實現(xiàn)查看XMPPTCPConnection中的sendStanzaInternal方法耙替。

protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException;
Override
protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
    //消息寫入
    packetWriter.sendStreamElement(packet);
    if (isSmEnabled()) {
        for (StanzaFilter requestAckPredicate : requestAckPredicates) {
            if (requestAckPredicate.accept(packet)) {
                requestSmAcknowledgementInternal();
                break;
            }
        }
    }
}

看到packetWriter是不是似曾相識亚侠?是的,這個就是我們之前講解的連接成功后初始化在內(nèi)存的那個packetWriter俗扇。

我們看看packetWriter中的sendStreamElement做了什么操作硝烂。

private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
                        QUEUE_SIZE, true);

protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
    throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
    try {
        //存入隊列
        queue.put(element);
    }
    catch (InterruptedException e) {
        // put() may throw an InterruptedException for two reasons:
        // 1. If the queue was shut down
        // 2. If the thread was interrupted
        // so we have to check which is the case
        throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
        // If the method above did not throw, then the sending thread was interrupted
        throw e;
    }
}

可以看見我們發(fā)送的報文存在了隊列中,最終寫入的方法是在writePackets里铜幽。writePackets方法中會從隊列中獲取packet滞谢,最終其實是PacketWriter類中writer進行了寫入操作。PacketWriter和writer在前面smack連接的時候已講解過除抛,這里不再細述狮杨,下面貼出writePackets代碼:


writePackets

小結(jié):
消息發(fā)送我們是用了Smack擴展中的ChatManager進行的,在ChatManager中會保存Chat對象到忽,在我們發(fā)送消息時根據(jù)Jid來獲取到chat對象橄教,在消息發(fā)送時調(diào)用chat.send(),其實最終的實現(xiàn)是在我們的實現(xiàn)類XMPPTCPConnection sendStanzaInternal中喘漏,sendStanzaInternal會調(diào)用packetWriter的sendStreamElement方法护蝶,最終在PacketWriter中sendStreamElement方法會把報文寫入自己的內(nèi)存隊列中,在writePackets方法中取出進行發(fā)送翩迈,最后是PacketWriter類中的writer流進行的寫入操作持灰。

<h1 id="4">4. 消息的接收</h1>

第一步: 添加監(jiān)聽IncomingChatMessageListener
這里監(jiān)聽設(shè)置在smack-extensions Moudle中ChatManager中

chatManager.addIncomingListener(messageListener);
public interface IncomingChatMessageListener {

    void newIncomingMessage(EntityBareJid from, Message message, Chat chat);

}
public final class ChatManager extends Manager {

    private final Set<IncomingChatMessageListener> incomingListeners = new CopyOnWriteArraySet<>();

    private final AsyncButOrdered<Chat> asyncButOrdered = new AsyncButOrdered<>();

    private ChatManager(final XMPPConnection connection) {
        super(connection);
        
        //1.設(shè)置XMPPConnection 的監(jiān)聽
        connection.addSyncStanzaListener(new StanzaListener() {
            @Override
            public void processStanza(Stanza stanza) {
                final Message message = (Message) stanza;
                if (!shouldAcceptMessage(message)) {
                    return;
                }

                final Jid from = message.getFrom();
                final EntityFullJid fullFrom = from.asEntityFullJidOrThrow();
                final EntityBareJid bareFrom = fullFrom.asEntityBareJid();
                final Chat chat = chatWith(bareFrom);
                chat.lockedResource = fullFrom;
                
                //2.這里所有chat都是保存在asyncButOrdered中,監(jiān)聽是在隊列里排隊回調(diào)负饲。
                asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
                    @Override
                    public void run() {
                        //3.回調(diào)incomingListeners
                        for (IncomingChatMessageListener listener : incomingListeners) {
                            listener.newIncomingMessage(bareFrom, message, chat);
                        }
                    }
                });

            }
        }, INCOMING_MESSAGE_FILTER);
        
        //略...
    }
    
    //...
}

我們首先分析里面第二步:
監(jiān)聽到時執(zhí)行的runnable堤魁。asyncButOrdered.performAsyncButOrdered喂链,asyncButOrdered源碼這里就不貼出來了。

每個chat對象回調(diào)監(jiān)聽時都會在AsyncButOrdered 類中存儲一個Map<K, Queue<Runnable>>妥泉,這里 K 為 chat椭微。
然后進行包裝存儲在成員變量 Map<K, Handler> threadActiveMap = new HashMap<>() 中。
這里的Handle 是AsyncButOrdered的一個內(nèi)部類盲链,然后執(zhí)行run方法赏表。

private class Handler implements Runnable {
        private final Queue<Runnable> keyQueue;
        private final K key;
        
        Handler(Queue<Runnable> keyQueue, K key) {
            this.keyQueue = keyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            //略。。碰纬。
            runnable.run();
        }
}

最后回到第三步:

asyncButOrdered.performAsyncButOrdered(chat, new Runnable() {
    @Override
    public void run() {
        //3.回調(diào)incomingListeners
        for (IncomingChatMessageListener listener : incomingListeners) {
            listener.newIncomingMessage(bareFrom, message, chat);
        }
    }
});

這里不難看出為什么要這么做往衷,因為上層可能會有很多監(jiān)聽過來,回調(diào)的調(diào)用其實也消息同樣效拭,量大。并且會有很多不同的chat對象監(jiān)聽返回,smack還進行了區(qū)分保存不同的chat回調(diào)火架,并在回調(diào)中第三個參數(shù)中返回給調(diào)用者了。
并且這個回調(diào)是異步的忙菠,smack在內(nèi)部維護了一個線程池進行處理何鸡。

好,現(xiàn)在我們在回到第一步監(jiān)聽是如何在connection中回調(diào)的牛欢。

第一步:設(shè)置XMPPConnection 的監(jiān)聽
XMPPConnection

void addSyncStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter);

AbstractXMPPConnection

private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();

@Override
public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
    if (packetListener == null) {
        throw new NullPointerException("Packet listener is null.");
    }
    ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
    synchronized (syncRecvListeners) {
        //添加Listener
        syncRecvListeners.put(packetListener, wrapper);
    }
}

最終實現(xiàn)是在AbstractXMPPConnection類中實現(xiàn)的骡男。
可以看到保存的是一個ListenerWrapper包裝類,第一個參數(shù)是回調(diào)監(jiān)聽傍睹,第二個參數(shù)是我們傳入的監(jiān)聽過濾器隔盛。
現(xiàn)在我們需要看的是那些地方調(diào)用了成員變量syncRecvListeners中的回調(diào)。

我們這里檢索查看是在AbstractXMPPConnection類中invokeStanzaCollectorsAndNotifyRecvListeners方法中進行了調(diào)用

invokeStanzaCollectorsAndNotifyRecvListeners()

protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {

    listenersToNotify.clear();
    //1.如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
    extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);

    //2.
    ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
        @Override
        public void run() {

            Iterator<StanzaListener> it = listenersToNotify.iterator();
            synchronized (syncRecvListeners) {
                while (it.hasNext()) {
                    StanzaListener stanzaListener = it.next();
                    if (!syncRecvListeners.containsKey(stanzaListener)) {
                        it.remove();
                    }
                }
            }
            for (StanzaListener listener : listenersToNotify) {
                try {
                    listener.processStanza(packet);
                } catch (NotConnectedException e) {
                    LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
                    break;
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
                }
            }
        }
    });

}

第一步:如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
extractMatchingListeners

private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
                Collection<StanzaListener> listenersToNotify) {
    synchronized (listeners) {
        for (ListenerWrapper listenerWrapper : listeners.values()) {
            if (listenerWrapper.filterMatches(stanza)) {//過濾
                //加入外部拾稳。
                listenersToNotify.add(listenerWrapper.getListener());
            }
        }
    }
}
private final StanzaFilter packetFilter;

public boolean filterMatches(Stanza packet) {
       return packetFilter == null || packetFilter.accept(packet);
}

這里過濾調(diào)用的是包裝類傳入的那個過濾器吮炕。并通過accept(packet)接口進行過濾。好访得,我們現(xiàn)在在回過頭看看這個過濾器傳入的啥龙亲?
還記得我們之前在ChatManager addSyncStanzaListener() 方法的第二個參數(shù)嗎?

private static final StanzaFilter INCOMING_MESSAGE_FILTER = new AndFilter(
                    MESSAGE_FILTER,
                    FromTypeFilter.ENTITY_FULL_JID
                    );

注意:這個過濾器是一層一層的悍抑,可以層層過濾鳄炉,所以它的構(gòu)造過程也是一層一層的,接下來不要眨眼传趾。

public class AndFilter extends AbstractListFilter implements StanzaFilter {


    public AndFilter() {
        super();
    }


    public AndFilter(StanzaFilter... filters) {
        super(filters);
    }

    @Override
    public boolean accept(Stanza packet) {
        for (StanzaFilter filter : filters) {
            if (!filter.accept(packet)) {
                return false;
            }
        }
        return true;
    }

}

可以看見這里傳入的構(gòu)造傳入的參數(shù)是一個可變參數(shù)迎膜,他的accept方法調(diào)用了StanzaFilter 過濾器的accept方法。
StanzaFilter 其實是一個接口accept是接口 方法浆兰,所以具體實現(xiàn)是在其實現(xiàn)類中

我們這里傳入了2過濾器磕仅,這里只拿出每個構(gòu)造的第一個作為講解珊豹。

    private static final StanzaFilter MESSAGE_FILTER = new AndFilter(
                    MessageTypeFilter.NORMAL_OR_CHAT,
                    new OrFilter(MessageWithBodiesFilter.INSTANCE, new StanzaExtensionFilter(XHTMLExtension.ELEMENT, XHTMLExtension.NAMESPACE))
                    );

public static final StanzaFilter NORMAL_OR_CHAT = new OrFilter(NORMAL, CHAT);
public static final StanzaFilter NORMAL = new MessageTypeFilter(Type.normal);

最后定格在MessageTypeFilter中。

public final class MessageTypeFilter extends FlexibleStanzaTypeFilter<Message> {

    private final Message.Type type;

    private MessageTypeFilter(Message.Type type) {
        super(Message.class);
        this.type = type;
    }

    @Override
    protected boolean acceptSpecific(Message message) {
        return message.getType() == type;
    }

    @Override
    public String toString() {
        return getClass().getSimpleName() + ": type=" + type;
    }
}

沒有看見accept方法伴哦店茶?客官,別急嘛劫恒!看看他的父類中是否有贩幻。

public abstract class FlexibleStanzaTypeFilter<S extends Stanza> implements StanzaFilter {

    protected final Class<S> stanzaType;

    public FlexibleStanzaTypeFilter(Class<S> packetType) {
        this.stanzaType = Objects.requireNonNull(packetType, "Type must not be null");
    }

    @SuppressWarnings("unchecked")
    public FlexibleStanzaTypeFilter() {
        stanzaType = (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    @Override
    @SuppressWarnings("unchecked")
    public final boolean accept(Stanza packet) {
        if (stanzaType.isInstance(packet)) {//1.驗證消息class是否匹配
            return acceptSpecific((S) packet);//2.實現(xiàn)類自己的過濾
        }
        return false;
    }

    protected abstract boolean acceptSpecific(S packet);

    @Override
    public String toString() {
        return getClass().getSimpleName() + ": " + stanzaType.toString();
    }
}

果然在父類中看見了accept方法。
1.處:調(diào)用了stanzaType.isInstance(packet),這里可以看見stanzaType就是傳入的泛型類型两嘴。我們在MessageTypeFilter 這里傳的是super(Message.class); 所以這個過濾就是過濾到不是Message消息的報文丛楚。
2.處:進行2次過濾acceptSpecific()方法,這里他的子類實現(xiàn)是MessageTypeFilter憔辫。
可以看見條件是message.getType() == type趣些,所以消息Type也要與傳入的type進行匹配。

這里過濾器是如何工作的就講解完了贰您。
好了坏平,現(xiàn)在可以眨眼了。

回過頭我們繼續(xù)看過濾后的監(jiān)聽操作 [ invokeStanzaCollectorsAndNotifyRecvListeners]

protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {

    listenersToNotify.clear();
    //1.如果用戶設(shè)置過濾器,則通過過濾消息類型,返回上層監(jiān)聽
    extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);

    //2.
    ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
        @Override
        public void run() {

            Iterator<StanzaListener> it = listenersToNotify.iterator();
            synchronized (syncRecvListeners) {
                while (it.hasNext()) {
                    StanzaListener stanzaListener = it.next();
                    if (!syncRecvListeners.containsKey(stanzaListener)) {
                        it.remove();
                    }
                }
            }
            //3.回調(diào)監(jiān)聽
            for (StanzaListener listener : listenersToNotify) {
                try {
                    listener.processStanza(packet);
                } catch (NotConnectedException e) {
                    LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
                    break;
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
                }
            }
        }
    });

}

extractMatchingListeners方法傳入的packet 通過過濾后得到符合條件的監(jiān)聽listenersToNotify 對他們進行回調(diào)锦亦。

2.處:ASYNC_BUT_ORDERED.performAsyncButOrdered
這里就不在做講解了舶替,跟之前ChatManager回調(diào)監(jiān)聽時處理是一樣的,只不過這里hashMap中存儲的key為XMPPConnection杠园。
3.處:進行回調(diào)的監(jiān)聽顾瞪。

好,現(xiàn)在監(jiān)聽回調(diào)的調(diào)用地方算是已經(jīng)找到了返劲,但是我們想繼續(xù)看看invokeStanzaCollectorsAndNotifyRecvListeners是在哪里調(diào)用過來的×崦粒現(xiàn)在就一步一步往上面推。

檢索發(fā)有2地方進行調(diào)用篮绿,這里我們的實現(xiàn)類是繼承AbstractXMPPConnection類的孵延,所以查看AbstractXMPPConnection類中調(diào)用invokeStanzaCollectorsAndNotifyRecvListeners的方法

protected void processStanza(final Stanza stanza) throws InterruptedException {
    assert stanza != null;

    final SmackDebugger debugger = this.debugger;
    if (debugger != null) {
        debugger.onIncomingStreamElement(stanza);
    }

    lastStanzaReceived = System.currentTimeMillis();
    // 將傳入的數(shù)據(jù)包傳遞給偵聽器
    invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
}

然后在向上檢索在AbstractXMPPConnection類中

protected void parseAndProcessStanza(XmlPullParser parser)
                throws XmlPullParserException, IOException, InterruptedException {
    ParserUtils.assertAtStartTag(parser);
    int parserDepth = parser.getDepth();
    Stanza stanza = null;
    try {
        stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment);
    }
    catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) {
        CharSequence content = PacketParserUtils.parseContentDepth(parser,
                        parserDepth);
        UnparseableStanza message = new UnparseableStanza(content, e);
        ParsingExceptionCallback callback = getParsingExceptionCallback();
        if (callback != null) {
            callback.handleUnparsableStanza(message);
        }
    }
    ParserUtils.assertAtEndTag(parser);
    if (stanza != null) {
        //1.調(diào)用
        processStanza(stanza);
    }
}

可以看到此方法把XmlPullParser解析的內(nèi)容封裝成stanza并調(diào)用了processStanza方法。

然后在向上面推亲配,看哪里調(diào)用的parseAndProcessStanza方法尘应。
檢索發(fā)現(xiàn)有多處調(diào)用,這里我們的實現(xiàn)是XMPPTCPConnection類吼虎。


parsePackets

還記得我們之前連接成功后初始化的XMPPTCPConnection的內(nèi)部類PacketReader嗎犬钢?
對的,在他的parsePackets方法中思灰,解析了報文的節(jié)點玷犹,然后進行了調(diào)用過程parseAndProcessStanza()

小結(jié):
其他監(jiān)聽其實也是一樣的,比如:群聊的監(jiān)聽洒疚。
當(dāng)消息過來時歹颓,都是通過用戶設(shè)置的過濾器過濾符合用戶需要的監(jiān)聽進行回調(diào)坯屿。他們在處理過濾時的方法都是在 invokeStanzaCollectorsAndNotifyRecvListeners方法中進行的處理

到此,本文smack簡單的解析就結(jié)束了巍扛,這里也是自己的一些小的見解领跛,可能會有不完善和不正確的地方,望大家覺得有不準確的地方撤奸,歡迎指出并一起探討吠昭。

郵箱:soarsy@163.com

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市胧瓜,隨后出現(xiàn)的幾起案子矢棚,更是在濱河造成了極大的恐慌,老刑警劉巖府喳,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件幻妓,死亡現(xiàn)場離奇詭異,居然都是意外死亡劫拢,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門强胰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舱沧,“玉大人,你說我怎么就攤上這事偶洋∈炖簦” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵玄窝,是天一觀的道長牵寺。 經(jīng)常有香客問我,道長恩脂,這世上最難降的妖魔是什么帽氓? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮俩块,結(jié)果婚禮上黎休,老公的妹妹穿的比我還像新娘。我一直安慰自己玉凯,他們只是感情好势腮,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著漫仆,像睡著了一般捎拯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上盲厌,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天署照,我揣著相機與錄音祸泪,去河邊找鬼。 笑死藤树,一個胖子當(dāng)著我的面吹牛浴滴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播岁钓,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼升略,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了屡限?” 一聲冷哼從身側(cè)響起品嚣,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎钧大,沒想到半個月后翰撑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡啊央,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年眶诈,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓜饥。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡逝撬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乓土,到底是詐尸還是另有隱情宪潮,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布趣苏,位于F島的核電站狡相,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏食磕。R本人自食惡果不足惜尽棕,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望彬伦。 院中可真熱鬧萄金,春花似錦、人聲如沸媚朦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽询张。三九已至孙乖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唯袄。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工弯屈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人恋拷。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓资厉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蔬顾。 傳聞我的和親對象是個殘疾皇子宴偿,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容