Sarama 源碼筆記 02:Consumer

[TOC]
consumer 的結(jié)構(gòu)體很簡單帝美,里面主要是 partitionConsumer 和 brokerConsumer 兩個表:

type consumer struct {
    conf            *Config
    children        map[string]map[int32]*partitionConsumer
    brokerConsumers map[*Broker]*brokerConsumer
    client          Client
    lock            sync.Mutex
}

從上篇筆記中記得 ConsumePartition() 是消費的入口,看代碼大約是初始化一個 partitionConsumer,并啟動 partitionConsumer 的 dispatcher 和 responseFeeder 兩個 goroutine誊稚,將 brokerConsumer 的引用交給 partitionConsumer顽冶。

注釋中提到除非調(diào)用 AsyncClose() 或者 Close() 方法杂彭,ConsumePartition() 啟動的消費過程會永不停止,遇到錯誤动羽,就不停重試,如果開啟 Consumer.Return.Errors渔期,則可以將收到的錯誤交給 Errors 這個 channel 允許用戶處理运吓。

brokerConsumer 是什么呢?當 Consumer 需要從 Broker 訂閱多個 Topic 時疯趟,會使用單獨的一個連接來消費數(shù)據(jù)拘哨,再將數(shù)據(jù)按 partition 分給不同的 partitionConsumer。Consumer信峻、partitionConsumer倦青、brokerConsumer 乃至 broker 之間大致上似乎是這樣的關(guān)系:

image.png

整理一下 partitionConsumer 與 brokerConsumer 之間的交互還是有點復(fù)雜的,大概是:

  1. partitionConsumer 通過 broker.input 這個 chan 加入 brokerConsumer 的訂閱盹舞;
  2. 加入訂閱后产镐,brokerConsumer 每次 fetchNewMessages 得到一批消息,會發(fā)給每個 partitionConsumer 的 feeder chan踢步;
  3. 每個 partitionConsumer 解析 feeder chan 的數(shù)據(jù)得到 對應(yīng)的消息列表癣亚,同時設(shè)置自身的 responseResult,設(shè)置 brokers.acks 這個 WaitGoup 執(zhí)行 Done()贾虽,表示處理完畢該批次逃糟;
  4. brokerConsumer 等待每個 partitonConsumer 處理完畢該批次,處理所有 partitionConsumer 的 responseResult 后循環(huán) fetchNewMessages蓬豁;
image.png

這里的交互還是比較 go 特色的绰咽,直覺上 java 原版客戶端該不會做這么麻煩的交互。

brokerConsumer 的 input chan 有點像服務(wù)端的 accept地粪,會有多個 partitionConsumer 動態(tài)加入取募、解除對 brokerConsumer 的訂閱。

brokerConsumer 會負責發(fā)起 FetchRequest 的主循環(huán)蟆技,每輪迭代拉取一批消息玩敏,發(fā)送給每個 partitionConsumer 再轉(zhuǎn)給用戶的 messages chan斗忌。通過 acks 這個 WaitGroup 來協(xié)調(diào)每個 partitionConsumer 對這一批次的消息的處理節(jié)奏。

如果用戶消費 messages chan 有超時旺聚,parseResponse 會返回 errTimeout织阳,brokerConsumer 會依據(jù) errTimeout 而暫停 fetchRequest

partitionConsumer

partitonConsumer 會啟動 dispatcher 和 responseFeeder 兩個 goroutine,其中 dispatcher goroutine 用于跟蹤 broker 的變化砰粹,偏元信息性質(zhì)的控制側(cè)唧躲,而 responseFeeder 用于跟蹤消息的到來,偏數(shù)據(jù)側(cè)碱璃。

func (child *partitionConsumer) dispatcher() {
    for range child.trigger {
        select {
        case <-child.dying:
            close(child.trigger)
        case <-time.After(child.computeBackoff()):
            if child.broker != nil {
                child.consumer.unrefBrokerConsumer(child.broker)
                child.broker = nil
            }

            Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
            if err := child.dispatch(); err != nil {
                child.sendError(err)
                child.trigger <- none{}
            }
        }
    }

    if child.broker != nil {
        child.consumer.unrefBrokerConsumer(child.broker)
    }
    child.consumer.removeChild(child)
    close(child.feeder)
}

// ...

  func (child *partitionConsumer) dispatch() error {
      if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
          return err
      }

      var leader *Broker
      var err error
      if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
          return err
      }

      child.broker = child.consumer.refBrokerConsumer(leader)

      child.broker.input <- child

      return nil
  }

