快速了解組件-spring batch(2)之helloworld

tags: springbatch


1.引言

前面《數(shù)據(jù)批處理神器-Spring Batch(1)簡介及使用場景》已經(jīng)介紹了Spring Batch是一個輕量級翔悠,完善的批處理框架峡迷,它使用起來簡單,方便票编,比較適合有點編程基礎(chǔ)(特別是使用Spring及SpringBoot框架)的開發(fā)人員,針對業(yè)務(wù)編程咽白,只需要關(guān)心具體的業(yè)務(wù)實現(xiàn)即可闺魏,把流程以及流程的控制交給Spring Batch就好。常言道"talk is cheap, show me the code",下面我們就通過一個簡單的hello world蜕径,進(jìn)入Spring Batch的世界怪蔑,通過這個示例,可以快速了解開發(fā)批處理的流程和Spring Batch開發(fā)用到的組件丧荐,為后續(xù)的操作打下基礎(chǔ)。

2.開發(fā)環(huán)境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發(fā)IDE: IDEA
  • 構(gòu)建工具M(jìn)aven: 3.3.9
  • 日志組件logback:1.2.3
  • lombok:1.18.6

3.helloworld開發(fā)

3.1 helloworld說明

本helloworld實現(xiàn)一個非常簡單的功能喧枷,就是從數(shù)據(jù)組中讀取字符串虹统,把字符串轉(zhuǎn)為大寫,然后輸出到控制臺隧甚。如圖:

字符串讀寫

整個過程就是一個批重任務(wù)(Job)车荔,它只有一個步驟(Job Step),步驟里分為三個階段戚扳,讀數(shù)據(jù)(ItemReader)忧便、處理數(shù)據(jù)(ItemProcessor)、寫數(shù)據(jù)(ItemWriter)帽借。

3.2 開發(fā)流程

開發(fā)的主要代碼如下:

主要代碼

總體來說就是珠增,通過ReaderProcessor砍艾、Writer完成任務(wù)蒂教,結(jié)束后通過Listener進(jìn)行監(jiān)聽,整個任務(wù)通過配置(BatchConfig)進(jìn)行配置脆荷。

3.2.1 創(chuàng)建Spring Boot工程

直接使用Idea生成或在使用Spring Initializr生成即可凝垛,此處不詳細(xì)說明。也可以直接使用我的代碼示例蜓谋。當(dāng)前使用的Spring Boot版本是2.1.4.RELEASE

3.2.2 添加相關(guān)依賴

  • Spring Batch依賴
    在使用spring-boot-starter-parent的情況下梦皮,直接添加以下依賴即可:
<!-- 批處理框架-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

引用后,會引用兩個jar包桃焕,一個是spring-batch-infrastructure剑肯,一個是spring-batch-core,版本是4.1.2.RELEASE覆旭。分別對應(yīng)的是基礎(chǔ)框架層和核心層退子。

  • 添加內(nèi)存數(shù)據(jù)庫H2
    Spring Batch是需要數(shù)據(jù)庫來存儲任務(wù)的基本信息以及運行狀態(tài)的,本例中不需要操作數(shù)據(jù)庫邏輯型将,直接使用內(nèi)存數(shù)據(jù)庫H2即可寂祥。添加以下依賴:
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>
  • 添加測試及工具類依賴
    為了簡化開發(fā),使用lombok進(jìn)行處理七兜。使用Spring Boot進(jìn)行單元測試丸凭,添加依賴如下:
<!-- 工具包:lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

3.2.3 開發(fā)讀數(shù)據(jù)組件ItemReader

添加完依賴后,就可以進(jìn)入業(yè)務(wù)邏輯編程了。按Spring Batch的批處理流程惜犀,讀數(shù)據(jù)ItemReader是第一步铛碑,當(dāng)前示例中,我們的任務(wù)是從數(shù)組中讀取數(shù)據(jù)虽界。ItemReader是一個接口汽烦,開發(fā)人員直接實現(xiàn)此接口即可。此接口定義了核心方法read()莉御,負(fù)責(zé)從給定的資源中讀取可用的數(shù)據(jù)撇吞。具體實現(xiàn)如下:

@Slf4j
public class StringReader implements ItemReader<String> {
    private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
    private int count = 0;
    @Override
    public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
        if(count < messages.length){
            String message = messages[count++];
            log.debug(LogConstants.LOG_TAG + "read data:"+message);
            return message;
        }else{
            log.debug(LogConstants.LOG_TAG + "read data end.");
            count = 0;
        }
        return null;
    }
}

說明:

  • (1)StringReader實現(xiàn)ItemReader接口;
  • (2)messages是數(shù)據(jù)源礁叔;
  • (3)count表示讀取數(shù)據(jù)的下標(biāo)牍颈,每讀一次,下標(biāo)自增琅关,讀取完后返回null表示結(jié)束煮岁。同時把count置為0,以方便下次讀取涣易。
  • (4)日志輸出使用的是logback画机,結(jié)合lombok的@Slf4j注解,直接可使用log進(jìn)行輸出新症,簡化操作色罚。

