1 定義
- 一個(gè)用于構(gòu)建消息驅(qū)動(dòng)的微服務(wù)的框架
用人話說就是 : 致力于簡化MQ通信的框架
2 編程模型
◆ Destination Binder (目標(biāo)綁定器)
- 與消息中間件通信的組件
◆ Destination Bindings (目標(biāo)綁定)
Binding是連接應(yīng)用程序跟消息中間件的橋梁,用于消息的消費(fèi)和生產(chǎn)属百,由binder創(chuàng)建
◆ Message(消息)
可見該編程模型異常強(qiáng)大,短短幾行代碼,就實(shí)現(xiàn)了消息的對接和處理
input/output就是微服務(wù)接收和發(fā)出消息
下面開始對內(nèi)容中心編碼
3 編寫生產(chǎn)者
-
添加依賴
-
在啟動(dòng)類添加注解
-
寫配置
4 編寫消費(fèi)者
編碼用戶中心
-
添加依賴
-
啟動(dòng)類上添加注解
-
寫配置
5 自定義接口
5.1 發(fā)送消息
-
新建mysource接口
-
啟動(dòng)類注解
-
寫配置,注意要和接口中的名字一致
-
測試代碼
注意,由于mybatis會(huì)掃描啟動(dòng)類注解上scan注解所限制路徑下的所有接口,所以一旦有接口未被xml mapper,即拋異常,所以編碼時(shí)必須將掃描注解范圍限定死在mapper包下!
5.2 消費(fèi)消息
用戶中心編碼
-
寫接口
-
添注解
-
加配置
透過現(xiàn)象看本質(zhì)
當(dāng)我們定義好Source/Sink接口后,在啟動(dòng)類使用EnableBinding指定了接口后,就會(huì)使用IOC創(chuàng)建對應(yīng)名字的代理類,所以配置文件中也必須同名
消息過濾
監(jiān)控
記得多看端點(diǎn)哦!
output/input其實(shí)就是一個(gè)channel
排錯(cuò)依據(jù)的重要端點(diǎn)
- /actuator/bindings
- /actuator/channels
- /actuator/health
異常處理
整合RocketMQ實(shí)現(xiàn)分布式事務(wù)
Stream本身并未考慮分布式事務(wù)問題,都是RocketMQ的能力
重構(gòu)生產(chǎn)者
對內(nèi)容中心一頓操作:刪除不必要代碼
-
自定義的MySource接口,因?yàn)镾pring內(nèi)置的就已經(jīng)滿足我們的需求了
- 接著別忘了刪除啟動(dòng)類中對他的引用
- 刪除TestController中對應(yīng)測試代碼
-
清理yml中Spring消息編程模型整合RocketMQ的部分
-
myoutput刪除
-
修正如下
代碼重構(gòu)
改造ShareService
-
即改造以下代碼(直接刪除)
-
添加Source
-
開始使用source發(fā)送消息,但是send只能直接發(fā)送消息(或者帶有超時(shí))
而我們之前使用rocketmqtemplate傳遞參數(shù)時(shí)可以帶個(gè)arg
那現(xiàn)在我們該怎么傳arg呢???
記得前面埋下的伏筆,header也是很有用處的!
我們可以將要傳的參數(shù)放入header中,如下:
rocketmqtemplate功成身退,我們可以使用stream編程模型完全替代了
改造AddBonusTransactionListener
-
現(xiàn)在這里的arg是null了
-
需要從header中獲取arg了(有坑,后面再說),在這里打個(gè)斷點(diǎn)
-
完善配置(IDEA無法識(shí)別,但確實(shí)會(huì)生效),實(shí)現(xiàn)事務(wù)功能
-
注意上面的group名稱要與下一致
- 啟動(dòng)內(nèi)容中心
-
發(fā)送請求
-
發(fā)現(xiàn)dto其實(shí)是字符串,并不是DTO對象
-
所以繼續(xù)修正代碼
-
這樣就是正常的對象了
因?yàn)閺膆eader中獲取的都是字符串哦!切記!
重構(gòu)消費(fèi)者
對用戶中心刪除不必要代碼,與內(nèi)容中心類似,不再詳述
-
刪除
- 將
MyTestStreamConsumer
改為AddBonusStreamConsumer