HBase 1.2.0源碼系列:HBase RPC 通信(上)

HBase 中 HMaster、HRegionServer 和 Client 之間的通信使用了兩個(gè)技術(shù)区赵,Google Protobuf RPC 和 Java NIO郁副。

主要代碼位置:

.
|—— hbase-client
|    |—— org.apache.hadoop.hbase.ipc
|    |—— org.apache.hadoop.hbase.client
|—— hbase-server
|    |—— org.apache.hadoop.hbase.ipc
|    |—— org.apache.hadoop.hbase.master
|    |—— org.apache.hadoop.hbase.regionserver
|—— hbase-protocal 
|    |—— org.apache.hadoop.hbase.protobuf.generated
...

HBase RPC

什么是 PRC

RPC(Remote Procedure Call)即遠(yuǎn)程過程調(diào)用讨盒。對于本地調(diào)用,定義好一個(gè)函數(shù)以后愈案,程序的其他部分通過調(diào)用該函數(shù)挺尾,就可以返回想要的結(jié)果。RPC 的區(qū)別就是函數(shù)定義和函數(shù)調(diào)用位于不同的機(jī)器(大部分情況)站绪,因?yàn)樯婕暗讲煌臋C(jī)器遭铺,所以 RPC 相比較本地函數(shù)調(diào)用多了通信部分。主要涉及到兩個(gè)角色:調(diào)用方(client)和函數(shù)定義實(shí)現(xiàn)(server),RPC 調(diào)用的流程如下面圖所示:

RPC

HBase Server 端主要類關(guān)系:

[圖片上傳失敗...(image-6f67b9-1560062040880)]

HMaster 繼承 HRegionServer魂挂,rpcService 提供 RPC Server 端實(shí)現(xiàn)(HRegionServier 中由 RSRpcService 實(shí)現(xiàn)甫题,HMaster 中由 MasterRpcService 實(shí)現(xiàn)),rpcServer 是具體的 RPC Server 實(shí)現(xiàn)(實(shí)現(xiàn) RpcServerInterface 接口)涂召,Listener 線程負(fù)責(zé)監(jiān)聽請求坠非,Resonder 線程負(fù)責(zé)發(fā)送請求結(jié)果。

HBase Client 端主要類關(guān)系:

HBase Client

HBase Client 訪問 HBase 需要先創(chuàng)建 HConnection果正,Connection 中的 rpcClient(RpcClient 接口炎码,RpcClientImpl 是實(shí)現(xiàn)類)表示 Rpc Client 端實(shí)現(xiàn),由 RpcClientFactory 創(chuàng)建秋泳。

RPC 初始化

在 HRegionServer 啟動類的源碼中潦闲,有以下代碼,分別初始化 RpcServer 和 RpcClient:

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
      throws IOException, InterruptedException {
    // ...
    
    rpcServices = createRpcServices();
    
    // ...
}

private void initializeThreads() {
    // ...
    
    rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
        rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());    
    
    // ...
}

RPC Server

先來看以下 RPC Server 端的一些實(shí)現(xiàn)轮锥,從一些重要類的初始化開始

初始化

createRpcServices()

createRpcServices() 方法是初始化 RPC Server 端實(shí)現(xiàn)的入口:

class HRegionServer {
    protected RSRpcServices createRpcServices() throws IOException {
        return new RSRpcServices(this);
    }
}

class HMaseter implements HRegionServer {
    @Override
    protected RSRpcServices createRpcServices() throws IOException {
        return new MasterRpcServices(this);
    }
}

RSRpcServices#Constructor

MasterRpcServices 的構(gòu)造方法調(diào)用父類 RSRpcServices 的構(gòu)造方法:

public MasterRpcServices(HMaster m) throws IOException {
    super(m);
    // set HMaster
    master = m;
}
  
