Nsq是什么
A realtime distributed messaging platform
特征:
- 分布式的
NSQ提倡沒(méi)有單點(diǎn)故障的分布式和分散拓?fù)洌С秩蒎e(cuò)和高可用性始苇,并提供可靠的消息傳遞保證
- 可擴(kuò)展
NSQ水平擴(kuò)展砌烁,沒(méi)有任何集中的代理。內(nèi)置的發(fā)現(xiàn)功能簡(jiǎn)化了向集群添加節(jié)點(diǎn)的工作埂蕊。支持發(fā)布-訂閱和負(fù)載平衡的消息傳遞往弓。它也很快。
- 操作友好
NSQ易于配置和部署蓄氧,并附帶一個(gè)管理UI函似。二進(jìn)制文件沒(méi)有運(yùn)行時(shí)依賴(lài)性,我們?yōu)閘inux喉童、darwin撇寞、freebsd和windows以及官方Docker映像提供預(yù)編譯版本。
- 完整的
官方的Go和Python庫(kù)以及大多數(shù)主要語(yǔ)言的社區(qū)支持的庫(kù)都是可用的(參見(jiàn)客戶(hù)端庫(kù))堂氯。
羅列如下:
- 基于低延遲推送的消息傳遞(性能)
- 負(fù)載平衡和多播風(fēng)格的消息路由組合
- 在流(高吞吐量)和面向工作(低吞吐量)的工作負(fù)載方面表現(xiàn)出色
- 主要在內(nèi)存中(超出高水位標(biāo)記的消息透明地保存在磁盤(pán)上)
- 供使用者查找生產(chǎn)者的運(yùn)行時(shí)發(fā)現(xiàn)服務(wù)(nsqlookupd)
- 傳輸層安全性(TLS)
- 不可知的數(shù)據(jù)格式
- 很少的依賴(lài)項(xiàng)(易于部署)和合理蔑担,有界的默認(rèn)配置
- 支持任何語(yǔ)言的客戶(hù)端庫(kù)的簡(jiǎn)單TCP協(xié)議
- 統(tǒng)計(jì)信息,管理操作和生產(chǎn)者的HTTP接口(無(wú)需發(fā)布客戶(hù)端庫(kù))
- 與statsd集成以進(jìn)行實(shí)時(shí)檢測(cè)
- 強(qiáng)大的集群管理界面(nsqadmin)
保證
與任何分布式系統(tǒng)一樣咽白,要實(shí)現(xiàn)目標(biāo)啤握,就需要進(jìn)行明智的權(quán)衡。通過(guò)透明地了解這些折衷的現(xiàn)實(shí)晶框,我們希望對(duì)NSQ在生產(chǎn)中部署時(shí)的行為設(shè)定期望排抬。
- 消息不持久(默認(rèn)情況下)
盡管系統(tǒng)支持一個(gè)--mem-queue-size
選項(xiàng),設(shè)置之后授段,消息可以透明的保存到磁盤(pán)上蹲蒲,但是,它主要還是一個(gè)內(nèi)存中的消息傳遞平臺(tái)侵贵。
可以將--mem-queue-size
設(shè)置為0届搁,以確保所有傳入消息都持久保存到磁盤(pán)。 在這種情況下,如果節(jié)點(diǎn)發(fā)生故障卡睦,那么您的故障面就會(huì)減少
沒(méi)有內(nèi)置的復(fù)制宴胧。 但是,可以通過(guò)多種方式來(lái)管理此折衷方案么翰,例如部署拓?fù)浜鸵匀蒎e(cuò)方式主動(dòng)將主題從屬并將其持久化到磁盤(pán)的技術(shù)牺汤。
- 消息至少傳遞一次
與上面的內(nèi)容密切相關(guān),這假定給定的nsqd節(jié)點(diǎn)不會(huì)失敗浩嫌。
這意味著,由于多種原因补胚,可以多次傳遞消息(客戶(hù)端超時(shí)码耐,斷開(kāi)連接,重新排隊(duì)等)溶其。 客戶(hù)有責(zé)任執(zhí)行冪等操作或重復(fù)數(shù)據(jù)刪除骚腥。
- 收到的消息是無(wú)序的
您不能依靠傳遞給使用者的消息順序。
與消息傳遞語(yǔ)義類(lèi)似瓶逃,這是重新排隊(duì)束铭,內(nèi)存中存儲(chǔ)和磁盤(pán)存儲(chǔ)組合以及每個(gè)nsqd節(jié)點(diǎn)不共享任何事實(shí)的結(jié)果。
通過(guò)在您的使用者中引入一個(gè)等待時(shí)間窗口以接受消息并在處理之前對(duì)其進(jìn)行排序厢绝,可以輕松地實(shí)現(xiàn)松散的排序(即契沫,對(duì)于給定的使用者,其消息是有序的昔汉,而不是整個(gè)集群)懈万。 保留此不變變量,必須丟棄該窗口之外的消息)靶病。
- 消費(fèi)者最終會(huì)找到所有的主題生產(chǎn)者
發(fā)現(xiàn)服務(wù)(nsqlookupd)旨在最終保持一致会通。 nsqlookupd節(jié)點(diǎn)不協(xié)調(diào)維護(hù)狀態(tài)或回答查詢(xún)。
網(wǎng)絡(luò)分區(qū)不會(huì)影響可用性娄周,因?yàn)榉謪^(qū)的兩側(cè)仍然可以回答查詢(xún)涕侈。 部署拓?fù)鋵?duì)緩解這些類(lèi)型的問(wèn)題具有最重要的作用。
Nsq的組件
NSQ由3個(gè)守護(hù)程序組成:
- nsqd是接收煤辨,排隊(duì)并將消息傳遞到客戶(hù)端的守護(hù)程序裳涛。它可以獨(dú)立運(yùn)行,但通常在具有nsqlookupd實(shí)例的群集中配置(在這種情況下掷酗,它將宣布主題和發(fā)現(xiàn)通道)调违。它監(jiān)聽(tīng)兩個(gè)TCP端口,一個(gè)用于客戶(hù)機(jī)泻轰,另一個(gè)用于HTTP API技肩。它可以選擇監(jiān)聽(tīng)HTTPS的第三個(gè)端口。
- nsqlookupd是管理拓?fù)湫畔⒉⑻峁┳罱K一致的發(fā)現(xiàn)服務(wù)的守護(hù)程序。 客戶(hù)端查詢(xún)nsqlookupd以發(fā)現(xiàn)特定主題的nsqd生產(chǎn)者虚婿,并且nsqd節(jié)點(diǎn)廣播主題和頻道信息旋奢。有兩個(gè)接口:nsqd用于廣播的TCP接口和客戶(hù)機(jī)用于執(zhí)行發(fā)現(xiàn)和管理操作的HTTP接口。
- nsqadmin是一個(gè)Web UI然痊,用于實(shí)時(shí)內(nèi)省群集(并執(zhí)行各種管理任務(wù))至朗。
核心概念
Topics和Channels
主題和渠道是NSQ的核心原語(yǔ),最能說(shuō)明系統(tǒng)設(shè)計(jì)如何無(wú)縫轉(zhuǎn)換為Go的功能剧浸。go中的channesl是表示隊(duì)列的自然方式锹引,因此一個(gè)NSQ主題/通道,就像帶有緩存的go channel. 緩沖區(qū)的大小等于--mem-queue-size
配置參數(shù)唆香。
在線(xiàn)讀取數(shù)據(jù)后嫌变,將消息發(fā)布到主題的操作涉及:
- 消息結(jié)構(gòu)的實(shí)例化(以及消息正文[] byte的分配)
- 讀取鎖以獲取主題
- 讀取鎖以檢查發(fā)布能力
- 發(fā)送到帶緩沖的go-chan
為了讓消息從一個(gè)主題放到它的channel中不能依賴(lài)于典型的go channel接收原語(yǔ)。因?yàn)槎鄠€(gè)goroutines從一個(gè)go channel中接收消息會(huì)分發(fā)消息躬它,而期望的最終結(jié)果是將每個(gè)消息復(fù)制到每個(gè)通道(goroutine)腾啥。
相反,每個(gè)主題都維護(hù)3個(gè)主要的goroutine冯吓。 第一個(gè)稱(chēng)為router
倘待,負(fù)責(zé)從傳入的go-chan中讀取新發(fā)布的消息并將其存儲(chǔ)在隊(duì)列(內(nèi)存或磁盤(pán))中。
第二個(gè)稱(chēng)為messagePump
组贺,負(fù)責(zé)如上所述將消息復(fù)制并推送到通道凸舵。
第三個(gè)負(fù)責(zé)DiskQueue IO
,將在后面討論锣披。
通道稍微復(fù)雜一些贞间,但共有一個(gè)基本目標(biāo),即公開(kāi)單個(gè)輸入和單個(gè)輸出go-chan(以抽象出內(nèi)部消息可能在內(nèi)存或磁盤(pán)中的事實(shí)):
此外雹仿,每個(gè)通道維護(hù)2個(gè)按時(shí)間順序排列的優(yōu)先級(jí)隊(duì)列增热,這些隊(duì)列負(fù)責(zé)延遲和進(jìn)行中的消息超時(shí)(以及2個(gè)隨附的goroutines進(jìn)行監(jiān)視)。
通過(guò)管理每個(gè)通道的數(shù)據(jù)結(jié)構(gòu)胧辽,而不是依賴(lài)Go運(yùn)行時(shí)的全局計(jì)時(shí)器調(diào)度程序峻仇,可以改善并行化。
注意:在內(nèi)部邑商,Go運(yùn)行時(shí)使用單個(gè)優(yōu)先級(jí)隊(duì)列和goroutine來(lái)管理計(jì)時(shí)器摄咆。 這支持(但不限于)整個(gè)time
包。 通常人断,它不需要用戶(hù)級(jí)按時(shí)間順序排列的優(yōu)先級(jí)隊(duì)列吭从,但請(qǐng)務(wù)必記住,它是具有單個(gè)鎖的單個(gè)數(shù)據(jù)結(jié)構(gòu)恶迈,可能會(huì)影響GOMAXPROCS> 1
的性能涩金。 請(qǐng)參閱runtime/time.go
。
Backend和DiskQueue
NSQ的設(shè)計(jì)目標(biāo)之一是限制內(nèi)存中的消息數(shù)量。它通過(guò)DiskQueue
(主題或通道的第三個(gè)主goroutine)透明地將消息溢出寫(xiě)入磁盤(pán)來(lái)實(shí)現(xiàn)這一點(diǎn)步做。
因?yàn)閮?nèi)存隊(duì)列只是一個(gè)go channel副渴,首先,將消息路由到內(nèi)存很簡(jiǎn)單全度,如果可能煮剧,然后回寫(xiě)到磁盤(pán):
for msg := range c.incomingMsgChan {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
// ... handle errors ...
}
}
}
利用Go的select
語(yǔ)句可以用幾行代碼來(lái)表達(dá)這個(gè)功能:上面的default
只在memoryMsgChan
滿(mǎn)的時(shí)候執(zhí)行。
Nsq還有短暫topic/channel到概念将鸵,他們?cè)谙⒁绯鰰r(shí)勉盅,直接丟棄他們(而不是寫(xiě)入到磁盤(pán))當(dāng)他們不再有客戶(hù)端訂閱的時(shí)候。這是go 接口的很好的一個(gè)使用案例咨堤,主題和渠道的struct成員聲明為Backend
接口菇篡,而不是具體類(lèi)型。普通主題和頻道使用DiskQueue
一喘,而臨時(shí)主題和頻道則在DummyBackendQueue
中使用存根,而DummyBackendQueue
實(shí)現(xiàn)了無(wú)操作Backend
嗜暴。
設(shè)計(jì)原理
簡(jiǎn)化配置和管理
單個(gè)nsqd實(shí)例旨在一次處理多個(gè)數(shù)據(jù)流凸克。 流稱(chēng)為“主題”,一個(gè)主題具有1個(gè)或多個(gè)“頻道”闷沥。 每個(gè)頻道都會(huì)收到一個(gè)主題的所有消息的副本萎战。 實(shí)際上,頻道映射到消耗主題的下游服務(wù)舆逃。
主題和頻道沒(méi)有事先配置蚂维。 通過(guò)發(fā)布到命名主題或訂閱命名主題上的頻道,可以在首次使用時(shí)創(chuàng)建主題路狮。 通過(guò)訂閱指定的頻道來(lái)在首次使用時(shí)創(chuàng)建頻道虫啥。
主題和頻道都相互獨(dú)立地緩沖數(shù)據(jù),從而防止緩慢的使用者造成其他頻道的積壓(在主題級(jí)別也是如此)奄妨。
一個(gè)通道通惩孔眩可以連接多個(gè)客戶(hù)端。 假設(shè)所有連接的客戶(hù)端都處于準(zhǔn)備接收消息的狀態(tài)砸抛,則每條消息都將傳遞給隨機(jī)客戶(hù)端评雌。 例如:
總而言之,消息是從主題->通道多播的(每個(gè)通道都接收該主題的所有消息的副本)直焙,但從通道->使用者均勻分發(fā)(每個(gè)消費(fèi)者都接收該通道的一部分消息)景东。
NSQ還包括一個(gè)輔助應(yīng)用程序nsqlookupd,該應(yīng)用程序提供了目錄服務(wù)奔誓,消費(fèi)者可以在其中查找nsqd實(shí)例的地址斤吐,這些地址提供了他們感興趣的訂閱主題。 在配置方面,這使消費(fèi)者與生產(chǎn)者分離(他們兩個(gè)都只需要知道在哪里聯(lián)系nsqlookupd的普通實(shí)例曲初,而彼此之間就不需要)体谒,從而降低了復(fù)雜性和維護(hù)。
在較低級(jí)別上臼婆,每個(gè)nsqd都具有到nsqlookupd的長(zhǎng)期TCP連接抒痒,并通過(guò)該連接定期推送其狀態(tài)。 此數(shù)據(jù)用于通知nsqlookupd將給消費(fèi)者哪些nsqd地址颁褂。 對(duì)于使用者,將公開(kāi)HTTP /lookup
端點(diǎn)以進(jìn)行輪詢(xún)颁独。
要引入主題的新的不同消費(fèi)者誓酒,只需啟動(dòng)一個(gè)NSQ客戶(hù)端,該客戶(hù)端使用nsqlookupd實(shí)例的地址配置。不需要更改配置就可以添加新的使用者或新的發(fā)布者,從而大大降低了開(kāi)銷(xiāo)和復(fù)雜性钧排。
注意:在將來(lái)的版本中形帮,啟發(fā)式nsqlookupd用于返回地址的方法可能基于深度合冀,連接的客戶(hù)端數(shù)或其他“智能”策略急侥。 當(dāng)前的實(shí)現(xiàn)僅僅是全部。 最終,目標(biāo)是確保讀取所有生產(chǎn)者的深度保持接近零百拓。
重要的是要注意琴锭,nsqd和nsqlookupd守護(hù)程序旨在獨(dú)立運(yùn)行,之間無(wú)需通信或協(xié)調(diào)衙传。
我們還認(rèn)為决帖,有一種查看,反思和管理匯總?cè)杭姆椒ū痛罚@一點(diǎn)非常重要地回。 我們構(gòu)建了nsqadmin來(lái)做到這一點(diǎn)。 它提供了一個(gè)Web UI腺阳,以瀏覽主題/通道/消費(fèi)者的層次結(jié)構(gòu)落君,并檢查每一層的深度和其他關(guān)鍵統(tǒng)計(jì)數(shù)據(jù)。 此外亭引,它還支持一些管理命令绎速,例如刪除和清空通道(這是一個(gè)有用的工具當(dāng)可以安全地丟棄通道中的消息以使深度回到0時(shí))。
直接升級(jí)路徑
這是我們的最高優(yōu)先事項(xiàng)之一焙蚓。 我們的生產(chǎn)系統(tǒng)全部基于我們現(xiàn)有的消息傳遞工具來(lái)處理大量流量纹冤,因此我們需要一種緩慢而有條不紊地升級(jí)基礎(chǔ)架構(gòu)的特定部分而幾乎沒(méi)有影響的方法。
首先购公,在消息生成器端萌京,我們構(gòu)建了nsqd來(lái)匹配simplequeue。 具體地說(shuō)宏浩,nsqd像simplequeue一樣知残,向POST二進(jìn)制數(shù)據(jù)公開(kāi)一個(gè)HTTP / put終結(jié)點(diǎn)(一個(gè)警告是該終結(jié)點(diǎn)采用一個(gè)附加的查詢(xún)參數(shù)來(lái)指定“主題”)。 想要切換為開(kāi)始發(fā)布到nsqd的服務(wù)僅需進(jìn)行少量代碼更改比庄。
其次求妹,我們?cè)赑ython和Go中都建立了與我們現(xiàn)有庫(kù)中習(xí)慣的功能和慣用語(yǔ)相匹配的庫(kù)。 通過(guò)將代碼更改限制為引導(dǎo)程序佳窑,這簡(jiǎn)化了消息使用方的過(guò)渡制恍。 所有業(yè)務(wù)邏輯都保持不變。
最后神凑,我們構(gòu)建了將新舊組件粘合在一起的實(shí)用程序净神。 這些都可以在存儲(chǔ)庫(kù)的examples目錄中找到:
- 向NSQ集群中的主題公開(kāi)類(lèi)似于HTTP的pubsub的接口
- 持久地將給定主題的所有消息寫(xiě)入文件
- 將主題中的所有消息執(zhí)行到(多個(gè))端點(diǎn)的HTTP請(qǐng)求
消除SPOT
NSQ被設(shè)計(jì)為以分布式方式使用。 nsqd客戶(hù)端(通過(guò)TCP)連接到提供指定主題的所有實(shí)例溉委。 沒(méi)有中間人鹃唯,沒(méi)有消息代理,也沒(méi)有SPOF:
這種拓?fù)浣Y(jié)構(gòu)消除了鏈接單個(gè)聚合訂閱源的需要薛躬。 相反俯渤,您直接從所有生產(chǎn)者那里消費(fèi)。 從技術(shù)上講型宝,哪個(gè)客戶(hù)端連接到哪個(gè)NSQ都無(wú)關(guān)緊要八匠,只要有足夠的客戶(hù)端連接到所有生產(chǎn)者并滿(mǎn)足消息量絮爷,就可以保證最終將全部處理。
對(duì)于nsqlookupd梨树,可通過(guò)運(yùn)行多個(gè)實(shí)例來(lái)實(shí)現(xiàn)高可用性坑夯。 它們不會(huì)直接相互通信,并且數(shù)據(jù)最終被認(rèn)為是一致的抡四。 使用者輪詢(xún)所有已配置的nsqlookupd實(shí)例柜蜈,并合并響應(yīng)。 過(guò)時(shí)的節(jié)點(diǎn)指巡,無(wú)法訪(fǎng)問(wèn)的節(jié)點(diǎn)或其他有故障的節(jié)點(diǎn)不會(huì)使系統(tǒng)癱瘓淑履。
消息傳遞保證
NSQ保證消息至少要傳遞一次,盡管可能會(huì)重復(fù)消息藻雪。 消費(fèi)者應(yīng)該對(duì)此有所期待秘噪,并進(jìn)行重復(fù)數(shù)據(jù)刪除或執(zhí)行冪等操作。
此保證作為協(xié)議的一部分而強(qiáng)制執(zhí)行勉耀,并且按以下方式工作(假定客戶(hù)端已成功連接并訂閱了主題):
- 客戶(hù)端表明他們已準(zhǔn)備好接收消息
- NSQ發(fā)送消息并臨時(shí)在本地存儲(chǔ)數(shù)據(jù)(在重新排隊(duì)或超時(shí)的情況下)
- 客戶(hù)端回復(fù)FIN或者REQ分別表示成功和失敗指煎,如果客戶(hù)端在配置的時(shí)間間隔內(nèi)不回復(fù)NSQ將超時(shí),并將消息自動(dòng)重新排隊(duì)便斥。)
這樣可以確保導(dǎo)致消息丟失的唯一極端情況是nsqd進(jìn)程的異常關(guān)閉至壤。 在這種情況下,內(nèi)存中的任何消息(或任何未刷新到磁盤(pán)的緩沖寫(xiě)入)都將丟失枢纠。
如果最重要的是防止消息丟失像街,那么即使是這種極端情況也可以緩解。 一種解決方案是啟動(dòng)冗余的nsqd副本實(shí)例(在單獨(dú)的主機(jī)上)晋渺,以接收消息的相同部分宅广。 因?yàn)槟褜⑹褂谜叨閮绲龋詫?duì)這些消息進(jìn)行兩次處理不會(huì)對(duì)下游產(chǎn)生影響些举,并且使系統(tǒng)能夠承受任何單節(jié)點(diǎn)故障而不會(huì)丟失消息。
結(jié)論是俭厚,NSQ提供了構(gòu)建塊來(lái)支持各種生產(chǎn)用例和可配置的持久性程度户魏。
有限的內(nèi)存占用
nsqd提供配置選項(xiàng)--mem-queue-size
,該選項(xiàng)將確定給定隊(duì)列在內(nèi)存中保留的消息數(shù)挪挤。 如果隊(duì)列的深度超過(guò)此閾值叼丑,則將消息透明地寫(xiě)入磁盤(pán)。 這會(huì)將給定nsqd
進(jìn)程的內(nèi)存占用空間限制為mem-queue-size * #_of_channels_and_topics
:
而且扛门,敏銳的觀(guān)察者可能已經(jīng)發(fā)現(xiàn)鸠信,通過(guò)將這個(gè)值設(shè)置為較低的值(如1或甚至0),這是獲得更高交付保證的一種方便方法论寨。
對(duì)于數(shù)據(jù)協(xié)議星立,我們做了一個(gè)關(guān)鍵的設(shè)計(jì)決策爽茴,通過(guò)將數(shù)據(jù)推送到客戶(hù)端而不是等待它被拉出,從而最大化性能和吞吐量绰垂。這個(gè)概念室奏,我們稱(chēng)之為RDY狀態(tài),本質(zhì)上是客戶(hù)端流控制的一種形式劲装。
當(dāng)客戶(hù)端連接到nsqd并訂閱頻道時(shí)胧沫,它處于RDY狀態(tài)0考榨。這意味著不會(huì)有任何消息發(fā)送到客戶(hù)端规丽。 當(dāng)客戶(hù)端準(zhǔn)備好接收消息時(shí),它會(huì)發(fā)送一條命令块攒,將其RDY狀態(tài)更新為大約#谦疾,它準(zhǔn)備處理南蹂,例如100。如果沒(méi)有其他命令餐蔬,則會(huì)在有可用消息時(shí)將100條消息推送到客戶(hù)端(每次減少 該客戶(hù)端的服務(wù)器端RDY計(jì)數(shù))碎紊。
客戶(hù)端庫(kù)被設(shè)計(jì)用來(lái)發(fā)送一個(gè)命令來(lái)更新RDY計(jì)數(shù),當(dāng)它達(dá)到可配置的最大動(dòng)態(tài)設(shè)置的25%時(shí)(適當(dāng)?shù)胤峙涞蕉鄠€(gè)nsqd實(shí)例的連接)樊诺。
這是一個(gè)重要的性能旋鈕仗考,因?yàn)槟承┫掠蜗到y(tǒng)能夠更輕松地批量處理消息,并受益于更高的max-in-flight
词爬。
值得注意的是秃嗜,由于它既基于緩沖又具有推送功能,并且能夠滿(mǎn)足對(duì)流(通道)的獨(dú)立副本的需求顿膨,因此我們生成了一個(gè)守護(hù)程序锅锨,其行為類(lèi)似于simplequeue和pubsub組合。 在簡(jiǎn)化我們傳統(tǒng)上會(huì)維護(hù)上面討論過(guò)的舊工具鏈的系統(tǒng)拓?fù)浞矫媪滴郑@很有用必搞。
Go
我們很早就做出了一項(xiàng)戰(zhàn)略決策,即在Go中構(gòu)建NSQ核心囊咏。 我們最近在一點(diǎn)點(diǎn)博客中介紹了我們對(duì)Go的使用恕洲,并暗示了這個(gè)項(xiàng)目-瀏覽該帖子以了解我們對(duì)語(yǔ)言的看法可能會(huì)有所幫助。
關(guān)于NSQ梅割,Go通道(不要與NSQ通道混淆)和該語(yǔ)言的內(nèi)置并發(fā)功能非常適合nsqd的內(nèi)部工作霜第。 我們利用緩沖的通道來(lái)管理內(nèi)存中的消息隊(duì)列,并將溢出無(wú)縫地寫(xiě)入磁盤(pán)户辞。
標(biāo)準(zhǔn)庫(kù)使編寫(xiě)網(wǎng)絡(luò)層和客戶(hù)端代碼變得容易泌类。 內(nèi)置的內(nèi)存和cpu性能分析掛鉤突出了優(yōu)化的機(jī)會(huì),并且只需很少的精力即可進(jìn)行集成底燎。 我們還發(fā)現(xiàn)隔離刃榨,測(cè)試使用接口的模擬類(lèi)型以及迭代構(gòu)建功能真的很容易弹砚。
簡(jiǎn)單部署
以下步驟將在本地計(jì)算機(jī)上運(yùn)行一個(gè)小型NSQ群集,并逐步完成將消息發(fā)布喇澡,使用和歸檔到磁盤(pán)的過(guò)程迅栅。
- 1.在一個(gè)shell中啟動(dòng)
nsqlookupd
$ nsqlookupd
- 2.在另一個(gè)shell中啟動(dòng)
nsqd
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
- 3.在另一個(gè)shell中啟動(dòng)
nsqadmin
:
$ nsqadmin --lookupd-http-address=127.0.0.1:4161
- 4.發(fā)布一個(gè)初始化消息(在集群中創(chuàng)建主題):
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
- 5.最后,在另一個(gè)shell中晴玖,啟動(dòng)
nsq_to_file
:
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
- 6.發(fā)布更多的消息到
nsqd
上:
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
- 7.驗(yàn)證
為了驗(yàn)證事情按照我們期望的方式運(yùn)行读存,在web瀏覽器中打開(kāi)http://127.0.0.1:4171/
來(lái)查詢(xún)nsqadmin UI,并查詢(xún)靜態(tài)數(shù)據(jù)呕屎,同時(shí)让簿,檢查/tmp
目錄下日志文件test.*.log
的內(nèi)容。
使用Docker部署NSQ
這一小結(jié)詳細(xì)介紹了如何在Docker容器中部署和運(yùn)行nsq二進(jìn)制文件
有一個(gè)包含所有NSQ二進(jìn)制文件的最小nsq映像秀睛。 通過(guò)在運(yùn)行Docker時(shí)將二進(jìn)制文件指定為命令可以運(yùn)行每個(gè)二進(jìn)制文件尔当。 基本格式為:
docker run nsqio/nsq /<command>
請(qǐng)注意命令前的/號(hào)。例如:
docker run nsqio/nsq /nsq_to_file
運(yùn)行nsqlookupd
docker pull nsqio/nsq
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
運(yùn)行nsqd
首先蹂安,獲取Docker主機(jī)的IP:
ifconfig | grep addr
其次椭迎,運(yùn)行nsqd容器:
docker pull nsqio/nsq
docker run --name nsqd -p 4150:4150 -p 4151:4151 \
nsqio/nsq /nsqd \
--broadcast-address=<host> \
--lookupd-tcp-address=<host>:<port>
將--lookupd-tcp-address
標(biāo)志設(shè)置為以前運(yùn)行nsqlookupd
的主機(jī)的IP和TCP端口,即dockerIP:4160
:
特別注意:在本地部署測(cè)試的時(shí)候田盈,這里不要用127.0.0.1
例如畜号,給定主機(jī)IP 172.17.42.1
:
docker run --name nsqd -p 4150:4150 -p 4151:4151 \
nsqio/nsq /nsqd \
--broadcast-address=172.17.42.1 \
--lookupd-tcp-address=172.17.42.1:4160
請(qǐng)注意,該端口使用端口4160
允瞧,這是我們啟動(dòng)nsqlookupd
容器時(shí)公開(kāi)的端口(它也是nsqlookupd
的默認(rèn)端口)简软。
如果要使用非默認(rèn)端口,請(qǐng)更改-p
參數(shù):
docker run --name nsqlookupd -p 5160:4160 -p 5161:4161 nsqio/nsq /nsqlookupd
這將使nsqlookupd
在端口5160
和5161
的Docker主機(jī)IP上可用述暂。
使用TLS
要對(duì)集裝箱化的NSQ二進(jìn)制文件使用TLS痹升,您需要包括證書(shū)文件、私鑰和根CA文件畦韭。Docker映像在/etc/ssl/certs/
上有一個(gè)可用于此目的的卷安裝疼蛾。將包含文件的主機(jī)目錄裝載到卷中,然后像往常一樣在命令行中指定文件:
docker run -p 4150:4150 -p 4151:4151 -p 4152:4152 -v /home/docker/certs:/etc/ssl/certs \
nsqio/nsq /nsqd \
--tls-root-ca-file=/etc/ssl/certs/certs.crt \
--tls-cert=/etc/ssl/certs/cert.pem \
--tls-key=/etc/ssl/certs/key.pem \
--tls-required=true \
--tls-client-auth-policy=require-verify
這將把證書(shū)從/home/docker/certs
加載到docker容器中艺配,以便在運(yùn)行時(shí)使用据过。
持久化NSQ數(shù)據(jù)
要將nsqd
數(shù)據(jù)存儲(chǔ)在主機(jī)磁盤(pán)上,請(qǐng)使用/data
卷作為數(shù)據(jù)目錄妒挎,這使您可以裝載到僅包含數(shù)據(jù)的Docker容器或裝載到主機(jī)目錄:
docker run nsqio/nsq /nsqd \
--data-path=/data
使用docker-compose
為了使用docker-compose
一起啟動(dòng)nsqd
,nsqlookupd
和nsqadmin
西饵,然后創(chuàng)建docker-compose.yml
酝掩。
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160"
- "4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150"
- "4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171"
要從與之前創(chuàng)建的docker-compose.yml
相同的目錄運(yùn)行以下命令。
docker-compose up -d
將創(chuàng)建一個(gè)專(zhuān)用網(wǎng)絡(luò)眷柔,并使用該專(zhuān)用網(wǎng)絡(luò)啟動(dòng)三個(gè)容器期虾。 在本地主機(jī)上原朝,每個(gè)容器將有一個(gè)隨機(jī)端口映射到docker-compose.yml
中公開(kāi)的端口。
查看正在運(yùn)行的容器狀態(tài)和映射的端口镶苞。
docker-compose ps
查看正在運(yùn)行的容器中的日志喳坠。
docker-compose logs
假設(shè)nsqlookupd將主機(jī)端口31001映射到容器端口4161,則可以使用curl執(zhí)行簡(jiǎn)單的ping茂蚓。
curl http://127.0.0.1:31001/ping
Go實(shí)例
消費(fèi)者
package main
import (
"time"
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
err := initConsumer("test", "count", "127.0.0.1:4161")
if err != nil {
log.Fatal("init Consumer error")
}
err = initConsumer("test","count2","127.0.0.1:4161")
if err != nil {
log.Fatal("init Consumer error")
}
select {
}
}
type nsqHandler struct {
nsqConsumer *nsq.Consumer
messagesReceived int
}
//處理消息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{
nh.messagesReceived++
fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))
fmt.Println()
return nil
}
func initConsumer(topic, channel, addr string) error {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = 3*time.Second
c,err := nsq.NewConsumer(topic,channel,cfg)
if err != nil {
log.Println("init Consumer NewConsumer error:",err)
return err
}
handler := &nsqHandler{nsqConsumer:c}
c.AddHandler(handler)
err = c.ConnectToNSQLookupd(addr)
if err != nil {
log.Println("init Consumer ConnectToNSQLookupd error:",err)
return err
}
return nil
}
生產(chǎn)者實(shí)例
package main
import (
"fmt"
"log"
"bufio"
"os"
"github.com/nsqio/go-nsq"
)
func main() {
strIP1 := "127.0.0.1:4150"
strIP2 := "127.0.0.1:4152"
producer1,err := initProducer(strIP1)
if err != nil {
log.Fatal("init producer1 error:",err)
}
producer2,err := initProducer(strIP2)
if err != nil {
log.Fatal("init producer2 error:",err)
}
defer producer1.Stop()
defer producer2.Stop()
//讀取控制臺(tái)輸入
reader := bufio.NewReader(os.Stdin)
count := 0
for {
fmt.Print("please say:")
data, _, _ := reader.ReadLine()
command := string(data)
if command == "stop" {
fmt.Println("stop producer!")
return
}
if count % 2 == 0 {
err := producer1.public("test1",command)
if err != nil {
log.Fatal("producer1 public error:",err)
}
}else {
err := producer2.public("test2",command)
if err != nil {
log.Fatal("producer2 public error:",err)
}
}
count++
}
}
type nsqProducer struct {
*nsq.Producer
}
//初始化生產(chǎn)者
func initProducer(addr string) (*nsqProducer, error) {
fmt.Println("init producer address:",addr)
producer,err := nsq.NewProducer(addr,nsq.NewConfig())
if err != nil {
return nil,err
}
return &nsqProducer{producer},nil
}
//發(fā)布消息
func (np *nsqProducer)public(topic,message string) error {
err := np.Publish(topic,[]byte(message))
if err != nil {
log.Println("nsq public error:",err)
return err
}
return nil
}
消費(fèi)者消費(fèi)信息的時(shí)候