3.2.4 開發(fā)處理數(shù)據(jù)組件ItemProcessor

讀取數(shù)據(jù)后,返回的數(shù)據(jù)會流到ItemProcessor進(jìn)行處理账劲。同樣戳护,ItemProcessor是一個接口,要實現(xiàn)自己的處理邏輯瀑焦,實現(xiàn)此接口即可腌且。當(dāng)然,如果沒有ItemProcessor榛瓮,讀到的數(shù)據(jù)直接就到ItemWriter流程也是可以的铺董。此處,Spring Batch有一個Chunk的概念禀晓,用于多次讀精续,直到chunk指定的數(shù)量后,再統(tǒng)一給到processor和writer粹懒,以提高效率重付。本示例對于ItemProcessor的實現(xiàn)很簡單,即把字符串轉(zhuǎn)為大寫凫乖。如下:

@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
    @Autowired
    private ConsoleService consoleService;
    @Override
    public String process(String data) {
        String dataProcessed = consoleService.convert2UpperCase(data);
        log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
        return dataProcessed;
    }
}

說明:

  • 實現(xiàn)ItemProcessor接口确垫,它有兩個泛型弓颈,分別是I和O,I是讀階段獲取的數(shù)據(jù)删掀,O是提交給寫階段的數(shù)據(jù)翔冀。
  • 使用ConsoleService服務(wù),對數(shù)據(jù)進(jìn)行大寫轉(zhuǎn)換披泪,里面的實現(xiàn)直接使用字符串的toUpperCase()方法

3.2.5 開發(fā)寫數(shù)據(jù)組件ItemWriter

數(shù)據(jù)處理完后纤子,會統(tǒng)一交給寫組件(ItemWriter)進(jìn)行寫入。ItemWriter也是一個接口款票,核心方法是write方法计福,參數(shù)是數(shù)組。要實現(xiàn)自己的邏輯徽职,實現(xiàn)此接口即可。本示例中佩厚,直接把數(shù)據(jù)輸出到日志中即可姆钉。如下:

@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
    @Override
    public void write(List<? extends String> list) {
        for (String msg :list) {
            log.debug(LogConstants.LOG_TAG + "write data: "+msg);
        }
    }
}

3.2.6 開發(fā)任務(wù)完成后的監(jiān)聽器JobExecutionListener

數(shù)據(jù)寫入到目標(biāo)后,任務(wù)即結(jié)束抄瓦,但有時候我們還需要在任務(wù)結(jié)束時去做一些其它工作潮瓶,如清理數(shù)據(jù),更新時間等钙姊,則需要在任務(wù)完成后進(jìn)行邏輯處理毯辅。Spring Batch對于任務(wù)或步驟開始和結(jié)束都會提供監(jiān)聽,以便于開發(fā)人員實現(xiàn)監(jiān)聽邏輯煞额。如通過繼承JobExecutionListenerSupport思恐,可以實現(xiàn)beforeJobafterJob的監(jiān)聽,以實現(xiàn)開始任務(wù)前和結(jié)束任務(wù)后的處理膊毁。當(dāng)前示例中胀莹,僅輸出任務(wù)完成的日志。如下:

@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
            log.debug("console batch job complete!");
        }
    }
}

3.2.7 配置完整任務(wù)

經(jīng)過上面的讀婚温、處理描焰、寫、任務(wù)完成后監(jiān)聽的操作栅螟,現(xiàn)在需要把它們組裝在一起荆秦,形成一個完成的任務(wù),使用Spring Boot力图,簡單的使用幾個配置即可完成任務(wù)的組裝步绸。任務(wù)及其相關(guān)組件的關(guān)系如下:

組件關(guān)系

創(chuàng)建配置文件ConsoleBatchConfig.java,具體代碼如下:

@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
                .end().build();
    }

    @Bean
    public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor
            ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName).listener(commonStepListener)
                .<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
                .writer(consoleWriter).build();
    }

    @Bean
    public ItemReader stringReader(){return new StringReader();}

    @Bean
    public ItemWriter consoleWriter(){return new ConsoleWriter();}

    @Bean
    public ItemProcessor convertProcessor(){return new ConvertProcessor();}

    @Bean
    public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}

說明:

  • 添加注解@Configuration及@EnableBatchProcessing吃媒,標(biāo)識為配置及啟用Spring Batch的配置(可以直接使用JobBuilderFactoryStepBuilderFactory分別用于創(chuàng)建Job和Step)靡努。
  • 創(chuàng)建ItemReader坪圾、ItemWriterItemProcessor惑朦、Listener對應(yīng)的Bean兽泄,以供Step及Job的注入。
  • 使用stepBuilderFactory創(chuàng)建作業(yè)Step漾月,其中chunk進(jìn)行面向塊的處理病梢,即多次讀取后再寫入,提高效率梁肿。當(dāng)前配置是3個為一個chunk蜓陌。
  • 使用jobBuilderFactory添加step,創(chuàng)建任務(wù)吩蔑。
  • 注意step和Job都需要有對應(yīng)的名稱(get方法確定)钮热,此處直接使用方法名作為Job和Step的名稱。

