消息中間件Consumer通用化封裝思路

上一篇講到Producer的封裝思路称勋,比較的是AMQ和Kafka兩個(gè)比較流行的中間件耀鸦,這篇就不多啰嗦了捶枢,繼續(xù)沿著上一篇的思路稀拐,將AMQ和Kafka嘗試封裝成對(duì)客戶更簡(jiǎn)單更易用的接口火邓。

拐一句,我有想過(guò)為什么不直接對(duì)外暴露官方的接口钩蚊,也就是做通用化封裝的意義何在贡翘?在一個(gè)一兩個(gè)人的小項(xiàng)目里蹈矮,如果MQ只是用于系統(tǒng)間解耦砰逻,當(dāng)然沒(méi)必要封裝,直接拿來(lái)用即可泛鸟。但是如果一個(gè)公司有成千上百個(gè)項(xiàng)目蝠咆,大量項(xiàng)目都希望使用消息中間件,這時(shí)候最合適的方法就是對(duì)這些項(xiàng)目暴露一個(gè)簡(jiǎn)單易用的消息中間件的接口北滥,由專門的團(tuán)隊(duì)提供消息平臺(tái)的服務(wù)刚操,讓業(yè)務(wù)系統(tǒng)只需要簡(jiǎn)單的培訓(xùn)就可以使用消息服務(wù),簡(jiǎn)化公司內(nèi)重復(fù)的勞動(dòng)再芋。

說(shuō)回正題菊霜,仍然從AMQ和Kafka的官方接口開始,consumer考慮統(tǒng)一只使用拉燃檬辍(pull)的方式鉴逞。

AMQ的接收一條消息的流程為:

  1. 建立連接工廠 ConnectionFactory
ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",url);
  1. 通過(guò)連接工廠建立連接并啟動(dòng) Connection
Connection conn = cf.createConnection();
conn.start
  1. 通過(guò)連接建立會(huì)話 Session
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
  1. 通過(guò)會(huì)話建立目的地 Destination
Destination dest = session.createQueue("test");
  1. 在會(huì)話中指定目的地建立消費(fèi)者Consumer
MessageConsumer consumer = session.createConsumer(dest)
  1. 使用Consumer收取一條消息
Message msg = consumer.receive(1000);

與生產(chǎn)者主要的區(qū)別就是從第五步以后的步驟,建立連接等方式都是一樣的司训。一般來(lái)說(shuō)构捡,使用pull方式需要在外面套上一層while(true)的循環(huán),連接保持長(zhǎng)連接的方式壳猜,不停的從服務(wù)器端拉取消息勾徽。

Kafka的消息消費(fèi)與一般消息中間件有所不同。

Kafka接受消息的流程為:
注:使用kafka 0.9版本后提供的新的consumer API

  1. 實(shí)例化一個(gè)Properties類
     Properties props = new Properties();
  1. 往Properties中填入bootstrap服務(wù)器统扳,groupID等屬性
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  1. 將Properties作為入?yún)?shí)例化一個(gè)KafkaConsumer
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 設(shè)置consumer訂閱的topic
     consumer.subscribe(Arrays.asList("foo", "bar"));
  1. 接收一條消息
     ConsumerRecords<String, String> records = consumer.poll(100);

簡(jiǎn)直感動(dòng)喘帚,當(dāng)初我以為要使用high level的consumer來(lái)設(shè)計(jì)了,要從流中獲取消息啊咒钟,都不知道怎么封裝了吹由。沒(méi)想到0.9版本以后簡(jiǎn)化了更易用的consumer版本,業(yè)界良心盯腌!

通用化封裝

閑話少說(shuō)溉知,沿襲上一篇的思路,直接開始:

  1. 實(shí)例化一個(gè)MQClient對(duì)象
MQClient mc = new MQClient("mqclient.properties");

對(duì)應(yīng)AMQ的1,2兩步级乍,Kafka的1舌劳,2兩步
在實(shí)例化時(shí),從mqclient.properties文件中讀取出所有的配置玫荣,并建立連接甚淡。

  1. 建立一個(gè)消費(fèi)者
MQConsumer consumer = client.getConsumer(destination);

對(duì)應(yīng)AMQ的3,4,5步,Kafka的第3,4步
建立消費(fèi)者時(shí)捅厂,需要指定目的地贯卦,因?yàn)楹苊黠@消費(fèi)者和目的地應(yīng)該是對(duì)應(yīng)好的。destination應(yīng)該支持組合的方式焙贷,不止是ActiveMQ支持組合目的地撵割,kafka也同樣支持。

  1. 獲取一條消息
