命令總線是將命令分發(fā)給各自的命令Handler的機制。每個命令總是被精確發(fā)送到一個命令Handler台囱。如果沒有可用的命令Handler為已分發(fā)的命令淡溯,將會拋出NoHandlerForCommandException異常。對同一命令類型訂閱多個命令Handler將導(dǎo)致訂閱互相取代簿训。在這種情況下咱娶,最后一個訂閱獲勝。
分發(fā)命令
CommandBus提供了兩個方法去分發(fā)命令到它們各自的Handler:dispatch(commandMessage,callback)和dispatch(commandMessage)强品。第一個參數(shù)是一個包含要發(fā)送的實際命令的消息膘侮。第二個可選的參數(shù)接收一個回調(diào),允許在命令處理完成時通知分發(fā)組件的榛。這個回調(diào)有兩個方法:onSuccess()和onFailure()琼了,分別會在命令處理返回后被調(diào)用,或者當(dāng)它拋出一個異常時調(diào)用夫晌。
調(diào)用組件可能不采取在分發(fā)命令的同一線程中調(diào)用回調(diào)雕薪。如果調(diào)用線程在繼續(xù)之前依賴于結(jié)果,你可以使用FutureCallback晓淀。這是一個Future(在java.concurrent包中定義)和Axon的CommandCallback的組合所袁。或者凶掰,考慮使用命令網(wǎng)關(guān)燥爷。
如果一個應(yīng)用程序不直接對命令的結(jié)果感興趣,可以使用dispatch(commandMessage) 方法锄俄。
SimpleCommandBus
SimpleCommandBus,顧名思義勺拣,最簡單的實現(xiàn)奶赠。它在分發(fā)它們的線程中簡單的處理命令。命令處理后药有,修改后的聚合在同一線程被保存和發(fā)布生成的事件毅戈。在大多數(shù)情況下苹丸,如web應(yīng)用程序,該實現(xiàn)將符合你的需求苇经。配置API中SimpleCommandBus是默認(rèn)使用的實現(xiàn)赘理。
像大多數(shù)CommandBus實現(xiàn)一樣,SimpleCommandBus允許攔截器進(jìn)行配置扇单。在命令總線上分發(fā)一個命令后調(diào)用CommandDispatchInterceptors商模。在實際的命令handler 方法之前調(diào)用CommandHandlerInterceptors蜘澜,允許你修改或阻塞命令。有關(guān)更多信息鄙信,請參考命令處理器攔截器。
因為所有命令處理都在同一線程中完成装诡,這個實現(xiàn)僅限于JVM的邊界。這個實現(xiàn)的性能是很好的鸦采,但不超凡∈衿幔跨JVM邊界咱旱,或使你的CPU cycles發(fā)揮最大的功效,看看其他CommandBus實現(xiàn)吐限。
AsynchronousCommandBus
顧名思義,AsynchronousCommandBus實現(xiàn)從分發(fā)它們的線程異步執(zhí)行命令诸典。它使用一個Executor在不同的線程來執(zhí)行實際的處理邏輯。
默認(rèn)情況下狐粱,AsynchronousCommandBus使用一個unbounded緩存的線程池。這意味著分發(fā)一個命令時會創(chuàng)建線程肌蜻。完成處理命令的線程將被重新用于新命令。如果60秒線程沒有處理命令蒋搜,則會停止線程判莉。
或者带迟,Executor實例可以提供不同的線程策略配置项郊。
注意锰镀,應(yīng)用程序停止時應(yīng)該關(guān)閉AsynchronousCommandBus宿刮,以確保任何等待線程正確關(guān)閉。關(guān)閉胡桃,調(diào)用shutdown()方法磕潮。這也將關(guān)閉任何Executor實例,如果它實現(xiàn)ExecutorService接口的話自脯。
DisruptorCommandBus
SimpleCommandBus具有合理的性能特性之景,特別是當(dāng)你經(jīng)歷了性能調(diào)優(yōu)技巧膏潮。事實上,SimpleCommandBus需要鎖來防止多個線程并發(fā)訪問同一聚合焕参,導(dǎo)致處理開銷和鎖爭用。
DisruptorCommandBus采用不同的方法進(jìn)行多線程處理叠纷。不是多個線程每個都執(zhí)行同樣的處理,而是有多個線程涩嚣,每個負(fù)責(zé)一件處理。DisruptorCommandBus使用Disruptor航厚,一個小的并發(fā)編程框架,通過不同的方法對多線程進(jìn)行處理來實現(xiàn)更好的性能幔睬。任務(wù)不是在調(diào)用線程中進(jìn)行處理,而是將任務(wù)移交給兩組線程進(jìn)行處理溪窒,每組線程負(fù)責(zé)一部分處理。第一組的線程將執(zhí)行命令handler澈蚌,更改一個聚合的狀態(tài)。第二組將存儲并將事件發(fā)布到事件存儲浮禾。
雖然DisruptorCommandBus輕易優(yōu)于SimpleCommandBus 4倍(!),但有一些限制:
- DisruptorCommandBus僅支持事件溯源聚合盈电。這個命令總線充當(dāng)由Disruptor處理聚合的存儲庫。獲取一個存儲庫的引用匆帚,使用createRepository(AggregateFactory)。
- 一個命令只能導(dǎo)致一個聚合實例狀態(tài)變化吸重。
- 當(dāng)使用緩存時,它只允許給定的標(biāo)識符為單個聚合嚎幸。這意味著它是不可能有兩個具有相同的標(biāo)識符的不同類型的聚合。
命令一般不會引發(fā)需要回滾工作單元的故障嫉晶。當(dāng)發(fā)生回滾時,DisruptorCommandBus不能保證命令按照它們被分發(fā)的順序進(jìn)行處理田篇。此外,它需要重試其他命令斯辰,從而造成不必要的計算。 - 在創(chuàng)建一個新的聚合實例時彬呻,命令更新所創(chuàng)建實例可能并不完全按照所提供的順序進(jìn)行。一旦創(chuàng)建了聚合闸氮,所有命令將按照它們被分發(fā)順序執(zhí)行。為了確保順序蒲跨,在創(chuàng)建命令上使用回調(diào)去等待正在創(chuàng)建的聚合。它不應(yīng)該耗時超過幾毫秒或悲。
構(gòu)建一個DisruptorCommandBus實例堪唐,你需要一個EventStore。該組件在Repositories and Event Stores中有解釋淮菠。
或者,你可以提供一個DisruptorConfiguration實例合陵,它允許你調(diào)整配置優(yōu)化你的特定環(huán)境下的性能:
- Buffer size:在ringBuffer上注冊傳入命令的槽數(shù)。更高的值可能會增加吞吐量,但也導(dǎo)致更高的延遲拥知。必須是2的次方數(shù)碎赢,默認(rèn)為4096。
- ProducerType: 表示條目是由單線程或多線程生成的肮塞。默認(rèn)為多線程户侥。
- WaitStrategy:當(dāng)處理器線程(三個線程負(fù)責(zé)的實際處理)需要等待對方時使用的策略蕊唐。最好的WaitStrategy取決于機器上可用的處理器數(shù)量,和正在運行的其他進(jìn)程的數(shù)量替梨。如果低延遲是至關(guān)重要的,DisruptorCommandBus可以自己認(rèn)領(lǐng)內(nèi)核副瀑,你可以使用BusySpinWaitStrategy。為了使命令總線索取更少的CPU并且允許其他線程處理糠睡,使用YieldingWaitStrategy疚颊。最后,你可以使用SleepingWaitStrategy和BlockingWaitStrategy允許其他進(jìn)程共享CPU均抽。如果命令總線不需要進(jìn)行專職處理,則后者是合適的油挥。默認(rèn)為BlockingWaitStrategy。
- Executor:設(shè)置Executor為DisruptorCommandBus提供線程攘乒。這個Executor必須能夠提供至少4個線程惋鹅。其中的3個線程负饲,由DisruptorCommandBus的處理組件認(rèn)領(lǐng)返十。額外的線程用于調(diào)用回調(diào)函數(shù)椭微,并計劃重試以防檢測到錯誤的聚合狀態(tài)蝇率。默認(rèn)是CachedThreadPool提供線程從一個稱為“DisruptorCommandBus”的線程組中排拷。
- TransactionManager:定義了事務(wù)管理器监氢,應(yīng)該確保存儲和事件發(fā)布以事務(wù)的方式執(zhí)行浪腐。
- InvokerInterceptors:定義了在調(diào)用處理中使用的CommandHandlerInterceptors议街。這個處理調(diào)用實際的命令處理器方法璧榄。
- PublisherInterceptors:定義了在發(fā)布處理中使用的CommandHandlerInterceptors骨杂。這個發(fā)布處理存儲和發(fā)布生成的事件腊脱。
- RollbackConfiguration:定義工作單元應(yīng)該回滾的異常。默認(rèn)配置為回滾未經(jīng)檢查的異常悍抑。
- RescheduleCommandsOnCorruptState:指示已經(jīng)執(zhí)行過命令但損壞的聚合(如:因為一個工作單元是回滾)是否應(yīng)該重新計劃拂盯。如果為假记靡,回調(diào)的onFailure()方法將被調(diào)用摸吠。如果為的(默認(rèn)),命令將被重新計劃寸痢。
- CoolingDownPeriod:設(shè)置等待的秒數(shù),以確保所有命令被處理道逗。在冷卻期間滓窍,不接受新命令巩那,但是現(xiàn)有的命令仍然處理锦亦,并在必要時重新計劃令境。冷卻期間確保線程可供重新安排命令和調(diào)用回調(diào)之用舔庶。默認(rèn)為1000(1秒)。
緩存:設(shè)置緩存存儲從Event Store中恢復(fù)的聚合實例瞧甩。緩存用disruptor存儲不活躍的聚合實例肚逸。 - InvokerThreadCount:給命令處理器的調(diào)用分配線程的數(shù)量。一個好的起始點是機器內(nèi)核數(shù)量的一半膝晾。
- PublisherThreadCount:用于發(fā)布事件的線程數(shù)量务冕。一個好的起始點是一半的內(nèi)核數(shù)量,如果IO上花費大量的時間臊旭,可以增加。
- SerializerThreadCount:使用pre-serialize事件的線程數(shù)量箩退。默認(rèn)為1,但如果沒有配置序列化器將被忽略乏德。
- Serializer:用于執(zhí)行pre-serialization的序列化器喊括。當(dāng)配置序列化器時,DisruptorCommandBus將包裝所有生成的事件在一個SerializationAware消息上郑什。附加有效負(fù)載和元數(shù)據(jù)的序列化形式蒲肋,在發(fā)布到事件存儲之前。