3.2.8 測試批處理

經(jīng)過上面的步驟烛芬,已經(jīng)完成Job的開發(fā)隧期,測試則可使用兩種方式,一個是編寫Controller赘娄,以接口調(diào)用的方式運行job仆潮,一種編寫單元測試。

  • Job的運行
    通過JobLauncherrun方法來運行任務(wù)遣臼,run方法參數(shù)分別是JobjobParameters性置,即已配置的Job及job運行的參數(shù)。每個任務(wù)的區(qū)分是通過任務(wù)名(jobName)和任務(wù)參數(shù)(jobParameters)作為區(qū)別的揍堰,即如果jobNamejobParameters相同鹏浅,Spring Batch會認(rèn)為是同一任務(wù),若任務(wù)已運行成功屏歹,同一任務(wù)不會再運行篡石。因此,一般來說西采,不同的任務(wù)凰萨,我們的jobParameters可以直接以時間作為參數(shù),以便于區(qū)別械馆。生成jobParameters胖眷。代碼如下:
JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
  • 編寫單元測試
    編寫ConsoleJobTest,加載job霹崎,運行測試珊搀,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job consoleJob;

    public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //構(gòu)建參數(shù)
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        //執(zhí)行任務(wù)
        JobExecution run = jobLauncher.run(consoleJob, jobParameters);
        ExitStatus exitStatus = run.getExitStatus();
        log.debug(exitStatus.toString());
    }
}

說明:引入SpringBootTest注解時,需要把Spring Batch任務(wù)也引入進(jìn)來尾菇。

  • 執(zhí)行結(jié)果輸出
    執(zhí)行結(jié)果如下圖所示:

    執(zhí)行結(jié)果

    從輸出可知境析,由于設(shè)置的chunk是3囚枪,讀取3個數(shù)據(jù)后,就統(tǒng)一給ItemProcessor進(jìn)行大寫轉(zhuǎn)換處理劳淆,然后統(tǒng)一交給ItemWriter進(jìn)行寫入链沼。執(zhí)行完成后,Job的exitCode表示任務(wù)執(zhí)行的狀態(tài)沛鸵,如果正常則為COMPLETED括勺,失敗則為FAILED

4.總結(jié)

經(jīng)過以上的操作步驟曲掰,即可完成批處理操作疾捍。關(guān)于任務(wù)的狀態(tài),流程的步驟(讀栏妖、處理乱豆、寫)均交給Spring Batch來完成,開發(fā)人員所做的工作是根據(jù)自己的業(yè)務(wù)邏輯編寫具體的讀數(shù)據(jù)吊趾、處理數(shù)據(jù)和寫數(shù)據(jù)即可宛裕。希望通過本文,大家可以對Spring Batch的組件有清晰的了解趾徽。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市翰守,隨后出現(xiàn)的幾起案子孵奶,更是在濱河造成了極大的恐慌,老刑警劉巖蜡峰,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件了袁,死亡現(xiàn)場離奇詭異,居然都是意外死亡湿颅,警方通過查閱死者的電腦和手機(jī)载绿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來油航,“玉大人崭庸,你說我怎么就攤上這事∫昵簦” “怎么了怕享?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長镰踏。 經(jīng)常有香客問我函筋,道長,這世上最難降的妖魔是什么奠伪? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任跌帐,我火速辦了婚禮首懈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谨敛。我一直安慰自己究履,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布佣盒。 她就那樣靜靜地躺著挎袜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪肥惭。 梳的紋絲不亂的頭發(fā)上盯仪,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機(jī)與錄音蜜葱,去河邊找鬼全景。 笑死,一個胖子當(dāng)著我的面吹牛牵囤,可吹牛的內(nèi)容都是我干的爸黄。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼揭鳞,長吁一口氣:“原來是場噩夢啊……” “哼炕贵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起野崇,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤称开,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后乓梨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鳖轰,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年扶镀,在試婚紗的時候發(fā)現(xiàn)自己被綠了蕴侣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡臭觉,死狀恐怖昆雀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蝠筑,我是刑警寧澤忆肾,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站菱肖,受9級特大地震影響客冈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜稳强,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一场仲、第九天 我趴在偏房一處隱蔽的房頂上張望和悦。 院中可真熱鬧,春花似錦渠缕、人聲如沸鸽素。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽馍忽。三九已至,卻和暖如春燕差,著一層夾襖步出監(jiān)牢的瞬間遭笋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工徒探, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留瓦呼,地道東北人。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓测暗,卻偏偏與公主長得像央串,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子碗啄,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容