微服務(wù)實戰(zhàn)(二):落地微服務(wù)架構(gòu)到直銷系統(tǒng)(構(gòu)建消息總線框架接口)
從上一篇文章大家可以看出,實現(xiàn)一個自己的消息總線框架是非常重要的內(nèi)容住拭,消息總線可以將界限上下文之間進行解耦坦冠,也可以為大并發(fā)訪問提供必要的支持。
消息總線的作用:
1.界限上下文解耦:在DDD第一波文章中,當更新了訂單信息后耍休,我們通過調(diào)用經(jīng)銷商界限上下文的領(lǐng)域模型和倉儲辱匿,進行了經(jīng)銷商信息的更新键痛,這造成了耦合。通過一個消息總線匾七,可以在訂單界限上下文的WebApi服務(wù)(來源微服務(wù)-生產(chǎn)者)更新了訂單信息后絮短,發(fā)布一個事件消息到消息總線的某個隊列中,經(jīng)銷商界限上下文的WebApi服務(wù)(消費者)訂閱這個事件消息昨忆,然后交給自己的Handler進行消息處理丁频,更新自己的經(jīng)銷商信息。這樣就實現(xiàn)了訂單界限上下文與經(jīng)銷商界限上下文解耦邑贴。
2.大并發(fā)支持:可以通過消息總線進一步提升下單的性能席里。我們可以將用戶下單的操作直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令后拢驾,直接丟給一個消息總線的隊列奖磁,然后立即給前端返回下單結(jié)果掠哥。這樣用戶就不用等待后續(xù)的復雜訂單業(yè)務(wù)邏輯奖唯,加快速度。后續(xù)訂單的一系列處理交給消息的Handler進行后續(xù)的處理與消息的進一步投遞趁舀。
消息總線設(shè)計重點:
1.定義消息(事件)的接口:所有需要投遞與處理的消息嵌洼,都從這個消息接口繼承案疲,因為需要約束消息中必須包含的內(nèi)容,比如消息的ID麻养、消息產(chǎn)生的時間等褐啡。
publicinterface IEvent
? ? {
? ? ? ? Guid Id { get;set; }
? ? ? ? DateTime CreateDate { get;set; }
? ? }
2.定義消息(事件)處理器接口:當消息投遞到消息總線隊列中后,一定有消費者WebApi接收并處理這個消息鳖昌,具體的處理方法邏輯在訂閱方處理器中實現(xiàn)备畦,這里先需要定義處理器的接口低飒,便于在消息總線框架中使用。
publicinterface IEventHandler
? ? {
? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;
? ? }
從上面代碼可以看出懂盐,消息(事件)處理器處理的類型就是從IEvent接口繼承的消息類褥赊。
3.定義消息(事件)與消息(事件)處理器關(guān)聯(lián)接口:一種類型的消息被投遞后,一定要在訂閱方找到這種消息的處理器進行處理莉恼,所以一定要定義二者的關(guān)聯(lián)接口拌喉,這樣才能將消息與消息處理器對應(yīng)起來,才能實現(xiàn)消息被訂閱后的處理俐银。
publicinterface IEventHandlerExecutionContext
? ? {
? ? ? ? voidRegisterEventHandler()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? ? ? boolIsRegisterEventHandler()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? ? ? Task HandleAsync(TEvent @event)where TEvent : IEvent;
? ? }
RegisterEventHandler方法就是建立消息與消息處理器的關(guān)聯(lián)尿背,這個方法其實是在訂閱方使用,訂閱方告訴消息總線捶惜,什么樣的消息應(yīng)該交給我的哪個處理器進行處理田藐。
IsRegisterEventHandler方法是判斷消息與處理器之間是否已經(jīng)存在關(guān)聯(lián)。
HandleAsync方法是通過查找到消息對應(yīng)的處理器后吱七,然后調(diào)用處理器自己的Handle方法進行消息的處理.
4.定義消息發(fā)布汽久、訂閱與消息總線接口:消息總線至少要支持兩個功能,一個是生產(chǎn)者能夠發(fā)布消息到我的消息總線陪捷,另一個是訂閱方需要能夠從我這個消息總線訂閱消息回窘。
publicinterface IEventPublisher
? ? {
? ? ? ? voidPublish(TEvent @event)where TEvent : IEvent;
? ? }
從上面代碼可以看出诺擅,生產(chǎn)者發(fā)布的消息仍然要從IEvent繼承的類型市袖。
publicinterface IEventSubscriber
? ? {
? ? ? ? voidSubscribe()where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? }
上面代碼是訂閱方用于從消息總線訂閱消息,從代碼中可以看出烁涌,它的最終的實現(xiàn)其實就是建立消息與處理器之間的關(guān)聯(lián)苍碟。
publicinterface IEventBus:IEventPublisher,IEventSubscriber
? ? {
? ? }
消息(事件)總線從兩個接口繼承下來,同時支持消息的發(fā)布與消息的訂閱撮执。
5.實現(xiàn)事件基類:上面已經(jīng)訂閱了消息(事件)的接口微峰,這里來實現(xiàn)事件的基類,其實就是實現(xiàn)消息ID與產(chǎn)生的時間:
publicclass BaseEvent : IEvent
? ? {
? ? ? ? publicGuid Id {get;set; }
? ? ? ? publicDateTime CreateDate {get;set; }
? ? ? ? public BaseEvent()
? ? ? ? {
? ? ? ? ? ? this.Id = Guid.NewGuid();
? ? ? ? ? ? this.CreateDate = DateTime.Now;
? ? ? ? }
? ? }
6.實現(xiàn)消息總線基類:消息總線底層的依賴可以是各種消息代理產(chǎn)品抒钱,比如RabbitMq蜓肆、Kafaka或第三方云平臺提供的消息代理產(chǎn)品,通常我們要封裝這些消息代理產(chǎn)品谋币。在封裝之前仗扬,我們需要定義頂層的消息總線基類實現(xiàn),主要的目的是未來依賴于它的具體實現(xiàn)可替換蕾额,另外也將消息與消息處理器的關(guān)聯(lián)接口傳遞進來早芭,便于訂閱方使用。
publicabstractclass BaseEventBus : IEventBus
? ? {
? ? ? ? protectedreadonly IEventHandlerExecutionContext eventHandlerExecutionContext;
? ? ? ? protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
? ? ? ? {
? ? ? ? ? ? this.eventHandlerExecutionContext = eventHandlerExecutionContext;
? ? ? ? }
? ? ? ? publicabstractvoidPublish(TEvent @event)
? ? ? ? ? ? where TEvent : IEvent;
? ? ? ? publicabstractvoidSubscribe()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler;
? ? }
7.實現(xiàn)消息與處理器關(guān)聯(lián):消息必須與處理器關(guān)聯(lián)诅蝶,訂閱方收到特定類型的消息后退个,才知道交給哪個處理器處理募壕。
publicclass EventHandlerExecutionContext : IEventHandlerExecutionContext
? ? {
? ? ? ? privatereadonly IServiceCollection registry;
? ? ? ? privatereadonly IServiceProvider serviceprovider;
? ? ? ? privateDictionary> registrations =newDictionary>();
? ? ? ? publicEventHandlerExecutionContext(IServiceCollection registry,Func
? ? ? ? ? ? IServiceProvider> serviceProviderFactory =null)
? ? ? ? {
? ? ? ? ? ? this.registry = registry;
? ? ? ? ? ? this.serviceprovider =this.registry.BuildServiceProvider();
? ? ? ? }
? ? ? //查找消息關(guān)聯(lián)的處理器,然后調(diào)用處理器的處理方法publicasyncTask HandleAsync(TEvent @event)where TEvent : IEvent
? ? ? ? {
? ? ? ? ? ? vareventtype = @event.GetType();
? ? ? ? ? ? if(registrations.TryGetValue(eventtype,outList handlertypes) && handlertypes.Count >0)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? using(varchildscope =this.serviceprovider.CreateScope())
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? foreach(varhandlertypein handlertypes)
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? varhandler = Activator.CreateInstance(handlertype)as IEventHandler;
? ? ? ? ? ? ? ? ? ? ? ? await handler.HandleAsync(@event);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? //判斷消息與處理器之間是否有關(guān)聯(lián)publicboolIsRegisterEventHandler()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler
? ? ? ? {
? ? ? ? ? ? if(registrations.TryGetValue(typeof(TEvent),outList handlertypelist))
? ? ? ? ? ? {
? ? ? ? ? ? ? ? returnhandlertypelist !=null&& handlertypelist.Contains(typeof(IEventHandler));
? ? ? ? ? ? }
? ? ? ? ? ? returnfalse;
? ? ? ? }
? ? ? //將消息與處理器關(guān)聯(lián)起來语盈,可以在內(nèi)存中建立關(guān)聯(lián)舱馅,也可以建立在數(shù)據(jù)庫單獨表中publicvoidRegisterEventHandler()
? ? ? ? ? ? where TEvent : IEvent
? ? ? ? ? ? where TEventHandler : IEventHandler
? ? ? ? {
? ? ? ? ? ? Utils.DictionaryRegister(typeof(TEvent),typeof(TEventHandler), registrations);
? ? ? ? }
? ? }
上面我們基本上就將消息總線的架子搭建起來了,也實現(xiàn)了基本的功能刀荒,下一章我們基于它來實現(xiàn)RabbitMq的消息總線习柠。
QQ討論群:309287205?
微服務(wù)實戰(zhàn)視頻請關(guān)注微信公眾號:MSSHCJ