dispatcher 這個 goroutine 用于發(fā)現(xiàn) broker 的變化弄痹。它會偵聽 dispatcher.trigger 這個 channel 的通知,來發(fā)現(xiàn) Partition 的 Leader 變化嵌器。而 trigger 這個 channel 的更新來自 brokerConsumer 對象肛真。

最后 child.broker.input child 這一句,相當于使 partitionConsumer 加入 brokerConsumer 的訂閱爽航。

不過這里不是很明白 dispatcher 這個 goroutine 里為啥沒有對 partitionConsumer 對象上鎖蚓让。

func (child *partitionConsumer) responseFeeder() {
    var msgs []*ConsumerMessage
    expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
    firstAttempt := true

feederLoop:
    for response := range child.feeder {
        msgs, child.responseResult = child.parseResponse(response)

        if child.responseResult == nil {
            atomic.StoreInt32(&child.retries, 0)
        }

        for i, msg := range msgs {
        messageSelect:
            select {
            case <-child.dying:
                child.broker.acks.Done()
                continue feederLoop
            case child.messages <- msg:
                firstAttempt = true
            case <-expiryTicker.C:
                if !firstAttempt {
                    child.responseResult = errTimedOut
                    child.broker.acks.Done()
                remainingLoop:
                    for _, msg = range msgs[i:] {
                        select {
                        case child.messages <- msg:
                        case <-child.dying:
                            break remainingLoop
                        }
                    }
                    child.broker.input <- child
                    continue feederLoop
                } else {
                    // current message has not been sent, return to select
                    // statement
                    firstAttempt = false
                    goto messageSelect
                }
            }
        }

        child.broker.acks.Done()
    }

    expiryTicker.Stop()
    close(child.messages)
    close(child.errors)
}

child.feed 這個 channel 也是來自 brokerConsumer。大約是處理來自 brokerConsumer 的消息讥珍,轉(zhuǎn)發(fā)給 messages chan凭疮。

值得留意有一個配置項目 child.conf.Consumer.MaxProcessingTime,默認值為 100ms串述,看注釋它的意思是如果朝 messages chan 寫入超過 100ms 仍未成功执解,則停止再向 Broker 發(fā)送 fetch 請求。這一流程的實現(xiàn)有一些細節(jié):

  1. 通過 firstAttempt 變量判斷是否第一次超時纲酗,只有當?shù)诙纬瑫r才跑暫停 Broker 發(fā)送 fetch 請求的流程
  2. 設(shè)置 child.responseResult 為 errTimeout衰腌,以通知 brokerConsumer 暫停。
  3. 將當前批次的消息列表消化掉觅赊,如果中途 partitionConsumer 退出右蕊,則停止消化當前批次的消息;
  4. child.broker.input child吮螺,將 partitionConsumer 重新加入 brokerConsumer 的訂閱饶囚;

brokerConsumer

brokerConsumer 的結(jié)構(gòu)體如下:

type brokerConsumer struct {
      consumer         *consumer
      broker           *Broker
      input            chan *partitionConsumer
      newSubscriptions chan []*partitionConsumer
      subscriptions    map[*partitionConsumer]none
      wait             chan none
      acks             sync.WaitGroup
      refs             int
  }

初始化 brokerConsumer 時會產(chǎn)生兩個 goroutine:

bc.subscriptionManager:偵聽 input chan,獲取加入訂閱的 partitionConsumer鸠补;
bc.subscriptionConsumer:循環(huán)發(fā)起 FetchRequest萝风,向 broker 拉取一批消息,協(xié)調(diào)發(fā)給 partitionConsumer紫岩;
subscriptionConsumer 相當于 FetchRequest 的主循環(huán)流程:

//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
  func (bc *brokerConsumer) subscriptionConsumer() {
      <-bc.wait // wait for our first piece of work

      for newSubscriptions := range bc.newSubscriptions {
          bc.updateSubscriptions(newSubscriptions)

          if len(bc.subscriptions) == 0 {
              // We're about to be shut down or we're about to receive more subscriptions.
              // Either way, the signal just hasn't propagated to our goroutine yet.
              <-bc.wait
              continue
          }

          response, err := bc.fetchNewMessages()

          if err != nil {
              Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
              bc.abort(err)
              return
          }

          bc.acks.Add(len(bc.subscriptions))
          for child := range bc.subscriptions {
              child.feeder <- response
          }
          bc.acks.Wait()
          bc.handleResponses()
      }
  }

