上篇講過簡單的hello消息,這篇我們將實現(xiàn)一個可以在多個Consumer上發(fā)送持久化消息的work queue媚值。
work queue又稱為task queue载弄,其主要作用是避免立即執(zhí)行資源密集型任務(wù)抢呆,而不必等待完成翰萨。相反的,我們可以安排后續(xù)完成的任務(wù)回挽。我們將任務(wù)封裝成消息没咙,并將其發(fā)送到隊列。在后臺運行的work進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)千劈,當(dāng)你運行多個worker時祭刚,他們之間還能分擔(dān)處理任務(wù)。
worker:相當(dāng)于一個Consumer
work queue在某些web應(yīng)用程序中特別有用,如在短時間HTTP請求窗口中無法處理的復(fù)雜任務(wù)涡驮。
work queue
在這個樣例中暗甥,由于不是真正的業(yè)務(wù)場景,所以不能模擬標(biāo)準(zhǔn)的復(fù)雜業(yè)務(wù)捉捅,所以我們用time.sleep函數(shù)來模擬時間的消耗撤防,我們統(tǒng)計出一個string中"."的個數(shù),用它來模式程序處理業(yè)務(wù)消耗的時間棒口,每個"."消耗1s寄月。如:“初級賽亞人...”,消耗3s
對于send.go无牵,稍作修改漾肮,以允許程序可以從命令行發(fā)送任意消息,該消息將被發(fā)送至work queue
receive.go也做部分修改茎毁,以實現(xiàn)模擬復(fù)雜任務(wù)的處理耗時操作
現(xiàn)在重新build兩個工程克懊,然后在命令端運行:
Round-robin dispatching
使用任務(wù)隊列的優(yōu)點之一是能夠輕松地并行工作。 如果我們正在建立一個積壓的工作七蜘,我們可以添加更多的worker程序谭溉,這樣可以很容易地擴(kuò)展。
如果我們同時運行兩個consumer從一個queue中獲取消息橡卤,它們會怎樣工作呢扮念?來看看它們的運行機制。
看下運行結(jié)果:
producer
Consumer1
Consumer2
Message acknowledgment
完成一個任務(wù)會消耗一定的時間蒜魄,考慮一個問題扔亥,如果一個consumer開始了一個長時任務(wù),在這個任務(wù)完成一部分還未完全完成時谈为,這個consumer就掛掉了,那這個消息就是沒處理完成踢关。在目前我們的代碼中伞鲫,rabbitMQ分發(fā)一個message到consumer中,就會立即標(biāo)記這個消息已完成并將它從queue中刪除掉签舞,在這種情況下秕脓,如果一個worker沒處理完一個message就掛掉了那么這個消息將會丟失。
但是我們并不想丟失任何消息儒搭,可能每一條都是非常重要的吠架;如果一個worker在處理一個message過程中掛掉了,那么我們更希望這個message會被分發(fā)到下一個worker中搂鲫,而不是因此丟失傍药。
為了保證任何的message永遠(yuǎn)不會丟失,rabbitMQ支持message acknowledgments。一個Ack(nowledgement)信號將由consumer返回給rabbitMQ拐辽,告訴它該消息已被接收并且完成處理拣挪,rabbitMQ收到這個ack之后將會從queue中刪除該message。
如果一個consumer掛掉(channel關(guān)閉俱诸,connection關(guān)閉菠劝,或者tcp連接斷開)導(dǎo)致未發(fā)送Ack,那么rabbitMQ會知道這個消息未被完全處理并且會重新發(fā)送它睁搭,如果這是正巧有其他consumer在線赶诊,rabbitMQ會迅速的將這個未處理的message重新分發(fā)給其他consumer,這樣就可以保證沒有message會丟失园骆。
Message acknowledgments默認(rèn)是關(guān)閉的舔痪;打開ack消息確認(rèn)機制需要將它設(shè)置為false(//auto-ack選項),此后調(diào)用msg.Ack(false)遇伞,worker會發(fā)送正確的ack消息到rabbitMQ辙喂。
這樣就能保證在worker正在處理message時,即使使用ctrl+c鸠珠,message也不會被丟失巍耗。
注:忘記返回Ack是很常見的錯誤,但是后果確實很嚴(yán)重渐排;當(dāng)客戶端退出時消息將會被重新發(fā)送(可能看起來像隨機發(fā)送)炬太,但是rabbitMQ會消耗越來越多的內(nèi)存,因為它無法釋放unacked message驯耻。為了調(diào)試中這種錯誤亲族,你可以使用rabbitmqctl打印message_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下用:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message durability
我們已經(jīng)知道如何保證消息在發(fā)送至consumer的處理過程不丟失,但是當(dāng)rabbitMQ server掛掉時可缚,消息還是會丟失霎迫。為了保證queue和message不會在RabbitMQ server掛掉時造成數(shù)據(jù)丟失,我們需要將queue和message持久化帘靡。
首先知给,確保queue是durable(持久化的)。
這樣就保證了在RabbitMQ重啟情況下queue不會丟失描姚,現(xiàn)在需要將message標(biāo)記為持久性的涩赢,在amqp.Publishing中使用amqp.Persistent選項
注:將消息標(biāo)記為amqp.Persistent并不能解決消息絕不會被丟失的情況。它只是告訴RabbitMQ需要將該消息存儲至硬盤轩勘,但是在存儲過程中有一段存儲時間筒扒,如果這段時間rabbitMQ Server發(fā)生故障,那么這個消息將會造成丟失绊寻,所以RabbitMQ不會為每個消息做fsync(2)——僅僅存入cache中花墩,并不會真正寫入硬盤悬秉。這個持久化策略可能并不夠完美,但是它足以滿足你的大多數(shù)需求观游。如果你需要足夠完美的持久化策略以保證消息絕不會被丟失搂捧,可以參考publisher confirms。
Fair dispatch
你肯能注意到目前RabbitMQ的message分發(fā)策略(默認(rèn)round-robin)并不是我們所期望的懂缕。比如允跑,有兩個worker,當(dāng)所有的奇數(shù)號的message很重搪柑,偶數(shù)號的message很輕聋丝;在round-robin下分發(fā)消息時,一個worker會拿到所有奇數(shù)號的消息工碾,另一個拿到所有偶數(shù)號的消息弱睦,這時一個worker就會非常忙碌而另一個則會閑著無所事事。因為RabbitMQ不知道這些事渊额,任然不斷的給workers分發(fā)消息况木。
發(fā)生這種情況是因為當(dāng)一個消息進(jìn)入queue時,RabbitMQ只負(fù)責(zé)將它分發(fā)到consumer旬迹,而不去查看這個consumer的unacknowledged messages的數(shù)量火惊,而只是負(fù)責(zé)將第n個消息分發(fā)給第n(求余之后的n)個consumer。
為了解決這種情況我們需要將prefetch的值設(shè)置為1奔垦。告訴RabbitMQ在同一時間給一個worker分發(fā)的message不超過1個屹耐。換言之,在一個worker處理完一個message并發(fā)送ack之前椿猎,別再給他分發(fā)任何message惶岭。這樣,這個消息就會被分發(fā)到下一個worker進(jìn)行處理犯眠。
receive.go
注:如果所有的worker都很忙按灶,此時queue可能被填充滿,你需要注意的是增加更多的worker筐咧,或者其他的策略兆衅。