public RSRpcServices(HRegionServer rs) throws IOException {
    // set HRegionServer
    regionServer = rs;

    // 初始化 RpcSchedulerFactory
    // 反射 hbase.region.server.rpc.scheduler.factory.class 指定的類
    // 默認(rèn)使用 SimpleRpcSchedulerFactory
    RpcSchedulerFactory rpcSchedulerFactory = ...

    // ...
    
    // 優(yōu)先級函數(shù)(調(diào)度請求時(shí)使用)
    priority = createPriority();
    
    // 設(shè)置訪問時(shí)的重試次數(shù)
    ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
    
    // 初始化 RpcServer
    try {
        rpcServer = new RpcServer(rs, name, getServices(),
            bindAddress,
            rs.conf,
            rpcSchedulerFactory.create(rs.conf, this, rs));
    } catch (BindException be) {
        // throw Execption
    }

    // 初始化配置信息
    // hbase.client.scanner.timeout.period  # Client 端 Scan Timeout 
    // hbase.server.scanner.max.result.size Scan    # Scan 獲取的最大條目數(shù) 
    // hbase.rpc.timeout    # RPC 處理請求 Timeout 時(shí)間
    // hbase.region.server.rpc.minimum.scan.time.limit.delta    #
    scannerLeaseTimeoutPeriod = rs.conf.getInt(...);
    maxScannerResultSize = rs.conf.getLong(...);
    rpcTimeout = rs.conf.getInt(...);
    minimumScanTimeLimitDelta = rs.conf.getLong(...);

}

RpcServer#Constructor

RpcServer 實(shí)現(xiàn)了 RpcServerInterface 接口矫钓,構(gòu)造函數(shù):

public RpcServer(final Server server, final String name,
      final List<BlockingServiceAndInterface> services,
      final InetSocketAddress bindAddress, Configuration conf,
      RpcScheduler scheduler)
      throws IOException {
    
    // 初始化屬性 ...

    // 設(shè)置 Listener
    listener = new Listener(name);

    // 設(shè)置 Responder
    responder = new Responder();

    // 設(shè)置 Scheduler(調(diào)用方通過 RpcSchedulerFactory 創(chuàng)建)
    this.scheduler = scheduler;
    this.scheduler.init(new RpcSchedulerContext(this));
}

主要處理角色

Rpc Server 監(jiān)控、讀取舍杜、請求基于 Reactor 模式新娜,主要流程如下圖:


image

Listener

Listener 負(fù)責(zé)監(jiān)聽請求,對于獲取到的請求既绩,交由 Reader 負(fù)責(zé)讀雀帕洹:

private class Listener extends Thread {

    private ServerSocketChannel acceptChannel = null; 
    private Selector selector = null; 
    private Reader[] readers = null;
    private ExecutorService readPool;

    public Listener(final String name) throws IOException {
        // ...

        // 創(chuàng)建非阻塞的 ServerSocketChannel
        acceptChannel = ServerSocketChannel.open();
        acceptChannel.configureBlocking(false);
        
        // 綁定 Socket 到 RpcServer#bingAddress
        bind(acceptChannel.socket(), bindAddress, backlogLength);
        
        // 創(chuàng)建 selector
        selector = Selector.open();

        // 初始化 Reader ThreadPool
        readers = new Reader[readThreads];
        readPool = Executors.newFixedThreadPool(readThreads,
            new ThreadFactoryBuilder()
                .setNameFormat(...)
                .setDaemon(true)
                .build());
        
        for (int i = 0; i < readThreads; ++i) {
            Reader reader = new Reader();
            readers[i] = reader;
            readPool.execute(reader);
        }

        // 注冊 selector
        acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
}

Reader

處理請求的邏輯在 Reader 中,生成 Call 對象交由 RpcSchedule 進(jìn)行分發(fā)

private class Reader implements Runnable {
    private final Selector readSelector;

    @Override
    public void run() {
        try {
            doRunLoop();
        } finally {
            // close readSelector
        }
    }

    private synchronized void doRunLoop() {
        while (running) {
            try {
                // 線程阻塞饲握,知道有請求到來
                readSelector.select();
                
                Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isValid()) {
                        if (key.isReadable()) {
                            // 請求有效私杜,進(jìn)行處理
                            doRead(key);
                        }
                    }
                }
            } catch (Exception e) {
                // ...
            }
        }
    }
}

doRead() 方法在 Listener 中,由 Connection 對象處理救欧,生成 Call衰粹,并包裝為 CallRunner 交給 Scheduler

class Listener {
    void doRead(SelectionKey key) throws InterruptedException {
        Connection c = (Connection) key.attachment();
    
        try {
            count = c.readAndProcess();
        } catch (Exception e) {
            // ...
        }
    }
}

