RabbitMQ系列(三):work queue

上篇講過簡單的hello消息,這篇我們將實現(xiàn)一個可以在多個Consumer上發(fā)送持久化消息的work queue媚值。

work queue又稱為task queue载弄,其主要作用是避免立即執(zhí)行資源密集型任務(wù)抢呆,而不必等待完成翰萨。相反的,我們可以安排后續(xù)完成的任務(wù)回挽。我們將任務(wù)封裝成消息没咙,并將其發(fā)送到隊列。在后臺運行的work進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)千劈,當(dāng)你運行多個worker時祭刚,他們之間還能分擔(dān)處理任務(wù)。

worker:相當(dāng)于一個Consumer

work queue在某些web應(yīng)用程序中特別有用,如在短時間HTTP請求窗口中無法處理的復(fù)雜任務(wù)涡驮。

work queue

在這個樣例中暗甥,由于不是真正的業(yè)務(wù)場景,所以不能模擬標(biāo)準(zhǔn)的復(fù)雜業(yè)務(wù)捉捅,所以我們用time.sleep函數(shù)來模擬時間的消耗撤防,我們統(tǒng)計出一個string中"."的個數(shù),用它來模式程序處理業(yè)務(wù)消耗的時間棒口,每個"."消耗1s寄月。如:“初級賽亞人...”,消耗3s

對于send.go无牵,稍作修改漾肮,以允許程序可以從命令行發(fā)送任意消息,該消息將被發(fā)送至work queue

receive.go也做部分修改茎毁,以實現(xiàn)模擬復(fù)雜任務(wù)的處理耗時操作

現(xiàn)在重新build兩個工程克懊,然后在命令端運行:

Round-robin dispatching

使用任務(wù)隊列的優(yōu)點之一是能夠輕松地并行工作。 如果我們正在建立一個積壓的工作七蜘,我們可以添加更多的worker程序谭溉,這樣可以很容易地擴(kuò)展。

如果我們同時運行兩個consumer從一個queue中獲取消息橡卤,它們會怎樣工作呢扮念?來看看它們的運行機制。

看下運行結(jié)果:

producer

Consumer1

Consumer2

Message acknowledgment

完成一個任務(wù)會消耗一定的時間蒜魄,考慮一個問題扔亥,如果一個consumer開始了一個長時任務(wù),在這個任務(wù)完成一部分還未完全完成時谈为,這個consumer就掛掉了,那這個消息就是沒處理完成踢关。在目前我們的代碼中伞鲫,rabbitMQ分發(fā)一個message到consumer中,就會立即標(biāo)記這個消息已完成并將它從queue中刪除掉签舞,在這種情況下秕脓,如果一個worker沒處理完一個message就掛掉了那么這個消息將會丟失。

但是我們并不想丟失任何消息儒搭,可能每一條都是非常重要的吠架;如果一個worker在處理一個message過程中掛掉了,那么我們更希望這個message會被分發(fā)到下一個worker中搂鲫,而不是因此丟失傍药。

為了保證任何的message永遠(yuǎn)不會丟失,rabbitMQ支持message acknowledgments。一個Ack(nowledgement)信號將由consumer返回給rabbitMQ拐辽,告訴它該消息已被接收并且完成處理拣挪,rabbitMQ收到這個ack之后將會從queue中刪除該message。

如果一個consumer掛掉(channel關(guān)閉俱诸,connection關(guān)閉菠劝,或者tcp連接斷開)導(dǎo)致未發(fā)送Ack,那么rabbitMQ會知道這個消息未被完全處理并且會重新發(fā)送它睁搭,如果這是正巧有其他consumer在線赶诊,rabbitMQ會迅速的將這個未處理的message重新分發(fā)給其他consumer,這樣就可以保證沒有message會丟失园骆。

