spring batch的java config實踐

背景

在后臺服務(wù)開發(fā)中, 經(jīng)常要用到多線程技術(shù)進(jìn)行加速執(zhí)行, 每家公司都有內(nèi)部多線程的框架, 這些框架不是文檔不規(guī)范, 就是只能適用特定場景.
基于這些原因, spring batch帶來了更易用, 性能更好的解決方案.

基本概念

JobRepository

job倉庫, 提供了JobLauncher, Job, Setp的CRUD實現(xiàn)

JobLauncher

job的啟動器, 可以傳入job所需參數(shù)

Job

一個任務(wù)概念, 可以包含多個step, 且對step的執(zhí)行順序進(jìn)行編排

Step

具體步驟, 基本包含reader, writer, reader后可選processor, 或者使用tesklet

下面用一個圖來說明他們之間的關(guān)系

spring-batch-reference-model.png

概念還是挺簡單的, 就是框架有點(diǎn)復(fù)雜, 用起來坑不少

實踐代碼

我這里使用java config形式使用spring batch, 需要額外注意的是, 所有帶有@Bean的方法名不要重復(fù)

  1. build.gradle
buildscript {
    ext {
        springBootVersion = '1.5.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'war'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-batch')
}

這里引用了spring boot starter batch, 只是為了解決jar包依賴問題, 實際使用時沒有使用spring boot.

創(chuàng)建任務(wù)使用的obj, TestObj.java

public class TestObj {
    private String id;
    private int index;
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public int getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }
    
}

主邏輯BatchConfiguration.java

@EnableBatchProcessing
public class BatchConfiguration {
    
    Object lock = new Object();
    
    Logger logger = Logger.getRootLogger();
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.afterPropertiesSet();

        return stepBuilderFactory.get("step1")
            .<TestObj, TestObj> chunk(10)
            .reader(new ItemReader<TestObj>() {
                private List<TestObj> list = null;
    
                @Override
                public synchronized TestObj read() throws Exception {
                    if (list == null) {
                        list = new ArrayList<TestObj>();
                        for (int i = 0; i < 10000; i++) {
                            TestObj obj = new TestObj();
                            obj.setId(UUID.randomUUID().toString());
                            obj.setIndex(i);
                            list.add(obj);
                        }
                        System.out.println("----------------"+list.size());
                    }
                    if (!this.list.isEmpty()) {
                        TestObj t = this.list.remove(0);
                        logger.info("step1==========read data:" + t.getIndex()));
                        return t;
                    }
                    return null;
                }
            })
            .processor(new ItemProcessor<TestObj, TestObj>() {
                public TestObj process(TestObj item) {
                    logger.debug("step1==============process: " + item.getIndex());
                    return item;
                }
            })
            .writer(new ItemWriter<TestObj>() {
                @Override
                public void write(List<? extends TestObj> items) throws Exception {
                    logger.debug("step1=============write batch start: " + items.size());
                    for (TestObj item : items) {
                        logger.debug("step1=============write: " + item.getIndex());
                    }
                    logger.info("step1=============write batch end: " + items.size());
                }
            })
            .taskExecutor(taskExecutor)
            .build();
    }
    
    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2").tasklet(new Tasklet() {
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
                logger.debug("step2========================Tasklet");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Job job1(Step step1, Step step2) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).next(step2).build();
    }

}

最后是啟動類Main.java

public class Main {

    public static void main(String[] args) {
        Logger logger = Logger.getRootLogger();
        
        logger.setLevel(Level.INFO);
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        ctx.register(BatchConfiguration.class);
        ctx.refresh();
        JobLauncher  jobLauncher = ctx.getBean(JobLauncher.class);
        Job job = (Job)ctx.getBean("job1");
        try {
            jobLauncher.run(job, new JobParameters());
            logger.debug("------job1 finished");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中需要注意以下幾點(diǎn)

  1. @EnableBatchProcessing 會默認(rèn)給出一些基礎(chǔ)配置
  • JobRepository - bean name "jobRepository"
  • JobLauncher - bean name "jobLauncher"
  • JobRegistry - bean name "jobRegistry"
  • PlatformTransactionManager - bean name "transactionManager"
  • JobBuilderFactory - bean name "jobBuilders"
    StepBuilderFactory - bean name "stepBuilders"
  1. 此處使用了多線程進(jìn)行執(zhí)行任務(wù), 其中taskExecutor.setAllowCoreThreadTimeOut(true);表示當(dāng)沒有任務(wù)時(默認(rèn)為60s), 線程池中線程會自動銷毀
  2. 自定義的ItemReader實現(xiàn)類中的read()方法需要加synchronized(多線程環(huán)境一定要加), 在官方文檔上有提過一嘴, 如果不是使用多線程可以不加, 在官方很多默認(rèn)實現(xiàn)中, 有一些是線程安全的, 有一些則不是, 如果非線程安全, 使用時都需要加上synchronized關(guān)鍵字
  3. 如果read()方法, 返回null, 則整個任務(wù)結(jié)束.
  4. chunk(10)表示當(dāng)每次傳入write的list的個數(shù)為10時, write執(zhí)行一次, 為主要的調(diào)優(yōu)方法
  5. 實際使用中, processor可以去掉

一些說明

spring batch本身有很多功能以及高級特性(比如監(jiān)聽, 任務(wù)流, spring batch admin), 本文中不做展開, 這里只針對最常用情況給出一個可用版本, 在我實際使用過程中, 發(fā)現(xiàn)大多數(shù)文章的例子基本都無法使用或者是使用xml或者不能單獨(dú)執(zhí)行.

很多時候還是要多看官方文檔, 只是官方文檔有點(diǎn)太平鋪直敘了

參考

spring batch官方文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末熄云,一起剝皮案震驚了整個濱河市幢妄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌领猾,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件骇扇,死亡現(xiàn)場離奇詭異摔竿,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)少孝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門继低,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人稍走,你說我怎么就攤上這事袁翁〔竦祝” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵粱胜,是天一觀的道長柄驻。 經(jīng)常有香客問我,道長年柠,這世上最難降的妖魔是什么凿歼? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮冗恨,結(jié)果婚禮上答憔,老公的妹妹穿的比我還像新娘。我一直安慰自己掀抹,他們只是感情好虐拓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著傲武,像睡著了一般蓉驹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上揪利,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天态兴,我揣著相機(jī)與錄音,去河邊找鬼疟位。 笑死瞻润,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的甜刻。 我是一名探鬼主播绍撞,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼得院!你這毒婦竟也來了傻铣?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤祥绞,失蹤者是張志新(化名)和其女友劉穎非洲,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蜕径,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡怪蔑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了丧荐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡喧枷,死狀恐怖虹统,靈堂內(nèi)的尸體忽然破棺而出弓坞,到底是詐尸還是另有隱情,我是刑警寧澤车荔,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布渡冻,位于F島的核電站,受9級特大地震影響忧便,放射性物質(zhì)發(fā)生泄漏族吻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一珠增、第九天 我趴在偏房一處隱蔽的房頂上張望超歌。 院中可真熱鬧,春花似錦蒂教、人聲如沸巍举。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至梦皮,卻和暖如春炭分,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背剑肯。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工捧毛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人退子。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓岖妄,卻偏偏與公主長得像,于是被迫代替她去往敵國和親寂祥。 傳聞我的和親對象是個殘疾皇子荐虐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)福扬,斷路器铛碑,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,810評論 6 342
  • 在大型企業(yè)中虽界,由于業(yè)務(wù)復(fù)雜莉御、數(shù)據(jù)量大、數(shù)據(jù)格式不同迄薄、數(shù)據(jù)交互格式繁雜讥蔽,并非所有的操作都能通過交互界面進(jìn)行處理。而有...
    無敵西瓜黃博文閱讀 3,211評論 1 39
  • 天澄氣佳,景和物休金抡,木隕微風(fēng)梗肝,鱗騰細(xì)流巫击。宇清禽舉坝锰,云水澹憂。叢篁發(fā)響确垫,柔柯拂頭翔冀。祝融銷退纤子,時涉素秋跌捆。銜璧日之映照,...
    商夷君閱讀 230評論 0 0
  • 二月的開始是從爬山開啟的,又一次挑戰(zhàn)了華山陶冷,這次的經(jīng)歷讓我對自己的體力又有了新的認(rèn)識埂伦,自己一直在進(jìn)步沾谜,同樣的也...
    Jackjia閱讀 137評論 0 0