MQMessage(s) msg = consumer.receive(time);

對(duì)應(yīng)AMQ的第6步辙芍,Kafka的第5步
AMQ和Kafka的API都支持傳入時(shí)間啡彬,表示持續(xù)消費(fèi)的意思。外面包上一層while()循環(huán)即可故硅。獲取到的MQMessage是一個(gè)我們自己寫的類庶灿,在這個(gè)類中,如果是AMQ吃衅,需要將消息的類型(TextMessage往踢,ObjectMessage)等標(biāo)注清楚,如果是Kafka的類徘层,需要注意的是獲取到的是一個(gè)ConsumerRecords的類峻呕,里面可能包含多條消息。

我們可以在這里選擇三種處理方式:

  • 將AMQ也改為獲取到一組消息惑灵,MQMessages類中包含多個(gè)MQMessage山上。
  • 限制Kafka每次只能獲取一條消息,繼續(xù)使用MQMessage類英支。
  • MQMessage類中佩憾,為Kafka單獨(dú)處理,對(duì)于AMQ來(lái)說(shuō)干花,MQMessage獲取到的是一條消息妄帘,Kafka獲取到的是一組消息。

由于Kakfa的新API我還沒(méi)用過(guò)池凄,等到實(shí)際使用后我再來(lái)選擇具體方案抡驼。

結(jié)合上一篇Producer的封裝思路,消息中間件的面向客戶的API封裝思路基本理順了肿仑,生下來(lái)的就是各種細(xì)節(jié)致盟,需要仔細(xì)思考的有:

  1. 線程池碎税,連接池的使用
  2. 快速安裝broker
  3. 用戶的權(quán)限管理,topic和queue的安全訪問(wèn)控制
  4. 定制化需求的實(shí)現(xiàn)
  5. ……

原來(lái)還有一堆事等著我啊馏锡。雷蹂。。杯道。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末匪煌,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子党巾,更是在濱河造成了極大的恐慌萎庭,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件齿拂,死亡現(xiàn)場(chǎng)離奇詭異驳规,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)创肥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門达舒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)值朋,“玉大人叹侄,你說(shuō)我怎么就攤上這事∽虻牵” “怎么了趾代?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)丰辣。 經(jīng)常有香客問(wèn)我撒强,道長(zhǎng),這世上最難降的妖魔是什么笙什? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任飘哨,我火速辦了婚禮,結(jié)果婚禮上琐凭,老公的妹妹穿的比我還像新娘芽隆。我一直安慰自己,他們只是感情好统屈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布胚吁。 她就那樣靜靜地躺著,像睡著了一般愁憔。 火紅的嫁衣襯著肌膚如雪腕扶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天吨掌,我揣著相機(jī)與錄音半抱,去河邊找鬼脓恕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛窿侈,可吹牛的內(nèi)容都是我干的进肯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼棉磨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼江掩!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起乘瓤,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤环形,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后衙傀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抬吟,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年统抬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了火本。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡聪建,死狀恐怖钙畔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情金麸,我是刑警寧澤擎析,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站挥下,受9級(jí)特大地震影響揍魂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棚瘟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一现斋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧偎蘸,春花似錦庄蹋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至振乏,卻和暖如春蔗包,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背慧邮。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工调限, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舟陆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓耻矮,卻偏偏與公主長(zhǎng)得像秦躯,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子裆装,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 消息中間件就我目前接觸過(guò)的主要有ActiveMQ踱承,Kafka,RabbitMQ哨免,IBM MQ茎活,RocketMQ。目...
    MisterCH閱讀 2,053評(píng)論 1 4
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理琢唾,服務(wù)發(fā)現(xiàn)载荔,斷路器,智...
    卡卡羅2017閱讀 134,651評(píng)論 18 139
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語(yǔ)閱讀 10,827評(píng)論 4 54
  • 一采桃、基本概念 介紹 Kafka是一個(gè)分布式的懒熙、可分區(qū)的、可復(fù)制的消息系統(tǒng)普办。它提供了普通消息系統(tǒng)的功能工扎,但具有自己獨(dú)...
    ITsupuerlady閱讀 1,629評(píng)論 0 9
  • kafka的定義:是一個(gè)分布式消息系統(tǒng),由LinkedIn使用Scala編寫泌豆,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,317評(píng)論 1 15