上一篇講到Producer的封裝思路称勋,比較的是AMQ和Kafka兩個(gè)比較流行的中間件耀鸦,這篇就不多啰嗦了捶枢,繼續(xù)沿著上一篇的思路稀拐,將AMQ和Kafka嘗試封裝成對(duì)客戶更簡(jiǎn)單更易用的接口火邓。
拐一句,我有想過(guò)為什么不直接對(duì)外暴露官方的接口钩蚊,也就是做通用化封裝的意義何在贡翘?在一個(gè)一兩個(gè)人的小項(xiàng)目里蹈矮,如果MQ只是用于系統(tǒng)間解耦砰逻,當(dāng)然沒(méi)必要封裝,直接拿來(lái)用即可泛鸟。但是如果一個(gè)公司有成千上百個(gè)項(xiàng)目蝠咆,大量項(xiàng)目都希望使用消息中間件,這時(shí)候最合適的方法就是對(duì)這些項(xiàng)目暴露一個(gè)簡(jiǎn)單易用的消息中間件的接口北滥,由專門的團(tuán)隊(duì)提供消息平臺(tái)的服務(wù)刚操,讓業(yè)務(wù)系統(tǒng)只需要簡(jiǎn)單的培訓(xùn)就可以使用消息服務(wù),簡(jiǎn)化公司內(nèi)重復(fù)的勞動(dòng)再芋。
說(shuō)回正題菊霜,仍然從AMQ和Kafka的官方接口開始,consumer考慮統(tǒng)一只使用拉燃檬辍(pull)的方式鉴逞。
AMQ的接收一條消息的流程為:
- 建立連接工廠 ConnectionFactory
ConnectionFactory cf = new ActiveMQConnectionFactory("admin","admin",url);
- 通過(guò)連接工廠建立連接并啟動(dòng) Connection
Connection conn = cf.createConnection();
conn.start
- 通過(guò)連接建立會(huì)話 Session
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
- 通過(guò)會(huì)話建立目的地 Destination
Destination dest = session.createQueue("test");
- 在會(huì)話中指定目的地建立消費(fèi)者Consumer
MessageConsumer consumer = session.createConsumer(dest)
- 使用Consumer收取一條消息
Message msg = consumer.receive(1000);
與生產(chǎn)者主要的區(qū)別就是從第五步以后的步驟,建立連接等方式都是一樣的司训。一般來(lái)說(shuō)构捡,使用pull方式需要在外面套上一層while(true)的循環(huán),連接保持長(zhǎng)連接的方式壳猜,不停的從服務(wù)器端拉取消息勾徽。
Kafka的消息消費(fèi)與一般消息中間件有所不同。
Kafka接受消息的流程為:
注:使用kafka 0.9版本后提供的新的consumer API
- 實(shí)例化一個(gè)Properties類
Properties props = new Properties();
- 往Properties中填入bootstrap服務(wù)器统扳,groupID等屬性
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- 將Properties作為入?yún)?shí)例化一個(gè)KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- 設(shè)置consumer訂閱的topic
consumer.subscribe(Arrays.asList("foo", "bar"));
- 接收一條消息
ConsumerRecords<String, String> records = consumer.poll(100);
簡(jiǎn)直感動(dòng)喘帚,當(dāng)初我以為要使用high level的consumer來(lái)設(shè)計(jì)了,要從流中獲取消息啊咒钟,都不知道怎么封裝了吹由。沒(méi)想到0.9版本以后簡(jiǎn)化了更易用的consumer版本,業(yè)界良心盯腌!
通用化封裝
閑話少說(shuō)溉知,沿襲上一篇的思路,直接開始:
- 實(shí)例化一個(gè)MQClient對(duì)象
MQClient mc = new MQClient("mqclient.properties");
對(duì)應(yīng)AMQ的1,2兩步级乍,Kafka的1舌劳,2兩步
在實(shí)例化時(shí),從mqclient.properties文件中讀取出所有的配置玫荣,并建立連接甚淡。
- 建立一個(gè)消費(fèi)者
MQConsumer consumer = client.getConsumer(destination);
對(duì)應(yīng)AMQ的3,4,5步,Kafka的第3,4步
建立消費(fèi)者時(shí)捅厂,需要指定目的地贯卦,因?yàn)楹苊黠@消費(fèi)者和目的地應(yīng)該是對(duì)應(yīng)好的。destination應(yīng)該支持組合的方式焙贷,不止是ActiveMQ支持組合目的地撵割,kafka也同樣支持。
- 獲取一條消息
MQMessage(s) msg = consumer.receive(time);
對(duì)應(yīng)AMQ的第6步辙芍,Kafka的第5步
AMQ和Kafka的API都支持傳入時(shí)間啡彬,表示持續(xù)消費(fèi)的意思。外面包上一層while()循環(huán)即可故硅。獲取到的MQMessage是一個(gè)我們自己寫的類庶灿,在這個(gè)類中,如果是AMQ吃衅,需要將消息的類型(TextMessage往踢,ObjectMessage)等標(biāo)注清楚,如果是Kafka的類徘层,需要注意的是獲取到的是一個(gè)ConsumerRecords的類峻呕,里面可能包含多條消息。
我們可以在這里選擇三種處理方式:
- 將AMQ也改為獲取到一組消息惑灵,MQMessages類中包含多個(gè)MQMessage山上。
- 限制Kafka每次只能獲取一條消息,繼續(xù)使用MQMessage類英支。
- MQMessage類中佩憾,為Kafka單獨(dú)處理,對(duì)于AMQ來(lái)說(shuō)干花,MQMessage獲取到的是一條消息妄帘,Kafka獲取到的是一組消息。
由于Kakfa的新API我還沒(méi)用過(guò)池凄,等到實(shí)際使用后我再來(lái)選擇具體方案抡驼。
結(jié)合上一篇Producer的封裝思路,消息中間件的面向客戶的API封裝思路基本理順了肿仑,生下來(lái)的就是各種細(xì)節(jié)致盟,需要仔細(xì)思考的有:
- 線程池碎税,連接池的使用
- 快速安裝broker
- 用戶的權(quán)限管理,topic和queue的安全訪問(wèn)控制
- 定制化需求的實(shí)現(xiàn)
- ……
原來(lái)還有一堆事等著我啊馏锡。雷蹂。。杯道。