而 subscriptionManager 用于管理訂閱的加入规惰,并通過 bc.newSubscriptions 來通知給 subscriptionConsumer。

// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
  // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
  // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
  // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
  // so the main goroutine can block waiting for work if it has none.
  func (bc *brokerConsumer) subscriptionManager() {
      var buffer []*partitionConsumer

      for {
          if len(buffer) > 0 {
              select {
              case event, ok := <-bc.input:
                  if !ok {
                      goto done
                  }
                  buffer = append(buffer, event)
              case bc.newSubscriptions <- buffer:
                  buffer = nil
              case bc.wait <- none{}:
              }
          } else {
              select {
              case event, ok := <-bc.input:
                  if !ok {
                      goto done
                  }
                  buffer = append(buffer, event)
              case bc.newSubscriptions <- nil:
              }
          }
      }

  done:
      close(bc.wait)
      if len(buffer) > 0 {
          bc.newSubscriptions <- buffer
      }
      close(bc.newSubscriptions)
  }

總結(jié)

  1. consumer 與 broker 算是個多對多的關(guān)系泉蝌,如果有多個 partition 位于一個 broker歇万,那么通過單個 brokerConsumer 與之統(tǒng)一交互揩晴。
  2. 因此 partitionConsumer 與 brokerConsumer 屬于一個訂閱的關(guān)系,partitonConsumer 關(guān)注的點是將自己加入訂閱并處理訂閱的內(nèi)容贪磺,由 brokerConsumer 驅(qū)動 FetchRequest 循環(huán)硫兰;
  3. brokerConsumer 使用一個 WaitGroup 來協(xié)調(diào)多個 partitionConsumer 的執(zhí)行節(jié)奏與結(jié)果。
  4. 里面這些 goroutine 好像都沒怎么上鎖寒锚,不是很明白會不會有線程安全問題瞄崇。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市壕曼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌等浊,老刑警劉巖腮郊,帶你破解...
    沈念sama閱讀 212,029評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異筹燕,居然都是意外死亡轧飞,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,395評論 3 385
  • 文/潘曉璐 我一進店門撒踪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來过咬,“玉大人,你說我怎么就攤上這事制妄〉Ы剩” “怎么了?”我有些...
    開封第一講書人閱讀 157,570評論 0 348
  • 文/不壞的土叔 我叫張陵耕捞,是天一觀的道長衔掸。 經(jīng)常有香客問我,道長俺抽,這世上最難降的妖魔是什么敞映? 我笑而不...
    開封第一講書人閱讀 56,535評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮磷斧,結(jié)果婚禮上振愿,老公的妹妹穿的比我還像新娘。我一直安慰自己弛饭,他們只是感情好冕末,可當我...
    茶點故事閱讀 65,650評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著侣颂,像睡著了一般栓霜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上横蜒,一...
    開封第一講書人閱讀 49,850評論 1 290
  • 那天胳蛮,我揣著相機與錄音销凑,去河邊找鬼。 笑死仅炊,一個胖子當著我的面吹牛斗幼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抚垄,決...
    沈念sama閱讀 39,006評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼蜕窿,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了呆馁?” 一聲冷哼從身側(cè)響起桐经,我...
    開封第一講書人閱讀 37,747評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎浙滤,沒想到半個月后阴挣,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,207評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡纺腊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,536評論 2 327
  • 正文 我和宋清朗相戀三年畔咧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片揖膜。...
    茶點故事閱讀 38,683評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡誓沸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出壹粟,到底是詐尸還是另有隱情拜隧,我是刑警寧澤,帶...
    沈念sama閱讀 34,342評論 4 330
  • 正文 年R本政府宣布趁仙,位于F島的核電站虹蓄,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏幸撕。R本人自食惡果不足惜薇组,卻給世界環(huán)境...
    茶點故事閱讀 39,964評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坐儿。 院中可真熱鬧律胀,春花似錦、人聲如沸貌矿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,772評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逛漫。三九已至黑低,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背克握。 一陣腳步聲響...
    開封第一講書人閱讀 32,004評論 1 266
  • 我被黑心中介騙來泰國打工蕾管, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人菩暗。 一個月前我還...
    沈念sama閱讀 46,401評論 2 360
  • 正文 我出身青樓掰曾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親停团。 傳聞我的和親對象是個殘疾皇子旷坦,可洞房花燭夜當晚...
    茶點故事閱讀 43,566評論 2 349

推薦閱讀更多精彩內(nèi)容