class Connection {
    protected void processRequest(byte[] buf) throws IOException, InterruptedException {
        // ...
        
        // Dispatches an RPC request asynchronously
        if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
            
            // ...
            responder.doRespond(call);
        }
    }
}

Scheduler

Scheduler 是一個(gè)生產(chǎn)者消費(fèi)者模型,內(nèi)部有一個(gè)隊(duì)列緩存請求笆怠,另外有一些線程負(fù)責(zé)從隊(duì)列中拉取消息進(jìn)行分發(fā)

Scheduler 默認(rèn)實(shí)現(xiàn)為 SimpleRpcScheduler(HBase 提供的另一種實(shí)現(xiàn)為 FifoRpcScheduler)铝耻,包含三個(gè) RpcExecutor(callExecutor、priorityExecutor蹬刷、replicationExecutor)

public class SimpleRpcScheduler extends RpcScheduler {

    /**
     * callExecutor = RWQueueRpcExecutor or BalancedQueueRpcExecutor
     * priorityExecutor = BalancedQueueRpcExecutor or NULL
     * replicationExecutor = BalancedQueueRpcExecutor or NULL
     **/
    private final RpcExecutor callExecutor;
    private final RpcExecutor priorityExecutor;
    private final RpcExecutor replicationExecutor;
    
    // 分發(fā)請求
    @Override
    public boolean dispatch(CallRunner callTask) throws InterruptedException {
        
        // 選擇不同的 Executor 處理
        // 大部分基于的請求都是通過 callExecutor 來執(zhí)行
        if (priorityExecutor != null && level > highPriorityLevel) {
            return priorityExecutor.dispatch(callTask);
        } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
            return replicationExecutor.dispatch(callTask);
        } else {
            return callExecutor.dispatch(callTask);
        }
    }
}

RpcExecutor

阻塞隊(duì)列

RpcExecutor 的實(shí)現(xiàn)類 RWQueueRpcExecutor 使用阻塞隊(duì)列緩存消息(BalancedQueueRpcExecutor 實(shí)現(xiàn)類似):

class RWQueueRpcExecutor extends RpcExecutor {
    private final List<BlockingQueue<CallRunner>> queues;
    
    @Override
    public boolean dispatch(final CallRunner callTask) throws InterruptedException {
        // 進(jìn)入隊(duì)列
        return queues.get(queueIndex).offer(callTask);
    }
}
消費(fèi)線程

RpcExecutor 處理的具體邏輯在 consumerLoop() 方法中瓢捉,從阻塞隊(duì)列中取出 CallRunner 對象,并執(zhí)行:

public abstract class RpcExecutor {

    // 啟動多線程處理
    protected void startHandlers(final String nameSuffix, final int numHandlers,
        final List<BlockingQueue<CallRunner>> callQueues,
        final int qindex, final int qsize, final int port) {
        
        for (int i = 0; i < numHandlers; i++) {
            final int index = qindex + (i % qsize);
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    consumerLoop(callQueues.get(index));
                }
            });
            t.start();
        }
    }

    // 核心處理邏輯(省略部分代碼)
    protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
        try {
            while (running) {
                try {
                    // 如果 BlockingQueue 中沒有數(shù)據(jù)办成,會在此阻塞
                    CallRunner task = myQueue.take();
                    try {
                        // 執(zhí)行請求
                        task.run();
                    } catch (Throwable e) {
                        // throw or abort
                    } 
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
CallRunner & Call
image

CallRunner 和 Call 關(guān)鍵代碼如下:

class CallRunner {
    private Call call;

    public void run() {
        // ...        
        
        Pair<Message, CellScanner> resultPair = null;
        try {
            // make the call
            resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
                call.timestamp, this.status);
        } catch (Throwable e) {
            // ...
        } 
        
        // Call to respond
        call.sendResponseIfReady();
    }
}

class Call {
    public synchronized void sendResponseIfReady() throws IOException {
        // 結(jié)果返回給 Client
        this.responder.doRespond(this);
    }
}

調(diào)用執(zhí)行方法在 RpcServer#call() 方法中:

class RpcServer {

    @Override
    public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
        Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
        throws IOException {
    
    try {
        
        // call method
        // com.google.protobuf#BlockingService
        Message result = service.callBlockingMethod(md, controller, param);
      
        if (tooSlow || tooLarge) {
            // logging
        }
        return new Pair<Message, CellScanner>(result, controller.cellScanner());
    } catch (Throwable e) {
        // ...
    }
  }
}

Responder

Resonder 負(fù)責(zé)發(fā)送 RPC 請求結(jié)果給 Client泡态,Scheduler 調(diào)度請求后,執(zhí)行結(jié)果通過 doRespond() 加入到返回結(jié)果的相應(yīng)隊(duì)列里面

class Responder extends Thread {

    void doRespond(Call call) throws IOException {
        boolean added = false;
        
        // 如果已經(jīng)有一個(gè)正在進(jìn)行的寫入迂卢,不會等待某弦。
        // 這允許立即釋放處理程序以執(zhí)行其他任務(wù)桐汤。
        if (call.connection.responseQueue.isEmpty() && 
            call.connection.responseWriteLock.tryLock()) {
            
            try {
                if (call.connection.responseQueue.isEmpty()) {
                    
                    // 這里如果完成寫操作,直接返回
                    if (processResponse(call)) {
                        return; // we're done.
                    }
                    
                    call.connection.responseQueue.addFirst(call);
                    added = true;
                }
            } finally {
                call.connection.responseWriteLock.unlock();
            }
        }

        if (!added) {
            call.connection.responseQueue.addLast(call);
        }
        
        // Add a connection to the list that want to write,
        call.responder.registerForWrite(call.connection);
    }
}

如果在 doRespond() 中沒有完成寫操作刀崖,通過將 Call 對象的 connection 注冊到 selector惊科,由 Responder 中的線程進(jìn)行后續(xù)的操作。

protected class Responder extends Thread {
    private final Selector writeSelector;

    public void registerForWrite(Connection c) {
        if (writingCons.add(c)) {
            writeSelector.wakeup();
        }
    }
    
    private void doRunLoop() {
        while (running) {
            try {
                // 獲取要寫入的連接列表亮钦,并在 selector 中注冊
                registerWrites();
                
                // ...

                Set<SelectionKey> keys = writeSelector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isValid() && key.isWritable()) {
                        // 異步寫
                        doAsyncWrite(key);
                    }
                }
            } catch (OutOfMemoryError e) {
                // return or sleep  
            }
        }
    }

    private void doAsyncWrite(SelectionKey key) throws IOException {
        Connection connection = (Connection) key.attachment();
        if (processAllResponses(connection)) {
            // ...
        }
    }

    private boolean processAllResponses(final Connection connection) throws IOException {

        // Only one writer on the channel for a connection at a time.
        connection.responseWriteLock.lock();
        try {
            for (int i = 0; i < 20; i++) {
                Call call = connection.responseQueue.pollFirst();
            
                if (!processResponse(call)) {
                    connection.responseQueue.addFirst(call);
                    return false;
                }
            }
        } finally {
            connection.responseWriteLock.unlock();
        }

        return connection.responseQueue.isEmpty();
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末馆截,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蜂莉,更是在濱河造成了極大的恐慌蜡娶,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件映穗,死亡現(xiàn)場離奇詭異窖张,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)蚁滋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門宿接,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人辕录,你說我怎么就攤上這事睦霎。” “怎么了走诞?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵副女,是天一觀的道長。 經(jīng)常有香客問我蚣旱,道長碑幅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任塞绿,我火速辦了婚禮沟涨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘异吻。我一直安慰自己拷窜,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布涧黄。 她就那樣靜靜地躺著,像睡著了一般赋荆。 火紅的嫁衣襯著肌膚如雪笋妥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天窄潭,我揣著相機(jī)與錄音春宣,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛月帝,可吹牛的內(nèi)容都是我干的躏惋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼嚷辅,長吁一口氣:“原來是場噩夢啊……” “哼簿姨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起簸搞,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤扁位,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后趁俊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體域仇,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年寺擂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了暇务。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡怔软,死狀恐怖垦细,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情爽雄,我是刑警寧澤蝠检,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站挚瘟,受9級特大地震影響叹谁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乘盖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一焰檩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧订框,春花似錦析苫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至矛物,卻和暖如春茫死,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背履羞。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工峦萎, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留屡久,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓爱榔,卻偏偏與公主長得像涩馆,于是被迫代替她去往敵國和親双抽。 傳聞我的和親對象是個(gè)殘疾皇子付翁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容