Fescar example解析 - TM發(fā)送邏輯

開篇

?這篇文章的目的主要是理清楚Fescar的TM發(fā)送部分的邏輯蓝牲,從時序圖和源碼兩個層面進行分析。

?文章中間會解答兩個自己閱讀代碼中遇到的困惑(估計大部分人看代碼的時候也會遇到這個困惑)泰讽,包括TmRpcClient的初始化過程配置加載過程例衍。

?文章的最后會附上GlobalAction相關(guān)Request的類關(guān)系圖,便于理解依賴關(guān)系已卸。

?

Fescar TM發(fā)送流程

TM Sender.jpg

說明:

  • 1.DefaultGlobalTransaction執(zhí)行begin/commit/rollback等調(diào)用DefaultTransactionManager佛玄。

  • 2.DefaultTransactionManager內(nèi)部調(diào)用syncCall()方法,進而調(diào)用TmRpcClient的sendMsgWithResponse()方法累澡。

  • 3.TmRpcClient調(diào)用父類AbstractRpcRemoting的sendAsyncRequest()方法構(gòu)建發(fā)送隊列梦抢。

  • 4.AbstractRpcRemotingClient的MergedSendRunnable線程消費發(fā)送隊列構(gòu)建MergedWarpMessage調(diào)用sendRequest發(fā)送。

  • 5.sendRequest()方法內(nèi)部調(diào)用writeAndFlush完成消息發(fā)送愧哟。



TmRcpClient

說明:

  • TmRpcClient的類依賴關(guān)系圖如上奥吩。

  • TmRpcClient繼承自AbstractRpcRemotingClient類。


Fescar TM發(fā)送源碼分析

public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    /**
     * Get transaction manager.
     *
     * @return the transaction manager
     */
    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
            throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

說明:

  • DefaultTransactionManager的beigin/commit/rollback方法內(nèi)部最終調(diào)用syncCall()方法蕊梧。

  • syncCall方法內(nèi)部執(zhí)行TmRpcClient.getInstance().sendMsgWithResponse(request)調(diào)用TmRpcClient方法霞赫。


public final class TmRpcClient extends AbstractRpcRemotingClient {
    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
        throws TimeoutException {
        return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
    }
}

說明:

  • TmRpcClient內(nèi)部執(zhí)行發(fā)送sendMsgWithResponse調(diào)用sendAsyncRequestWithResponse。
  • sendAsyncRequestWithResponse的實現(xiàn)在父類AbstractRpcRemoting當(dāng)中肥矢。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
        TimeoutException {
        if (timeout <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return sendAsyncRequest(address, channel, msg, timeout);
    }

    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
        throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }

        // 構(gòu)建RpcMessage對象
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(RpcMessage.getNextMessageId());
        rpcMessage.setAsync(false);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);

        // 通過MessageFuture包裝實現(xiàn)超時
        final MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeout);
        futures.put(rpcMessage.getId(), messageFuture);

        // 測試代碼走的是這個分支
        if (address != null) {
            // 根據(jù)address進行hash放置到不同的Map當(dāng)中
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
            BlockingQueue<RpcMessage> basket = map.get(address);
            if (basket == null) {
                map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
                basket = map.get(address);
            }
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }

            // 發(fā)送其實是另外一個線程單獨執(zhí)行發(fā)送操作的
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
        } else {
            ChannelFuture future;
            channelWriteableCheck(channel, msg);
            future = channel.writeAndFlush(rpcMessage);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        destroyChannel(future.channel());
                    }
                }
            });
        }

        // 通過Future實現(xiàn)限時超時機制
        if (timeout > 0) {
            try {
                return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
        } else {
            return null;
        }
    }
}

說明:

  • 構(gòu)建RpcMessage對象端衰,包裝Request。
  • 構(gòu)建MessageFuture對象甘改,包裝RpcMessage旅东,實現(xiàn)超時等待功能。
  • 通過basket進行分桶操作十艾,真正執(zhí)行發(fā)送的代碼在AbstractRpcRemotingClient類的MergedSendRunnable抵代。
  • Request的發(fā)送類似生成消費者模型,上述代碼只是生產(chǎn)者部分疟羹。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public class MergedSendRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {}
                }
                isSending = true;
                for (String address : basketMap.keySet()) {
                    BlockingQueue<RpcMessage> basket = basketMap.get(address);
                    if (basket.isEmpty()) { continue; }

                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = connect(address);
                    try {
                        sendRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
                            && address != null) {
                            destroyChannel(address, sendChannel);
                        }
                        LOGGER.error("", "client merge call failed", e);
                    }
                }
                isSending = false;
            }
        }
}

說明:

  • MergedSendRunnable 負責(zé)消費待發(fā)送消息體并組裝成MergedWarpMessage對象主守。
  • sendRequest()方法內(nèi)部將MergedWarpMessage再次包裝成RpcMessage進行發(fā)送。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected void sendRequest(Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);
        rpcMessage.setId(RpcMessage.getNextMessageId());
        if (msg instanceof MergeMessage) {
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
        }
        channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush(rpcMessage);
    }
}

說明:

  • RpcMessage再次包裝MergeMessage進行發(fā)送榄融。


