Elastic+Spring-batch+Mybatis整合實(shí)現(xiàn)

背景

因?yàn)橛衅髽I(yè)級(jí)批處理需求胎围,具體到應(yīng)用場(chǎng)景就是在三方支付系統(tǒng)中欲低,日切后要進(jìn)行清分結(jié)算的跑批處理。所以使用到成熟的spring batch韭畸,依托其強(qiáng)大且靈活的批量處理功能宇智,再加上elastic任務(wù)調(diào)度整合來(lái)實(shí)現(xiàn)清分結(jié)算業(yè)務(wù)流程。本文章就是分別說(shuō)明了elastic和spring-batch + mybatis的常規(guī)使用和兩者的整合胰丁,僅供大家參考随橘。。

項(xiàng)目整體結(jié)構(gòu)

就先把elastic和spring-batch+mybatis整合的項(xiàng)目工程結(jié)構(gòu)列出來(lái)锦庸,該工程實(shí)現(xiàn)包括:

  • Elastic多種作業(yè)類型的開發(fā)和使用
  • 通過(guò)Elastic調(diào)度spring-batch作業(yè)机蔗,將數(shù)據(jù)庫(kù)中存儲(chǔ)行業(yè)信息記錄按照id的奇偶分別查詢出來(lái),寫入兩個(gè)不同的文件中甘萧。(也就是通過(guò)elastic的分片作業(yè)開發(fā)實(shí)現(xiàn))
  • Elastic-console管理控制臺(tái)的界面手動(dòng)觸發(fā)操作


    image

Elastic-Lite-Job

Elastic是一個(gè)分布式任務(wù)調(diào)度框架萝嘁,可以利用zookeeper作為調(diào)度中心,通過(guò)管理控制臺(tái)對(duì)任務(wù)進(jìn)行手動(dòng)關(guān)閉扬卷、觸發(fā)牙言、重啟,并支持并行調(diào)度和任務(wù)分片怪得,對(duì)spring支持也友好咱枉。具體參考官方文檔卑硫。

  • 整體架構(gòu)圖(圖片來(lái)源官網(wǎng))
    image



  • 配置依賴
    不多說(shuō),直接參考官網(wǎng)依賴maven進(jìn)行配置:
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <latest.release.version>2.1.5</latest.release.version>
    </properties>

        <!-- 添加elastic-job -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${latest.release.version}</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${latest.release.version}</version>
        </dependency>
        <!-- end -->

在pom.xml配置即可庞钢。

  • 作業(yè)開發(fā)

配置了elastic的兩種作業(yè)類型:簡(jiǎn)單作業(yè)流式處理作業(yè)
SimpleJob對(duì)應(yīng)的實(shí)現(xiàn)類:

@Component
public class SimpleJobA implements SimpleJob{

    @Override
    public void execute(ShardingContext context) {
        System.out.println(String.format("------SimpleJobA: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
    }

}

數(shù)據(jù)流作業(yè)實(shí)現(xiàn)類:DataFlowJobEven和DataFlowJobOdd分別查詢數(shù)據(jù)庫(kù)行業(yè)表Industry的id奇偶記錄并輸出:

@Component
public class DataFlowJobEven implements DataflowJob<Industry>{

    @Autowired
    private IndustryService industryService;
    
    @Resource
    private IndustryDAO industry;
    
    @Override
    public List<Industry> fetchData(ShardingContext context) {
        System.out.println(String.format("------DataFlowJobEven: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        List<Industry> retList = industry.queryListEven();
        return retList;
    }

    @Override
    public void processData(ShardingContext context, List<Industry> list) {
        System.out.println("Even count: " + list.size());
        for(Industry in : list) {
            System.out.println(in.getId() + "--" +in.getName() +"--" + in.getEnname());
        }
         
    }

}


@Component
public class DataFlowJobOdd implements DataflowJob<Industry>{

    @Resource
    private IndustryService industryService;
    
    @Resource
    private IndustryDAO industry;
    
    @Override
    public List<Industry> fetchData(ShardingContext context) {
        System.out.println(String.format("------DataFlowJobOdd: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        System.out.println(industry);
        List<Industry> retList = industry.queryListOdd();
        return retList;
    }

    @Override
    public void processData(ShardingContext context, List<Industry> list) {
        System.out.println("Odd count: " + list.size());
        for(Industry in : list) {
            System.out.println(in.getId() +" --" + in.getName() +"--" + in.getEnname());
        }
        
    }

}

其中作業(yè)流的會(huì)先調(diào)用fetchData方法查詢滿足條件的數(shù)據(jù)因谎,再傳入processData方法中進(jìn)行批量處理基括。

  • 配置作業(yè)

關(guān)于配置文件中的注冊(cè)中心zookeeper需要自行安裝,單機(jī)安裝也挺簡(jiǎn)單的财岔,在此就不在說(shuō)明风皿。

文件名:elastic-job.xml。配置可以參考:elastic-job配置手冊(cè)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd                         
                        ">
    <!-- 這里配置的zookeeper與elastic console有關(guān) -->                   
    <reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
        namespace="payment-ls" base-sleep-time-milliseconds="1000"
        max-retries="3" />

    <!-- 作業(yè)配置A -->
    <job:simple id="simpleJobA" sharding-total-count="2"
        cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
        sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 

    
    <!-- 流數(shù)據(jù)作業(yè) -->
    <job:dataflow sharding-total-count="1" cron="2 * * * * ?"
        registry-center-ref="goPayCenter" id="dataFlowJobEven" class="newgo.job.DataFlowJobEven" streaming-process="false" overwrite="true" description="查詢偶數(shù)id數(shù)據(jù)"/>
    <job:dataflow sharding-total-count="1" cron="2 * * * * ?"
        registry-center-ref="goPayCenter" id="dataFlowJobOdd" class="newgo.job.DataFlowJobOdd" streaming-process="false" overwrite="true" description="查詢奇數(shù)id數(shù)據(jù)"/>

</beans>

通過(guò)xml節(jié)點(diǎn)就能指定配置簡(jiǎn)單作業(yè)和數(shù)據(jù)流作業(yè)匠璧。

  • 控制臺(tái)調(diào)度
    控制臺(tái)需要的運(yùn)行腳本:elastic-job-lite-console 密碼: ygfj

    在windows中桐款,運(yùn)行bat腳本,就會(huì)運(yùn)行一個(gè)web項(xiàng)目夷恍,端口默認(rèn)8899魔眨,本地打開地址:http://localhost:8899即可進(jìn)入控制臺(tái)。默認(rèn)是以root登錄

    選擇Global Settings --》 Registry center,點(diǎn)擊Add按鈕酿雪,添加zookeeper注冊(cè)中心遏暴,添加參數(shù)要與elastic-job.xml中配置zookeeper的信息:

    Name -> id,address:server-list,namespace:namespace
    <reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
        namespace="payment-ls" base-sleep-time-milliseconds="1000"
        max-retries="3" />

點(diǎn)擊submit即可看到記錄指黎,點(diǎn)擊connect連接朋凉,即可看到左邊列表Job operation上有所有狀態(tài)的job作業(yè)信息,如下圖:

image

可以看到醋安,可以對(duì)所有任務(wù)進(jìn)行調(diào)度杂彭,手動(dòng)觸發(fā)多種操作。

  • 任務(wù)異常監(jiān)聽處理

可以定義自定義監(jiān)聽器吓揪,配置在指定的任務(wù)job上亲怠,當(dāng)該job拋出異常的時(shí)候,就會(huì)進(jìn)入該攔截器進(jìn)行處理柠辞。配置自定義攔截器MyJobExceptionHandler:

public class MyJobExceptionHandler implements JobExceptionHandler{

    @Override
    public void handleException(String jobName, Throwable cause) {
        //System.out.println("異常方法: " + cause.getStackTrace()[0].getMethodName());
        System.out.println(String.format("任務(wù)[%s]調(diào)度異常", jobName) + ",異常類型: " + cause.toString());
    }

}

結(jié)合job配置:在節(jié)點(diǎn)有配置屬性赁炎,設(shè)置即可。

<job:simple id="simpleJobA" sharding-total-count="2"
        cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
        sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 

Elastic整合SpringBathc

Elastic主要在任務(wù)調(diào)度钾腺、治理方面有優(yōu)勢(shì)徙垫,具體的作業(yè)操作則是使用spring batch。其中作業(yè)讀取數(shù)據(jù)可以從文件放棒、也可以通過(guò)mybatis讀取數(shù)據(jù)庫(kù)來(lái)處理數(shù)據(jù)姻报。

其中,elastic作業(yè)配置到elastic-job.xml中间螟,spring batch配置文件配置在spring-batch.xml中吴旋。

Spring Bath:是一個(gè)輕量級(jí)的损肛、完善的批處理框架,旨在幫助企業(yè)建立健壯、高效的批處理應(yīng)用荣瑟。更多參考官網(wǎng)

主要關(guān)心的是批處理任務(wù)中包含的主體對(duì)象治拿,每個(gè)對(duì)象負(fù)責(zé)一個(gè)任務(wù)處理中不同的過(guò)程,如下圖:(圖片來(lái)自官網(wǎng))

image


對(duì)于spring batch中每種不同主體作用笆焰,主要關(guān)注的是job劫谅,對(duì)應(yīng)一個(gè)任務(wù)單元。該job就可以映射成elastic中的job嚷掠,兩者都是表示一個(gè)任務(wù)處理捏检,那么就可以將spring batch的job整合到elastic的job中,即可實(shí)現(xiàn)elastic對(duì)這些job的調(diào)用處理不皆。

從上圖中也可以知道一個(gè)job可以對(duì)應(yīng)多個(gè)step,每個(gè)step是由三大組件構(gòu)成贯城,也是也就對(duì)應(yīng)著IPO(輸入-處理-輸出)的抽象。我們?cè)趯?duì)數(shù)據(jù)的批量處理過(guò)程中霹娄,就可以通過(guò)自定義類能犯,實(shí)現(xiàn)這些接口來(lái)實(shí)現(xiàn)不同的業(yè)務(wù)需求。

主要的就是ItemReader犬耻、ItemProcessor悲雳、ItemWriter三者的實(shí)現(xiàn)。我們開發(fā)主要關(guān)注的也就是數(shù)據(jù)源數(shù)據(jù)是如何處理香追,最后如何輸出的合瓢。

  • 讀取文件數(shù)據(jù)(Flat文件格式)
    Flat類型文件是一種簡(jiǎn)單文本格式文件類型,通常經(jīng)過(guò)分隔符分割透典、或者定長(zhǎng)字段來(lái)描述數(shù)據(jù)格式晴楔。

    本項(xiàng)目的源數(shù)據(jù)是batch-data-source.csv文件,文件內(nèi)容:
1,PENDING
2,PENDING
3,PENDING
4,PENDING
5,PENDING
6,PENDING
7,PENDING
8,PENDING
9,PENDING
10,PENDING

需求就是通過(guò)job讀取該文件內(nèi)容峭咒,進(jìn)行處理税弃,將PENDING設(shè)置成SENT,寫入到新的文件batch-data-target.csv凑队。

    • LineMapper

Linemapper作用就是將文件中每一行的數(shù)據(jù)则果,轉(zhuǎn)換成POJO來(lái)批量處理。也就是文件內(nèi)容映射到對(duì)象處理器漩氨。

public class HelloLineMapper implements LineMapper<DeviceCommand>{

    private static final String FILE_SPLIT_SIGN = ",";
    @Override
    public DeviceCommand mapLine(String line, int lineNumber) throws Exception {
        String[] args = line.split(FILE_SPLIT_SIGN);
        DeviceCommand dc = new DeviceCommand();
        dc.setId(args[0]);
        dc.setStatus(args[1]);
        return dc;
    }

}

其中西壮,文件映射成的bean屬性定義和文件內(nèi)容一一對(duì)應(yīng)的:

public class DeviceCommand {

    private String id;
    private String status;
    //getter setter..
}

該linemapper會(huì)在ItemReader使用的時(shí)候,通過(guò)set設(shè)置到FlatFileItemReader屬性中進(jìn)行使用叫惊。

    • ItemProcessor

當(dāng)ItemReader讀取到數(shù)據(jù)后款青,就會(huì)通過(guò)ItemProcessor對(duì)數(shù)據(jù)進(jìn)行處理,很有些Java8的stream流式編程的意思霍狰,前置的輸出就是下一個(gè)階段的輸入抡草,當(dāng)然兩者的操作對(duì)象類型必然是一致的饰及。

public class HelloItemProcessor implements ItemProcessor<DeviceCommand,DeviceCommand>{

    @Override
    public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {
        System.out.println("send command to device, id=" + deviceCommand.getId());
         deviceCommand.setStatus("SENT");
        return deviceCommand;
    }

}

處理目地就是就將數(shù)據(jù)源的PENGDING更新成SENT再返回命令對(duì)象。同理康震,可以將處理后的對(duì)象作為輸入燎含,傳給ItemWriter處理。

    • LineAggregator

對(duì)于FlatItem來(lái)說(shuō)腿短,LineAggregator作用就是制定將處理器處理后的數(shù)據(jù)如何處理屏箍,按照什么樣的形式將每個(gè)對(duì)象寫入文件的每一行。該數(shù)據(jù)輸出規(guī)則也會(huì)通過(guò)屬性設(shè)置到FlatFileItemReader中答姥。

public class HelloLineAggregator implements LineAggregator<DeviceCommand> {

    @Override
    public String aggregate(DeviceCommand deviceCommand) {
        StringBuffer sb = new StringBuffer();
        sb.append(deviceCommand.getId());
        sb.append("|");
        sb.append(deviceCommand.getStatus());
        return sb.toString();
    }

}

按照|分隔符將內(nèi)容輸出到文件中每一行铣除。

    • Job作業(yè)開發(fā)

elastic-job.xml中配置了作業(yè)B,如下谚咬,在其作業(yè)實(shí)現(xiàn)類中鹦付,就是使用了spring-batch的job定義實(shí)現(xiàn):包括使用上面定義的ItemReader、ItemProcessor择卦、ItemWriter敲长。

    <!-- 作業(yè)配置B -->
    <job:simple id="simpleJobB" sharding-total-count="2"
        cron="10 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobB"
        sharding-item-parameters="0=test1,1=test2" overwrite="true"
        description="結(jié)算批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 

spring-batch可以使用編程式開發(fā),也可以使用xml配置秉继,在此是直接在代碼里面構(gòu)建job,step祈噪。作業(yè)B中結(jié)合batch的具體實(shí)現(xiàn)如下:

@Component
public class SimpleJobB implements SimpleJob{
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private PlatformTransactionManager transactionManager;

    @Override
    public void execute(ShardingContext context) {
        System.out.println(String.format("------SimpleJobB: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        
           ClassPathResource resource = new ClassPathResource("batch-data-source.csv");
           FileSystemResource resource2 = new FileSystemResource("E:\\work\\newgo\\src\\main\\resources\\batch-data-target.csv");
           //通過(guò)classPathResource不可寫入文件數(shù)據(jù)?
          // ClassPathResource resource2 = new ClassPathResource("batch-data-target.csv");
          
           //1.0 獲取任務(wù)啟動(dòng)器
           
           //2.0 創(chuàng)建reader(Resource + 文件讀取映射器)
           FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();
           flatFileItemReader.setLineMapper(new HelloLineMapper());
           flatFileItemReader.setResource(resource);
           
           //3.0 創(chuàng)建處理器processor
           HelloItemProcessor processor = new HelloItemProcessor();
           
           //4.0 創(chuàng)建writer
           FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();
           flatFileItemWriter.setResource(resource2);
           flatFileItemWriter.setLineAggregator(new HelloLineAggregator());
           //5.0 創(chuàng)建作業(yè)步驟step
           StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
           //流式編程
           Step step = stepBuilderFactory.get("step").<DeviceCommand,DeviceCommand>chunk(1)
                                                     .reader(flatFileItemReader)
                                                     .processor(processor)
                                                     .writer(flatFileItemWriter)
                                                     .build();
           
                                                              
           //6.0 創(chuàng)建作業(yè)job,關(guān)聯(lián)步驟step
           JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
//         Job job = jobBuilderFactory.get("job-" + Math.random()).start(step).build();
           Job job = jobBuilderFactory.get("job").start(step).build();
           int i = 5 / 0;
           
           LinkedHashMap<String, JobParameter> parameterMap = new LinkedHashMap<String,JobParameter>();
           JobParameter datetime = new JobParameter(dateFormatter(new Date())); 
           parameterMap.put("datetime", datetime);
           JobParameters paramters = new JobParameters(parameterMap);
           //7.0 啟動(dòng)任務(wù)
           try {
            jobLauncher.run(job, paramters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    public String dateFormatter(Date date) {
//      SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
        return df.format(date);
    }

}
    • 定時(shí)器運(yùn)行

用常規(guī)spring啟動(dòng)方式來(lái)啟動(dòng):

public class App 
{
    public static void main( String[] args )
    {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"classpath:/spring/springBeans.xml"});
    }
}

根據(jù)定時(shí)器配置時(shí)間,作業(yè)會(huì)進(jìn)行數(shù)據(jù)處理尚辑。,作業(yè)操作完成之后辑鲤,就可以發(fā)現(xiàn)文件batch-data-target.csv中的內(nèi)容變成了如下內(nèi)容:

1|SENT
2|SENT
3|SENT
4|SENT
5|SENT
6|SENT
7|SENT
8|SENT
9|SENT
10|SENT
  • 讀取數(shù)據(jù)庫(kù)數(shù)據(jù)(關(guān)系數(shù)據(jù)庫(kù))

很多應(yīng)用場(chǎng)景且大量數(shù)據(jù)都是存在數(shù)據(jù)庫(kù)的,那么spring-batch是如何從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)杠茬。再次就介紹兩種:JdbcCursorItemReaderMyBatisBatchItemWriter月褥。前者是直接基于原始SQL進(jìn)行處理,后者是利用mybatis框架進(jìn)行整合查詢瓢喉。

    • 基于jdbc游標(biāo)和elastic分片處理

思路是:分別定義兩個(gè)job宁赤,分別利用JdbcCursorItemReader設(shè)置不同的查詢SQL來(lái)查詢不同的數(shù)據(jù)。利用elastic的分片來(lái)分別執(zhí)行不同的job栓票。

1.0 先定義兩個(gè)不同的job[在spring-batch.xml中]

包括三大核心組件的定義IPO,自定義查詢SQL决左。

    <!-- 配置作業(yè)STEP統(tǒng)一攔截器 -->
    <batch:step id="abstractStep" abstract="true">
        <batch:listeners>
            <batch:listener ref="readItemListener" />
            <batch:listener ref="processStepListener" />
        </batch:listeners>
    </batch:step>
    <bean id="abstractCursorReader" abstract="true"
        class="org.springframework.batch.item.database.JdbcCursorItemReader">
        <property name="dataSource" ref="dataSource" />
    </bean>
    <!-- 讀取行業(yè)偶數(shù)列表配置: start -->
    <batch:job id="industryEvenProcessJob" restartable="true">
        <batch:step id="industryEvenStep" parent="abstractStep">
            <batch:tasklet>
                <batch:chunk reader="industryEvenJdbcReader" processor="industryProcessor" writer="industryEvenJdbcWriter" commit-interval="50"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="industryEvenJdbcReader" scope="step" parent="abstractCursorReader">
        <property name="sql">
            <value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? != 0]]></value>
        </property>
        <property name="rowMapper" ref="industryRowMapper"/>
        <property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
        <property name="fetchSize" value="20"/>
    </bean>
    
    <bean id="industryProcessor" class="newgo.job.dbmapper.IndustryProcessor"/>
    
    <bean id="industryEvenJdbcWriter"  class="newgo.job.dbmapper.IndustryEvenJdbcWriter"/>
    <bean id="industryRowMapper" class="newgo.job.dbmapper.IndustryRowMapper"/>
    <bean id="industryPreparedStatementSetter" class="newgo.job.dbmapper.IndustryPreparedStatementSetter"/> 
    <!-- 讀取行業(yè)偶數(shù)列表配置: end -->
    
    
    <!-- 讀取行業(yè)奇數(shù)列表配置: start -->
    <batch:job id="industryOddProcessJob" restartable="true">
        <batch:step id="industryOddStep" parent="abstractStep">
            <batch:tasklet>
                <batch:chunk reader="industryOddJdbcReader" processor="industryProcessor" writer="industryOddJdbcWriter" commit-interval="50"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="industryOddJdbcReader" scope="step" parent="abstractCursorReader">
        <property name="sql">
            <value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? = 0]]></value>
        </property>
        <property name="rowMapper" ref="industryRowMapper"/>
        <property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
        <property name="fetchSize" value="20"/>
    </bean>
    
    <bean id="industryOddJdbcWriter"  class="newgo.job.dbmapper.IndustryOddJdbcWriter"/>
    <!-- 讀取行業(yè)偶數(shù)列表配置: end -->

2.0 兩個(gè)作業(yè)的IPO具體實(shí)現(xiàn)在newgo.job.dbmapper目錄下,這里代碼就不貼出來(lái)了走贪。
3.0 定義elastic的作業(yè)分片[elastic-job.xml]
分片是通過(guò)sharding-item-parameters來(lái)設(shè)定的佛猛。

<!-- 數(shù)據(jù)庫(kù)查詢行業(yè)作業(yè)A : 根據(jù)查詢?nèi)蝿?wù)分片,分別查詢奇偶行列 -->
    <job:simple id="elasIndustryJob" sharding-total-count="2" cron="2 * * * * ?" 
    sharding-item-parameters="0=even,1=odd"
    registry-center-ref="goPayCenter" class="newgo.job.IndustryOperatorJob" 
    job-exception-handler="newgo.handler.MyJobExceptionHandler"/> 

4.0 elastic的作業(yè)的分片實(shí)現(xiàn):

@Component
public class IndustryOperatorJob implements SimpleJob {
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    @Qualifier("industryEvenProcessJob")
    private Job industryEvenProcessJob;
    
    @Autowired
    @Qualifier("industryOddProcessJob")
    private Job industryOddProcessJob;
    
    @Override
    public void execute(ShardingContext context) {
        System.out.println(String.format("------IndustryQueryJob: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        try {
            String shardingParameter = context.getShardingParameter();
            switch (shardingParameter) {
            case "odd":
                JobExecution oddJobResult = jobLauncher.run(industryOddProcessJob, new JobParameters());
                System.out.println("IndustryOddProcessJob start result: " + oddJobResult.toString());
                break;
            case "even":
                JobExecution evenJobResult = jobLauncher.run(industryEvenProcessJob, new JobParameters());
                System.out.println("IndustryEvenProcessJob start result: " + evenJobResult.toString());
                break;
            default:
                break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
    • 基于mybatis分頁(yè)查詢

基于mybatis更方便開發(fā)和維護(hù)坠狡,也是我們常規(guī)操作數(shù)據(jù)庫(kù)的途徑挚躯。下面例子就是通過(guò)mybatis來(lái)查詢數(shù)據(jù),并寫入文件中擦秽。

1.0 配置spring-batch的step[spring-batch.xml]

定義了mybatis的ItemReader實(shí)現(xiàn)码荔。其中queryId就是對(duì)應(yīng)mybatis接口方法名漩勤。

    <bean id="itemReader" scope="step" class="org.mybatis.spring.batch.MyBatisPagingItemReader">
       <property name="sqlSessionFactory" ref="sqlSessionFactory" />
        <property name="pageSize" value="100"/>
        <property name="queryId" value="" />
    </bean>

2.0 配置elastic的作業(yè)定時(shí)器[elastic-job.xml]

    <!-- 數(shù)據(jù)庫(kù)查詢行業(yè)作業(yè)B : 通過(guò)mybatis映射 -->
    <job:simple id="mybatisIndustryJob" sharding-total-count="1" cron="1 * * * * ?" 
    registry-center-ref="goPayCenter" class="newgo.job.MybatisIndustryJob" 
    job-exception-handler="newgo.handler.MyJobExceptionHandler"/>

3.0 作業(yè)job具體實(shí)現(xiàn)開發(fā)

@Component
public class MybatisIndustryJob  implements SimpleJob{
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private JobRepository jobRepository;
    
    @Autowired
    private SqlSessionFactory sqlSessionFactory;
    @Autowired
    private PlatformTransactionManager transactionManager;
    
    @Autowired
    private MybatisWriter mybatisWriter;
    
    @Override
    public void execute(ShardingContext context) {
        System.out.println(String.format("------MybatisIndustryJob: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
        MyBatisPagingItemReader<Industry> itemReader = new MyBatisPagingItemReader<Industry>();
        Map<String,Object> params = Maps.newHashMap();
        params.put("name", "五金");
        itemReader.setQueryId("newgo.dao.IndustryDAO.queryIndustryByName");
        itemReader.setPageSize(20);
        itemReader.setParameterValues(params);
        itemReader.setSqlSessionFactory(sqlSessionFactory);
        try {
            itemReader.afterPropertiesSet();
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
           //流式編程
           Step step = stepBuilderFactory.get("step-mybats").<Industry,IndustryFull>chunk(1)
                                                     .reader(itemReader)
                                                     .processor(new MybatisIndustryProcessor())
                                                     .writer(mybatisWriter)
                                                     .build();
           
                                                              
           JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
           Job job = jobBuilderFactory.get("mybatisIndustryJob").start(step).build();
           try {
            JobExecution jobResult = jobLauncher.run(job, new JobParameters());
            System.out.println("MybatisIndustryJob 執(zhí)行結(jié)果:" + jobResult.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}

這里要注意的就是要設(shè)置itemReader.afterPropertiesSet();方法,不然mybatis在讀取數(shù)據(jù)的時(shí)候缩搅,sqlSessionTemplate未注入拋出NPE越败。并且在每次通jobLauncher.run啟動(dòng)的時(shí)候,可以根據(jù)返回值來(lái)查看異常信息硼瓣。在調(diào)試mybaatis查詢方法doPageRead時(shí)候究飞,發(fā)現(xiàn)內(nèi)部是通過(guò)CopyOnWriteArrayList存放數(shù)據(jù)的,也就是利用并發(fā)安全的寫時(shí)復(fù)制機(jī)制的集合來(lái)保存讀取到數(shù)據(jù)堂鲤,在高并發(fā)且讀遠(yuǎn)高于寫的時(shí)候亿傅,可以用該類型集合來(lái)保存數(shù)據(jù)。

4.0 處理器和輸出的實(shí)現(xiàn)類都是在newgo.job.mapper包下瘟栖,再次不在P出葵擎。

5.0 在spring batch的不同階段,都可以注入自定義的監(jiān)聽器來(lái)對(duì)每個(gè)階段數(shù)據(jù)進(jìn)行預(yù)處理,實(shí)現(xiàn)在包newgo.listener下半哟。

更多高級(jí)特性還需要在實(shí)際應(yīng)用中不斷調(diào)試和實(shí)踐酬滤。。寓涨。

elastic+spring-batch+mybatis源代碼-- 密碼: hvzr

參考:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末盯串,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子戒良,更是在濱河造成了極大的恐慌体捏,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件糯崎,死亡現(xiàn)場(chǎng)離奇詭異几缭,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)拇颅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門奏司,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人樟插,你說(shuō)我怎么就攤上這事韵洋。” “怎么了黄锤?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵搪缨,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我鸵熟,道長(zhǎng)副编,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任流强,我火速辦了婚禮痹届,結(jié)果婚禮上呻待,老公的妹妹穿的比我還像新娘。我一直安慰自己队腐,他們只是感情好蚕捉,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著柴淘,像睡著了一般迫淹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上为严,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天敛熬,我揣著相機(jī)與錄音,去河邊找鬼第股。 笑死应民,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的炸茧。 我是一名探鬼主播瑞妇,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼稿静,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼梭冠!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起改备,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤控漠,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后悬钳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體盐捷,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年默勾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了碉渡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡母剥,死狀恐怖滞诺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情环疼,我是刑警寧澤习霹,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站炫隶,受9級(jí)特大地震影響淋叶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜伪阶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一煞檩、第九天 我趴在偏房一處隱蔽的房頂上張望处嫌。 院中可真熱鬧,春花似錦斟湃、人聲如沸锰霜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)癣缅。三九已至,卻和暖如春哄酝,著一層夾襖步出監(jiān)牢的瞬間友存,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工陶衅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留屡立,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓搀军,卻偏偏與公主長(zhǎng)得像膨俐,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子罩句,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容