持久化
????當(dāng)我們?cè)诩合到y(tǒng)中,一臺(tái)機(jī)器向另一臺(tái)機(jī)器發(fā)送一段數(shù)據(jù),負(fù)責(zé)接收的機(jī)器在接收數(shù)據(jù)前突然宕機(jī),就會(huì)造成數(shù)據(jù)丟失無(wú)法恢復(fù)。Akka實(shí)現(xiàn)了對(duì)actor 持久化的方法來(lái)恢復(fù)數(shù)據(jù)转唉。
????????????Akka持久化使有狀態(tài)的actor能保留它的內(nèi)部狀態(tài),因此我們不會(huì)因?yàn)镴VM崩潰稳捆、監(jiān)管者引起或集群中遷移導(dǎo)致數(shù)據(jù)丟失無(wú)法恢復(fù)而尷尬赠法,Akka持久化可以幫助我們恢復(fù)actor。
? ? ? ? ? ? 持久化的什么乔夯?
? ? ? ? ? ? 持久化的是actor內(nèi)部狀態(tài)的變化期虾,并且這些變化只是附加到原有的存儲(chǔ)上。
? ??????????actor是如何進(jìn)行恢復(fù)的呢驯嘱?
? ??????????我們可以通過(guò)將保存的變化進(jìn)行重放镶苞,從而使它們可以重建其內(nèi)部狀態(tài)。當(dāng)重放的內(nèi)容龐大時(shí)會(huì)需要很多時(shí)間鞠评,因此Akka提供了快照功能將重放記錄分解從而減少恢復(fù)時(shí)間茂蚓。
????????????另外一個(gè)重點(diǎn)是Akka持久化也提供了“至少一次消息傳遞語(yǔ)義”的點(diǎn)對(duì)點(diǎn)通信來(lái)保證消息不丟失。
依賴
使用Akka持久化,要在你的項(xiàng)目中添加以下依賴:
libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.5.11"
Akka持久化還提供了一些內(nèi)置的持久化插件聋涨,包括基于內(nèi)存堆的日志晾浴、基于本地文件系統(tǒng)的快照存儲(chǔ)以及基于LevelDB的日志。
基于LevelDB的插件需要以下額外的依賴:
libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
體系結(jié)構(gòu)
1. PersistentActor(持久化actor):是一種特殊的帶有內(nèi)部狀態(tài)的Actor牍白,既可以執(zhí)行命令又能以事件來(lái)源模式來(lái)進(jìn)行內(nèi)部狀態(tài)持久化的脊凰。若因系統(tǒng)崩潰、人為終結(jié)等茂腥,系統(tǒng)在重啟后Actor通過(guò)之前持久化的信息可以恢復(fù)之前的狀態(tài)狸涌。
2.?AtLeastOnceDelivery(至少一次傳遞):消息傳遞的機(jī)制,意味著每條應(yīng)用了這種機(jī)制的消息潛在的存在多次投遞嘗試并保證至少會(huì)成功一次。就是說(shuō)這條消息可能會(huì)重復(fù)但是不會(huì)丟失最岗。
3. AsyncWriteJournal(異步存儲(chǔ)日志): 將發(fā)送給持久化Actor的消息序列異步存儲(chǔ)到日志中帕胆。日志為每條消息維護(hù)一個(gè)不斷增加的序列號(hào)。日志存儲(chǔ)的底層實(shí)現(xiàn)是可插拔的般渡。Akka的持久化擴(kuò)展自帶一個(gè)叫做"leveldb"懒豹,向本地文件系統(tǒng)寫入的日志插件。Akka社區(qū)里還有更多日志存儲(chǔ)插件提供驯用。
4. Snapshot store(快照存儲(chǔ)):對(duì)持久化Actor或持久化視圖的內(nèi)部狀態(tài)的快照進(jìn)行持久化脸秽。快照用于優(yōu)化回復(fù)的時(shí)間蝴乔”ⅲ快照存儲(chǔ)的底層是可插拔的。Akka持久化擴(kuò)展自帶一個(gè)向本地文件系統(tǒng)寫入的“本地”快照存儲(chǔ)插件淘这。Akka社區(qū)里還有更多快照存儲(chǔ)插件提供。
5.?Event sourcing(事件來(lái)源):把使?fàn)顟B(tài)產(chǎn)生變化的事件按發(fā)生時(shí)間順序持久化巩剖,而不是把當(dāng)前整個(gè)狀態(tài)存儲(chǔ)起來(lái)铝穷。在恢復(fù)狀態(tài)時(shí)把日志中這些事件按原來(lái)的時(shí)間順序重演一遍回到原來(lái)的狀態(tài)。
下面我們根據(jù)持久化的體系結(jié)構(gòu)來(lái)詳細(xì)地介紹
事件來(lái)源Event sourcing
事件來(lái)源背后的基本思想其實(shí)很簡(jiǎn)單佳魔,當(dāng)一個(gè)持久化actor接收到一個(gè)(非持久化)命令曙聂,首先它要驗(yàn)證(這個(gè)命令)是否可以運(yùn)用到當(dāng)前狀態(tài)。
如果命令驗(yàn)證成功鞠鲜,根據(jù)這個(gè)命令產(chǎn)生一個(gè)事件宁脊。在事件成功持久化之后,可以用來(lái)改變actor的狀態(tài)贤姆。
當(dāng)持久化actor需要恢復(fù)時(shí)榆苞,因?yàn)橹耙呀?jīng)驗(yàn)證過(guò)可以運(yùn)用到當(dāng)前狀態(tài),我們可以直接將持久化的事件進(jìn)行重放霞捡。
文章推薦:Events As First-Class Citizens
Akka持久化通過(guò)PersistentActor支持事件來(lái)源坐漏。actor可以使用persist方法持久化和處理事件。通過(guò)實(shí)現(xiàn)receiveRecover和receiveCommand定義PersistentActor的行為。下面的示例演示了這一點(diǎn)赊琳。
本例子中定義了Cmd和Evt兩種數(shù)據(jù)類型街夭,Cmd代表命令,Evt代表事件
receiveRecover方法是在恢復(fù)過(guò)程中處理事件和快照消息躏筏。
receiveCommand方法用來(lái)處理普通Actor的消息板丽。在上面的示例中,如果actor收到的是命令的話會(huì)調(diào)用persist方法
persist方法是異步的方式持久化事件趁尼。它有兩個(gè)參數(shù) 埃碱,一個(gè)是事件 ,另一個(gè)是事件處理程序(event handler)弱卡。
事件處理程序是將之前持久化過(guò)的事件進(jìn)行處理乃正,該事件在內(nèi)部作為獨(dú)立消息發(fā)送回持久化actor 來(lái)使事件處理程序執(zhí)行,來(lái)改變或關(guān)閉持久化actor的狀態(tài)婶博。持久化事件的發(fā)送者也是相應(yīng)命令的發(fā)送者瓮具,因此當(dāng)命令的發(fā)送者沒(méi)顯出時(shí)事件處理程序也可以回復(fù)。
當(dāng)使用persist方法來(lái)持久化事件時(shí)在調(diào)用和執(zhí)行相關(guān)事件處理程序的過(guò)程中凡人,要保證持久化actor不會(huì)收到下一步的命令名党,否則會(huì)受到影響。當(dāng)在某個(gè)命令的上下文中多次調(diào)用persist方法時(shí)挠轴。這個(gè)過(guò)程中收到的消息一直被暫存直到presist方法運(yùn)行結(jié)束传睹。
如果實(shí)例化事件失敗,onPersistFailure方法將被調(diào)用(默認(rèn)記錄為error)岸晦,并且actor將無(wú)條件地被停止欧啤。如果持久化事件在存儲(chǔ)之前被拒絕,比如事件發(fā)生連續(xù)錯(cuò)誤启上,onPersistRejected將被調(diào)用(默認(rèn)記錄為warning)并且actor繼續(xù)下一條消息
運(yùn)行該示例最簡(jiǎn)單的方法是下載Typesafe Activator邢隧,并打開(kāi)Akka Persistence Samples with Scala這個(gè)教程。它包含如何運(yùn)行PersistentActorExample的說(shuō)明冈在。這個(gè)例子的源代碼可以在?Akka Samples Repository?中找到倒慧。
??? ??標(biāo)識(shí)符
? ?一個(gè)持久化actor必須有個(gè)標(biāo)識(shí)符 這個(gè)標(biāo)識(shí)符必須用persistenceId?方法來(lái)定義。
override def persistenceId = "my-stable-persistence-id"
? ? ?恢復(fù)
PersistentActor在啟動(dòng)和重啟時(shí)通過(guò)重放之前持久化的日志消息來(lái)實(shí)現(xiàn)自動(dòng)恢復(fù)包券,如果在恢復(fù)過(guò)程中收到新的消息纫谅,會(huì)將新消息先存儲(chǔ)起來(lái)等恢復(fù)完成后,在收到新的消息溅固。
可以限制同一時(shí)間并發(fā)的恢復(fù)的數(shù)量付秕,來(lái)限制系統(tǒng)和后端的數(shù)據(jù)存儲(chǔ)不超載,如果超過(guò)限制actor將等待到其他恢復(fù)都完成后才開(kāi)始侍郭。配置方式:
akka.persistence.max-concurrent-recoveries = 50
? ? ? ? 自定義恢復(fù)
在應(yīng)用程序中有時(shí)也需要依照客戶具體要求來(lái)恢復(fù)盹牧,通過(guò)返回recovery?方法中的自定義Recovery對(duì)象來(lái)執(zhí)行自定義恢復(fù)俩垃。recovery?是PersistentActor的一個(gè)方法。
你可以使用SnapshotSelectionCriteria.None.?來(lái)跳過(guò)加載快照和重放所有事件汰寓。它用于將快照序列化格式變成互不相容的方式時(shí)口柳。不適宜用于事件被刪除的情況下。
override?def?recovery = Recovery(fromSnapshot =?SnapshotSelectionCriteria.None)
另一個(gè)可能的自定義恢復(fù)是設(shè)置重放的上界有滑,對(duì)debug很有幫助跃闹,使得actor僅在過(guò)去的某個(gè)點(diǎn)重放。
override def recovery = Recovery(toSequenceNr = 457L)
在PersistentActor的recovery?方法中返回Recovery.none()可以使恢復(fù)失效毛好。
override def recovery = Recovery.none
? ??????恢復(fù)狀態(tài)
持久化actor可以通過(guò)以下方法查詢它自己的恢復(fù)狀態(tài)
def recoveryRunning: Boolean
def recoveryFinished: Boolean
持久化actor在回復(fù)完成后會(huì)收到一個(gè)特殊的RecoveryCompleted?消息望艺。然后再執(zhí)行下一步操作
如果actor從日志中的恢復(fù)狀態(tài)有問(wèn)題,onRecoveryFailure?會(huì)被調(diào)用(記錄為error)并且actor將被停止肌访。
內(nèi)部暫存(stash)
持久化actor有一個(gè)私有的暫存用來(lái)緩存整個(gè)恢復(fù)過(guò)程中進(jìn)來(lái)的消息或者暫存persist\persistAll方法持久化的事件找默。內(nèi)部暫存通過(guò)掛鉤到unstashAll?與普通暫存協(xié)作
你應(yīng)該控制消息的產(chǎn)出不要超過(guò)持久化actor的處理能力,否則暫存消息的數(shù)量將無(wú)限增長(zhǎng)吼驶。所以我們要在mailbox配置中定義暫存的容量來(lái)保護(hù)暫存并防止發(fā)生OutOfMemoryError?
akka.actor.default-mailbox.stash-capacity=10000
注意惩激,如果你有很多持久化actor,要定義一個(gè)小的暫存容量蟹演,防止占用過(guò)多的內(nèi)存
持久化actor定義了三個(gè)策略來(lái)處理內(nèi)部暫存容量超出的故障风钻。默認(rèn)的溢出策略是ThrowOverflowExceptionStrategy,具體內(nèi)容是丟棄當(dāng)前的信息酒请,拋出StashOverflowException異常骡技,造成actor重啟。
你可以覆蓋internalStashOverflowStrategy?方法為了“獨(dú)特的”持久化actor來(lái)返回DiscardToDeadLetterStrategy?或者ReplyToStrategy?或者通過(guò)提供FQCN(Fully Qualified Class Name完全限定類名)來(lái)給所有的持久化actor來(lái)定義“默認(rèn)值”羞反。
在persistence?的配置中:
akka.persistence.internal-stash-overflow-strategy="akka.persistence.ThrowExceptionConfigurator"
DiscardToDeadLetterStrategy?策略也有一個(gè)打包好的配akka.persistence.DiscardConfigurator.
你也可以查詢默認(rèn)策略:
Persistence(context.system).defaultInternalStashOverflowStrategy?
放寬的局部一致性要求和高吞吐量的用例
如果面臨放寬的局部一致性要求和高吞吐量布朦,有時(shí)PersistentActor及其persist在處理大量涌入的命令時(shí)可能會(huì)不夠,有時(shí)你可能會(huì)放寬一致性要求——例如你會(huì)想要盡可能快速地處理命令昼窗,假設(shè)事件最終會(huì)持久化并在后臺(tái)恰當(dāng)處理是趴,并在需要時(shí)追溯性地回應(yīng)持久性故障。
persistAsync方法提供了一個(gè)工具膏秫,用于實(shí)現(xiàn)高吞吐量的持久化actor。當(dāng)日志仍在致力于持久化和執(zhí)行用戶事件回調(diào)代碼時(shí)做盅,它不會(huì)暫存傳入的命令缤削。
推遲操作,直到持久化處理程序已經(jīng)執(zhí)行
PersistentActor?提供了一個(gè)實(shí)用的方法deferAsync(延遲異步)吹榴,它工作起來(lái)類似于persistAsync但是不持久化傳遞過(guò)的事件亭敢,它將保留在內(nèi)存中,并在調(diào)用處理程序時(shí)使用图筹。建議將其用于讀取操作帅刀,以及在domain模型中沒(méi)有相應(yīng)事件的操作让腹。
請(qǐng)注意,sender()在處理程序回調(diào)中是安全的扣溺,將指向該命令的原始發(fā)送方骇窍,該命令將調(diào)用這個(gè)deferAsync處理程序。
持久化嵌套調(diào)用
可以在各自的回調(diào)塊中調(diào)用persistAsync?和persist锥余,它們將適當(dāng)?shù)乇A艟€程安全性(包括sender的正確值)和存儲(chǔ)保證腹纳。
在下面的示例中,將發(fā)出兩個(gè)持久調(diào)用驱犹,每個(gè)調(diào)用都在其回調(diào)中發(fā)出另一個(gè)持久化調(diào)用
向PersistentActor發(fā)送a和b命令嘲恍,執(zhí)行順序:
首先“outer層”的持久化調(diào)用先被發(fā)出并且它們的回調(diào)被應(yīng)用。一旦這個(gè)事件在日志(journal)中被確認(rèn)持久化雄驹,inner的回調(diào)將被調(diào)用佃牛。只有這些所有的處理程序成功調(diào)用,下一個(gè)命令才會(huì)被傳遞給持久化actor医舆。換句話說(shuō)俘侠,初始調(diào)用在外部層中的persist()所保證的傳入命令的存儲(chǔ)(stash)被擴(kuò)展,直到所有嵌套的持久化回調(diào)都被處理彬向。
故障
如果事件的持久化失敗兼贡,將會(huì)調(diào)用onPersistFailure(在默認(rèn)情況下記錄錯(cuò)誤),并且將無(wú)條件地停止actor娃胆。
因?yàn)槿罩臼遣豢捎玫谋橄#援?dāng)持久化失敗重啟是很困難的,最好是停止該角色里烦,在退出超時(shí)之后重新啟動(dòng)它凿蒜。akka.pattern.BackoffSupervisoractor支持這重啟
如果事件的持久化在存儲(chǔ)之前被拒絕,將會(huì)調(diào)用onpersistreject(在默認(rèn)情況下記錄一個(gè)警告)胁黑,并且actor?將繼續(xù)使用下一個(gè)消息
如果在啟動(dòng)actor時(shí)從日志中恢復(fù)actor?的狀態(tài)有問(wèn)題废封,就會(huì)調(diào)用onRecoveryFailure(在默認(rèn)情況下記錄錯(cuò)誤),并停止該actor丧蘸。
Atomic?寫入
通過(guò)使用persistAll或persistAllAsync方法來(lái)原子地存儲(chǔ)多個(gè)事件漂洋。這意味著傳遞給該方法的所有事件都被存儲(chǔ),如果出現(xiàn)錯(cuò)誤力喷,則不會(huì)存儲(chǔ)它們刽漂。
使用persistAll來(lái)只持久化事件的子集,因此一個(gè)持久化actor的恢復(fù)將永遠(yuǎn)不會(huì)被部分地完成弟孟。
一些日志在各自的事件中不支持原子寫入而且它們反對(duì)使用persistAll命令贝咙。也就是說(shuō)onPersistRejected是調(diào)用一個(gè)異常(類似?UnsupportedOperationException).
Batch 寫入
為了在使用persistAsync時(shí)優(yōu)化吞吐量,在將事件寫入日志之前拂募,一個(gè)持久的參與者內(nèi)部批量事件將被存儲(chǔ)在高負(fù)載下庭猩。批量大小是由日志往返期間發(fā)出的事件數(shù)量動(dòng)態(tài)決定的窟她。在將一個(gè)批次發(fā)送到日志后,在確認(rèn)收到之前的批次之前蔼水,不能再發(fā)送批次震糖。批處理寫入從來(lái)都不是基于時(shí)間的,這使得延遲至少是最小的徙缴。
消息刪除
可以將所有消息(由單個(gè)持久actor記錄的日志)刪除到指定的序列號(hào)试伙。持久化actor可以調(diào)用deleteMessages方法
deletemessage請(qǐng)求的結(jié)果向持久化actor發(fā)出信號(hào),如果刪除成功于样,則發(fā)出DeleteMessagesSuccess?如果發(fā)送失敗則發(fā)出DeleteMessagesFailure
即使所有的消息在deleteMessages調(diào)用后被刪除疏叨,消息刪除也不會(huì)影響日志的最高序列號(hào)
持久化狀態(tài)處理
在PersistentActor中重寫的故障處理程序的回調(diào)是顯式。這些處理程序的默認(rèn)實(shí)現(xiàn)發(fā)出一個(gè)日志消息(persist的error/恢復(fù)故障/其他warning)并記錄失敗的原因和導(dǎo)致失敗的消息穿剖。
對(duì)于決定性的故障(例如恢復(fù)或持久化事件失斣槁),在故障處理程序調(diào)用之后持久化actor將被停止糊余。這是因?yàn)槿绻讓拥娜罩景l(fā)送持久化故障信號(hào)秀又,它很可能要么完全失敗,要么超負(fù)荷贬芥,重新啟動(dòng)吐辙,并試圖再次堅(jiān)持這個(gè)事件,這不會(huì)幫助日志恢復(fù)蘸劈,而會(huì)引起?Thundering herd problem,?因?yàn)樵S多持久化actor將重新啟動(dòng)并嘗試再次持久化他們的事件昏苏。
使用BackoffSupervisor?(在?Failures中描述)它實(shí)現(xiàn)了一個(gè)指數(shù)回退策略,該策略允許日志在持久化actor的重新啟動(dòng)之間恢復(fù)更多的喘息空間威沫。
安全地關(guān)閉持久化actor
對(duì)于普通的actor來(lái)說(shuō)贤惯,可以接受使用特殊的PoisonPill?消息來(lái)向一個(gè)actor發(fā)出信號(hào),當(dāng)它接收到這個(gè)信息時(shí)棒掠,它應(yīng)該停止自己的動(dòng)作孵构,且actor自己無(wú)法阻止。
注意:在使用PersistentActor時(shí)烟很,(在調(diào)用持久化處理程序之前)actor在它處理其他需要放入暫存的消息之前颈墅,可能收到或處理PoisonPill?,造成actor提前關(guān)閉雾袱。
重放過(guò)濾器
當(dāng)多個(gè)寫入者(即多個(gè)持久化actor實(shí)例)用相同的序列號(hào)來(lái)記錄不同的消息恤筛,可能會(huì)有事件流被破壞。在這種情況下谜酒,您可以配置如何在恢復(fù)時(shí)過(guò)濾來(lái)自多個(gè)寫入者的重播消息叹俏。
在你的配置中akka.persistence.journal.xxx下妻枕。replay-filter部分(xxx是您的日志插件id)僻族,您可以從以下值中選擇回放過(guò)濾器模式:
????????repair-by-discard-old
????????fail
????????warn
????????off
快照
當(dāng)你使用actor時(shí)粘驰,你可能要注意有些actor可能會(huì)積累非常長(zhǎng)的事件日志,并經(jīng)歷較長(zhǎng)的恢復(fù)時(shí)間述么。正確的方法是分裂成一組較短的actor來(lái)大幅度減少恢復(fù)時(shí)間蝌数。
持久actor可以通過(guò)調(diào)用saveSnapshot方法來(lái)保存內(nèi)部狀態(tài)的快照。如果保存快照成功度秘,持久化actor將接收到SaveSnapshotSuccess消息顶伞,否則是SaveSnapshotFailure消息
如果沒(méi)有指定,他們默認(rèn)為SnapshotSelectionCriteria.Latest?(最新的快照)剑梳。若要禁用基于快照的恢復(fù)唆貌,應(yīng)用程序應(yīng)使用SnapshotSelectionCriteria.None。如果已保存的快照沒(méi)有匹配指定的SnapshotSelectionCriteria垢乙,恢復(fù)時(shí)將重播所有日志消息锨咙。
快照刪除
持久化actor可以通過(guò)調(diào)用deleteSnapshot方法來(lái)刪除單個(gè)快照,該方法使用快照的時(shí)間戳追逮。
如果大量刪除一個(gè)范圍內(nèi)的與SnapshotSelectionCriteria匹配的快照酪刀,持久化actor可以使用deleteSnapshots
快照狀態(tài)處理
保存和刪除快照也可以有成功或失敗,此信息通過(guò)如下表所示的狀態(tài)消息反饋給持久actor
如果失敗的消息actor沒(méi)有處理钮孵,每個(gè)傳入的失敗消息將記錄一個(gè)默認(rèn)的警告日志消息骂倘。在成功消息上沒(méi)有執(zhí)行默認(rèn)操作,但是您可以自由地處理它們巴席,例如历涝,為了刪除在內(nèi)存中快照,或者在失敗的情況下再次嘗試保存快照情妖。
至少一次傳遞
要發(fā)送至少一次發(fā)送語(yǔ)義到目的地的消息睬关,您可以在發(fā)送端對(duì)您的PersistentActor混合AtLeastOnceDelivery特性。當(dāng)它們?cè)诳膳渲贸瑫r(shí)內(nèi)未被確認(rèn)時(shí)毡证,它負(fù)責(zé)重新發(fā)送消息电爹。
發(fā)送actor的狀態(tài)包括未被接收方確認(rèn)的消息被發(fā)送的狀態(tài)必須是持久的,因此它可以在方送的actor或jvm崩潰時(shí)幸存下來(lái)料睛。AtLeastOnceDelivery?特性堅(jiān)持傳遞消息的意圖丐箩,并接收到確認(rèn)信息。
deliver方法用于將消息發(fā)送到目的地恤煞。當(dāng)目的地已回復(fù)一條確認(rèn)消息屎勘,調(diào)用confirmDelivery方法。
傳遞和確認(rèn)傳遞之間的關(guān)系
要將消息發(fā)送到目標(biāo)路徑居扒,請(qǐng)?jiān)诔志没l(fā)送消息的意圖之后使用deliver?方法概漱。
目的地的actor必須發(fā)送回一個(gè)確認(rèn)消息,當(dāng)發(fā)送方的actor收到確認(rèn)消息 應(yīng)該將消息傳送成功這個(gè)事實(shí)持久化喜喂,然后調(diào)用confirmDelivery方法
如果持久化actor當(dāng)前沒(méi)有恢復(fù)瓤摧,則deliver?方法將向目標(biāo)參與者發(fā)送消息竿裂。當(dāng)恢復(fù)時(shí),消息將被緩沖照弥,直到它們被確認(rèn)使用confirmDelivery.腻异。一旦恢復(fù)完成,在整個(gè)恢復(fù)過(guò)程中如果有未解決的消息沒(méi)有被確認(rèn)这揣,在發(fā)送任何其他消息之前悔常,持久參與者將重新發(fā)送這些消息。
傳遞要求一個(gè)deliveryIdToMessage?函數(shù)通過(guò)在消息中提供deliveryId?给赞,因此deliver?和confirmDelivery?創(chuàng)建關(guān)系是可能的机打。deliveryId?必須往返于傳遞。在接收到消息后片迅,目標(biāo)actor將把相同的deliveryId包裹在確認(rèn)消息中姐帚,并返回給發(fā)送方。發(fā)送方隨后將使用它調(diào)用confirmDelivery方法來(lái)完成傳遞程序障涯。
deliveryId是無(wú)間隙嚴(yán)格單調(diào)遞增序列號(hào)罐旗。所有目標(biāo)actor將使用相同的序列,即當(dāng)發(fā)送到多個(gè)目標(biāo)時(shí)會(huì)看到序列中的空白唯蝶。所以不可能使用自定義的deliveryId九秀。然而你可以在消息中發(fā)送自定義關(guān)聯(lián)標(biāo)識(shí)符到目的地。然后粘我,您必須保留內(nèi)部deliveryId(傳遞到deliveryIdToMessage函數(shù))和自定義關(guān)聯(lián)id之間的映射(傳遞到消息中)鼓蜒。您可以通過(guò)在Map(correlationId -> deliveryId)中存儲(chǔ)這樣的映射來(lái)實(shí)現(xiàn)這一點(diǎn),在您的消息的接收方以您的自定義相關(guān)id進(jìn)行回復(fù)之后征字,您可以從該映射中檢索到要傳遞的deliveryId都弹,并將其傳遞到confirmDelivery?方法中。
AtLeastOnceDelivery?特性有由未經(jīng)證實(shí)的消息和一個(gè)序列號(hào)組成的一個(gè)狀態(tài)匙姜,它并不存儲(chǔ)這個(gè)狀態(tài)本身畅厢。你必須持久化從你的PersistentActor調(diào)用deliver和confirmDelivery所對(duì)應(yīng)的事件,從而可以在PersistentActor的恢復(fù)階段調(diào)用相同的方法恢復(fù)狀態(tài)氮昧。有時(shí)框杜,這些事件可以從其他業(yè)務(wù)級(jí)別的事件中派生出來(lái),有時(shí)您必須創(chuàng)建單獨(dú)的事件袖肥。在恢復(fù)過(guò)程中deliver的調(diào)用不會(huì)發(fā)出消息咪辱,但如果沒(méi)有匹配的confirmDelivery執(zhí)行,它將稍后發(fā)送椎组。
getDeliverySnapshot和setDeliverySnapshot提供支持快照功能油狂。AtLeastOnceDeliverySnapshot包含完整的投遞狀態(tài),包括未經(jīng)確認(rèn)的消息。如果你需要一個(gè)自定義的快照保存actor其他部分的狀態(tài)专筷,你還必須包括AtLeastOnceDeliverySnapshot夹供,它使用protobuf序列化,即利用Akka的通用序列化機(jī)制仁堪。最簡(jiǎn)單的方法是將AtLeastOnceDeliverySnapshot中的字節(jié)作為blob包含在你自定義的快照中。
重新傳遞嘗試之間的間隔是由redeliverInterval方法定義的填渠,默認(rèn)值是由akka.persistence.at-least-once-delivery.redeliver-interval來(lái)配置弦聂,可以在實(shí)現(xiàn)類中重寫該方法來(lái)返回非默認(rèn)值。
在每次重新發(fā)送爆發(fā)時(shí)將發(fā)送的消息的最大數(shù)量由redeliveryBurstLimit方法定義(爆發(fā)的頻率是重發(fā)間隔的一半)氛什。如果有很多未確認(rèn)的消息(例如莺葫,如果目的地很長(zhǎng)一段時(shí)間都沒(méi)有),這有助于防止大量的消息同時(shí)發(fā)送枪眉。默認(rèn)值可以被akka.persistence.at-least-once-delivery.redelivery-burst-limit來(lái)配置捺檬,可以在實(shí)現(xiàn)類中重寫該方法來(lái)返回非默認(rèn)值。
在進(jìn)行n次嘗試之后贸铜,有一個(gè)AtLeastOnceDelivery.UnconfirmedWarning消息發(fā)送到self堡纬,重新發(fā)送將仍然繼續(xù),但是你可以選擇調(diào)用confirmDelivery?來(lái)取消重新發(fā)送蒿秦。傳遞的數(shù)量嘗試之前發(fā)出的警告是由warnAfterNumberOfUnconfirmedAttempts定義方法烤镐。默認(rèn)值可以被akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts可以在實(shí)現(xiàn)類中重寫該方法來(lái)返回非默認(rèn)值
AtLeastOnceDelivery?特性將消息保存在內(nèi)存中,直到確認(rèn)其成功交付棍鳖。actor能保留在內(nèi)存中的未經(jīng)確認(rèn)的消息的最大數(shù)目限制是由maxUnconfirmedMessages方法定義的炮叶。如果超過(guò)了此限制deliver方法將不會(huì)接受更多的消息,它將拋出AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException異常渡处。镜悉。可以用akka.persistence.at-least-once-delivery.max-unconfirmed-messages配置其默認(rèn)值医瘫÷乱蓿可以用實(shí)現(xiàn)類重寫該方法來(lái)返回非默認(rèn)值。
事件適配器
在長(zhǎng)期運(yùn)行的項(xiàng)目中醇份,使用事件源有時(shí)需要將數(shù)據(jù)模型完全從domain?模型中分離出來(lái)茫孔。
事件適配器在以下情況提供幫助:
Version Migrations版本遷移:
存在事件存儲(chǔ)在版本1應(yīng)該被“上拋”到版本2,這樣的過(guò)程需要實(shí)際代碼來(lái)實(shí)現(xiàn)被芳,不只是序列化層的變化缰贝。在這些場(chǎng)景下,toJournal?函數(shù)通常是特性函數(shù)畔濒,然而v1.Event=>v2.Event被formJournal實(shí)現(xiàn)剩晴,在fromJournal ?方法中執(zhí)行必要的映射。這種技術(shù)有時(shí)被稱為其他CQRS庫(kù)中的“向上轉(zhuǎn)換”。
Separating Domain and Data models?分離Domain?和Data模型
由于事件適配器赞弥,可以將domain模型與用于在日志中持久存儲(chǔ)數(shù)據(jù)的模型完全分離毅整,例如可能希望在domain模型中使用case類,但是绽左,將它們的協(xié)議緩沖區(qū)(或任何其他二進(jìn)制序列化格式)保留到日志中悼嫉。一個(gè)簡(jiǎn)單的toJournal:MyModel=>MyDataModel?和?fromJournal:MyDataModel=>MyModel適配器可以用來(lái)實(shí)現(xiàn)
Journal Specialized Data Types?日志專門的數(shù)據(jù)類型:
公開(kāi)底層日志中已知的數(shù)據(jù)類型,例如數(shù)據(jù)用JSON來(lái)存儲(chǔ)可以寫一個(gè)事件適配器toJournal:Any=>JSON拼窥。日志可以直接存儲(chǔ)json戏蔑,而不是將對(duì)象序列化為二進(jìn)制表示。
可以將多個(gè)適配器綁定到一個(gè)類以進(jìn)行恢復(fù)鲁纠,在這種情況下总棵,所有綁定適配器的fromJournal方法將應(yīng)用于給定的匹配事件(按照配置中的定義順序)。由于每個(gè)適配器可能從0返回到n個(gè)適應(yīng)事件(稱為EventSeq)每個(gè)適配器都可以對(duì)事件進(jìn)行調(diào)查改含,如果它確實(shí)需要調(diào)整情龄,它將返回相應(yīng)的事件(s)。在此修改過(guò)程中沒(méi)有任何可貢獻(xiàn)的其他適配器只返回EventSeq.empty捍壤。在重放過(guò)程中骤视,改編事件按次序傳遞給PersistentActor?。
注意
對(duì)于更高級(jí)的模式演化技術(shù)參考Persistence - Schema Evolution文檔
持久化有限狀態(tài)機(jī)(FSM)
PersistentFSM?以FSM的方式處理傳入消息鹃觉。它的內(nèi)部狀態(tài)是作為一系列變化被持久化的尚胞,后來(lái)被稱為domain事件。傳入消息帜慢、FSM狀態(tài)和轉(zhuǎn)換之間的關(guān)系笼裳,domain事件的持久性是由DSL定義的。
一個(gè)簡(jiǎn)單的例子
為了演示PersistentFSM特性的特性粱玲,請(qǐng)考慮一個(gè)代表Web store客戶的actor躬柬。我們的“WebStoreCustomerFSMActor”的合同是
AddItem?在客戶向購(gòu)物車中添加商品時(shí)發(fā)送。
Buy?當(dāng)客戶完成購(gòu)買時(shí)抽减。
Leave當(dāng)顧客離開(kāi)商店時(shí)沒(méi)有購(gòu)買任何東西允青。
GetCurrentCart?允許查詢客戶購(gòu)物車的當(dāng)前狀態(tài)。
顧客的幾種狀態(tài):
LookingAround客戶正在瀏覽網(wǎng)站卵沉,但沒(méi)有添加任何東西到購(gòu)物車
Shopping?客戶最近將商品添加到購(gòu)物車中颠锉。
Inactive?顧客購(gòu)物車?yán)镉猩唐罚罱鼪](méi)有添加任何東西史汗。
Paid?顧客已經(jīng)購(gòu)買了商品琼掠。
客戶的操作被“記錄”為連續(xù)的“domain事件”序列。為了恢復(fù)最新客戶的狀態(tài)停撞,這些事件在一個(gè)actor的開(kāi)始時(shí)重播瓷蛙。
使用snapshot-after進(jìn)行定期快照
如果你把reference.conf中的下列flag改編悼瓮,你能夠周期性的調(diào)用PersistentFSM?中的saveStateSnapshot()
akka.persistence.fsm.snapshot-after = 1000
這意味著在序列號(hào)達(dá)到1000的倍數(shù)后調(diào)用saveStateSnapshot()。
存儲(chǔ)插件
在Akka持久性擴(kuò)展中艰猬,用于日志和快照存儲(chǔ)的存儲(chǔ)備份是可插入的横堡。在Akka社區(qū)項(xiàng)目頁(yè)面上有一個(gè)持久性日志和快照存儲(chǔ)插件的目錄,請(qǐng)參閱社區(qū)插件冠桃。
當(dāng)一個(gè)持久化actor定義自己的一組插件時(shí)命贴,插件可以被“默認(rèn)”選擇為所有持久化actor,或者“單獨(dú)”
當(dāng)一個(gè)持久的actor不重寫journalPluginId和snapshotPluginId方法時(shí)食听,持久性擴(kuò)展將使用在reference.conf中配置的“默認(rèn)”日志和快照存儲(chǔ)插件胸蛛。
但是,這些條目是空的””碳蛋,并且需要在用戶application.conf中通過(guò)覆蓋來(lái)顯式的用戶配置。一個(gè)日志插件的例子省咨,把消息寫入LevelDB 肃弟,詳情請(qǐng)查看看?Local LevelDB journal.對(duì)于快照存儲(chǔ)插件的示例,該插件將快照寫入到本地文件系統(tǒng)的單個(gè)文件中零蓉,詳情請(qǐng)查看看?Local snapshot store.
應(yīng)用程序可以通過(guò)實(shí)現(xiàn)插件API并通過(guò)配置激活它們來(lái)提供自己的插件笤受。插件開(kāi)發(fā)需要以下導(dǎo)入:
急切地初始化持久化插件
默認(rèn)情況下,持久化插件根據(jù)自己的用處?kù)`活的啟動(dòng)敌蜂。急切地啟動(dòng)某個(gè)插件可能是有益的箩兽。為了做到這些,你應(yīng)該先添加akka.persistence.Persistence在akka.extensions關(guān)鍵字下章喉。在akka.persistence.journal.auto-start-journals和?akka.persistence.snapshot-store.auto-start-snapshot-stores下指定您希望自動(dòng)啟動(dòng)的插件的id汗贫。例如,如果您希望對(duì)leveldb日志插件和本地快照存儲(chǔ)插件進(jìn)行急切的初始化秸脱,您的配置應(yīng)該如下所示:
日志插件API
日志插件繼承AsyncWriteJournal
AsyncWriteJournal?是一個(gè)actor落包,實(shí)現(xiàn)方法
如果存儲(chǔ)后端API僅支持同步,阻塞寫入摊唇,方法實(shí)現(xiàn)如下咐蝇,
日志插件還必須實(shí)現(xiàn)在AsyncRecovery中定義的用于重放和序列號(hào)恢復(fù)的方法,實(shí)現(xiàn)方法
日志插件實(shí)例是一個(gè)actor巷查,因此與persist actor的請(qǐng)求相對(duì)應(yīng)的方法將按順序執(zhí)行有序。
它可以委托給異步庫(kù),或者委托給其他參與者來(lái)實(shí)現(xiàn)并行岛请。
日志插件類必須有一個(gè)帶有這些簽名的構(gòu)造函數(shù)旭寿。
????????·?一個(gè)com.typesafe.config.Config參數(shù)和一個(gè)配置路徑的String參數(shù)
????????· 一個(gè)com.typesafe.config.Config參數(shù)
? ? ? ? ·?沒(méi)有參數(shù)的構(gòu)造函數(shù)
actor系統(tǒng)配置的插件部分將在配置構(gòu)造函數(shù)參數(shù)中傳遞。插件配置路徑在String參數(shù)中傳遞崇败。
plugin-dispatcher是插件actor的分配器许师,如果沒(méi)有指定,則默認(rèn)為akka.persistence.dispatchers.default-plugin-dispatcher.
不要在系統(tǒng)默認(rèn)調(diào)度器上運(yùn)行日志任務(wù),因?yàn)檫@可能會(huì)導(dǎo)致其他任務(wù)的匱乏微渠。
快照存儲(chǔ)插件API
一個(gè)快照存儲(chǔ)插件必須擴(kuò)展SnapshotStore?actor并實(shí)現(xiàn)以下方法:一個(gè)快照存儲(chǔ)插件必須擴(kuò)展SnapshotStore actor并實(shí)現(xiàn)以下方法:實(shí)現(xiàn)方法
快照存儲(chǔ)實(shí)例是一個(gè)actor搭幻,因此與persist actor的請(qǐng)求相對(duì)應(yīng)的方法將按順序執(zhí)行。
它可以委托給異步庫(kù)逞盆,或者委托給其他參與者來(lái)實(shí)現(xiàn)并行檀蹋。
快照存儲(chǔ)插件類須有一個(gè)帶有這些簽名的構(gòu)造函數(shù)。
????????1.一個(gè)com.typesafe.config.Config參數(shù)和一個(gè)配置路徑的String參數(shù)
????????2.一個(gè)com.typesafe.config.Config參數(shù)
????????3.沒(méi)有參數(shù)的構(gòu)造函數(shù)
actor系統(tǒng)配置的插件部分將在配置構(gòu)造函數(shù)參數(shù)中傳遞云芦。插件配置路徑在String參數(shù)中傳遞俯逾。
plugin-dispatcher是插件actor的分配器,如果沒(méi)有指定舅逸,則默認(rèn)為akka.persistence.dispatchers.default-plugin-dispatcher.
不要在系統(tǒng)默認(rèn)調(diào)度器上運(yùn)行快照存儲(chǔ)任務(wù)桌肴,因?yàn)檫@可能會(huì)導(dǎo)致其他任務(wù)的匱乏。
TCK插件
為了幫助開(kāi)發(fā)人員構(gòu)建正確和高質(zhì)量的存儲(chǔ)插件琉历。我們提供了Technology Compatibility Kit(技術(shù)兼容性工具包)
TCK可以從Java和Scala項(xiàng)目中使用旷痕。要測(cè)試您的實(shí)現(xiàn)(獨(dú)立于語(yǔ)言)蹂析,您需要包括akka-persistence-tck依賴:
"com.typesafe.akka" %% "akka-persistence-tck" % "2.5.11" % "test"
要在測(cè)試套件中包含TCK測(cè)試翔始,只需擴(kuò)展所提供的JournalSpe
請(qǐng)注意斩箫,有些測(cè)試是可選的,并且通過(guò)覆蓋support…方法給TCK提供運(yùn)行測(cè)試所需的信息蝇恶。你可以使用布爾值或提供的CapabilityFlag.on?/?CapabilityFlag.off 值來(lái)實(shí)現(xiàn)這些方法
我們也提供了一個(gè)簡(jiǎn)單的基準(zhǔn)類JournalPerfSpec它包含了JournalSpec所擁有的所有測(cè)試拳魁,在打印性能統(tǒng)計(jì)數(shù)據(jù)時(shí),還可以在日志上執(zhí)行一些較長(zhǎng)的操作撮弧。雖然它不是為了提供一個(gè)合適的基準(zhǔn)測(cè)試環(huán)境潘懊,但它可以用來(lái)對(duì)您的日志在最典型的情況下的性能有一個(gè)粗略的感覺(jué)。
為了將SnapshotStore TCK測(cè)試包含在測(cè)試套件中贿衍,只需擴(kuò)展SnapshotStoreSpec:
如果你的插件需要一些設(shè)置情況下(啟動(dòng)一個(gè)模擬數(shù)據(jù)庫(kù)卦尊,刪除臨時(shí)文件等等),你可以重寫beforeAll和afterAll方法舌厨,并在測(cè)試的生命周期中加入:
我們強(qiáng)烈建議在你的測(cè)試套件包括這些規(guī)格岂却,因?yàn)樗鼈兒w了您可能已經(jīng)忘記在從頭編寫插件時(shí)要測(cè)試的范圍廣泛的案例。
預(yù)先包裝好的插件
本地levelDB日志
levelDB日志插件的配置入口:akka.persistence.journal.leveldb它將消息寫入一個(gè)本地的LevelDB實(shí)例裙椭。通過(guò)定義配置屬性啟用此插件:
基于LevelDB的插件還需要以下附加的依賴聲明:
"org.fusesource.leveldbjni" ??% "leveldbjni-all" ??% "1.8"
LevelDB文件的默認(rèn)位置是當(dāng)前工作目錄中一個(gè)名為journal的目錄躏哩。此位置可以由配置中指定的相對(duì)或絕對(duì)的路徑更改:
akka.persistence.journal.leveldb.dir = "target/journal"
用這個(gè)插件,每個(gè)actor系統(tǒng)可運(yùn)行其自己私有的LevelDB實(shí)例揉燃。
LevelDB的一個(gè)特性是扫尺,刪除操作不會(huì)移除日志中的消息,而是為每個(gè)刪除的消息添加一個(gè)“墓碑”炊汤。在大量使用日志的情況下正驻,特別是頻繁的刪除弊攘,這可能是一個(gè)問(wèn)題,因?yàn)橛脩艨赡馨l(fā)現(xiàn)自己在處理不斷增加的日志大小姑曙。為此襟交,LevelDB提供了一個(gè)特殊的journal compaction函數(shù),它通過(guò)以下配置公開(kāi)伤靠。
共享LevelDB日志
一個(gè)LevelDB實(shí)例還可以由多個(gè)actor系統(tǒng)(在相同或不同節(jié)點(diǎn)上)共享捣域。它,例如宴合,允許持久化actor進(jìn)行故障轉(zhuǎn)移到備份節(jié)點(diǎn)焕梅,并從備份節(jié)點(diǎn)繼續(xù)使用共享的日志實(shí)例。
通過(guò)實(shí)例化SharedLeveldbStore actor可以啟動(dòng)一個(gè)共享的LevelDB實(shí)例卦洽。
默認(rèn)情況下贞言,共享的實(shí)例將日志消息寫入到當(dāng)前的工作目錄中一個(gè)名為journal的本地目錄》У伲可以通過(guò)配置更改存儲(chǔ)位置:
akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
使用共享的LevelDB存儲(chǔ)的actor系統(tǒng)必須激活akka.persistence.journal.leveldb-shared插件该窗。
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
這個(gè)插件必須通過(guò)注入(遠(yuǎn)程)SharedLeveldbStore??actor引用來(lái)初始化。注入是通過(guò)以actor引用作為參數(shù)調(diào)用SharedLeveldbJournal.setStore方法完成的脂新。
內(nèi)部日志命令(由持久化actor發(fā)送的)會(huì)被緩沖直到注入完成挪捕。注入是冪等的粗梭,即只有第一次的注入被使用争便。
本地快照存儲(chǔ)區(qū)
本地快照存儲(chǔ)插件配置條目是akka.persistence.snapshot-store.local?它將快照文件寫入到本地文件系統(tǒng)中。通過(guò)定義配置屬性來(lái)啟用這個(gè)插件断医。
默認(rèn)的存儲(chǔ)位置是當(dāng)前工作目錄中一個(gè)名為snapshots?的目錄滞乙。這可以通過(guò)配置中指定的相對(duì)或絕對(duì)的路徑來(lái)更改
akka.persistence.snapshot-store.local.dir =?"target/snapshots"
默認(rèn)的存儲(chǔ)位置是當(dāng)前工作目錄中一個(gè)名為snapshots?的目錄。這可以通過(guò)配置中指定的相對(duì)或絕對(duì)的路徑來(lái)更改
持久性插件代理
持久性插件代理允許共享日志和快照存儲(chǔ)在多個(gè)actor系統(tǒng)(在相同或不同的節(jié)點(diǎn))鉴嗤。
例如斩启,這允許持久化actor將故障轉(zhuǎn)移到備份節(jié)點(diǎn),并繼續(xù)使用備份節(jié)點(diǎn)上的共享日志實(shí)例醉锅。代理的工作方式是將所有日志/快照存儲(chǔ)消息轉(zhuǎn)發(fā)到一個(gè)單獨(dú)的兔簇、共享的持久性插件實(shí)例,因此支持由代理插件支持的任何用例硬耍。
日志和快照存儲(chǔ)代理分別通過(guò)akka.persistence.journal.proxy和akka.persistence.snapshot-store.proxy配置來(lái)控制垄琐。
通過(guò)target-journal-plugin或target-snapshot-store-plugin來(lái)設(shè)置你想優(yōu)先使用的插件(例如akka.persistence.journal.leveldb)。start-target-journal和?start-target-snapshot-store關(guān)鍵字應(yīng)該在一個(gè)actor系統(tǒng)中設(shè)置成on——這是一個(gè)將實(shí)例化共享持久性插件的系統(tǒng)经柴。
接下來(lái)狸窘,需要告訴代理如何找到共享插件。這個(gè)可以通過(guò)設(shè)置?target-journal-address 和 target-snapshot-store-address?配置關(guān)鍵字來(lái)完成或者以編程方式調(diào)用PersistencePluginProxy.setTargetLocation方法
自定義序列化
快照序列化和Persistent消息的有效載荷是可以通過(guò)Akka序列化基礎(chǔ)架構(gòu)配置的坯认。例如翻擒,如果應(yīng)用程序想要序列化
????????????·有效載荷的MyPayload類型與自定義的MyPayloadSerializer
????????????·快照的類型MySnapshot與自定義的MySnapshotSerializer
在應(yīng)用程序配置中氓涣。如果未指定,則使用默認(rèn)的序列化程序陋气。
對(duì)于更高級(jí)的模式演化技術(shù)劳吠,請(qǐng)參考?Persistence - Schema Evolution文檔
測(cè)試
運(yùn)行測(cè)試時(shí)使用sbt的LevelDB默認(rèn)設(shè)置,請(qǐng)確保在你的sbt項(xiàng)目中設(shè)置fork := true恩伺,否則你將看到一個(gè)UnsatisfiedLinkError赴背。或者晶渠,你可以切換到一個(gè)LevelDB Java 端口凰荚,通過(guò)這樣的設(shè)置
akka.persistence.journal.leveldb.native = off
或
akka.persistence.journal.leveldb-shared.store.native = off
在Akka配置中。LevelDB 的Java端口僅用于測(cè)試目的褒脯。
還要注意便瑟,對(duì)于LevelDB Java端口,您需要以下依賴項(xiàng):
"org.iq80.leveldb" ???????????% "leveldb" ?????????% "0.9" ?????????% "test"
配置
持久化模塊有幾個(gè)配置屬性番川,請(qǐng)參考?reference configuration.
多個(gè)持久性插件配置
默認(rèn)情況下到涂,持久化actor將使用“默認(rèn)”的日志和快照存儲(chǔ)插件,這些插件配置在reference.conf配置資源中颁督,
當(dāng)持久參與者重寫了journalPluginId和snapshotPluginId方法時(shí)践啄,該actor將由這些特定的持久性插件而不是默認(rèn)值服務(wù):
請(qǐng)注意,journalPluginId和snapshotPluginId必須引用正確配置的引用沉御。帶有標(biāo)準(zhǔn)類屬性的conf插件條目以及特定于這些插件的設(shè)置屿讽,也就是: