探索Rabbitmq的Java客戶端

原文

AMQPConnection

實(shí)例初始化

創(chuàng)建Connection時會通過FrameHandlerFacotry創(chuàng)建一個SocketFrameHandler遣蚀,SocketFrameHandler對Socket進(jìn)行了封裝。

public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
    {
        checkPreconditions();
        this.username = params.getUsername();
        this.password = params.getPassword();
        this._frameHandler = frameHandler;
        this._virtualHost = params.getVirtualHost();
        this._exceptionHandler = params.getExceptionHandler();

        this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
        this.requestedFrameMax = params.getRequestedFrameMax();
        this.requestedChannelMax = params.getRequestedChannelMax();
        this.requestedHeartbeat = params.getRequestedHeartbeat();
        this.shutdownTimeout = params.getShutdownTimeout();
        this.saslConfig = params.getSaslConfig();
        this.executor = params.getExecutor();
        this.threadFactory = params.getThreadFactory();

        this._channelManager = null;

        this._brokerInitiatedShutdown = false;

        this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
    }

啟動連接

初始化WorkService和HeartBeatSender兢卵。

創(chuàng)建一個channel0的AMQChannel,這個channel不會被ChannelManager管理盐茎。

首先channel0會將一個BlockingRpcContinuation作為當(dāng)前未完成的Rpc請求袭蝗,用于接收handshake的響應(yīng)。

然后channel0會向socket中寫入一條只有header的消息作為handshake斋攀,header中包含了客戶端的版本號。

緊接著會啟動主循環(huán)線程梧田,主循環(huán)會通過SocketFrameHandler從socket接收字節(jié)流淳蔼。此時接收到的第一條數(shù)據(jù)是服務(wù)端響應(yīng)handshake返回的Connection.Start信息(包含服務(wù)端版本、機(jī)制裁眯、基礎(chǔ)信息)鹉梨。

主循環(huán)線程啟動后,主線程會阻塞地等待服務(wù)端的handshake響應(yīng)穿稳。拿到響應(yīng)之后會對服務(wù)器回傳的信息進(jìn)行比對存皂,然后發(fā)送Connection.StartOK的信息去服務(wù)端(這個請求也還是阻塞式的),等待服務(wù)端回傳Connection.Tune(包含最大channel數(shù)逢艘、最大frame長度和heartbeat間隔)旦袋。將這些信息與實(shí)例初始化是的設(shè)置信息進(jìn)行對比,初始化ChannelManager

緊接著發(fā)送Connection.TuneOk和Connection.Open消息去服務(wù)端它改,完成connection的建立疤孕。

Connection > MainLoop > readFrame

消息體

Frame是對AMQP消息的封裝:包含frame的type、channel號搔课、消息內(nèi)容

type|channelNumber|payloadSize|payload|frameEndMarker

Payload包含了消息類型胰柑、消息頭和消息主題

method|header|body

消息發(fā)送和接收

消息的發(fā)送和接收都要channel來完成。

創(chuàng)建Channel

通過Connection的ChannelManager來創(chuàng)建Channel爬泥,通過指定的ChannelNumber或者由分配器分配。創(chuàng)建好的Channel實(shí)例會放入ChannelManager的Map中崩瓤,key為ChannelNumber袍啡。由此可見Channel是Connection唯一的。

public ChannelN createChannel(AMQConnection connection);
public ChannelN createChannel(AMQConnection connection, int channelNumber);
private ChannelN addNewChannel(AMQConnection connection, int channelNumber);
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService);

Channel實(shí)例化之后會調(diào)用Channel.open方法却桶,發(fā)送Channel.Open去服務(wù)端(阻塞式)境输,等待服務(wù)端響應(yīng)Channel.OpenOk蔗牡。

消息發(fā)送

Channel.transmit 發(fā)送消息,調(diào)用AMQCommand.transmit完成發(fā)送嗅剖。

AMQCommand.transmit將消息封裝成Frame辩越,通過connection的SocketFrameHandler寫入OutpuStream。

消息接收

主循環(huán)線程在鏈接創(chuàng)建完成后會監(jiān)聽socket信粮,從InputStream中讀取二進(jìn)制流封裝成Frame黔攒。通過Frame中的ChannelNumber從ChannelManager中獲取對應(yīng)的Channel實(shí)例處理Frame。

while (_running) {
    Frame frame = _frameHandler.readFrame();
    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        cm.getChannel(frame.channel).handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

Channel會使用已經(jīng)準(zhǔn)備好的AMQCommand處理Frame强缘,并未下一個Frame準(zhǔn)備新的AMQCommand。

public void handleFrame(Frame frame) throws IOException {
    AMQCommand command = _command;
    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        _command = new AMQCommand(); // prepare for the next one
        handleCompleteInboundCommand(command);
    }
}

AMQCommad會使用CommandAssembler依次從Frame的payload中檢出對應(yīng)的Method、Header和Body凌净。如果檢出了Body什荣,整個Frame會被檢出完成。如過未完成商虐,會進(jìn)入主循環(huán)再次處理直至完成觉阅。

public synchronized boolean handleFrame(Frame f) throws IOException
{
    switch (this.state) {
      case EXPECTING_METHOD:          consumeMethodFrame(f); break;
      case EXPECTING_CONTENT_HEADER:  consumeHeaderFrame(f); break;
      case EXPECTING_CONTENT_BODY:    consumeBodyFrame(f);   break;
      default:
          throw new AssertionError("Bad Command State " + this.state);
    }
    return isComplete();
}

