通過例子講解Spring Batch入門,優秀的批處理框架
1 前言
歡迎訪問南瓜慢說 www.pkslow.com獲取更多精彩文章!
Spring相關文章:
Springboot-Cloud相關
Spring Batch
是一個輕量級的、完善的批處理框架,作為Spring
體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量資訊、定時處理大量數據等場景十分簡便。
結合調度框架能更大地發揮Spring Batch
的作用。
2 Spring Batch的概念知識
2.1 分層架構
Spring Batch
的分層架構圖如下:
可以看到它分為三層,分別是:
Application
應用層:包含了所有任務batch jobs
和開發人員自定義的程式碼,主要是根據項目需要開發的業務流程等。Batch Core
核心層:包含啟動和管理任務的運行環境類,如JobLauncher
等。Batch Infrastructure
基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader
和寫出writer
、重試框架等。
2.2 關鍵概念
理解下圖所涉及的概念至關重要,不然很難進行後續開發和問題分析。
2.2.1 JobRepository
專門負責與資料庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch
是需要依賴資料庫來管理的。
2.2.2 任務啟動器JobLauncher
負責啟動任務Job
。
2.2.3 任務Job
Job
是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job
所定義的內容。
上圖介紹了Job
的一些相關概念:
Job
:封裝處理實體,定義過程邏輯。JobInstance
:Job
的運行實例,不同的實例,參數不同,所以定義好一個Job
後可以通過不同參數運行多次。JobParameters
:與JobInstance
相關聯的參數。JobExecution
:代表Job
的一次實際執行,可能成功、可能失敗。
所以,開發人員要做的事情,就是定義Job
。
2.2.4 步驟Step
Step
是對Job
某個過程的封裝,一個Job
可以包含一個或多個Step
,一步步的Step
按特定邏輯執行,才代表Job
執行完成。
通過定義Step
來組裝Job
可以更靈活地實現複雜的業務邏輯。
2.2.5 輸入——處理——輸出
所以,定義一個Job
關鍵是定義好一個或多個Step
,然後把它們組裝好即可。而定義Step
有多種方法,但有一種常用的模型就是輸入——處理——輸出
,即Item Reader
、Item Processor
和Item Writer
。比如通過Item Reader
從文件輸入數據,然後通過Item Processor
進行業務處理和數據轉換,最後通過Item Writer
寫到資料庫中去。
Spring Batch
為我們提供了許多開箱即用的Reader
和Writer
,非常方便。
3 程式碼實例
理解了基本概念後,就直接通過程式碼來感受一下吧。整個項目的功能是從多個csv
文件中讀數據,處理後輸出到一個csv
文件。
3.1 基本框架
添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
需要添加Spring Batch
的依賴,同時使用H2
作為記憶體資料庫比較方便,實際生產肯定是要使用外部的資料庫,如Oracle
、PostgreSQL
。
入口主類:
@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {
public static void main(String[] args) {
SpringApplication.run(PkslowBatchJobMain.class, args);
}
}
也很簡單,只是在Springboot
的基礎上添加註解@EnableBatchProcessing
。
領域實體類Employee
:
package com.pkslow.batch.entity;
public class Employee {
String id;
String firstName;
String lastName;
}
對應的csv
文件內容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
3.2 輸入——處理——輸出
3.2.1 讀取ItemReader
因為有多個輸入文件,所以定義如下:
@Value("input/inputData*.csv")
private Resource[] inputResources;
@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{
MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
resourceItemReader.setResources(inputResources);
resourceItemReader.setDelegate(reader());
return resourceItemReader;
}
@Bean
public FlatFileItemReader<Employee> reader()
{
FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
//跳過csv文件第一行,為表頭
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
//欄位名
setNames(new String[] { "id", "firstName", "lastName" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
//轉換化後的目標類
setTargetType(Employee.class);
}
});
}
});
return reader;
}
這裡使用了FlatFileItemReader
,方便我們從文件讀取數據。
3.2.2 處理ItemProcessor
為了簡單演示,處理很簡單,就是把最後一列轉為大寫:
public ItemProcessor<Employee, Employee> itemProcessor() {
return employee -> {
employee.setLastName(employee.getLastName().toUpperCase());
return employee;
};
}
3.2.3 輸出ItremWriter
比較簡單,程式碼及注釋如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv");
@Bean
public FlatFileItemWriter<Employee> writer()
{
FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
writer.setResource(outputResource);
//是否為追加模式
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
{
//設置分割符
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
{
//設置欄位
setNames(new String[] { "id", "firstName", "lastName" });
}
});
}
});
return writer;
}
3.3 Step
有了Reader-Processor-Writer
後,就可以定義Step
了:
@Bean
public Step csvStep() {
return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.processor(itemProcessor())
.writer(writer())
.build();
}
這裡有一個chunk
的設置,值為5
,意思是5條記錄後再提交輸出,可以根據自己需求定義。
3.4 Job
完成了Step
的編碼,定義Job
就容易了:
@Bean
public Job pkslowCsvJob() {
return jobBuilderFactory
.get("pkslowCsvJob")
.incrementer(new RunIdIncrementer())
.start(csvStep())
.build();
}
3.5 運行
完成以上編碼後,執行程式,結果如下:
成功讀取數據,並將最後欄位轉為大寫,並輸出到outputData.csv
文件。
4 監聽Listener
可以通過Listener
介面對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日誌;處理完成,就通知下游拿數據等。
我們分別對Read
、Process
和Write
事件進行監聽,對應分別要實現ItemReadListener
介面、ItemProcessListener
介面和ItemWriteListener
介面。因為程式碼比較簡單,就是列印一下日誌,這裡只貼出ItemWriteListener
的實現程式碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> {
private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
@Override
public void beforeWrite(List<? extends Employee> list) {
logger.info("beforeWrite: " + list);
}
@Override
public void afterWrite(List<? extends Employee> list) {
logger.info("afterWrite: " + list);
}
@Override
public void onWriteError(Exception e, List<? extends Employee> list) {
logger.info("onWriteError: " + list);
}
}
把實現的監聽器listener
整合到Step
中去:
@Bean
public Step csvStep() {
return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.listener(new PkslowReadListener())
.processor(itemProcessor())
.listener(new PkslowProcessListener())
.writer(writer())
.listener(new PkslowWriteListener())
.build();
}
執行後看一下日誌:
這裡就能明顯看到之前設置的chunk
的作用了。Writer
每次是處理5條記錄,如果一條輸出一次,會對IO
造成壓力。
5 總結
Spring Batch
還有許多優秀的特性,如面對大量數據時的並行處理。本文主要入門介紹為主,不一一介紹,後續會專門講解。
歡迎關注微信公眾號<南瓜慢說>,將持續為你更新…
多讀書,多分享;多寫作,多整理。