[TOC]
consumer 的結(jié)構(gòu)體很簡單帝美,里面主要是 partitionConsumer 和 brokerConsumer 兩個表:
type consumer struct {
conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
client Client
lock sync.Mutex
}
從上篇筆記中記得 ConsumePartition() 是消費的入口,看代碼大約是初始化一個 partitionConsumer,并啟動 partitionConsumer 的 dispatcher 和 responseFeeder 兩個 goroutine誊稚,將 brokerConsumer 的引用交給 partitionConsumer顽冶。
注釋中提到除非調(diào)用 AsyncClose() 或者 Close() 方法杂彭,ConsumePartition() 啟動的消費過程會永不停止,遇到錯誤动羽,就不停重試,如果開啟 Consumer.Return.Errors渔期,則可以將收到的錯誤交給 Errors 這個 channel 允許用戶處理运吓。
brokerConsumer 是什么呢?當 Consumer 需要從 Broker 訂閱多個 Topic 時疯趟,會使用單獨的一個連接來消費數(shù)據(jù)拘哨,再將數(shù)據(jù)按 partition 分給不同的 partitionConsumer。Consumer信峻、partitionConsumer倦青、brokerConsumer 乃至 broker 之間大致上似乎是這樣的關(guān)系:
整理一下 partitionConsumer 與 brokerConsumer 之間的交互還是有點復(fù)雜的,大概是:
- partitionConsumer 通過 broker.input 這個 chan 加入 brokerConsumer 的訂閱盹舞;
- 加入訂閱后产镐,brokerConsumer 每次 fetchNewMessages 得到一批消息,會發(fā)給每個 partitionConsumer 的 feeder chan踢步;
- 每個 partitionConsumer 解析 feeder chan 的數(shù)據(jù)得到 對應(yīng)的消息列表癣亚,同時設(shè)置自身的 responseResult,設(shè)置 brokers.acks 這個 WaitGoup 執(zhí)行 Done()贾虽,表示處理完畢該批次逃糟;
- brokerConsumer 等待每個 partitonConsumer 處理完畢該批次,處理所有 partitionConsumer 的 responseResult 后循環(huán) fetchNewMessages蓬豁;
這里的交互還是比較 go 特色的绰咽,直覺上 java 原版客戶端該不會做這么麻煩的交互。
brokerConsumer 的 input chan 有點像服務(wù)端的 accept地粪,會有多個 partitionConsumer 動態(tài)加入取募、解除對 brokerConsumer 的訂閱。
brokerConsumer 會負責發(fā)起 FetchRequest 的主循環(huán)蟆技,每輪迭代拉取一批消息玩敏,發(fā)送給每個 partitionConsumer 再轉(zhuǎn)給用戶的 messages chan斗忌。通過 acks 這個 WaitGroup 來協(xié)調(diào)每個 partitionConsumer 對這一批次的消息的處理節(jié)奏。
如果用戶消費 messages chan 有超時旺聚,parseResponse 會返回 errTimeout织阳,brokerConsumer 會依據(jù) errTimeout 而暫停 fetchRequest
partitionConsumer
partitonConsumer 會啟動 dispatcher 和 responseFeeder 兩個 goroutine,其中 dispatcher goroutine 用于跟蹤 broker 的變化砰粹,偏元信息性質(zhì)的控制側(cè)唧躲,而 responseFeeder 用于跟蹤消息的到來,偏數(shù)據(jù)側(cè)碱璃。
func (child *partitionConsumer) dispatcher() {
for range child.trigger {
select {
case <-child.dying:
close(child.trigger)
case <-time.After(child.computeBackoff()):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
}
Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
}
}
}
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
}
child.consumer.removeChild(child)
close(child.feeder)
}
// ...
func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}
var leader *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
return err
}
child.broker = child.consumer.refBrokerConsumer(leader)
child.broker.input <- child
return nil
}
dispatcher 這個 goroutine 用于發(fā)現(xiàn) broker 的變化弄痹。它會偵聽 dispatcher.trigger 這個 channel 的通知,來發(fā)現(xiàn) Partition 的 Leader 變化嵌器。而 trigger 這個 channel 的更新來自 brokerConsumer 對象肛真。
最后 child.broker.input child 這一句,相當于使 partitionConsumer 加入 brokerConsumer 的訂閱爽航。
不過這里不是很明白 dispatcher 這個 goroutine 里為啥沒有對 partitionConsumer 對象上鎖蚓让。
func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
firstAttempt := true
feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}
for i, msg := range msgs {
messageSelect:
select {
case <-child.dying:
child.broker.acks.Done()
continue feederLoop
case child.messages <- msg:
firstAttempt = true
case <-expiryTicker.C:
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
select {
case child.messages <- msg:
case <-child.dying:
break remainingLoop
}
}
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
firstAttempt = false
goto messageSelect
}
}
}
child.broker.acks.Done()
}
expiryTicker.Stop()
close(child.messages)
close(child.errors)
}
child.feed 這個 channel 也是來自 brokerConsumer。大約是處理來自 brokerConsumer 的消息讥珍,轉(zhuǎn)發(fā)給 messages chan凭疮。
值得留意有一個配置項目 child.conf.Consumer.MaxProcessingTime,默認值為 100ms串述,看注釋它的意思是如果朝 messages chan 寫入超過 100ms 仍未成功执解,則停止再向 Broker 發(fā)送 fetch 請求。這一流程的實現(xiàn)有一些細節(jié):
- 通過 firstAttempt 變量判斷是否第一次超時纲酗,只有當?shù)诙纬瑫r才跑暫停 Broker 發(fā)送 fetch 請求的流程
- 設(shè)置 child.responseResult 為 errTimeout衰腌,以通知 brokerConsumer 暫停。
- 將當前批次的消息列表消化掉觅赊,如果中途 partitionConsumer 退出右蕊,則停止消化當前批次的消息;
- child.broker.input child吮螺,將 partitionConsumer 重新加入 brokerConsumer 的訂閱饶囚;
brokerConsumer
brokerConsumer 的結(jié)構(gòu)體如下:
type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
refs int
}
初始化 brokerConsumer 時會產(chǎn)生兩個 goroutine:
bc.subscriptionManager:偵聽 input chan,獲取加入訂閱的 partitionConsumer鸠补;
bc.subscriptionConsumer:循環(huán)發(fā)起 FetchRequest萝风,向 broker 拉取一批消息,協(xié)調(diào)發(fā)給 partitionConsumer紫岩;
subscriptionConsumer 相當于 FetchRequest 的主循環(huán)流程:
//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
// Either way, the signal just hasn't propagated to our goroutine yet.
<-bc.wait
continue
}
response, err := bc.fetchNewMessages()
if err != nil {
Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
bc.abort(err)
return
}
bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
child.feeder <- response
}
bc.acks.Wait()
bc.handleResponses()
}
}
而 subscriptionManager 用于管理訂閱的加入规惰,并通過 bc.newSubscriptions 來通知給 subscriptionConsumer。
// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
for {
if len(buffer) > 0 {
select {
case event, ok := <-bc.input:
if !ok {
goto done
}
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer:
buffer = nil
case bc.wait <- none{}:
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
goto done
}
buffer = append(buffer, event)
case bc.newSubscriptions <- nil:
}
}
}
done:
close(bc.wait)
if len(buffer) > 0 {
bc.newSubscriptions <- buffer
}
close(bc.newSubscriptions)
}
總結(jié)
- consumer 與 broker 算是個多對多的關(guān)系泉蝌,如果有多個 partition 位于一個 broker歇万,那么通過單個 brokerConsumer 與之統(tǒng)一交互揩晴。
- 因此 partitionConsumer 與 brokerConsumer 屬于一個訂閱的關(guān)系,partitonConsumer 關(guān)注的點是將自己加入訂閱并處理訂閱的內(nèi)容贪磺,由 brokerConsumer 驅(qū)動 FetchRequest 循環(huán)硫兰;
- brokerConsumer 使用一個 WaitGroup 來協(xié)調(diào)多個 partitionConsumer 的執(zhí)行節(jié)奏與結(jié)果。
- 里面這些 goroutine 好像都沒怎么上鎖寒锚,不是很明白會不會有線程安全問題瞄崇。