Frame被檢出完后,會根據(jù)Method的類型進(jìn)入不同的異步處理流程秘车。

Method在channel打開和關(guān)閉的情況下會以下的可能:

Channel打開:Basic.Deliver, Basic.Return, Basic.Flow, Basic.Ack, Basic.Nack, Basic.RecoveryOk, Basic.Cancel

Channel關(guān)閉:Channel.CloseOk

生產(chǎn)和消費(fèi)

生產(chǎn)

調(diào)用Channel.basicPublish()方法典勇,指定exchange、routingKey等信息鲫尊,消息屬性痴柔、消息體。封裝成Baisc.Publish疫向,放入AMQCommand咳蔚,最后調(diào)用transmit方法完成發(fā)送。參考消息發(fā)送

public void basicPublish(String exchange, String routingKey,
                         boolean mandatory, boolean immediate,
                         BasicProperties props, byte[] body)
    throws IOException
{
    if (nextPublishSeqNo > 0) {
        unconfirmedSet.add(getNextPublishSeqNo());
        nextPublishSeqNo++;
    }
    BasicProperties useProps = props;
    if (props == null) {
        useProps = MessageProperties.MINIMAL_BASIC;
    }
    transmit(new AMQCommand(new Basic.Publish.Builder()
                                .exchange(exchange)
                                .routingKey(routingKey)
                                .mandatory(mandatory)
                                .immediate(immediate)
                            .build(),
                            useProps, body));
}

消費(fèi)

創(chuàng)建QueueingConsumer實(shí)例搔驼,然后調(diào)用Channel.basicConsume方法谈火。

queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume("queue_name", queueingConsumer);
new Thread() {
    @Override
    public void run() {
        while (true) {
            try {
                final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                new Thread() {
                    @Override
                    public void run() {
                        try{
                            delivery.getEnvelope();//消息頭
                            delivery.getProperties();//消息屬性
                            delivery.getBody();//消息體
                        }finally{
                          //channel.basicAck();
                          //channel.basicNack()
                        }
                    }
                }.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}.start();

QueueingConsumer實(shí)現(xiàn)了Consumer接口舌涨。

Channel.basicConsume方法會封裝Channel.Consume消息發(fā)送到服務(wù)端(阻塞式)糯耍,等待服務(wù)端的Channel.ConsumeOk響應(yīng)(包含了服務(wù)端為Consumer分配的ConsumerTag)。然后將QueueingConsumer放入Map中囊嘉,key為ConsumerTag温技。consumer是Channel唯一。

當(dāng)客戶端接收到消息扭粱,參考消息接收舵鳞。Basic.Deliver類型的消息(consumerTag、deliveryTag琢蛤、redelivered蜓堕、exchange抛虏、routingKey)會進(jìn)入消費(fèi)處理流程。Channel根據(jù)ConsumerTag從Map中獲取對應(yīng)的QueueConsumer實(shí)例套才,由Channel的ConsumerDispatcher通過Connection初始化的WorkService創(chuàng)建新的處理線程迂猴,調(diào)用QueueConsumer實(shí)例的handleDelivery方法。QueueConsumer將消息封裝成Delivery對象背伴,放入BlockingQueue中沸毁。

消費(fèi)線程等待新的Delivery(阻塞式),之后創(chuàng)建新的線程完成消息的處理挂据。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末以清,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子崎逃,更是在濱河造成了極大的恐慌掷倔,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件个绍,死亡現(xiàn)場離奇詭異勒葱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)巴柿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門凛虽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人广恢,你說我怎么就攤上這事凯旋。” “怎么了钉迷?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵至非,是天一觀的道長。 經(jīng)常有香客問我糠聪,道長荒椭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任舰蟆,我火速辦了婚禮趣惠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘身害。我一直安慰自己味悄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布塌鸯。 她就那樣靜靜地躺著傍菇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪界赔。 梳的紋絲不亂的頭發(fā)上丢习,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機(jī)與錄音淮悼,去河邊找鬼咐低。 笑死,一個胖子當(dāng)著我的面吹牛袜腥,可吹牛的內(nèi)容都是我干的见擦。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼羹令,長吁一口氣:“原來是場噩夢啊……” “哼鲤屡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起福侈,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤酒来,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后肪凛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體堰汉,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年伟墙,在試婚紗的時候發(fā)現(xiàn)自己被綠了翘鸭。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡戳葵,死狀恐怖就乓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拱烁,我是刑警寧澤生蚁,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站邻梆,受9級特大地震影響守伸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜浦妄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一尼摹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧剂娄,春花似錦蠢涝、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至耳胎,卻和暖如春惯吕,著一層夾襖步出監(jiān)牢的瞬間惕它,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工废登, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淹魄,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓堡距,卻偏偏與公主長得像甲锡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子羽戒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理缤沦,服務(wù)發(fā)現(xiàn),斷路器易稠,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html缸废,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,660評論 0 3
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器缩多。支持消息的持久化呆奕、事務(wù)、擁塞控...
    jiangmo閱讀 10,361評論 2 34
  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)衬吆。消息可以非常簡單梁钾,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,348評論 0 1
  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,980評論 6 13