Message acknowledgments默認(rèn)是關(guān)閉的舔痪;打開ack消息確認(rèn)機制需要將它設(shè)置為false(//auto-ack選項),此后調(diào)用msg.Ack(false)遇伞,worker會發(fā)送正確的ack消息到rabbitMQ辙喂。


這樣就能保證在worker正在處理message時,即使使用ctrl+c鸠珠,message也不會被丟失巍耗。

注:忘記返回Ack是很常見的錯誤,但是后果確實很嚴(yán)重渐排;當(dāng)客戶端退出時消息將會被重新發(fā)送(可能看起來像隨機發(fā)送)炬太,但是rabbitMQ會消耗越來越多的內(nèi)存,因為它無法釋放unacked message驯耻。為了調(diào)試中這種錯誤亲族,你可以使用rabbitmqctl打印message_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

windows下用:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message durability

我們已經(jīng)知道如何保證消息在發(fā)送至consumer的處理過程不丟失,但是當(dāng)rabbitMQ server掛掉時可缚,消息還是會丟失霎迫。為了保證queue和message不會在RabbitMQ server掛掉時造成數(shù)據(jù)丟失,我們需要將queue和message持久化帘靡。

首先知给,確保queue是durable(持久化的)。

這樣就保證了在RabbitMQ重啟情況下queue不會丟失描姚,現(xiàn)在需要將message標(biāo)記為持久性的涩赢,在amqp.Publishing中使用amqp.Persistent選項

注:將消息標(biāo)記為amqp.Persistent并不能解決消息絕不會被丟失的情況。它只是告訴RabbitMQ需要將該消息存儲至硬盤轩勘,但是在存儲過程中有一段存儲時間筒扒,如果這段時間rabbitMQ Server發(fā)生故障,那么這個消息將會造成丟失绊寻,所以RabbitMQ不會為每個消息做fsync(2)——僅僅存入cache中花墩,并不會真正寫入硬盤悬秉。這個持久化策略可能并不夠完美,但是它足以滿足你的大多數(shù)需求观游。如果你需要足夠完美的持久化策略以保證消息絕不會被丟失搂捧,可以參考publisher confirms

Fair dispatch

你肯能注意到目前RabbitMQ的message分發(fā)策略(默認(rèn)round-robin)并不是我們所期望的懂缕。比如允跑,有兩個worker,當(dāng)所有的奇數(shù)號的message很重搪柑,偶數(shù)號的message很輕聋丝;在round-robin下分發(fā)消息時,一個worker會拿到所有奇數(shù)號的消息工碾,另一個拿到所有偶數(shù)號的消息弱睦,這時一個worker就會非常忙碌而另一個則會閑著無所事事。因為RabbitMQ不知道這些事渊额,任然不斷的給workers分發(fā)消息况木。

發(fā)生這種情況是因為當(dāng)一個消息進(jìn)入queue時,RabbitMQ只負(fù)責(zé)將它分發(fā)到consumer旬迹,而不去查看這個consumer的unacknowledged messages的數(shù)量火惊,而只是負(fù)責(zé)將第n個消息分發(fā)給第n(求余之后的n)個consumer。


為了解決這種情況我們需要將prefetch的值設(shè)置為1奔垦。告訴RabbitMQ在同一時間給一個worker分發(fā)的message不超過1個屹耐。換言之,在一個worker處理完一個message并發(fā)送ack之前椿猎,別再給他分發(fā)任何message惶岭。這樣,這個消息就會被分發(fā)到下一個worker進(jìn)行處理犯眠。

receive.go

注:如果所有的worker都很忙按灶,此時queue可能被填充滿,你需要注意的是增加更多的worker筐咧,或者其他的策略兆衅。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市嗜浮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌摩疑,老刑警劉巖危融,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異雷袋,居然都是意外死亡吉殃,警方通過查閱死者的電腦和手機辞居,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蛋勺,“玉大人瓦灶,你說我怎么就攤上這事”辏” “怎么了贼陶?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長巧娱。 經(jīng)常有香客問我碉怔,道長,這世上最難降的妖魔是什么禁添? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任撮胧,我火速辦了婚禮,結(jié)果婚禮上老翘,老公的妹妹穿的比我還像新娘芹啥。我一直安慰自己,他們只是感情好铺峭,可當(dāng)我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布墓怀。 她就那樣靜靜地躺著,像睡著了一般逛薇。 火紅的嫁衣襯著肌膚如雪捺疼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天永罚,我揣著相機與錄音啤呼,去河邊找鬼。 笑死呢袱,一個胖子當(dāng)著我的面吹牛官扣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播羞福,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼惕蹄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了治专?” 一聲冷哼從身側(cè)響起卖陵,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎张峰,沒想到半個月后泪蔫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡喘批,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年撩荣,在試婚紗的時候發(fā)現(xiàn)自己被綠了铣揉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡餐曹,死狀恐怖逛拱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情台猴,我是刑警寧澤朽合,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站卿吐,受9級特大地震影響旁舰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜嗡官,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一箭窜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧衍腥,春花似錦磺樱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至尚骄,卻和暖如春块差,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背倔丈。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工憨闰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人需五。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓鹉动,卻偏偏與公主長得像,于是被迫代替她去往敵國和親宏邮。 傳聞我的和親對象是個殘疾皇子泽示,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化蜜氨、事務(wù)械筛、擁塞控...
    jiangmo閱讀 10,357評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)飒炎,斷路器变姨,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單厌丑,比如只包含文本字符串定欧,也可以更復(fù)雜...
    lijun_m閱讀 1,343評論 0 1
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評論 3 51
  • 此生,我把思念刻上你的名字怒竿,刻入骨髓砍鸠,愛上你是我此生的唯一,從此與我相依相伴耕驰,不離不棄爷辱;你的溫柔,你的體貼朦肘,沁滿了...
    凱羅閱讀 423評論 0 1