背景
因?yàn)橛衅髽I(yè)級(jí)批處理需求胎围,具體到應(yīng)用場(chǎng)景就是在三方支付系統(tǒng)中欲低,日切后要進(jìn)行清分結(jié)算的跑批處理。所以使用到成熟的spring batch韭畸,依托其強(qiáng)大且靈活的批量處理功能宇智,再加上elastic任務(wù)調(diào)度整合來(lái)實(shí)現(xiàn)清分結(jié)算業(yè)務(wù)流程。本文章就是分別說(shuō)明了elastic和spring-batch + mybatis的常規(guī)使用和兩者的整合胰丁,僅供大家參考随橘。。
項(xiàng)目整體結(jié)構(gòu)
就先把elastic和spring-batch+mybatis整合的項(xiàng)目工程結(jié)構(gòu)列出來(lái)锦庸,該工程實(shí)現(xiàn)包括:
- Elastic多種作業(yè)類型的開發(fā)和使用
- 通過(guò)Elastic調(diào)度spring-batch作業(yè)机蔗,將數(shù)據(jù)庫(kù)中存儲(chǔ)行業(yè)信息記錄按照id的奇偶分別查詢出來(lái),寫入兩個(gè)不同的文件中甘萧。(也就是通過(guò)elastic的分片作業(yè)開發(fā)實(shí)現(xiàn))
-
Elastic-console管理控制臺(tái)的界面手動(dòng)觸發(fā)操作
Elastic-Lite-Job
Elastic是一個(gè)分布式任務(wù)調(diào)度框架萝嘁,可以利用zookeeper作為調(diào)度中心,通過(guò)管理控制臺(tái)對(duì)任務(wù)進(jìn)行手動(dòng)關(guān)閉扬卷、觸發(fā)牙言、重啟,并支持并行調(diào)度和任務(wù)分片怪得,對(duì)spring支持也友好咱枉。具體參考官方文檔卑硫。
- 整體架構(gòu)圖(圖片來(lái)源官網(wǎng))
- 配置依賴
不多說(shuō),直接參考官網(wǎng)依賴maven進(jìn)行配置:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<latest.release.version>2.1.5</latest.release.version>
</properties>
<!-- 添加elastic-job -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${latest.release.version}</version>
</dependency>
<!-- end -->
在pom.xml配置即可庞钢。
- 作業(yè)開發(fā)
配置了elastic的兩種作業(yè)類型:簡(jiǎn)單作業(yè)和流式處理作業(yè)。
SimpleJob對(duì)應(yīng)的實(shí)現(xiàn)類:
@Component
public class SimpleJobA implements SimpleJob{
@Override
public void execute(ShardingContext context) {
System.out.println(String.format("------SimpleJobA: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
}
}
數(shù)據(jù)流作業(yè)實(shí)現(xiàn)類:DataFlowJobEven和DataFlowJobOdd分別查詢數(shù)據(jù)庫(kù)行業(yè)表Industry的id奇偶記錄并輸出:
@Component
public class DataFlowJobEven implements DataflowJob<Industry>{
@Autowired
private IndustryService industryService;
@Resource
private IndustryDAO industry;
@Override
public List<Industry> fetchData(ShardingContext context) {
System.out.println(String.format("------DataFlowJobEven: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
List<Industry> retList = industry.queryListEven();
return retList;
}
@Override
public void processData(ShardingContext context, List<Industry> list) {
System.out.println("Even count: " + list.size());
for(Industry in : list) {
System.out.println(in.getId() + "--" +in.getName() +"--" + in.getEnname());
}
}
}
@Component
public class DataFlowJobOdd implements DataflowJob<Industry>{
@Resource
private IndustryService industryService;
@Resource
private IndustryDAO industry;
@Override
public List<Industry> fetchData(ShardingContext context) {
System.out.println(String.format("------DataFlowJobOdd: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
System.out.println(industry);
List<Industry> retList = industry.queryListOdd();
return retList;
}
@Override
public void processData(ShardingContext context, List<Industry> list) {
System.out.println("Odd count: " + list.size());
for(Industry in : list) {
System.out.println(in.getId() +" --" + in.getName() +"--" + in.getEnname());
}
}
}
其中作業(yè)流的會(huì)先調(diào)用fetchData方法查詢滿足條件的數(shù)據(jù)因谎,再傳入processData方法中進(jìn)行批量處理基括。
- 配置作業(yè)
關(guān)于配置文件中的注冊(cè)中心zookeeper需要自行安裝,單機(jī)安裝也挺簡(jiǎn)單的财岔,在此就不在說(shuō)明风皿。
文件名:elastic-job.xml。配置可以參考:elastic-job配置手冊(cè)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
">
<!-- 這里配置的zookeeper與elastic console有關(guān) -->
<reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
namespace="payment-ls" base-sleep-time-milliseconds="1000"
max-retries="3" />
<!-- 作業(yè)配置A -->
<job:simple id="simpleJobA" sharding-total-count="2"
cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
<!-- 流數(shù)據(jù)作業(yè) -->
<job:dataflow sharding-total-count="1" cron="2 * * * * ?"
registry-center-ref="goPayCenter" id="dataFlowJobEven" class="newgo.job.DataFlowJobEven" streaming-process="false" overwrite="true" description="查詢偶數(shù)id數(shù)據(jù)"/>
<job:dataflow sharding-total-count="1" cron="2 * * * * ?"
registry-center-ref="goPayCenter" id="dataFlowJobOdd" class="newgo.job.DataFlowJobOdd" streaming-process="false" overwrite="true" description="查詢奇數(shù)id數(shù)據(jù)"/>
</beans>
通過(guò)xml節(jié)點(diǎn)就能指定配置簡(jiǎn)單作業(yè)和數(shù)據(jù)流作業(yè)匠璧。
- 控制臺(tái)調(diào)度
控制臺(tái)需要的運(yùn)行腳本:elastic-job-lite-console 密碼: ygfj
在windows中桐款,運(yùn)行bat腳本,就會(huì)運(yùn)行一個(gè)web項(xiàng)目夷恍,端口默認(rèn)8899魔眨,本地打開地址:http://localhost:8899
即可進(jìn)入控制臺(tái)。默認(rèn)是以root登錄
選擇Global Settings --》 Registry center,點(diǎn)擊Add按鈕酿雪,添加zookeeper注冊(cè)中心遏暴,添加參數(shù)要與elastic-job.xml中配置zookeeper的信息:
Name -> id,address:server-list,namespace:namespace。
<reg:zookeeper server-lists="192.168.7.21:3181" id="goPayCenter"
namespace="payment-ls" base-sleep-time-milliseconds="1000"
max-retries="3" />
點(diǎn)擊submit即可看到記錄指黎,點(diǎn)擊connect連接朋凉,即可看到左邊列表Job operation上有所有狀態(tài)的job作業(yè)信息,如下圖:
可以看到醋安,可以對(duì)所有任務(wù)進(jìn)行調(diào)度杂彭,手動(dòng)觸發(fā)多種操作。
- 任務(wù)異常監(jiān)聽處理
可以定義自定義監(jiān)聽器吓揪,配置在指定的任務(wù)job上亲怠,當(dāng)該job拋出異常的時(shí)候,就會(huì)進(jìn)入該攔截器進(jìn)行處理柠辞。配置自定義攔截器MyJobExceptionHandler:
public class MyJobExceptionHandler implements JobExceptionHandler{
@Override
public void handleException(String jobName, Throwable cause) {
//System.out.println("異常方法: " + cause.getStackTrace()[0].getMethodName());
System.out.println(String.format("任務(wù)[%s]調(diào)度異常", jobName) + ",異常類型: " + cause.toString());
}
}
結(jié)合job配置:在節(jié)點(diǎn)有配置屬性赁炎,設(shè)置即可。
<job:simple id="simpleJobA" sharding-total-count="2"
cron="5 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobA"
sharding-item-parameters="0=A,1=B" overwrite="true" description="清分批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
Elastic整合SpringBathc
Elastic主要在任務(wù)調(diào)度钾腺、治理方面有優(yōu)勢(shì)徙垫,具體的作業(yè)操作則是使用spring batch。其中作業(yè)讀取數(shù)據(jù)可以從文件放棒、也可以通過(guò)mybatis讀取數(shù)據(jù)庫(kù)來(lái)處理數(shù)據(jù)姻报。
其中,elastic作業(yè)配置到elastic-job.xml
中间螟,spring batch配置文件配置在spring-batch.xml
中吴旋。
Spring Bath:是一個(gè)輕量級(jí)的损肛、完善的批處理框架,旨在幫助企業(yè)建立健壯、高效的批處理應(yīng)用荣瑟。更多參考官網(wǎng)
主要關(guān)心的是批處理任務(wù)中包含的主體對(duì)象治拿,每個(gè)對(duì)象負(fù)責(zé)一個(gè)任務(wù)處理中不同的過(guò)程,如下圖:(圖片來(lái)自官網(wǎng))
對(duì)于spring batch中每種不同主體作用笆焰,主要關(guān)注的是
job
劫谅,對(duì)應(yīng)一個(gè)任務(wù)單元。該job就可以映射成elastic中的job嚷掠,兩者都是表示一個(gè)任務(wù)處理捏检,那么就可以將spring batch的job整合到elastic的job中,即可實(shí)現(xiàn)elastic對(duì)這些job的調(diào)用處理不皆。從上圖中也可以知道一個(gè)job可以對(duì)應(yīng)多個(gè)step,每個(gè)step是由三大組件構(gòu)成贯城,也是也就對(duì)應(yīng)著IPO(輸入-處理-輸出)的抽象。我們?cè)趯?duì)數(shù)據(jù)的批量處理過(guò)程中霹娄,就可以通過(guò)自定義類能犯,實(shí)現(xiàn)這些接口來(lái)實(shí)現(xiàn)不同的業(yè)務(wù)需求。
主要的就是ItemReader犬耻、ItemProcessor悲雳、ItemWriter三者的實(shí)現(xiàn)。我們開發(fā)主要關(guān)注的也就是數(shù)據(jù)源數(shù)據(jù)是如何處理香追,最后如何輸出的合瓢。
-
讀取文件數(shù)據(jù)(Flat文件格式)
Flat類型文件是一種簡(jiǎn)單文本格式文件類型,通常經(jīng)過(guò)分隔符分割透典、或者定長(zhǎng)字段來(lái)描述數(shù)據(jù)格式晴楔。
本項(xiàng)目的源數(shù)據(jù)是batch-data-source.csv
文件,文件內(nèi)容:
1,PENDING
2,PENDING
3,PENDING
4,PENDING
5,PENDING
6,PENDING
7,PENDING
8,PENDING
9,PENDING
10,PENDING
需求就是通過(guò)job讀取該文件內(nèi)容峭咒,進(jìn)行處理税弃,將PENDING設(shè)置成SENT,寫入到新的文件batch-data-target.csv
凑队。
- LineMapper
Linemapper作用就是將文件中每一行的數(shù)據(jù)则果,轉(zhuǎn)換成POJO來(lái)批量處理。也就是文件內(nèi)容映射到對(duì)象處理器漩氨。
public class HelloLineMapper implements LineMapper<DeviceCommand>{
private static final String FILE_SPLIT_SIGN = ",";
@Override
public DeviceCommand mapLine(String line, int lineNumber) throws Exception {
String[] args = line.split(FILE_SPLIT_SIGN);
DeviceCommand dc = new DeviceCommand();
dc.setId(args[0]);
dc.setStatus(args[1]);
return dc;
}
}
其中西壮,文件映射成的bean屬性定義和文件內(nèi)容一一對(duì)應(yīng)的:
public class DeviceCommand {
private String id;
private String status;
//getter setter..
}
該linemapper會(huì)在ItemReader使用的時(shí)候,通過(guò)set設(shè)置到FlatFileItemReader
屬性中進(jìn)行使用叫惊。
- ItemProcessor
當(dāng)ItemReader
讀取到數(shù)據(jù)后款青,就會(huì)通過(guò)ItemProcessor對(duì)數(shù)據(jù)進(jìn)行處理,很有些Java8的stream流式編程的意思霍狰,前置的輸出就是下一個(gè)階段的輸入抡草,當(dāng)然兩者的操作對(duì)象類型必然是一致的饰及。
public class HelloItemProcessor implements ItemProcessor<DeviceCommand,DeviceCommand>{
@Override
public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {
System.out.println("send command to device, id=" + deviceCommand.getId());
deviceCommand.setStatus("SENT");
return deviceCommand;
}
}
處理目地就是就將數(shù)據(jù)源的PENGDING更新成SENT再返回命令對(duì)象。同理康震,可以將處理后的對(duì)象作為輸入燎含,傳給ItemWriter
處理。
- LineAggregator
對(duì)于FlatItem來(lái)說(shuō)腿短,LineAggregator作用就是制定將處理器處理后的數(shù)據(jù)如何處理屏箍,按照什么樣的形式將每個(gè)對(duì)象寫入文件的每一行。該數(shù)據(jù)輸出規(guī)則也會(huì)通過(guò)屬性設(shè)置到FlatFileItemReader
中答姥。
public class HelloLineAggregator implements LineAggregator<DeviceCommand> {
@Override
public String aggregate(DeviceCommand deviceCommand) {
StringBuffer sb = new StringBuffer();
sb.append(deviceCommand.getId());
sb.append("|");
sb.append(deviceCommand.getStatus());
return sb.toString();
}
}
按照|
分隔符將內(nèi)容輸出到文件中每一行铣除。
- Job作業(yè)開發(fā)
在elastic-job.xml
中配置了作業(yè)B,如下谚咬,在其作業(yè)實(shí)現(xiàn)類中鹦付,就是使用了spring-batch的job定義實(shí)現(xiàn):包括使用上面定義的ItemReader、ItemProcessor择卦、ItemWriter敲长。
<!-- 作業(yè)配置B -->
<job:simple id="simpleJobB" sharding-total-count="2"
cron="10 * * * * ?" registry-center-ref="goPayCenter" class="newgo.job.SimpleJobB"
sharding-item-parameters="0=test1,1=test2" overwrite="true"
description="結(jié)算批量處理" job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
spring-batch可以使用編程式開發(fā),也可以使用xml配置秉继,在此是直接在代碼里面構(gòu)建job,step祈噪。作業(yè)B中結(jié)合batch的具體實(shí)現(xiàn)如下:
@Component
public class SimpleJobB implements SimpleJob{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Override
public void execute(ShardingContext context) {
System.out.println(String.format("------SimpleJobB: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
ClassPathResource resource = new ClassPathResource("batch-data-source.csv");
FileSystemResource resource2 = new FileSystemResource("E:\\work\\newgo\\src\\main\\resources\\batch-data-target.csv");
//通過(guò)classPathResource不可寫入文件數(shù)據(jù)?
// ClassPathResource resource2 = new ClassPathResource("batch-data-target.csv");
//1.0 獲取任務(wù)啟動(dòng)器
//2.0 創(chuàng)建reader(Resource + 文件讀取映射器)
FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setLineMapper(new HelloLineMapper());
flatFileItemReader.setResource(resource);
//3.0 創(chuàng)建處理器processor
HelloItemProcessor processor = new HelloItemProcessor();
//4.0 創(chuàng)建writer
FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(resource2);
flatFileItemWriter.setLineAggregator(new HelloLineAggregator());
//5.0 創(chuàng)建作業(yè)步驟step
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
//流式編程
Step step = stepBuilderFactory.get("step").<DeviceCommand,DeviceCommand>chunk(1)
.reader(flatFileItemReader)
.processor(processor)
.writer(flatFileItemWriter)
.build();
//6.0 創(chuàng)建作業(yè)job,關(guān)聯(lián)步驟step
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
// Job job = jobBuilderFactory.get("job-" + Math.random()).start(step).build();
Job job = jobBuilderFactory.get("job").start(step).build();
int i = 5 / 0;
LinkedHashMap<String, JobParameter> parameterMap = new LinkedHashMap<String,JobParameter>();
JobParameter datetime = new JobParameter(dateFormatter(new Date()));
parameterMap.put("datetime", datetime);
JobParameters paramters = new JobParameters(parameterMap);
//7.0 啟動(dòng)任務(wù)
try {
jobLauncher.run(job, paramters);
} catch (Exception e) {
e.printStackTrace();
}
}
public String dateFormatter(Date date) {
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:MM:ss");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
return df.format(date);
}
}
- 定時(shí)器運(yùn)行
用常規(guī)spring啟動(dòng)方式來(lái)啟動(dòng):
public class App
{
public static void main( String[] args )
{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"classpath:/spring/springBeans.xml"});
}
}
根據(jù)定時(shí)器配置時(shí)間,作業(yè)會(huì)進(jìn)行數(shù)據(jù)處理尚辑。,作業(yè)操作完成之后辑鲤,就可以發(fā)現(xiàn)文件batch-data-target.csv
中的內(nèi)容變成了如下內(nèi)容:
1|SENT
2|SENT
3|SENT
4|SENT
5|SENT
6|SENT
7|SENT
8|SENT
9|SENT
10|SENT
- 讀取數(shù)據(jù)庫(kù)數(shù)據(jù)(關(guān)系數(shù)據(jù)庫(kù))
很多應(yīng)用場(chǎng)景且大量數(shù)據(jù)都是存在數(shù)據(jù)庫(kù)的,那么spring-batch是如何從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)杠茬。再次就介紹兩種:JdbcCursorItemReader
和MyBatisBatchItemWriter
月褥。前者是直接基于原始SQL進(jìn)行處理,后者是利用mybatis框架進(jìn)行整合查詢瓢喉。
- 基于jdbc游標(biāo)和elastic分片處理
思路是:分別定義兩個(gè)job宁赤,分別利用JdbcCursorItemReader設(shè)置不同的查詢SQL來(lái)查詢不同的數(shù)據(jù)。利用elastic的分片來(lái)分別執(zhí)行不同的job栓票。
1.0 先定義兩個(gè)不同的job[在spring-batch.xml中]
包括三大核心組件的定義IPO,自定義查詢SQL决左。
<!-- 配置作業(yè)STEP統(tǒng)一攔截器 -->
<batch:step id="abstractStep" abstract="true">
<batch:listeners>
<batch:listener ref="readItemListener" />
<batch:listener ref="processStepListener" />
</batch:listeners>
</batch:step>
<bean id="abstractCursorReader" abstract="true"
class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- 讀取行業(yè)偶數(shù)列表配置: start -->
<batch:job id="industryEvenProcessJob" restartable="true">
<batch:step id="industryEvenStep" parent="abstractStep">
<batch:tasklet>
<batch:chunk reader="industryEvenJdbcReader" processor="industryProcessor" writer="industryEvenJdbcWriter" commit-interval="50"/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="industryEvenJdbcReader" scope="step" parent="abstractCursorReader">
<property name="sql">
<value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? != 0]]></value>
</property>
<property name="rowMapper" ref="industryRowMapper"/>
<property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
<property name="fetchSize" value="20"/>
</bean>
<bean id="industryProcessor" class="newgo.job.dbmapper.IndustryProcessor"/>
<bean id="industryEvenJdbcWriter" class="newgo.job.dbmapper.IndustryEvenJdbcWriter"/>
<bean id="industryRowMapper" class="newgo.job.dbmapper.IndustryRowMapper"/>
<bean id="industryPreparedStatementSetter" class="newgo.job.dbmapper.IndustryPreparedStatementSetter"/>
<!-- 讀取行業(yè)偶數(shù)列表配置: end -->
<!-- 讀取行業(yè)奇數(shù)列表配置: start -->
<batch:job id="industryOddProcessJob" restartable="true">
<batch:step id="industryOddStep" parent="abstractStep">
<batch:tasklet>
<batch:chunk reader="industryOddJdbcReader" processor="industryProcessor" writer="industryOddJdbcWriter" commit-interval="50"/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="industryOddJdbcReader" scope="step" parent="abstractCursorReader">
<property name="sql">
<value><![CDATA[select id, name,en_name, tm_smp from t_industry WHERE id % ? = 0]]></value>
</property>
<property name="rowMapper" ref="industryRowMapper"/>
<property name="preparedStatementSetter" ref="industryPreparedStatementSetter"/>
<property name="fetchSize" value="20"/>
</bean>
<bean id="industryOddJdbcWriter" class="newgo.job.dbmapper.IndustryOddJdbcWriter"/>
<!-- 讀取行業(yè)偶數(shù)列表配置: end -->
2.0 兩個(gè)作業(yè)的IPO具體實(shí)現(xiàn)在newgo.job.dbmapper目錄下,這里代碼就不貼出來(lái)了走贪。
3.0 定義elastic的作業(yè)分片[elastic-job.xml]
分片是通過(guò)sharding-item-parameters
來(lái)設(shè)定的佛猛。
<!-- 數(shù)據(jù)庫(kù)查詢行業(yè)作業(yè)A : 根據(jù)查詢?nèi)蝿?wù)分片,分別查詢奇偶行列 -->
<job:simple id="elasIndustryJob" sharding-total-count="2" cron="2 * * * * ?"
sharding-item-parameters="0=even,1=odd"
registry-center-ref="goPayCenter" class="newgo.job.IndustryOperatorJob"
job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
4.0 elastic的作業(yè)的分片實(shí)現(xiàn):
@Component
public class IndustryOperatorJob implements SimpleJob {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("industryEvenProcessJob")
private Job industryEvenProcessJob;
@Autowired
@Qualifier("industryOddProcessJob")
private Job industryOddProcessJob;
@Override
public void execute(ShardingContext context) {
System.out.println(String.format("------IndustryQueryJob: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
try {
String shardingParameter = context.getShardingParameter();
switch (shardingParameter) {
case "odd":
JobExecution oddJobResult = jobLauncher.run(industryOddProcessJob, new JobParameters());
System.out.println("IndustryOddProcessJob start result: " + oddJobResult.toString());
break;
case "even":
JobExecution evenJobResult = jobLauncher.run(industryEvenProcessJob, new JobParameters());
System.out.println("IndustryEvenProcessJob start result: " + evenJobResult.toString());
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 基于mybatis分頁(yè)查詢
基于mybatis更方便開發(fā)和維護(hù)坠狡,也是我們常規(guī)操作數(shù)據(jù)庫(kù)的途徑挚躯。下面例子就是通過(guò)mybatis來(lái)查詢數(shù)據(jù),并寫入文件中擦秽。
1.0 配置spring-batch的step[spring-batch.xml]
定義了mybatis的ItemReader實(shí)現(xiàn)码荔。其中queryId
就是對(duì)應(yīng)mybatis接口方法名漩勤。
<bean id="itemReader" scope="step" class="org.mybatis.spring.batch.MyBatisPagingItemReader">
<property name="sqlSessionFactory" ref="sqlSessionFactory" />
<property name="pageSize" value="100"/>
<property name="queryId" value="" />
</bean>
2.0 配置elastic的作業(yè)定時(shí)器[elastic-job.xml]
<!-- 數(shù)據(jù)庫(kù)查詢行業(yè)作業(yè)B : 通過(guò)mybatis映射 -->
<job:simple id="mybatisIndustryJob" sharding-total-count="1" cron="1 * * * * ?"
registry-center-ref="goPayCenter" class="newgo.job.MybatisIndustryJob"
job-exception-handler="newgo.handler.MyJobExceptionHandler"/>
3.0 作業(yè)job具體實(shí)現(xiàn)開發(fā)
@Component
public class MybatisIndustryJob implements SimpleJob{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobRepository jobRepository;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private MybatisWriter mybatisWriter;
@Override
public void execute(ShardingContext context) {
System.out.println(String.format("------MybatisIndustryJob: Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s", Thread.currentThread().getId(),context.getShardingTotalCount(),context.getShardingItem()));
MyBatisPagingItemReader<Industry> itemReader = new MyBatisPagingItemReader<Industry>();
Map<String,Object> params = Maps.newHashMap();
params.put("name", "五金");
itemReader.setQueryId("newgo.dao.IndustryDAO.queryIndustryByName");
itemReader.setPageSize(20);
itemReader.setParameterValues(params);
itemReader.setSqlSessionFactory(sqlSessionFactory);
try {
itemReader.afterPropertiesSet();
} catch (Exception e1) {
e1.printStackTrace();
}
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
//流式編程
Step step = stepBuilderFactory.get("step-mybats").<Industry,IndustryFull>chunk(1)
.reader(itemReader)
.processor(new MybatisIndustryProcessor())
.writer(mybatisWriter)
.build();
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
Job job = jobBuilderFactory.get("mybatisIndustryJob").start(step).build();
try {
JobExecution jobResult = jobLauncher.run(job, new JobParameters());
System.out.println("MybatisIndustryJob 執(zhí)行結(jié)果:" + jobResult.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
這里要注意的就是要設(shè)置itemReader.afterPropertiesSet();方法,不然mybatis在讀取數(shù)據(jù)的時(shí)候缩搅,sqlSessionTemplate
未注入拋出NPE越败。并且在每次通jobLauncher.run
啟動(dòng)的時(shí)候,可以根據(jù)返回值來(lái)查看異常信息硼瓣。在調(diào)試mybaatis查詢方法doPageRead
時(shí)候究飞,發(fā)現(xiàn)內(nèi)部是通過(guò)CopyOnWriteArrayList
存放數(shù)據(jù)的,也就是利用并發(fā)安全的寫時(shí)復(fù)制機(jī)制的集合來(lái)保存讀取到數(shù)據(jù)堂鲤,在高并發(fā)且讀遠(yuǎn)高于寫的時(shí)候亿傅,可以用該類型集合來(lái)保存數(shù)據(jù)。
4.0 處理器和輸出的實(shí)現(xiàn)類都是在newgo.job.mapper
包下瘟栖,再次不在P出葵擎。
5.0 在spring batch的不同階段,都可以注入自定義的監(jiān)聽器來(lái)對(duì)每個(gè)階段數(shù)據(jù)進(jìn)行預(yù)處理,實(shí)現(xiàn)在包newgo.listener
下半哟。
更多高級(jí)特性還需要在實(shí)際應(yīng)用中不斷調(diào)試和實(shí)踐酬滤。。寓涨。
elastic+spring-batch+mybatis源代碼-- 密碼: hvzr
參考: