Command Dispatching
使用顯示的命令調(diào)度機(jī)制具有許多優(yōu)點(diǎn)。首先坟乾,有一個(gè)明確描述客戶(hù)端意圖的對(duì)象啃沪。通過(guò)記錄命令,您可以存儲(chǔ)意圖和相關(guān)數(shù)據(jù)以備將來(lái)參考堪嫂。命令處理也可以很容易地通過(guò)Web服務(wù)將命令處理組件開(kāi)放給遠(yuǎn)程客戶(hù)端偎箫。測(cè)試也變得容易多了,你可以定義命令的開(kāi)始情境皆串,命令執(zhí)行(當(dāng))和預(yù)期的結(jié)果(然后)列出一些事件和命令(見(jiàn)測(cè)試)來(lái)定義測(cè)試腳本淹办。最后一個(gè)主要優(yōu)點(diǎn)是在同步和異步以及本地和分布式命令處理之間切換是非常容易的。
這并不意味著使用顯式命令對(duì)象進(jìn)行命令分發(fā)是唯一的方法恶复。 Axon的目標(biāo)不是規(guī)定一種特定的工作方式怜森,而是支持你按照自己的方式來(lái)做速挑,同時(shí)提供最佳實(shí)踐作為默認(rèn)行為。仍然可以使用可以調(diào)用的服務(wù)層來(lái)執(zhí)行命令副硅。該方法只需要啟動(dòng)一個(gè)工作單元(請(qǐng)參閱工作單元)姥宝,并在方法結(jié)束時(shí)執(zhí)行提交或回滾。
下一節(jié)將概述與使用Axon框架設(shè)置Command調(diào)度基礎(chǔ)架構(gòu)有關(guān)的任務(wù)恐疲。
命令路由
命令網(wǎng)關(guān)是一個(gè)方便的命令調(diào)度機(jī)制的接口腊满。雖然不需要使用網(wǎng)關(guān)來(lái)發(fā)送命令,但通常這是最簡(jiǎn)單的選擇培己。
有兩種使用Command Gateway的方法碳蛋。首先是使用由Axon提供的CommandGateway接口和DefaultCommandGateway實(shí)現(xiàn)。命令網(wǎng)關(guān)提供了許多方法漱凝,允許您發(fā)送命令并同步等待結(jié)果疮蹦,還有超時(shí)或異步方式。
另一種選擇也許是最靈活的茸炒。使用CommandGatewayFactory可以把幾乎任何接口變成一個(gè)命令網(wǎng)關(guān)愕乎。這允許你使用強(qiáng)類(lèi)型定義你的應(yīng)用程序的接口,并聲明自己的(checked)業(yè)務(wù)異常壁公。Axon將在運(yùn)行時(shí)自動(dòng)為該接口生成一個(gè)實(shí)現(xiàn)感论。
配置網(wǎng)關(guān)路由
您的自定義網(wǎng)關(guān)和Axon提供的自定義網(wǎng)關(guān)都需要配置對(duì)Command Bus的訪問(wèn)。另外紊册,Command Gateway可以配置一個(gè)RetryScheduler比肄,CommandDispatchInterceptors和CommandCallbacks。
當(dāng)命令執(zhí)行失敗時(shí)囊陡,RetryScheduler能夠進(jìn)行重試芳绩。 IntervalRetryScheduler是他的一個(gè)實(shí)現(xiàn),它將以設(shè)定的時(shí)間間隔重試給定的命令撞反,直到成功或重試次數(shù)達(dá)到最大值妥色。當(dāng)一個(gè)命令由于明確的非暫時(shí)的異常而失敗時(shí),根本不會(huì)執(zhí)行重試遏片。請(qǐng)注意嘹害,重試scheduler 程序僅在由于RuntimeException而導(dǎo)致命令失敗時(shí)才會(huì)調(diào)用。如果該異常是“業(yè)務(wù)異乘北悖”笔呀,則不會(huì)觸發(fā)重試。 RetryScheduler的典型用法是在分布式Command Bus上分發(fā)命令時(shí)髓需。如果某個(gè)節(jié)點(diǎn)發(fā)生故障许师,則重試調(diào)度程序?qū)?dǎo)致將命令分派給下一個(gè)能夠處理該命令的節(jié)點(diǎn)(請(qǐng)參閱分發(fā)命令總線)。
CommandDispatchInterceptors允許在將CommandMessages分發(fā)給Command Bus之前進(jìn)行修改。與在CommandBus上配置的CommandDispatchInterceptors相比枯跑,只有在通過(guò)此網(wǎng)關(guān)發(fā)送消息時(shí)惨驶,才會(huì)調(diào)用這些攔截器。攔截器可用于將元數(shù)據(jù)附加到命令或進(jìn)行驗(yàn)證敛助,例如。
為每個(gè)發(fā)送的命令調(diào)用CommandCallbacks屋确。這允許通過(guò)該網(wǎng)關(guān)發(fā)送的所有命令的一些通用行為纳击,而不管它們的類(lèi)型如何。
自定義網(wǎng)關(guān)路由
Axon允許自定義接口用作命令網(wǎng)關(guān)攻臀。在接口中聲明的每個(gè)方法的行為都是基于參數(shù)類(lèi)型焕数,返回類(lèi)型和聲明的異常。使用這個(gè)網(wǎng)關(guān)不僅方便刨啸,而且允許你在需要的地方模擬你的接口堡赔,使得測(cè)試變得更容易。
這些參數(shù)在CommandGateway的作用:
1.第一個(gè)參數(shù)是期望要發(fā)送的實(shí)際命令對(duì)象设联。
2.用@MetaDataValue注解的參數(shù)將其值分配給元數(shù)據(jù)字段善已,其中標(biāo)識(shí)符作為注解參數(shù)傳遞
3.MetaData類(lèi)型的參數(shù)將與CommandMessage上的元數(shù)據(jù)合并。如果后者參數(shù)定義的元數(shù)據(jù)相同离例,則將覆蓋較早參數(shù)的元數(shù)據(jù)换团。
4.CommandCallback類(lèi)型的參數(shù)將在處理命令后調(diào)用onSuccess或onFailure。您可能會(huì)傳入多個(gè)回調(diào)宫蛆,并可能與返回值組合在一起艘包。在這種情況下,回調(diào)的調(diào)用將始終與返回值(或異常)相匹配耀盗。
5.最后兩個(gè)參數(shù)可以是long(或int)和TimeUnit類(lèi)型想虎。在這種情況下這個(gè)方法至多會(huì)被阻塞為這些參數(shù)聲明的時(shí)間。該方法對(duì)超時(shí)作出什么響應(yīng)取決于方法中聲明的異常(見(jiàn)下文)叛拷。請(qǐng)注意舌厨,如果方法的其他屬性完全阻止了阻塞,超時(shí)將不會(huì)發(fā)生胡诗。
方法的聲明返回值也會(huì)影響其行為:
- 返回類(lèi)型為void將導(dǎo)致方法立即返回邓线,除非在方法上還有其他的指示要等待,比如超時(shí)或聲明的異常煌恢。
- Future骇陈,CompletionStage和CompletableFuture的返回類(lèi)型將使該方法立即返回。您可以使用方法返回的CompletableFuture實(shí)例來(lái)訪問(wèn)命令處理程序的結(jié)果瑰抵。方法中聲明的異常和超時(shí)將會(huì)被忽略你雌。
- 任何其他返回類(lèi)型將導(dǎo)致方法阻塞,直到有真實(shí)的結(jié)果。結(jié)果將轉(zhuǎn)換為返回類(lèi)型(如果類(lèi)型不匹配婿崭,則會(huì)導(dǎo)致ClassCastException)拨拓。
異常對(duì)其的影響:
- 如果命令處理程序(或攔截器)拋出異常,則將拋出任何已聲明的已檢查異常氓栈。如果受檢異常尚未聲明渣磷,它被包裹在一個(gè)CommandExecutionException中,這是一個(gè)RuntimeException授瘦。
- 發(fā)生超時(shí)時(shí)醋界,默認(rèn)行為是從方法返回一個(gè)null值。這可以通過(guò)聲明一個(gè)TimeoutException來(lái)改變提完。如果聲明了這個(gè)異常形纺,則拋出一個(gè)TimeoutException。
- 當(dāng)一個(gè)線程在等待結(jié)果時(shí)被中斷徒欣,默認(rèn)的行為是返回null逐样。在這種情況下,被中斷的標(biāo)志在線程上被重新設(shè)置打肝。如果在方法上聲明一個(gè)InterruptedException脂新,該行為會(huì)被改為拋出異常。當(dāng)拋出異常時(shí)闯睹,中斷標(biāo)志被移除戏羽,與java規(guī)范一致。
- 其他運(yùn)行時(shí)異陈コ裕可以在方法上聲明始花,但除了對(duì)API用戶(hù)進(jìn)行說(shuō)明之外,不會(huì)有任何影響孩锡。
最后酷宵,有可能使用的注解:
1.在參數(shù)部分指定的,用@MetaDataValue注解的參數(shù)的值作為元數(shù)據(jù)值添加躬窜。元數(shù)據(jù)項(xiàng)的key作為參數(shù)提供給注解浇垦。
2用@Timeout注解的方法最多會(huì)阻塞到指定的時(shí)間。如果方法聲明超時(shí)參數(shù)荣挨,這個(gè)注解將被忽略男韧。
3.用@Timeout注解的類(lèi),將導(dǎo)致這個(gè)類(lèi)中聲明的所有方法默垄,最多阻塞到指定的時(shí)間此虑。除非他們用自己的@Timeout注解或指定超時(shí)參數(shù)。
public interface MyGateway {
// fire and forget
void sendCommand(MyPayloadType command);
// method that attaches meta data and will wait for a result for 10 seconds
@Timeout(value = 10, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command,
@MetaDataValue("userId") String userId);
// alternative that throws exceptions on timeout
@Timeout(value = 20, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command)
throws TimeoutException, InterruptedException;
// this method will also wait, caller decides how long
void sendCommandAndWait(MyPayloadType command, long timeout, TimeUnit unit)
throws TimeoutException, InterruptedException;
}
// To configure a gateway:
CommandGatewayFactory factory = new CommandGatewayFactory(commandBus);// note that the commandBus can be obtained from the Configuration
object returned on configurer.initialize()
.
MyGateway myGateway = factory.createGateway(MyGateway.class);
命令總線
命令總線是將命令分發(fā)到它們各自的命令處理程序口锭。每個(gè)命令總是發(fā)送到一個(gè)命令處理程序朦前。如果分發(fā)的命令沒(méi)有找到可以處理的命令處理程序,則引發(fā)NoHandlerForCommandException異常。如果多個(gè)command handler訂閱了同一類(lèi)型的command韭寸,那么他們會(huì)以先后順序來(lái)覆蓋上一個(gè)春哨,也就是最后一個(gè)訂閱的會(huì)生效。
分發(fā)命令
CommandBus提供了兩種方法來(lái)將命令分發(fā)到各自的處理程序:dispatch(commandMessage恩伺,callback)和dispatch(commandMessage)赴背。第一個(gè)參數(shù)是包含實(shí)際分發(fā)的命令的消息∧洌可選的第二個(gè)參數(shù)是一個(gè)回調(diào)接口癞尚,允許在命令處理完成時(shí)通知須要回調(diào)的方法。這個(gè)回調(diào)函數(shù)有兩個(gè)方法:onSuccess()和onFailure()乱陡,分別在命令處理正常返回或拋出異常時(shí)調(diào)用。
調(diào)用組件可能不采取在分發(fā)命令的同一線程中調(diào)用回調(diào)函數(shù)仪壮。如果調(diào)用線程在繼續(xù)處理時(shí)會(huì)依賴(lài)于之前結(jié)果憨颠,則可以使用FutureCallback。它是Future的一個(gè)組合(在java.concurrent包中定義)和Axon的CommandCallback积锅∷或者,考慮使用命令網(wǎng)關(guān)缚陷。
如果應(yīng)用程序不直接對(duì)Command的結(jié)果感興趣适篙,則可以使用dispatch(commandMessage)方法。
SimpleCommandBus
SimpleCommandBus顧名思義就是最簡(jiǎn)單的實(shí)現(xiàn)箫爷。它可以直接處理調(diào)度它們的線程中的命令嚷节。處理命令后,修改后的聚合被保存虎锚,生成的事件將在同一個(gè)線程中發(fā)布硫痰。在大多數(shù)情況下,如Web應(yīng)用程序窜护,它和您的需要很匹配效斑。 SimpleCommandBus是配置API中默認(rèn)使用的實(shí)現(xiàn)。
像大多數(shù)CommandBus實(shí)現(xiàn)一樣柱徙,SimpleCommandBus也可以配置攔截器缓屠。在命令總線上分發(fā)命令時(shí),將調(diào)用CommandDispatchInterceptors护侮。在實(shí)際的命令處理程序方法之前調(diào)用CommandHandlerInterceptors敌完,允許您修改或阻止該命令。有關(guān)更多信息概行,請(qǐng)參閱命令攔截器蠢挡。
由于所有的命令處理都是在同一個(gè)線程中完成的,所以這個(gè)實(shí)現(xiàn)僅限于JVM內(nèi)。這個(gè)性能的表現(xiàn)是很不錯(cuò)的业踏,但不是最叼的禽炬。要跨越JVM邊界,或充分利用CPU周期勤家,請(qǐng)查看其他CommandBus實(shí)現(xiàn)腹尖。
AsynchronousCommandBus
顧名思義,AsynchronousCommandBus使用異步線程來(lái)執(zhí)行命令伐脖。它使用Executor分配新的線程來(lái)處理實(shí)際的業(yè)務(wù)邏輯.
默認(rèn)情況下热幔,AsynchronousCommandBus使用無(wú)限制的緩存線程池。這意味著一個(gè)命令分發(fā)時(shí)就會(huì)有一個(gè)線程被創(chuàng)建讼庇。完成處理命令的線程將繼續(xù)被用于處理新命令绎巨。如果60秒線程沒(méi)有處理命令,則會(huì)停止線程蠕啄。
另外场勤,可以給Executor實(shí)例配置不同的線程策略。
請(qǐng)注意歼跟,在停止應(yīng)用程序時(shí)和媳,應(yīng)關(guān)閉AsynchronousCommandBus,以確保任何等待的線程已正確關(guān)閉哈街。要關(guān)閉的話留瞳,請(qǐng)調(diào)用shutdown()方法。這也將關(guān)閉任何提供的Executor實(shí)例骚秦,如果它實(shí)現(xiàn)了ExecutorService接口的話她倘。
DisruptorCommandBus
SimpleCommandBus在性能方面有自己的特性,特別是你有性能調(diào)優(yōu)的經(jīng)歷的話就很容易理解骤竹。 事實(shí)上帝牡,SimpleCommandBus需要鎖來(lái)防止多個(gè)線程并發(fā)訪問(wèn)同一聚合,導(dǎo)致處理開(kāi)銷(xiāo)和鎖爭(zhēng)用蒙揣。
DisruptorCommandBus采取不同的方法來(lái)處理多線程靶溜。不是多個(gè)線程每個(gè)都執(zhí)行同樣的處理,而是有多個(gè)線程懒震,每個(gè)負(fù)責(zé)一件處理罩息。DisruptorCommandBus使用Disruptor(http://lmax-exchange.github.io/disruptor/)(一個(gè)用于并發(fā)編程的小型框架),通過(guò)采用不同的多線程方法來(lái)獲得更好的性能个扰。不是在調(diào)用線程中進(jìn)行處理瓷炮,而是將這些任務(wù)交給兩組線程,每個(gè)線程處理一部分處理递宅。第一組線程將執(zhí)行命令處理程序娘香,更改聚合的狀態(tài)苍狰。第二組將存儲(chǔ)事件并將其發(fā)布到事件存儲(chǔ)。
雖然DisruptorCommandBus優(yōu)于SimpleCommandBus 4倍(!)烘绽,但有一些限制:
- DisruptorCommandBus僅支持事件溯源的聚合淋昭。該命令總線也充當(dāng)Disruptor處理的集合的倉(cāng)儲(chǔ)。要獲取對(duì)存儲(chǔ)庫(kù)的引用安接,請(qǐng)使用createRepository(AggregateFactory)翔忽。
- 命令只能導(dǎo)致單個(gè)聚合實(shí)例中的狀態(tài)更改。
- 當(dāng)使用緩存時(shí)盏檐,它只允許給定的標(biāo)識(shí)符為單個(gè)聚合歇式。這意味著它是不可能有兩個(gè)具有相同的標(biāo)識(shí)符的不同類(lèi)型的聚合。
- 命令通常不會(huì)導(dǎo)致需要回滾工作單元的失敗胡野。發(fā)生回滾時(shí)材失,DisruptorCommandBus不能保證命令按照它們的分發(fā)順序進(jìn)行處理。此外硫豆,它需要重試一些其他命令豺憔,會(huì)導(dǎo)致不必要的計(jì)算。
- 在創(chuàng)建一個(gè)新的聚合實(shí)例時(shí)够庙,命令更新所創(chuàng)建實(shí)例可能并不完全按照所提供的順序進(jìn)行。一旦創(chuàng)建了聚合抄邀,所有命令將按照它們被分發(fā)順序執(zhí)行耘眨。為了確保順序,在創(chuàng)建命令上使用回調(diào)去等待正在創(chuàng)建的聚合境肾。它不應(yīng)該耗時(shí)超過(guò)幾毫秒剔难。
要構(gòu)建一個(gè)DisruptorCommandBus實(shí)例,您需要一個(gè)EventStore奥喻。該組件在“存儲(chǔ)庫(kù)和事件存儲(chǔ)”中進(jìn)行了說(shuō)明偶宫。
或者,你通過(guò)DisruptorConfiguration實(shí)例來(lái)配置優(yōu)化在特定環(huán)境下的性能:
- 緩沖區(qū)大谢防稹:用于注冊(cè)傳入命令的環(huán)形緩沖區(qū)上的插槽數(shù)量纯趋。更高的值可能會(huì)增加吞吐量,但也會(huì)導(dǎo)致更高的延遲冷离。其值必須始終是2的冪吵冒。默認(rèn)為4096。
- 生產(chǎn)者類(lèi)型:聲明這個(gè)實(shí)體是由多個(gè)或者單個(gè)線程生成西剥,默認(rèn)為多個(gè)
- 等待策略:處理器線程(假如有三個(gè)線程負(fù)責(zé)實(shí)際處理)需要使用的策略需要彼此等待痹栖。WaitStrategy取決于機(jī)器中可用的內(nèi)核數(shù)量,以及運(yùn)行的其他進(jìn)程的數(shù)量瞭空。如果你比較關(guān)心低延遲揪阿,并且DisruptorCommandBus可以為自己申請(qǐng)核心疗我,那么可以使用BusySpinWaitStrategy。如果想命令總線獲取更少的CPU資源并允許其他線程進(jìn)行處理南捂,可以使用YieldingWaitStrategy吴裤。最后,您可以使用SleepingWaitStrategy和BlockingWaitStrategy來(lái)允許其他進(jìn)程公平地競(jìng)爭(zhēng)CPU黑毅。如果命令總線不希望占用全部的時(shí)間來(lái)處理嚼摩,后者更合適。默認(rèn)策略為BlockingWaitStrategy矿瘦。
- Executor:我們可以通過(guò)設(shè)置Executor來(lái)修改DisruptorCommandBus所須的線程數(shù)枕面。DisruptorCommandBus使用的是Executor線程池來(lái)處理。這個(gè)Executor 必須能夠提供至少4個(gè)線程缚去。其中有3個(gè)是供DisruptorCommandBus組件使用的潮秘。另外的線程用于調(diào)用回調(diào)函數(shù),并在檢測(cè)到Aggregate的狀態(tài)不正確的情況下進(jìn)行重試易结。默認(rèn)實(shí)現(xiàn)是一個(gè)CachedThreadPool枕荞,它提供了一個(gè)名為“DisruptorCommandBus”的線程組的線程。
- 事務(wù)管理:他作為是確保事件的存儲(chǔ)和發(fā)布在一個(gè)事務(wù)中完成搞动。
- InvokerInterceptors:在Command Handler被調(diào)用的時(shí)候會(huì)調(diào)用(如果已定義)CommandHandlerInterceptor
- PublisherInterceptors:在存儲(chǔ)事件和發(fā)布事件的時(shí)候會(huì)調(diào)用(如果已定義)PublisherInterceptors
- RollbackConfiguration:表示Unit of Work什么樣的異常會(huì)被回滾躏精。默認(rèn)為unchecked的異常會(huì)被回滾。
- RescheduleCommandsOnCorruptState:聲明Command已經(jīng)被執(zhí)行鹦肿,但是對(duì)于聚合而言這次操作是不是成功(例如矗烛,因?yàn)楣ぷ鲉卧鸦貪L),須要進(jìn)行再次重新處理箩溃。如果值為false瞭吃,那么會(huì)調(diào)用onFailure()而不會(huì)進(jìn)行重新處理。如果值是true(默認(rèn)值)涣旨,那么該命令會(huì)被重新處理歪架。
- CoolingDownPeriod:設(shè)置等待的秒數(shù)主要是為了確保所有命令能夠被處理。在這個(gè)等待時(shí)間內(nèi)霹陡,他不會(huì)接收新的命令和蚪,但是現(xiàn)在正在處理的命令在必要的時(shí)候可以進(jìn)行重新安排處理。冷卻期主要是確保線程可用于重新安排處理命令和調(diào)用回調(diào)函數(shù)穆律,他的默認(rèn)值是1000(1秒)惠呼。
- Cache:設(shè)置緩存存儲(chǔ)從Event Store中恢復(fù)的聚合實(shí)例。緩存用disruptor來(lái)存儲(chǔ)那些不活躍的聚合實(shí)例
- InvokerThreadCount:分配給調(diào)用命令處理程序的線程數(shù)峦耘。最好的方法是先設(shè)置其值為機(jī)器內(nèi)核數(shù)量的一半剔蹋。
- PublisherThreadCount:用于發(fā)布事件的線程數(shù)。最好的方法是先設(shè)置其值為內(nèi)核數(shù)量的一半辅髓,如果在IO上花費(fèi)大量時(shí)間泣崩,可以適當(dāng)增加這個(gè)值少梁。
- SerializerThreadCount:用于預(yù)先序列化事件的線程數(shù)。默認(rèn)值為1矫付,但是如果沒(méi)有配置序列化器凯沪,就不管。
- Serializer:用于執(zhí)行預(yù)先序列化的序列化器买优。如果配置了序列化程序妨马,DisruptorCommandBus會(huì)將所有生成的事件包裝在SerializationAware消息中。payload和元數(shù)據(jù)的序列化會(huì)在事件發(fā)布到 Event Store之前處理杀赢。
命令攔截器
使用命令總線的優(yōu)點(diǎn)之一是能夠根據(jù)所有傳入命令進(jìn)行操作烘跺。例如日志記錄或認(rèn)證,無(wú)論是什么命令類(lèi)型脂崔,您都可能想要執(zhí)行此操作滤淳。這是使用攔截器完成的。
axon提供了不同類(lèi)型的攔截器:Dispatch 攔截器和處理器(Handler )攔截器砌左。Dispatch攔截器會(huì)在命令分發(fā)給命令處理程序之前調(diào)用脖咐。在那個(gè)時(shí)候,甚至還不能確定這個(gè)命令是否有匹配的Command Handler汇歹。Handler攔截器在調(diào)用Command Handler之前被調(diào)用屁擅。
消息分發(fā)攔截器
Message Dispatch Interceptors會(huì)在消息到達(dá)命令總線上時(shí)調(diào)用。他們可以把這些命令消息進(jìn)行修改产弹,如添加元數(shù)據(jù)煤蹭,或者拋出異常來(lái)阻止命令繼續(xù)傳遞。這些攔截器總是在調(diào)度Command的線程上調(diào)用取视。這些攔截器總是在分發(fā)命令的線程上被調(diào)用。
Structural validation
如果傳入的命令想要被正確處理卻沒(méi)有包含必要的信息常挚,則沒(méi)有意義作谭。事實(shí)上,一個(gè)不正確的命令應(yīng)該盡早被阻止奄毡,最好甚至在任何事務(wù)開(kāi)始之前折欠。因此,攔截器應(yīng)該檢查所有傳入的命令信息來(lái)判斷有效性吼过。這就是所謂的structural validation锐秦。
Axon框架支持基于JSR 303 Bean Validation的驗(yàn)證。您可以使用@NotEmpty和@Pattern之類(lèi)的注解來(lái)注解命令上的字段盗忱。您需要在類(lèi)路徑中包含JSR 303實(shí)現(xiàn)(如Hibernate-Validator)酱床。然后,在你的命令總線上配置一個(gè)BeanValidationInterceptor趟佃,它會(huì)自動(dòng)查找并配置你的驗(yàn)證器實(shí)現(xiàn)扇谣。雖然它使用合理的默認(rèn)值昧捷,但您可以根據(jù)自己的特定需求進(jìn)行微調(diào)。
建議:你想花盡可能少的資源在一個(gè)無(wú)效的命令上罐寨。所以這個(gè)攔截器一般都放在攔截器鏈的最前面靡挥。在某些情況下,你須要將日志記錄或Auditing 攔截器放置在前面鸯绿,而緊接著在后面添加一個(gè)驗(yàn)證攔截器跋破。
BeanValidationInterceptor也實(shí)現(xiàn)了MessageHandlerInterceptor,允許你配置它為Handler Interceptor瓶蝴。
消息處理攔截器
消息處理攔截器可以在命令處理之前和之后執(zhí)行操作毒返。攔截器甚至可以完全禁止命令處理,例如出于安全原因囊蓝。
攔截器必須實(shí)現(xiàn)MessageHandlerInterceptor接口饿悬。這個(gè)接口只有一個(gè)方法叫handle(),它有三個(gè)參數(shù):命令消息聚霜,當(dāng)前的UnitOfWork和一個(gè)InterceptorChain狡恬。 InterceptorChain是很多Interceptor串聯(lián)起來(lái)處理消息的chain。
與Dispatch 攔截器不同蝎宇,處理程序攔截器是在命令處理程序的上下文中調(diào)用的弟劲。這意味著他們可以將基于正在處理的消息的關(guān)聯(lián)數(shù)據(jù)附加到工作單元。這個(gè)關(guān)聯(lián)數(shù)據(jù)將被附加到在該工作單元的上下文中創(chuàng)建的消息上姥芥。
處理程序攔截器通常也用于管理處理命令的事務(wù)兔乞。為此,注冊(cè)一個(gè)TransactionManagingInterceptor凉唐,它又配置了一個(gè)TransactionManager來(lái)啟動(dòng)和提交(或回滾)實(shí)際的事務(wù)庸追。
分布式命令總線
前面介紹的CommandBus實(shí)現(xiàn)只允許在單個(gè)JVM中分發(fā)命令消息。有時(shí)台囱,你可以把Command Bus配置在多個(gè)JVM淡溯。在一個(gè)JVM的命令總線上分發(fā)的命令應(yīng)該無(wú)縫地傳送到另一個(gè)JVM中的 Command Handle,同時(shí)也能收到結(jié)果簿训。
這就是DistributedCommandBus的功能咱娶。與其他CommandBus實(shí)現(xiàn)不同,DistributedCommandBus根本不調(diào)用任何處理程序强品。它所做的只是在不同的JVM上的命令總線實(shí)現(xiàn)之間形成一個(gè)“橋梁”膘侮。每個(gè)JVM上的每個(gè)DistributedCommandBus實(shí)例稱(chēng)為“Segment”。
注意:盡管分布式命令總線本身是Axon Framework Core模塊的一部分的榛,但它需要的組件琼了,你可以在其中一個(gè)以axon-distributed-commandbus -* 的模塊中找到。如果你使用Maven夫晌,可以使用添加dependencies來(lái)獲取 ,他們的groupId和version與Core模塊的相同表伦。
DistributedCommandBus依賴(lài)于兩個(gè)組件:實(shí)現(xiàn)JVM之間的通信協(xié)議的CommandBusConnector和為每個(gè)傳入的命令選擇目的地的CommandRouter谦去。該路由器基于由路由策略計(jì)算的路由key來(lái)定義應(yīng)該給予分布式命令總線的哪個(gè)segment 。具有相同路由key的兩個(gè)命令將始終路由到相同的網(wǎng)段蹦哼,只要這些網(wǎng)段的數(shù)量和配置沒(méi)有變化鳄哭。通常情況下,目標(biāo)聚合的標(biāo)識(shí)被用作路由key纲熏。
axon提供了兩個(gè)RoutingStrategy實(shí)現(xiàn):MetaDataRoutingStrategy妆丘,它使用命令消息中的元數(shù)據(jù)屬性來(lái)查找路由key;以及AnnotationRoutingStrategy,它使用命令消息payload上的@TargetAggregateIdentifier注解來(lái)提取路由key局劲。顯然勺拣,你也可以提供你自己的實(shí)現(xiàn)。
默認(rèn)情況下鱼填,如果從命令消息中沒(méi)有找到key药有,則RoutingStrategy會(huì)拋出異常。通過(guò)在MetaDataRoutingStrategy或AnnotationRoutingStrategy的構(gòu)造函數(shù)中提供UnresolvedRoutingKeyPolicy苹丸,可以更改此行為愤惰。有三個(gè)可能的政策:
ERROR:這是默認(rèn)設(shè)置,當(dāng)路由key不可用時(shí)將引發(fā)異常
RANDOM_KEY:當(dāng)無(wú)法從命令消息解析路由key時(shí)赘理,將返回一個(gè)隨機(jī)值宦言。這實(shí)際上意味著這些命令將被路由到命令總線的一個(gè)隨機(jī)segment 。
STATIC_KEY:如果沒(méi)有找到路由Key商模,那么他將返回一個(gè)固定key.這實(shí)際上意味著只要segments 的配置沒(méi)有改變奠旺,所有這些命令將被路由到同一個(gè)segments 。
JGroupsConnector
JGroupsConnector使用JGroups作為底層的發(fā)現(xiàn)和調(diào)度機(jī)制(作為名稱(chēng)已經(jīng)提供)施流。對(duì)于本參考指南响疚,描述JGroups的功能集有點(diǎn)多,所以請(qǐng)參閱JGroups用戶(hù)指南了解更多詳細(xì)信息瞪醋。
由于JGroups同時(shí)處理節(jié)點(diǎn)的發(fā)現(xiàn)和它們之間的通信稽寒,所以JGroupsConnector既充當(dāng)CommandBusConnector又充當(dāng)CommandRouter。
注意:您可以在axon-distributed-commandbus-jgroups模塊中找到DistributedCommandBus的JGroups相關(guān)的組件趟章。
JGroupsConnector有四個(gè)必需的配置元素:
1.第一個(gè)是JChannel,它定義了JGroups協(xié)議棧慎王。通常蚓土,JChannel是通過(guò)引用JGroups配置文件構(gòu)建的。 JGroups提供了許多默認(rèn)配置赖淤,可以作為您自己配置的基礎(chǔ)蜀漆。請(qǐng)記住,IP組播通常不能在云服務(wù)中使用咱旱,例如Amazon确丢。 TCP Gossip通常在這種類(lèi)型的環(huán)境中是不錯(cuò)的選擇绷耍。
2.集群里的每個(gè)segment都應(yīng)該根據(jù)名稱(chēng)注冊(cè)到對(duì)應(yīng)的集群上。具有相同的集群名稱(chēng)的Segment最終會(huì)知道到彼此鲜侥,并在彼此間分發(fā)命令褂始。
3.“本地segment”是命令總線的實(shí)現(xiàn),它分發(fā)命令到本地JVM描函。這些命令可能已由其他JVM或本地實(shí)例分發(fā)崎苗。
4.最后,Serializer的作用是在命令消息在發(fā)送之前進(jìn)行序列化舀寓。
注意:當(dāng)使用緩存時(shí),當(dāng)ConsistentHash發(fā)生改變時(shí)應(yīng)該將其清除,以避免潛在的數(shù)據(jù)錯(cuò)誤(例如绊含,當(dāng)命令未指定@TargetAggregateVersion并且新成員快速加入并離開(kāi)JGroup時(shí)缤谎,修改聚合然而它還要緩存到其他地方)。
最終篡撵,JGroupsConnector需要實(shí)際連接判莉,以便將消息發(fā)送到其他段。為此酸休,請(qǐng)調(diào)用connect()方法骂租。
JChannel channel = new JChannel("path/to/channel/config.xml");
CommandBus localSegment = new SimpleCommandBus();
Serializer serializer = new XStreamSerializer();
JGroupsConnector connector = new JGroupsConnector(channel, "myCommandBus", localSegment, serializer);
DistributedCommandBus commandBus = new DistributedCommandBus(connector, connector);
// on one node:
commandBus.subscribe(CommandType.class.getName(), handler);
connector.connect();
// on another node, with more CPU:
commandBus.subscribe(CommandType.class.getName(), handler);
commandBus.subscribe(AnotherCommandType.class.getName(), handler2);
commandBus.updateLoadFactor(150); // defaults to 100
connector.connect();
// from now on, just deal with commandBus as if it is local...
注意:
請(qǐng)注意,不要求所有segments 都具有用于相同類(lèi)型的命令的命令處理程序斑司。您可以為不同的命令類(lèi)型使用不同的段渗饮。分布式命令總線將始終選擇一個(gè)節(jié)點(diǎn)來(lái)分派一個(gè)命令,以支持該特定類(lèi)型的命令宿刮。
如果你使用Spring互站,你可以考慮使用JGroupsConnectorFactoryBean。它在ApplicationContext啟動(dòng)時(shí)自動(dòng)連接Connector僵缺,并在ApplicationContext關(guān)閉時(shí)進(jìn)行斷開(kāi)連接胡桃。此外,它對(duì)測(cè)試環(huán)境使用合理的默認(rèn)值(但不應(yīng)將其視為生產(chǎn)就緒)磕潮,并對(duì)配置進(jìn)行自動(dòng)裝配翠胰。
Spring Cloud Connector
Spring Cloud Connector程序使用Spring Cloud描述的服務(wù)注冊(cè)和發(fā)現(xiàn)機(jī)制來(lái)分發(fā)命令總線。因此自脯,您可以自由選擇使用哪種Spring Cloud實(shí)現(xiàn)來(lái)分發(fā)您的命令之景。一個(gè)示例實(shí)現(xiàn)是Eureka Discovery / Eureka服務(wù)器組合。
注意:SpringCloudCommandRouter使用Spring Cloud 里的ServiceInstance來(lái)實(shí)現(xiàn).Metadata字段通知系統(tǒng)中所有節(jié)點(diǎn)的消息路由信息膏潮。因此锻狗,所選的Spring Cloud實(shí)現(xiàn)支持ServiceInstance.Metadata字段的使用是非常重要的。如果所需的Spring Cloud實(shí)現(xiàn)不支持修改ServiceInstance.Metadata(例如Consul),那么SpringCloudHttpBackupCommandRouter是一個(gè)可行的解決方案轻纪。有關(guān)SpringCloudHttpBackupCommandRouter的配置細(xì)節(jié)油额,請(qǐng)參閱本章末尾的內(nèi)容。
提供每個(gè)SpringCloud實(shí)現(xiàn)的描述將推動(dòng)本參考指南刻帚。因此潦嘶,我們參考他們各自的文件以獲得進(jìn)一步的信息。
Spring Cloud連接器裝置是一個(gè)SpringCloudCommandRouter和SpringHttpCommandBusConnector的組合我擂,分別填充CommandRouter的地點(diǎn)和 DistributedCommandBus的CommandBusConnector衬以。
注意:Spring Cloud Connector中的DistributedCommandBus相關(guān)組件可以在axon-distributed-commandbus-springcloud模塊中找到。
SpringCloudCommandRouter必須通過(guò)提供以下內(nèi)容來(lái)創(chuàng)建:
1.DiscoveryClient類(lèi)型的“發(fā)現(xiàn)客戶(hù)端”校摩。這可以通過(guò)使用@EnableDiscoveryClient注解您的Spring Boot應(yīng)用程序來(lái)提供看峻,該應(yīng)用程序?qū)⒃谀念?lèi)路徑中查找Spring Cloud實(shí)現(xiàn)。
- RoutingStrategy類(lèi)型的“路由策略”衙吩。axon核心模塊目前提供了幾個(gè)實(shí)現(xiàn)互妓,但也你也可以通用調(diào)用函數(shù)來(lái)實(shí)現(xiàn)。例如坤塞,如果您想根據(jù)“聚合標(biāo)識(shí)符”路由命令冯勉,則可以使用AnnotationRoutingStrategy并在payload上標(biāo)注使用@TargetAggregateIdentifier標(biāo)識(shí)聚合的字段。
SpringHttpCommandBusConnector創(chuàng)建所需要的三個(gè)參數(shù):
- CommandBus類(lèi)型的“本地命令總線”摹芙。這是將命令分發(fā)到本地JVM的命令總線上灼狰。這些命令可能已由其他JVM上的實(shí)例或本地實(shí)例發(fā)布。
- RestOperations對(duì)象是將執(zhí)行命令消息發(fā)布到另一個(gè)實(shí)例浮禾。
- 最后是Serializer類(lèi)型的“序列化器”交胚。他的作用是在命令消息發(fā)送到命令總線之前進(jìn)行序列化。
SpringCloudCommandRouter和SpringHttpCommandBusConnector都是用于創(chuàng)建DistributedCommandsBus的盈电。在Spring Java配置中蝴簇,他們看起來(lái)如下所示:
// Simple Spring Boot App providing the DiscoveryClient
bean@EnableDiscoveryClient@SpringBootApplicationpublic class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
// Example function providing a Spring Cloud Connector
@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy());
}
@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
RestOperations restOperations,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restOperations, serializer);
}
@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}
}
// if you don't use Spring Boot Autoconfiguration, you will need to explicitly define the local segment:@Bean@Qualifier("localSegment")public CommandBus localSegment() {
return new SimpleCommandBus();
}
注意:請(qǐng)注意,不要求所有segments 都具有用于相同類(lèi)型的命令的命令處理程序匆帚。您可以為不同的命令類(lèi)型使用不同的segments 熬词。分布式命令總線將始終選擇一個(gè)節(jié)點(diǎn)來(lái)分發(fā)一個(gè)命令,以支持該指定類(lèi)型的命令吸重。
Spring Cloud Http Back Up Command Router
在內(nèi)部互拾,SpringCloudCommandRouter使用Spring Cloud ServiceInstance中包含的Metadata映射在axon分布式環(huán)境中傳遞路由消息信息。如果所需的Spring Cloud實(shí)現(xiàn)不允許修改ServiceInstance.Metadata字段(例如Consul)嚎幸,則可以選擇實(shí)例化SpringCloudHttpBackupCommandRouter而不是SpringCloudCommandRouter颜矿。
顧名思義,SpringCloudHttpBackupCommandRouter具有備份機(jī)制鞭铆,如果ServiceInstance.Metadata字段不包含預(yù)期的路由消息信息。該備份機(jī)制是提供可以從中檢索消息路由信息的HTTP端點(diǎn),并且通過(guò)同時(shí)添加功能來(lái)查詢(xún)集群中其他已知節(jié)點(diǎn)的該端點(diǎn)以檢索其消息路由信息车遂。因此封断,備份機(jī)制函數(shù)是一個(gè)Spring控制器,用于在可指定端點(diǎn)接收請(qǐng)求舶担,并使用RestTemplate向可指定端點(diǎn)上的其他節(jié)點(diǎn)發(fā)送請(qǐng)求坡疼。
要使用SpringCloudHttpBackupCommandRouter而不是SpringCloudCommandRouter,添加下面的Spring Java配置(它取代了我們前面例子中的SpringCloudCommandRouter方法):
@Configurationpublic class MyApplicationConfiguration {
@Bean
public CommandRouter springCloudHttpBackupCommandRouter(DiscoveryClient discoveryClient,
RestTemplate restTemplate,
@Value("${axon.distributed.spring-cloud.fallback-url}") String messageRoutingInformationEndpoint) {
return new SpringCloudHttpBackupCommandRouter(discoveryClient, new AnnotationRoutingStrategy(), restTemplate, messageRoutingInformationEndpoint);
}
}