spring batch簡介
Spring Batch架構(gòu)介紹
- Spring Batch核心概念介紹
- chunk 處理流程
spring batch簡介
spring batch是spring提供的一個(gè)數(shù)據(jù)處理框架黄刚。企業(yè)域中的許多應(yīng)用程序需要批量處理才能在關(guān)鍵任務(wù)環(huán)境中執(zhí)行業(yè)務(wù)操作鲸匿。這些業(yè)務(wù)運(yùn)營包括:
- 自動(dòng)化缝龄、復(fù)雜地處理大量信息畦浓,無需用戶交互即可最高效地處理這些信息。這些操作通常包括基于時(shí)間的事件(例如月末計(jì)算今膊、通知或通信)审编。
- 定期應(yīng)用在非常大的數(shù)據(jù)集上重復(fù)處理的復(fù)雜業(yè)務(wù)規(guī)則(例如拗胜,保險(xiǎn)福利確定或費(fèi)率調(diào)整)
- 將從內(nèi)部和外部系統(tǒng)接收的信息集成到記錄系統(tǒng)中,這些信息通常需要以事務(wù)方式進(jìn)行格式化、驗(yàn)證和處理柑潦。對(duì)于企業(yè)來說享言,批處理每天用于處理數(shù)十億筆交易
Spring Batch是一個(gè)輕量級(jí)、全面的批處理框架渗鬼,旨在開發(fā)對(duì)企業(yè)系統(tǒng)日常操作至關(guān)重要的健壯批處理應(yīng)用程序览露。Spring批處理基于人們所期望的Spring框架的特性(生產(chǎn)力、基于POJO的開發(fā)方法和總體易用性)譬胎,同時(shí)使開發(fā)人員能夠在必要時(shí)輕松訪問和利用更高級(jí)的企業(yè)服務(wù)差牛。Spring Batch不是一個(gè)調(diào)度框架。在商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度器(如Quartz堰乔、Tivoli偏化、Control-M等)。它旨在與調(diào)度器一起工作镐侯,而不是替換調(diào)度器侦讨。
Spring Batch提供了在處理大量記錄時(shí)必不可少的可重用功能,包括日志/跟蹤苟翻、事務(wù)管理韵卤、作業(yè)處理統(tǒng)計(jì)、作業(yè)重啟崇猫、跳過和資源管理沈条。它還提供了更高級(jí)的技術(shù)服務(wù)和功能,通過優(yōu)化和分區(qū)技術(shù)實(shí)現(xiàn)了超大容量和高性能的批處理作業(yè)诅炉。Spring Batch既可以用于簡單的用例(如將文件讀入數(shù)據(jù)庫或運(yùn)行存儲(chǔ)過程)蜡歹,也可以用于復(fù)雜的高容量用例(如在數(shù)據(jù)庫之間移動(dòng)大量數(shù)據(jù)、轉(zhuǎn)換數(shù)據(jù)等)汞扎。大容量批處理作業(yè)可以以高度可擴(kuò)展的方式利用該框架來處理大量信息季稳。
Spring Batch架構(gòu)介紹
Spring Batch的設(shè)計(jì)考慮到了可擴(kuò)展性和不同的最終用戶群體擅这。下圖顯示了支持最終用戶開發(fā)人員的可擴(kuò)展性和易用性的分層體系結(jié)構(gòu)澈魄。
此分層體系結(jié)構(gòu)突出了三個(gè)主要的高層組件:應(yīng)用程序、核心和基礎(chǔ)架構(gòu)仲翎。該應(yīng)用程序包含所有批處理作業(yè)和開發(fā)人員使用Spring batch編寫的自定義代碼痹扇。批處理核心包含啟動(dòng)和控制批處理作業(yè)所需的核心運(yùn)行時(shí)類。它包括JobLauncher溯香、Job和Step的實(shí)現(xiàn)鲫构。應(yīng)用程序和核心都構(gòu)建在公共基礎(chǔ)架構(gòu)之上。此基礎(chǔ)結(jié)構(gòu)包含公共讀寫器和服務(wù)(如RetryTemplate)玫坛,應(yīng)用程序開發(fā)人員(讀寫器结笨,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的庫)都使用這些服務(wù)。
一般批量原則和指南
構(gòu)建批處理解決方案時(shí)炕吸,應(yīng)考慮以下關(guān)鍵原則伐憾、指導(dǎo)原則和一般注意事項(xiàng)。
- 請(qǐng)記住赫模,批處理體系結(jié)構(gòu)通常會(huì)影響在線體系結(jié)構(gòu)树肃,反之亦然。設(shè)計(jì)時(shí)考慮到架構(gòu)和環(huán)境瀑罗,盡可能使用公共構(gòu)建塊胸嘴。
- 盡可能簡化,避免在單批處理應(yīng)用程序中構(gòu)建復(fù)雜的邏輯結(jié)構(gòu)
- 時(shí)數(shù)據(jù)的處理和存儲(chǔ)在物理上緊密相連(換句話說斩祭,將數(shù)據(jù)保存在處理的位置)
- 最小化系統(tǒng)資源使用劣像,尤其是I/O。在內(nèi)存中執(zhí)行盡可能多的操作摧玫。
- 檢查應(yīng)用程序I/O(分析SQL語句)驾讲,以確保避免不必要的物理I/O。尤其需要查找以下四個(gè)常見缺陷:
- 當(dāng)數(shù)據(jù)可以讀取一次并緩存或保存在工作存儲(chǔ)器中時(shí)席赂,為每個(gè)事務(wù)讀取數(shù)據(jù)吮铭。
- 重新讀取同一事務(wù)中較早讀取數(shù)據(jù)的事務(wù)的數(shù)據(jù)
- 導(dǎo)致不必要的表或索引掃描
- 未在SQL語句的WHERE子句中指定鍵值。
- 不要在批處理運(yùn)行中執(zhí)行兩次操作颅停。例如谓晌,如果出于報(bào)告目的需要數(shù)據(jù)摘要,則應(yīng)(如果可能)在最初處理數(shù)據(jù)時(shí)增加存儲(chǔ)的總計(jì)癞揉,以便報(bào)告應(yīng)用程序不必重新處理相同的數(shù)據(jù)
- 在批處理應(yīng)用程序開始時(shí)分配足夠的內(nèi)存纸肉,以避免在處理過程中進(jìn)行耗時(shí)的重新分配。
- 始終假設(shè)數(shù)據(jù)完整性最差喊熟。插入足夠的檢查和記錄驗(yàn)證以保持?jǐn)?shù)據(jù)完整性柏肪。
- 盡可能實(shí)現(xiàn)內(nèi)部驗(yàn)證的校驗(yàn)和。例如芥牌,平面文件應(yīng)該有一個(gè)拖車記錄烦味,告訴文件中記錄的總數(shù)和關(guān)鍵字段的聚合。
- 在具有真實(shí)數(shù)據(jù)量的生產(chǎn)環(huán)境中盡早計(jì)劃和執(zhí)行壓力測(cè)試壁拉。
- 在大型批處理系統(tǒng)中谬俄,備份可能很有挑戰(zhàn)性,特別是如果系統(tǒng)24-7天與在線并行運(yùn)行弃理。數(shù)據(jù)庫備份通常在聯(lián)機(jī)設(shè)計(jì)中得到很好的處理溃论,但文件備份也應(yīng)被視為同樣重要。如果系統(tǒng)依賴于平面文件痘昌,則文件備份程序不僅應(yīng)到位并記錄在案钥勋,而且還應(yīng)定期測(cè)試炬转。
批處理策略
為了幫助設(shè)計(jì)和實(shí)現(xiàn)批處理系統(tǒng),應(yīng)以示例結(jié)構(gòu)圖和代碼外殼的形式向設(shè)計(jì)者和程序員提供基本的批處理應(yīng)用程序構(gòu)建塊和模式算灸。開始設(shè)計(jì)批處理作業(yè)時(shí)返吻,應(yīng)將業(yè)務(wù)邏輯分解為一系列步驟,這些步驟可以使用以下標(biāo)準(zhǔn)構(gòu)建塊來實(shí)現(xiàn):
- 轉(zhuǎn)換應(yīng)用程序:對(duì)于外部系統(tǒng)提供或生成的每種類型的文件乎婿,必須創(chuàng)建轉(zhuǎn)換應(yīng)用程序,以將提供的交易記錄轉(zhuǎn)換為處理所需的標(biāo)準(zhǔn)格式森逮。這種類型的批處理應(yīng)用程序可以部分或全部由翻譯實(shí)用程序模塊組成(請(qǐng)參見基本批處理服務(wù))疑俭。
- 驗(yàn)證應(yīng)用程序:驗(yàn)證應(yīng)用程序確保所有輸入/輸出記錄正確且一致懒浮。驗(yàn)證通常基于文件頭和尾部吓蘑、校驗(yàn)和和驗(yàn)證算法以及記錄級(jí)交叉檢查琳猫。
- 提取應(yīng)用程序:從數(shù)據(jù)庫或輸入文件中讀取一組記錄、根據(jù)預(yù)定義規(guī)則選擇記錄并將記錄寫入輸出文件的應(yīng)用程序私痹。
- 提取/更新應(yīng)用程序:從數(shù)據(jù)庫或輸入文件中讀取記錄脐嫂,并根據(jù)每個(gè)輸入記錄中的數(shù)據(jù)對(duì)數(shù)據(jù)庫或輸出文件進(jìn)行更改的應(yīng)用程序。
- 處理和更新應(yīng)用程序:對(duì)提取或驗(yàn)證應(yīng)用程序中的輸入事務(wù)執(zhí)行處理的應(yīng)用程序紊遵。處理通常涉及讀取數(shù)據(jù)庫以獲取處理所需的數(shù)據(jù)账千,可能會(huì)更新數(shù)據(jù)庫并創(chuàng)建用于輸出處理的記錄。
- 輸出/格式化應(yīng)用程序:讀取輸入文件暗膜、根據(jù)標(biāo)準(zhǔn)格式重新構(gòu)造此記錄中的數(shù)據(jù)并生成輸出文件以便打印或傳輸?shù)狡渌绦蚧蛳到y(tǒng)的應(yīng)用程序蕊爵。
此外,應(yīng)該為無法使用前面提到的構(gòu)建塊構(gòu)建的業(yè)務(wù)邏輯提供基本的應(yīng)用程序外殼桦山。
除了主要構(gòu)建塊之外攒射,每個(gè)應(yīng)用程序還可以使用一個(gè)或多個(gè)標(biāo)準(zhǔn)實(shí)用程序步驟,例如:
- 排序:一種讀取輸入文件并生成輸出文件的程序恒水,其中記錄已根據(jù)記錄中的排序鍵字段重新排序会放。排序通常由標(biāo)準(zhǔn)系統(tǒng)實(shí)用程序執(zhí)行。
- 拆分:一種程序钉凌,它讀取單個(gè)輸入文件咧最,并根據(jù)字段值將每條記錄寫入多個(gè)輸出文件中的一個(gè)。拆分可以由參數(shù)驅(qū)動(dòng)的標(biāo)準(zhǔn)系統(tǒng)實(shí)用程序定制或執(zhí)行御雕。
- 合并:一種程序矢沿,從多個(gè)輸入文件中讀取記錄,并用輸入文件中的組合數(shù)據(jù)生成一個(gè)輸出文件酸纲。合并可以由參數(shù)驅(qū)動(dòng)的標(biāo)準(zhǔn)系統(tǒng)實(shí)用程序定制或執(zhí)行捣鲸。
批處理應(yīng)用程序還可以按其輸入源進(jìn)行分類:
- 數(shù)據(jù)庫驅(qū)動(dòng)的應(yīng)用程序由從數(shù)據(jù)庫檢索的行或值驅(qū)動(dòng)。
- 文件驅(qū)動(dòng)的應(yīng)用程序由從文件檢索的記錄或值驅(qū)動(dòng)闽坡。
- 消息驅(qū)動(dòng)的應(yīng)用程序由從消息隊(duì)列檢索的消息驅(qū)動(dòng)栽惶。
任何批處理系統(tǒng)的基礎(chǔ)都是處理策略愁溜。影響策略選擇的因素包括:估計(jì)的批處理系統(tǒng)容量、與在線系統(tǒng)或其他批處理系統(tǒng)的并發(fā)性外厂、可用的批處理窗口冕象。(請(qǐng)注意,隨著越來越多的企業(yè)希望全天候運(yùn)營汁蝶,清晰的批處理窗口正在消失)渐扮。
批處理的典型處理選項(xiàng)有(按實(shí)現(xiàn)復(fù)雜性的增加順序):
- 脫機(jī)模式下批處理窗口期間的正常處理。
- 并行批處理或在線處理掖棉。
- 同時(shí)并行處理多個(gè)不同的批處理運(yùn)行或作業(yè)墓律。
- 分區(qū)(同時(shí)處理同一作業(yè)的多個(gè)實(shí)例)。
- 上述選項(xiàng)的組合啊片。
商業(yè)調(diào)度器可能支持部分或全部這些選項(xiàng)只锻。
后續(xù)將更詳細(xì)地討論這些處理選項(xiàng)。需要注意的是紫谷,根據(jù)經(jīng)驗(yàn)齐饮,批處理采用的提交和鎖定策略取決于執(zhí)行的處理類型,在線鎖定策略也應(yīng)使用相同的原則笤昨。因此祖驱,在設(shè)計(jì)總體體系結(jié)構(gòu)時(shí),批處理體系結(jié)構(gòu)不能只是事后諸葛亮瞒窒。
鎖定策略可以是僅使用普通數(shù)據(jù)庫鎖捺僻,或者在體系結(jié)構(gòu)中實(shí)現(xiàn)額外的自定義鎖定服務(wù)。鎖定服務(wù)將跟蹤數(shù)據(jù)庫鎖定(例如崇裁,通過在專用db表中存儲(chǔ)必要的信息)匕坯,并向請(qǐng)求db操作的應(yīng)用程序授予或拒絕權(quán)限。此體系結(jié)構(gòu)還可以實(shí)現(xiàn)重試邏輯拔稳,以避免在出現(xiàn)鎖定情況時(shí)中止批處理作業(yè)葛峻。
1、在單獨(dú)的批處理窗口中運(yùn)行的簡單批處理在批處理窗口中的正常處理巴比,在線用戶或其他批處理不需要更新數(shù)據(jù)术奖,并發(fā)性不是問題,可以在批處理運(yùn)行結(jié)束時(shí)進(jìn)行一次提交轻绞。
在大多數(shù)情況下采记,更穩(wěn)健的方法更合適。請(qǐng)記住政勃,批處理系統(tǒng)在復(fù)雜性和處理的數(shù)據(jù)量方面都有隨時(shí)間增長的趨勢(shì)唧龄。如果沒有鎖定策略,并且系統(tǒng)仍然依賴于單個(gè)提交點(diǎn)稼病,那么修改批處理程序可能會(huì)很痛苦选侨。因此掖鱼,即使是最簡單的批處理系統(tǒng)然走,也要考慮重新啟動(dòng)恢復(fù)選項(xiàng)的提交邏輯需要援制,以及本節(jié)后面描述的更復(fù)雜情況的相關(guān)信息。
2芍瑞、并發(fā)批處理或在線處理批處理應(yīng)用程序處理可由在線用戶同時(shí)更新的數(shù)據(jù)時(shí)晨仑,不應(yīng)鎖定在線用戶可能需要的任何數(shù)據(jù)(數(shù)據(jù)庫或文件中的數(shù)據(jù))超過幾秒鐘。此外拆檬,更新應(yīng)該在每幾次事務(wù)結(jié)束時(shí)提交到數(shù)據(jù)庫洪己。這可以最大限度地減少其他進(jìn)程無法使用的數(shù)據(jù)部分以及數(shù)據(jù)不可用所花費(fèi)的時(shí)間。
另一種最小化物理鎖定的方法是使用樂觀鎖定模式或悲觀鎖定模式實(shí)現(xiàn)邏輯行級(jí)鎖定竟贯。
樂觀鎖定假設(shè)記錄爭用的可能性很低答捕。它通常意味著在批處理和在線處理同時(shí)使用的每個(gè)數(shù)據(jù)庫表中插入一個(gè)時(shí)間戳列。當(dāng)應(yīng)用程序獲取一行進(jìn)行處理時(shí)屑那,它還獲取時(shí)間戳拱镐。當(dāng)應(yīng)用程序嘗試更新已處理的行時(shí),更新將使用WHERE子句中的原始時(shí)間戳持际。如果時(shí)間戳匹配沃琅,則更新數(shù)據(jù)和時(shí)間戳。如果時(shí)間戳不匹配蜘欲,則表示另一個(gè)應(yīng)用程序已在提取和更新嘗試之間更新了同一行益眉。因此,無法執(zhí)行更新姥份。
悲觀鎖定是任何一種鎖定策略郭脂,它假設(shè)記錄爭用的可能性很高,因此需要在檢索時(shí)獲取物理或邏輯鎖澈歉。一種悲觀邏輯鎖定使用數(shù)據(jù)庫表中的專用鎖列展鸡。當(dāng)應(yīng)用程序檢索要更新的行時(shí),它會(huì)在lock列中設(shè)置一個(gè)標(biāo)志闷祥。使用該標(biāo)志后娱颊,其他嘗試從邏輯上檢索同一行的應(yīng)用程序?qū)⑹ TO(shè)置標(biāo)志的應(yīng)用程序更新行時(shí)凯砍,也會(huì)清除標(biāo)志箱硕,使其他應(yīng)用程序能夠檢索該行。請(qǐng)注意悟衩,在初始提取和標(biāo)志設(shè)置之間剧罩,也必須保持?jǐn)?shù)據(jù)的完整性,例如使用db鎖(如SELECT for UPDATE)座泳。還要注意的是惠昔,這種方法與物理鎖定具有相同的缺點(diǎn)幕与,不同的是,如果用戶在鎖定記錄的同時(shí)去吃午飯镇防,那么構(gòu)建一種超時(shí)機(jī)制來釋放鎖會(huì)更容易管理啦鸣。
這些模式不一定適合批處理,但可以用于并行批處理和在線處理(例如来氧,在數(shù)據(jù)庫不支持行級(jí)鎖定的情況下)诫给。一般來說,樂觀鎖定更適合于在線應(yīng)用程序啦扬,而悲觀鎖定更適合于批處理應(yīng)用程序中狂。每當(dāng)使用邏輯鎖定時(shí),所有訪問受邏輯鎖定保護(hù)的數(shù)據(jù)實(shí)體的應(yīng)用程序都必須使用相同的方案扑毡。
請(qǐng)注意胃榕,這兩種解決方案都只解決鎖定單個(gè)記錄的問題。通常瞄摊,我們可能需要鎖定一組邏輯相關(guān)的記錄勋又。對(duì)于物理鎖,您必須非常小心地管理這些鎖泉褐,以避免潛在的死鎖赐写。對(duì)于邏輯鎖,通常最好構(gòu)建一個(gè)邏輯鎖管理器膜赃,該管理器能夠理解要保護(hù)的邏輯記錄組挺邀,并且能夠確保鎖是一致的和非死鎖的。這個(gè)邏輯鎖管理器通常使用自己的表來管理鎖跳座、爭用報(bào)告端铛、超時(shí)機(jī)制和其他問題。
3疲眷、并行處理并行處理允許多個(gè)批處理運(yùn)行或作業(yè)并行運(yùn)行禾蚕,以最大限度地減少批處理所用的總時(shí)間。只要作業(yè)不共享相同的文件狂丝、db表或索引空間换淆,這就不是問題。如果這樣做几颜,則應(yīng)使用分區(qū)數(shù)據(jù)實(shí)現(xiàn)此服務(wù)倍试。另一種選擇是通過使用控制表構(gòu)建一個(gè)體系結(jié)構(gòu)模塊來維護(hù)相互依賴性〉翱蓿控制表應(yīng)包含每個(gè)共享資源的一行县习,以及應(yīng)用程序是否正在使用該資源。然后,批處理體系結(jié)構(gòu)或并行作業(yè)中的應(yīng)用程序?qū)脑摫碇袡z索信息躁愿,以確定它是否可以訪問所需的資源叛本。
如果數(shù)據(jù)訪問沒有問題,那么可以通過使用額外的線程進(jìn)行并行處理來實(shí)現(xiàn)并行處理彤钟。在大型機(jī)環(huán)境中来候,傳統(tǒng)上使用并行作業(yè)類,以確保所有進(jìn)程都有足夠的CPU時(shí)間样勃。無論如何吠勘,解決方案必須足夠健壯性芬,以確保所有正在運(yùn)行的進(jìn)程的時(shí)間片峡眶。
并行處理中的其他關(guān)鍵問題包括負(fù)載平衡和通用系統(tǒng)資源(如文件、數(shù)據(jù)庫緩沖池等)的可用性植锉。還要注意辫樱,控制表本身很容易成為關(guān)鍵資源。
4俊庇、分區(qū)使用分區(qū)的分區(qū)允許多個(gè)版本的大批量應(yīng)用程序同時(shí)運(yùn)行狮暑。這樣做的目的是減少處理長批量作業(yè)所需的運(yùn)行時(shí)間』员ィ可以成功分區(qū)的進(jìn)程是那些可以拆分輸入文件和/或?qū)χ鲾?shù)據(jù)庫表進(jìn)行分區(qū)以允許應(yīng)用程序針對(duì)不同數(shù)據(jù)集運(yùn)行的進(jìn)程搬男。
此外,分區(qū)的進(jìn)程必須設(shè)計(jì)為僅處理其分配的數(shù)據(jù)集彭沼。分區(qū)體系結(jié)構(gòu)必須與數(shù)據(jù)庫設(shè)計(jì)和數(shù)據(jù)庫分區(qū)策略密切相關(guān)缔逛。注意,數(shù)據(jù)庫分區(qū)并不一定意味著數(shù)據(jù)庫的物理分區(qū)姓惑,盡管在大多數(shù)情況下這是可取的褐奴。下圖說明了分區(qū)方法:
圖2: 分區(qū)進(jìn)程
該體系結(jié)構(gòu)應(yīng)該足夠靈活,以允許動(dòng)態(tài)配置分區(qū)的數(shù)量于毙。應(yīng)考慮自動(dòng)和用戶控制的配置敦冬。自動(dòng)配置可能基于輸入文件大小和輸入記錄數(shù)等參數(shù)键思。
4.1 分區(qū)方法必須根據(jù)具體情況選擇分區(qū)方法梦染。下面的列表描述了一些可能的分區(qū)方法:
1、記錄集的固定和均勻分解
這涉及到將輸入記錄集分成偶數(shù)個(gè)部分(例如尉尾,10個(gè)介蛉,其中每個(gè)部分正好占整個(gè)記錄集的十分之一)萌庆。然后,批處理/提取應(yīng)用程序的一個(gè)實(shí)例處理每個(gè)部分甘耿。
為了使用這種方法踊兜,需要進(jìn)行預(yù)處理來拆分記錄集。此拆分的結(jié)果將是一個(gè)下限和上限放置編號(hào)佳恬,可以將其用作批處理/提取應(yīng)用程序的輸入捏境,以便將其處理限制為僅處理其部分于游。
預(yù)處理可能會(huì)帶來很大的開銷,因?yàn)樗仨氂?jì)算并確定記錄集每個(gè)部分的邊界垫言。
2贰剥、按鍵列拆分
這涉及到按鍵列(如位置代碼)分解輸入記錄集,并將每個(gè)鍵的數(shù)據(jù)分配給批處理實(shí)例筷频。為了實(shí)現(xiàn)這一點(diǎn)蚌成,列值可以是:
通過分區(qū)表分配給批處理實(shí)例(本節(jié)稍后將介紹)。
通過部分值(例如0000-0999凛捏、1000-1999等)分配給批次實(shí)例担忧。
在選項(xiàng)1下,添加新值意味著手動(dòng)重新配置批處理/提取坯癣,以確保將新值添加到特定實(shí)例瓶盛。
在選項(xiàng)2下,這確保通過批處理作業(yè)的實(shí)例覆蓋所有值示罗。但是惩猫,一個(gè)實(shí)例處理的值的數(shù)量取決于列值的分布(0000-0999范圍內(nèi)可能有大量位置,1000-1999范圍內(nèi)可能很少)蚜点。在此選項(xiàng)下轧房,數(shù)據(jù)范圍的設(shè)計(jì)應(yīng)考慮分區(qū)。
在這兩種選擇下绍绘,無法實(shí)現(xiàn)記錄到批實(shí)例的最佳均勻分布奶镶。使用的批處理實(shí)例數(shù)沒有動(dòng)態(tài)配置。
3脯倒、按視圖拆分
這種方法基本上由一個(gè)鍵列分解实辑,但在數(shù)據(jù)庫級(jí)別。它涉及到將記錄集分解為視圖藻丢。批處理應(yīng)用程序的每個(gè)實(shí)例在處理過程中都使用這些視圖剪撬。分解是通過對(duì)數(shù)據(jù)分組來完成的。
使用此選項(xiàng)悠反,必須將批處理應(yīng)用程序的每個(gè)實(shí)例配置為命中特定視圖(而不是主表)残黑。此外,隨著新數(shù)據(jù)值的添加斋否,必須將這組新數(shù)據(jù)包含到視圖中梨水。沒有動(dòng)態(tài)配置功能,因?yàn)閷?shí)例數(shù)量的變化會(huì)導(dǎo)致視圖的變化茵臭。
4疫诽、增加處理指標(biāo)
這涉及到在輸入表中添加一個(gè)新列,作為指示器。作為預(yù)處理步驟奇徒,所有指標(biāo)都標(biāo)記為未處理雏亚。在批處理應(yīng)用程序的記錄獲取階段,在將記錄標(biāo)記為未處理的情況下讀取記錄摩钙,一旦讀劝盏汀(帶鎖),記錄將標(biāo)記為正在處理胖笛。該記錄完成后网持,指示器將更新為“完成”或“錯(cuò)誤”。批處理應(yīng)用程序的許多實(shí)例可以在不進(jìn)行更改的情況下啟動(dòng)长踊,因?yàn)楦郊恿写_保記錄只處理一次功舀。“完成后之斯,指標(biāo)標(biāo)記為完成”的一兩句話)
使用此選項(xiàng)日杈,表上的I/O將動(dòng)態(tài)增加。在更新批處理應(yīng)用程序的情況下佑刷,這種影響會(huì)減少,因?yàn)闊o論如何都必須進(jìn)行寫入酿炸。
5瘫絮、將表格提取到平面文件
這涉及到將表提取到文件中。然后填硕,可以將該文件拆分為多個(gè)段麦萤,并將其用作批處理實(shí)例的輸入。
使用此選項(xiàng)扁眯,將表提取到文件中并將其拆分的額外開銷可能會(huì)抵消多重分區(qū)的影響壮莹。可以通過更改文件拆分腳本來實(shí)現(xiàn)動(dòng)態(tài)配置姻檀。
6命满、哈希列的使用
此方案涉及在用于檢索驅(qū)動(dòng)程序記錄的數(shù)據(jù)庫表中添加哈希列(鍵/索引)。此哈希列有一個(gè)指示器绣版,用于確定批處理應(yīng)用程序的哪個(gè)實(shí)例處理此特定行胶台。例如,如果有三個(gè)批處理實(shí)例要啟動(dòng)杂抽,則“A”的指示器標(biāo)記實(shí)例1處理的行诈唬,“B”的指示器標(biāo)記實(shí)例2處理的行,“C”的指示器標(biāo)記實(shí)例3處理的行缩麸。
然后铸磅,用于檢索記錄的過程將有一個(gè)額外的WHERE子句來選擇由特定指示符標(biāo)記的所有行。此表中的插入將涉及添加標(biāo)記字段,該字段將默認(rèn)為其中一個(gè)實(shí)例(如“A”)阅仔。
將使用一個(gè)簡單的批處理應(yīng)用程序來更新指標(biāo)济竹,例如在不同實(shí)例之間重新分配負(fù)載。當(dāng)添加了足夠多的新行時(shí)霎槐,可以運(yùn)行該批處理(在批處理窗口中除外的任何時(shí)間)送浊,以將新行重新分發(fā)給其他實(shí)例。
批處理應(yīng)用程序的其他實(shí)例只需要按照前面段落中的描述運(yùn)行批處理應(yīng)用程序丘跌,以重新分配指示器以使用新數(shù)量的實(shí)例袭景。
4.2 數(shù)據(jù)庫和應(yīng)用程序設(shè)計(jì)原則
支持使用鍵列方法對(duì)分區(qū)數(shù)據(jù)庫表運(yùn)行的多分區(qū)應(yīng)用程序的體系結(jié)構(gòu)應(yīng)包括一個(gè)用于存儲(chǔ)分區(qū)參數(shù)的中央分區(qū)存儲(chǔ)庫。這提供了靈活性并確保了可維護(hù)性闭树。存儲(chǔ)庫通常由一個(gè)表組成耸棒,稱為分區(qū)表。
存儲(chǔ)在分區(qū)表中的信息是靜態(tài)的报辱,通常應(yīng)由DBA維護(hù)与殃。該表應(yīng)包含多分區(qū)應(yīng)用程序每個(gè)分區(qū)的一行信息。該表應(yīng)包含以下列:程序ID代碼碍现、分區(qū)號(hào)(分區(qū)的邏輯ID)幅疼、該分區(qū)的db key列的低值和該分區(qū)的db key列的高值。
在程序啟動(dòng)時(shí)昼接,應(yīng)將程序id和分區(qū)號(hào)從體系結(jié)構(gòu)傳遞給應(yīng)用程序(特別是從控制處理微線程)爽篷。如果使用鍵列方法,則這些變量用于讀取分區(qū)表慢睡,以確定應(yīng)用程序要處理的數(shù)據(jù)范圍逐工。此外,在整個(gè)處理過程中漂辐,必須使用分區(qū)號(hào)來:
- 添加到輸出文件/數(shù)據(jù)庫更新泪喊,以便合并進(jìn)程正常工作。
- 將正常處理報(bào)告給批處理日志髓涯,并將任何錯(cuò)誤報(bào)告給架構(gòu)錯(cuò)誤處理程序袒啼。
4.3最小化死鎖
當(dāng)應(yīng)用程序并行運(yùn)行或分區(qū)時(shí),可能會(huì)發(fā)生數(shù)據(jù)庫資源爭用和死鎖复凳。作為數(shù)據(jù)庫設(shè)計(jì)的一部分瘤泪,數(shù)據(jù)庫設(shè)計(jì)團(tuán)隊(duì)必須盡可能消除潛在的爭用情況,這一點(diǎn)至關(guān)重要育八。
此外对途,開發(fā)人員必須確保在設(shè)計(jì)數(shù)據(jù)庫索引表時(shí)考慮到死鎖預(yù)防和性能。
死鎖或熱點(diǎn)通常出現(xiàn)在管理或體系結(jié)構(gòu)表中髓棋,如日志表实檀、控制表和鎖表惶洲。這些問題的影響也應(yīng)考慮在內(nèi)。現(xiàn)實(shí)的壓力測(cè)試對(duì)于確定體系結(jié)構(gòu)中可能的瓶頸至關(guān)重要膳犹。
為了最大限度地減少?zèng)_突對(duì)數(shù)據(jù)的影響恬吕,體系結(jié)構(gòu)應(yīng)該提供服務(wù),例如連接到數(shù)據(jù)庫或遇到死鎖時(shí)的等待和重試間隔须床。這意味著一種內(nèi)置機(jī)制铐料,可以對(duì)某些數(shù)據(jù)庫返回代碼做出反應(yīng),而不是立即發(fā)出錯(cuò)誤豺旬,而是等待預(yù)定的時(shí)間并重試數(shù)據(jù)庫操作
4.4參數(shù)傳遞與驗(yàn)證
分區(qū)體系結(jié)構(gòu)應(yīng)該對(duì)應(yīng)用程序開發(fā)人員相對(duì)透明钠惩。體系結(jié)構(gòu)應(yīng)執(zhí)行與以分區(qū)模式運(yùn)行應(yīng)用程序相關(guān)的所有任務(wù),包括:
- 在應(yīng)用程序啟動(dòng)之前檢索分區(qū)參數(shù)族阅。
- 在應(yīng)用程序啟動(dòng)之前驗(yàn)證分區(qū)參數(shù)篓跛。
- 在啟動(dòng)時(shí)將參數(shù)傳遞給應(yīng)用程序。
驗(yàn)證應(yīng)包括檢查坦刀,以確保:
- 應(yīng)用程序有足夠的分區(qū)來覆蓋整個(gè)數(shù)據(jù)范圍愧沟。
- 分區(qū)之間沒有間隙。
如果數(shù)據(jù)庫已分區(qū)鲤遥,則可能需要進(jìn)行一些額外的驗(yàn)證沐寺,以確保單個(gè)分區(qū)不跨數(shù)據(jù)庫分區(qū)。
此外渴频,架構(gòu)還應(yīng)考慮分區(qū)的整合芽丹。關(guān)鍵問題包括:
- 在進(jìn)入下一個(gè)作業(yè)步驟之前,必須完成所有分區(qū)嗎卜朗?
- 如果其中一個(gè)分區(qū)中止,會(huì)發(fā)生什么情況
典型過程
一個(gè)典型的批處理應(yīng)用程序大致如下:
- 從數(shù)據(jù)庫咕村,文件或隊(duì)列中讀取大量記錄场钉。
- 以某種方式處理數(shù)據(jù)。
- 以修改之后的形式寫回?cái)?shù)據(jù)懈涛。
其對(duì)應(yīng)的示意圖如下:
spring batch的一個(gè)總體的架構(gòu)如下:
Figure 2.1: Batch Stereotypes
上圖突出顯示了組成Spring Batch領(lǐng)域語言的關(guān)鍵概念逛万。一個(gè)作業(yè)有一到多個(gè)步驟,每個(gè)步驟正好有一個(gè)ItemReader批钠、一個(gè)ItemProcessor和一個(gè)ItemWriter宇植。需要啟動(dòng)作業(yè)(使用JobLauncher),并且需要存儲(chǔ)有關(guān)當(dāng)前正在運(yùn)行的流程的元數(shù)據(jù)(在JobRepository中)
Spring Batch核心概念介紹
下面是一些概念是Spring batch框架中的核心概念埋心。
什么是Job
Job和Step是spring batch執(zhí)行批處理任務(wù)最為核心的兩個(gè)概念指郁。
其中Job是一個(gè)封裝整個(gè)批處理過程的一個(gè)概念。Job在spring batch的體系當(dāng)中只是一個(gè)最頂層的一個(gè)抽象概念拷呆,體現(xiàn)在代碼當(dāng)中則它只是一個(gè)最上層的接口闲坎,其代碼如下:
/**
* Batch domain object representing a job. Job is an explicit abstraction
* representing the configuration of a job specified by a developer. It should
* be noted that restart policy is applied to the job as a whole and not to a
* step.
*/
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
在Job這個(gè)接口當(dāng)中定義了五個(gè)方法疫粥,它的實(shí)現(xiàn)類主要有兩種類型的job,一個(gè)是simplejob腰懂,另一個(gè)是flowjob梗逮。在spring batch當(dāng)中,job是最頂層的抽象绣溜,除job之外我們還有JobInstance以及JobExecution這兩個(gè)更加底層的抽象慷彤。
一個(gè)job是我們運(yùn)行的基本單位,它內(nèi)部由step組成怖喻。job本質(zhì)上可以看成step的一個(gè)容器底哗。一個(gè)job可以按照指定的邏輯順序組合step,并提供了我們給所有step設(shè)置相同屬性的方法罢防,例如一些事件監(jiān)聽艘虎,跳過策略。
Spring Batch以SimpleJob類的形式提供了Job接口的默認(rèn)簡單實(shí)現(xiàn)咒吐,它在Job之上創(chuàng)建了一些標(biāo)準(zhǔn)功能野建。一個(gè)使用java config的例子代碼如下:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
這個(gè)配置的意思是:首先給這個(gè)job起了一個(gè)名字叫footballJob,接著指定了這個(gè)job的三個(gè)step恬叹,他們分別由方法候生,playerLoad,gameLoad, playerSummarization實(shí)現(xiàn)。
什么是JobInstance
我們?cè)谏衔囊呀?jīng)提到了JobInstance绽昼,他是Job的更加底層的一個(gè)抽象唯鸭,他的定義如下:
public interface JobInstance {
/**
* Get unique id for this JobInstance.
* @return instance id
*/
public long getInstanceId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
}
他的方法很簡單,一個(gè)是返回Job的id硅确,另一個(gè)是返回Job的名字目溉。
JobInstance指的是job運(yùn)行當(dāng)中,作業(yè)執(zhí)行過程當(dāng)中的概念菱农。Instance本就是實(shí)例的意思缭付。
比如說現(xiàn)在有一個(gè)批處理的job,它的功能是在一天結(jié)束時(shí)執(zhí)行行一次循未。我們假定這個(gè)批處理job的名字為'EndOfDay'陷猫。在這個(gè)情況下,那么每天就會(huì)有一個(gè)邏輯意義上的JobInstance, 而我們必須記錄job的每次運(yùn)行的情況的妖。
JobParameters
在上文當(dāng)中我們提到了绣檬,同一個(gè)job每天運(yùn)行一次的話,那么每天都有一個(gè)jobIntsance嫂粟,但他們的job定義都是一樣的娇未,那么我們?cè)趺磥韰^(qū)別一個(gè)job的不同jobinstance了。不妨先做個(gè)猜想赋元,雖然jobinstance的job定義一樣忘蟹,但是他們有的東西就不一樣飒房,例如運(yùn)行時(shí)間。
spring batch中提供的用來標(biāo)識(shí)一個(gè)jobinstance的東西是:JobParameters媚值。JobParameters對(duì)象包含一組用于啟動(dòng)批處理作業(yè)的參數(shù)狠毯,它可以在運(yùn)行期間用于識(shí)別或甚至用作參考數(shù)據(jù)。我們假設(shè)的運(yùn)行時(shí)間褥芒,就可以作為一個(gè)JobParameters嚼松。
例如, 我們前面的'EndOfDay'的job現(xiàn)在已經(jīng)有了兩個(gè)實(shí)例,一個(gè)產(chǎn)生于1月1日锰扶,另一個(gè)產(chǎn)生于1月2日献酗,那么我們就可以定義兩個(gè)JobParameter對(duì)象:一個(gè)的參數(shù)是01-01, 另一個(gè)的參數(shù)是01-02。因此坷牛,識(shí)別一個(gè)JobInstance的方法可以定義為:
JobExecution
JobExecution指的是單次嘗試運(yùn)行一個(gè)我們定義好的Job的代碼層面的概念罕偎。job的一次執(zhí)行可能以失敗也可能成功京闰。只有當(dāng)執(zhí)行成功完成時(shí)颜及,給定的與執(zhí)行相對(duì)應(yīng)的JobInstance才也被視為完成。
JobExecution的接口定義如下:
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
每一個(gè)方法的注釋已經(jīng)解釋的很清楚蹂楣,這里不再多做解釋俏站。只提一下BatchStatus,JobExecution當(dāng)中提供了一個(gè)方法getBatchStatus用于獲取一個(gè)job某一次特地執(zhí)行的一個(gè)狀態(tài)痊土。BatchStatus是一個(gè)代表job狀態(tài)的枚舉類肄扎,其定義如下:
public enum BatchStatus {STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED }
“Job”定義了什么是作業(yè)以及如何執(zhí)行作業(yè),“JobInstance”是一個(gè)純粹的組織對(duì)象赁酝,用于將執(zhí)行分組在一起犯祠,主要是為了實(shí)現(xiàn)正確的重啟語義。然而酌呆,“JobExecution”是運(yùn)行期間實(shí)際發(fā)生情況的主要存儲(chǔ)機(jī)制雷则,它包含更多必須控制和持久化的屬性,如下表所示:
Property | Definition |
---|---|
Status | A BatchStatus object that indicates the status of the execution. While running, it is BatchStatus#STARTED . If it fails, it is BatchStatus#FAILED . If it finishes successfully, it is BatchStatus#COMPLETED
|
startTime | A java.util.Date representing the current system time when the execution was started. This field is empty if the job has yet to start. |
endTime | A java.util.Date representing the current system time when the execution finished, regardless of whether or not it was successful. The field is empty if the job has yet to finish. |
exitStatus | The ExitStatus , indicating the result of the run. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. The field is empty if the job has yet to finish. |
createTime | A java.util.Date representing the current system time when the JobExecution was first persisted. The job may not have been started yet (and thus has no start time), but it always has a createTime, which is required by the framework for managing job level ExecutionContexts . |
lastUpdated | A java.util.Date representing the last time a JobExecution was persisted. This field is empty if the job has yet to start. |
executionContext | The "property bag" containing any user data that needs to be persisted between executions. |
failureExceptions | The list of exceptions encountered during the execution of a Job . These can be useful if more than one exception is encountered during the failure of a Job . |
These properties are important because they are persisted and can be used to completely determine the status of an execution. For example, if the EndOfDay job for 01-01 is executed at 9:00 PM and fails at 9:30, the following entries are made in the batch metadata tables:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2017-01-01 | TRUE |
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
Column names may have been abbreviated or removed for the sake of clarity and formatting. | |
---|---|
Now that the job has failed, assume that it took the entire night for the problem to be determined, so that the 'batch window' is now closed. Further assuming that the window starts at 9:00 PM, the job is kicked off again for 01-01, starting where it left off and completing successfully at 9:30. Because it is now the next day, the 01-02 job must be run as well, and it is kicked off just afterwards at 9:31 and completes in its normal one hour time at 10:30. There is no requirement that one JobInstance
be kicked off after another, unless there is potential for the two jobs to attempt to access the same data, causing issues with locking at the database level. It is entirely up to the scheduler to determine when a Job
should be run. Since they are separate JobInstances
, Spring Batch makes no attempt to stop them from being run concurrently. (Attempting to run the same JobInstance
while another is already running results in a JobExecutionAlreadyRunningException
being thrown). There should now be an extra entry in both the JobInstance
and JobParameters
tables and two extra entries in the JobExecution
table, as shown in the following tables:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
2 | EndOfDayJob |
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
2 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
3 | DATE | schedule.Date | 2017-01-02 00:00:00 | TRUE |
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2017-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2017-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
Step
每一個(gè)Step對(duì)象都封裝了批處理作業(yè)的一個(gè)獨(dú)立的階段肪笋。事實(shí)上,每一個(gè)Job本質(zhì)上都是由一個(gè)或多個(gè)步驟組成度迂。每一個(gè)step包含定義和控制實(shí)際批處理所需的所有信息藤乙。任何特定的內(nèi)容都由編寫Job的開發(fā)人員自行決定。一個(gè)step可以非常簡單也可以非常復(fù)雜惭墓。例如坛梁,一個(gè)step的功能是將文件中的數(shù)據(jù)加載到數(shù)據(jù)庫中,那么基于現(xiàn)在spring batch的支持則幾乎不需要寫代碼腊凶。更復(fù)雜的step可能具有復(fù)雜的業(yè)務(wù)邏輯划咐,這些邏輯作為處理的一部分拴念。與Job一樣,Step具有與JobExecution類似的StepExecution褐缠,如下圖所示:
StepExecution
StepExecution表示一次執(zhí)行Step, 每次運(yùn)行一個(gè)Step時(shí)都會(huì)創(chuàng)建一個(gè)新的StepExecution政鼠,類似于JobExecution。但是队魏,某個(gè)步驟可能由于其之前的步驟失敗而無法執(zhí)行公般。且僅當(dāng)Step實(shí)際啟動(dòng)時(shí)才會(huì)創(chuàng)建StepExecution。
一次step執(zhí)行的實(shí)例由StepExecution類的對(duì)象表示胡桨。每個(gè)StepExecution都包含對(duì)其相應(yīng)步驟的引用以及JobExecution和事務(wù)相關(guān)的數(shù)據(jù)官帘,例如提交和回滾計(jì)數(shù)以及開始和結(jié)束時(shí)間。此外昧谊,每個(gè)步驟執(zhí)行都包含一個(gè)ExecutionContext刽虹,其中包含開發(fā)人員需要在批處理運(yùn)行中保留的任何數(shù)據(jù)
StepExecution
:
Property | Definition |
---|---|
Status | A BatchStatus object that indicates the status of the execution. While running, the status is BatchStatus.STARTED . If it fails, the status is BatchStatus.FAILED . If it finishes successfully, the status is BatchStatus.COMPLETED . |
startTime | A java.util.Date representing the current system time when the execution was started. This field is empty if the step has yet to start. |
endTime | A java.util.Date representing the current system time when the execution finished, regardless of whether or not it was successful. This field is empty if the step has yet to exit. |
exitStatus | The ExitStatus indicating the result of the execution. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. This field is empty if the job has yet to exit. |
executionContext | The "property bag" containing any user data that needs to be persisted between executions. |
readCount | The number of items that have been successfully read. |
writeCount | The number of items that have been successfully written. |
commitCount | The number of transactions that have been committed for this execution. |
rollbackCount | The number of times the business transaction controlled by the Step has been rolled back. |
readSkipCount | The number of times read has failed, resulting in a skipped item. |
processSkipCount | The number of times process has failed, resulting in a skipped item. |
filterCount | The number of items that have been 'filtered' by the ItemProcessor . |
writeSkipCount | The number of times write has failed, resulting in a skipped item. |
ExecutionContext
ExecutionContext即每一個(gè)StepExecution
的執(zhí)行環(huán)境。它包含一系列的鍵值對(duì)呢诬。我們可以用如下代碼獲取ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
JobRepository
JobRepository是一個(gè)用于將上述job涌哲,step等概念進(jìn)行持久化的一個(gè)類。它同時(shí)給Job和Step以及下文會(huì)提到的JobLauncher實(shí)現(xiàn)提供CRUD操作馅巷。首次啟動(dòng)Job時(shí)膛虫,將從repository中獲取JobExecution,并且在執(zhí)行批處理的過程中钓猬,StepExecution和JobExecution將被存儲(chǔ)到repository當(dāng)中稍刀。
@EnableBatchProcessing注解可以為JobRepository提供自動(dòng)配置。
JobLauncher
JobLauncher這個(gè)接口的功能非常簡單敞曹,它是用于啟動(dòng)指定了JobParameters的Job账月,為什么這里要強(qiáng)調(diào)指定了JobParameter,原因其實(shí)我們?cè)谇懊嬉呀?jīng)提到了澳迫,jobparameter和job一起才能組成一次job的執(zhí)行局齿。下面是代碼實(shí)例:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面run方法實(shí)現(xiàn)的功能是根據(jù)傳入的job以及jobparamaters從JobRepository獲取一個(gè)JobExecution并執(zhí)行Job。
Item Reader
ItemReader是一個(gè)讀數(shù)據(jù)的抽象橄登,它的功能是為每一個(gè)Step提供數(shù)據(jù)輸入抓歼。當(dāng)ItemReader以及讀完所有數(shù)據(jù)時(shí),它會(huì)返回null來告訴后續(xù)操作數(shù)據(jù)已經(jīng)讀完拢锹。Spring Batch為ItemReader提供了非常多的有用的實(shí)現(xiàn)類谣妻,比如JdbcPagingItemReader,
JdbcCursorItemReader等等卒稳。
ItemReader支持的讀入的數(shù)據(jù)源也是非常豐富的蹋半,包括各種類型的數(shù)據(jù)庫,文件充坑,數(shù)據(jù)流减江,等等染突。幾乎涵蓋了我們的所有場景。
下面是一個(gè)JdbcPagingItemReader 的例子代碼:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
下面是一個(gè)JdbcCursorItemReader的例子代碼:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
Item Writer
既然ItemReader是讀數(shù)據(jù)的一個(gè)抽象辈灼,那么ItemWriter自然就是一個(gè)寫數(shù)據(jù)的抽象份企,它是為每一個(gè)step提供數(shù)據(jù)寫出的功能。寫的單位是可以配置的茵休,我們可以一次寫一條數(shù)據(jù)薪棒,也可以一次寫一個(gè)chunk的數(shù)據(jù),關(guān)于chunk下文會(huì)有專門的介紹榕莺。ItemWriter對(duì)于讀入的數(shù)據(jù)是不能做任何操作的俐芯。
Spring Batch為ItemWriter也提供了非常多的有用的實(shí)現(xiàn)類,當(dāng)然我們也可以去實(shí)現(xiàn)自己的writer功能钉鸯。
Item Processor
ItemProcessor對(duì)項(xiàng)目的業(yè)務(wù)邏輯處理的一個(gè)抽象, 當(dāng)ItemReader讀取到一條記錄之后吧史,ItemWriter還未寫入這條記錄之前,I我們可以借助temProcessor提供一個(gè)處理業(yè)務(wù)邏輯的功能唠雕,并對(duì)數(shù)據(jù)進(jìn)行相應(yīng)操作贸营。如果我們?cè)贗temProcessor發(fā)現(xiàn)一條數(shù)據(jù)不應(yīng)該被寫入,可以通過返回null來表示岩睁。ItemProcessor和ItemReader以及ItemWriter可以非常好的結(jié)合在一起工作钞脂,他們之間的數(shù)據(jù)傳輸也非常方便。我們直接使用即可捕儒。
chunk
spring batch提供了讓我們按照chunk處理數(shù)據(jù)的能力冰啃,一個(gè)chunk的示意圖如下:
它的意思就和圖示的一樣,由于我們一次batch的任務(wù)可能會(huì)有很多的數(shù)據(jù)讀寫操作刘莹,因此一條一條的處理并向數(shù)據(jù)庫提交的話效率不會(huì)很高阎毅,因此spring batch提供了chunk這個(gè)概念,我們可以設(shè)定一個(gè)chunk size点弯,spring batch 將一條一條處理數(shù)據(jù)扇调,但不提交到數(shù)據(jù)庫,只有當(dāng)處理的數(shù)據(jù)數(shù)量達(dá)到chunk size設(shè)定的值得時(shí)候抢肛,才一起去commit.
java的實(shí)例定義代碼如下:
/**
* Note the JobRepository is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return this.jobBuilderFactory.get("sampleJob")
.repository(jobRepository)
.start(sampleStep)
.build();
}
/**
* Note the TransactionManager is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
return this.stepBuilderFactory.get("sampleStep")
.transactionManager(transactionManager)
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在上面這個(gè)step里面狼钮,chunk size被設(shè)為了10,當(dāng)ItemReader讀的數(shù)據(jù)數(shù)量達(dá)到10的時(shí)候捡絮,這一批次的數(shù)據(jù)就一起被傳到itemWriter燃领,同時(shí)transaction被提交。
skip策略和失敗處理
一個(gè)batch的job的step锦援,可能會(huì)處理非常大數(shù)量的數(shù)據(jù),難免會(huì)遇到出錯(cuò)的情況剥悟,出錯(cuò)的情況雖出現(xiàn)的概率較小灵寺,但是我們不得不考慮這些情況曼库,因?yàn)槲覀冏鰯?shù)據(jù)遷移最重要的是要保證數(shù)據(jù)的最終一致性。spring batch當(dāng)然也考慮到了這種情況略板,并且為我們提供了相關(guān)的技術(shù)支持毁枯,請(qǐng)看如下bean的配置:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
我們需要留意這三個(gè)方法,分別是skipLimit(),skip(),noSkip(),
skipLimit方法的意思是我們可以設(shè)定一個(gè)我們?cè)试S的這個(gè)step可以跳過的異常數(shù)量叮称,假如我們?cè)O(shè)定為10种玛,則當(dāng)這個(gè)step運(yùn)行時(shí)愈诚,只要出現(xiàn)的異常數(shù)目不超過10印蔬,整個(gè)step都不會(huì)fail钮惠。注意浙于,若不設(shè)定skipLimit衰琐,則其默認(rèn)值是0.
skip方法我們可以指定我們可以跳過的異常瘾敢,因?yàn)橛行┊惓5某霈F(xiàn)偷崩,我們是可以忽略的脱吱。
noSkip方法的意思則是指出現(xiàn)這個(gè)異常我們不想跳過谴古,也就是從skip的所以exception當(dāng)中排除這個(gè)exception质涛,從上面的例子來說,也就是跳過所有除FileNotFoundException的exception掰担。那么對(duì)于這個(gè)step來說汇陆,F(xiàn)ileNotFoundException就是一個(gè)fatal的exception,拋出這個(gè)exception的時(shí)候step就會(huì)直接fail
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布带饱!