TmRpcClient初始化

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {

    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
                                    FailureHandler failureHandlerHook) {
        setOrder(ORDER_NUM);
        setProxyTargetClass(true);
        this.applicationId = applicationId;
        this.txServiceGroup = txServiceGroup;
        this.mode = mode;
        this.failureHandlerHook = failureHandlerHook;
    }

    private void initClient() {

        TMClient.init(applicationId, txServiceGroup);
        if ((AT_MODE & mode) > 0) {
            RMClientAT.init(applicationId, txServiceGroup);
        }
    }

    public void afterPropertiesSet() {
        initClient();
    }
}

說明:

  • GlobalTransactionScanner的構(gòu)造函數(shù)執(zhí)行后執(zhí)行afterPropertiesSet并執(zhí)行initClient()操作参淫。
  • initClient()內(nèi)部執(zhí)行TMClient.init(applicationId, txServiceGroup)進行TMClient的初始化。


public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(
                 applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }
}

public final class TmRpcClient extends AbstractRpcRemotingClient {
    public void init() {
        if (initialized.compareAndSet(false, true)) {
            init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
        }
    }


    public void init(long healthCheckDelay, long healthCheckPeriod) {
        // 注意initVars()方法
        initVars();

        ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
           MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
           KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, 
           new LinkedBlockingQueue<Runnable>(),
           new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), 
                        MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    reconnect();
                } catch (Exception ignore) {
                    LOGGER.error(ignore.getMessage());
                }
            }
        }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
    }


    private void initVars() {
        enableDegrade = CONFIG.getBoolean(
        ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
        super.init();
    }
}

說明:

  • 核心在于關(guān)注initVars()方法愧杯。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        // 核心構(gòu)建發(fā)送的對象的連接池
        nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig());
        serviceManager = new ServiceManagerStaticConfigImpl();
        super.init();
    }
}


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
    public void init() {
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

                for (MessageFuture future : futures.values()) {
                    if (future.isTimeout()) {
                        timeoutMessageFutures.add(future);
                    }
                }

                for (MessageFuture messageFuture : timeoutMessageFutures) {
                    futures.remove(messageFuture.getRequestMessage().getId());
                    messageFuture.setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                    }
                }
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }
}

說明:

  • AbstractRpcRemotingClient的init()方法核心構(gòu)建nettyClientKeyPool工廠涎才。
  • nettyClientKeyPool用于獲取連接TC的對象的工廠池痹屹。


配置加載分析

Config
public class FileConfiguration implements Configuration {

    private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);

    private static final Config CONFIG = ConfigFactory.load();
}


package com.typesafe.config;
public final class ConfigFactory {
    private ConfigFactory() {
    }

    public static Config load() {
        return load(ConfigParseOptions.defaults());
    }
}

說明:

  • 配置加載使用了JAVA 配置管理庫 typesafe.config
  • 默認加載classpath下的application.conf,application.json和application.properties文件纷跛。通過ConfigFactory.load()加載。

Request的類關(guān)系圖

GlobalActionRequest.png


Fescar源碼分析連載

Fescar 源碼解析系列

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市批钠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌梅掠,老刑警劉巖圆存,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異伴挚,居然都是意外死亡靶衍,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門茎芋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來颅眶,“玉大人,你說我怎么就攤上這事田弥√涡铮” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵偷厦,是天一觀的道長商叹。 經(jīng)常有香客問我,道長沪哺,這世上最難降的妖魔是什么沈自? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮辜妓,結(jié)果婚禮上枯途,老公的妹妹穿的比我還像新娘。我一直安慰自己籍滴,他們只是感情好酪夷,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著孽惰,像睡著了一般晚岭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上勋功,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天坦报,我揣著相機與錄音,去河邊找鬼狂鞋。 笑死片择,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的骚揍。 我是一名探鬼主播字管,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼啰挪,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嘲叔?” 一聲冷哼從身側(cè)響起亡呵,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎硫戈,沒想到半個月后锰什,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡掏愁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年歇由,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片果港。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖糊昙,靈堂內(nèi)的尸體忽然破棺而出辛掠,到底是詐尸還是另有隱情,我是刑警寧澤释牺,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布萝衩,位于F島的核電站,受9級特大地震影響没咙,放射性物質(zhì)發(fā)生泄漏猩谊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一祭刚、第九天 我趴在偏房一處隱蔽的房頂上張望牌捷。 院中可真熱鬧,春花似錦涡驮、人聲如沸暗甥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撤防。三九已至,卻和暖如春棒口,著一層夾襖步出監(jiān)牢的瞬間寄月,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工无牵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留漾肮,地道東北人。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓合敦,卻偏偏與公主長得像初橘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴謹 對...
    cosWriter閱讀 11,111評論 1 32
  • 【Android 庫 Glide】 引用 Android圖片加載框架最全解析(一)保檐,Glide的基本用法Andro...
    Rtia閱讀 5,454評論 0 22
  • 今天正式入職第一天耕蝉,接手前任同時留下來的工作。好在夜只,一切都很順手垒在,在此不得不感謝給我交接工作的曉君,認真細心耐心扔亥。...
    雪碧君阿歐尼醬閱讀 361評論 0 0
  • 愿有生之年 只訴溫暖 不言殤 傾心相遇 安暖相陪 彌尚風(fēng)格婚紗攝影2018全新預(yù)約表
    時光磨憶_4aa9閱讀 254評論 0 0
  • 最近興起的各種波浪柒瓣,挺嚇人儒搭。禁不住要問:世上像樣點的男人中,到底有沒有能管得住下半身的芙贫? 女人也不得了搂鲫。進了婚姻,...
    3e05348ffdd4閱讀 242評論 0 4