­

通過例子講解Spring Batch入門,優秀的批處理框架

1 前言

歡迎訪問南瓜慢說 www.pkslow.com獲取更多精彩文章!

Spring相關文章:Springboot-Cloud相關

Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量資訊、定時處理大量數據等場景十分簡便。

結合調度框架能更大地發揮Spring Batch的作用。

2 Spring Batch的概念知識

2.1 分層架構

Spring Batch的分層架構圖如下:

可以看到它分為三層,分別是:

  • Application應用層:包含了所有任務batch jobs和開發人員自定義的程式碼,主要是根據項目需要開發的業務流程等。
  • Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。
  • Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader寫出writer、重試框架等。

2.2 關鍵概念

理解下圖所涉及的概念至關重要,不然很難進行後續開發和問題分析。

2.2.1 JobRepository

專門負責與資料庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴資料庫來管理的。

2.2.2 任務啟動器JobLauncher

負責啟動任務Job

2.2.3 任務Job

Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。

上圖介紹了Job的一些相關概念:

  • Job:封裝處理實體,定義過程邏輯。
  • JobInstanceJob的運行實例,不同的實例,參數不同,所以定義好一個Job後可以通過不同參數運行多次。
  • JobParameters:與JobInstance相關聯的參數。
  • JobExecution:代表Job的一次實際執行,可能成功、可能失敗。

所以,開發人員要做的事情,就是定義Job

2.2.4 步驟Step

Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。

通過定義Step來組裝Job可以更靈活地實現複雜的業務邏輯。

2.2.5 輸入——處理——輸出

所以,定義一個Job關鍵是定義好一個或多個Step,然後把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item ReaderItem ProcessorItem Writer。比如通過Item Reader從文件輸入數據,然後通過Item Processor進行業務處理和數據轉換,最後通過Item Writer寫到資料庫中去。

Spring Batch為我們提供了許多開箱即用的ReaderWriter,非常方便。

3 程式碼實例

理解了基本概念後,就直接通過程式碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理後輸出到一個csv文件。

3.1 基本框架

添加依賴:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

需要添加Spring Batch的依賴,同時使用H2作為記憶體資料庫比較方便,實際生產肯定是要使用外部的資料庫,如OraclePostgreSQL

入口主類:

@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {
    public static void main(String[] args) {
        SpringApplication.run(PkslowBatchJobMain.class, args);
    }
}

也很簡單,只是在Springboot的基礎上添加註解@EnableBatchProcessing

領域實體類Employee

package com.pkslow.batch.entity;
public class Employee {
    String id;
    String firstName;
    String lastName;
}

對應的csv文件內容如下:

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller

3.2 輸入——處理——輸出

3.2.1 讀取ItemReader

因為有多個輸入文件,所以定義如下:

@Value("input/inputData*.csv")
private Resource[] inputResources;

@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{
  MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
  resourceItemReader.setResources(inputResources);
  resourceItemReader.setDelegate(reader());
  return resourceItemReader;
}

@Bean
public FlatFileItemReader<Employee> reader()
{
  FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
  //跳過csv文件第一行,為表頭
  reader.setLinesToSkip(1);
  reader.setLineMapper(new DefaultLineMapper() {
    {
      setLineTokenizer(new DelimitedLineTokenizer() {
        {
          //欄位名
          setNames(new String[] { "id", "firstName", "lastName" });
        }
      });
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
        {
          //轉換化後的目標類
          setTargetType(Employee.class);
        }
      });
    }
  });
  return reader;
}

這裡使用了FlatFileItemReader,方便我們從文件讀取數據。

3.2.2 處理ItemProcessor

為了簡單演示,處理很簡單,就是把最後一列轉為大寫:

public ItemProcessor<Employee, Employee> itemProcessor() {
  return employee -> {
    employee.setLastName(employee.getLastName().toUpperCase());
    return employee;
  };
}

3.2.3 輸出ItremWriter

比較簡單,程式碼及注釋如下:

private Resource outputResource = new FileSystemResource("output/outputData.csv");

@Bean
public FlatFileItemWriter<Employee> writer()
{
  FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
  writer.setResource(outputResource);
  //是否為追加模式
  writer.setAppendAllowed(true);
  writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
    {
      //設置分割符
      setDelimiter(",");
      setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
        {
          //設置欄位
          setNames(new String[] { "id", "firstName", "lastName" });
        }
      });
    }
  });
  return writer;
}

3.3 Step

有了Reader-Processor-Writer後,就可以定義Step了:

@Bean
public Step csvStep() {
  return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .processor(itemProcessor())
    .writer(writer())
    .build();
}

這裡有一個chunk的設置,值為5,意思是5條記錄後再提交輸出,可以根據自己需求定義。

3.4 Job

完成了Step的編碼,定義Job就容易了:

@Bean
public Job pkslowCsvJob() {
  return jobBuilderFactory
    .get("pkslowCsvJob")
    .incrementer(new RunIdIncrementer())
    .start(csvStep())
    .build();
}

3.5 運行

完成以上編碼後,執行程式,結果如下:

成功讀取數據,並將最後欄位轉為大寫,並輸出到outputData.csv文件。

4 監聽Listener

可以通過Listener介面對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日誌;處理完成,就通知下游拿數據等。

我們分別對ReadProcessWrite事件進行監聽,對應分別要實現ItemReadListener介面、ItemProcessListener介面和ItemWriteListener介面。因為程式碼比較簡單,就是列印一下日誌,這裡只貼出ItemWriteListener的實現程式碼:

public class PkslowWriteListener implements ItemWriteListener<Employee> {
    private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
    @Override
    public void beforeWrite(List<? extends Employee> list) {
        logger.info("beforeWrite: " + list);
    }

    @Override
    public void afterWrite(List<? extends Employee> list) {
        logger.info("afterWrite: " + list);
    }

    @Override
    public void onWriteError(Exception e, List<? extends Employee> list) {
        logger.info("onWriteError: " + list);
    }
}

把實現的監聽器listener整合到Step中去:

@Bean
public Step csvStep() {
  return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .listener(new PkslowReadListener())
    .processor(itemProcessor())
    .listener(new PkslowProcessListener())
    .writer(writer())
    .listener(new PkslowWriteListener())
    .build();
}

執行後看一下日誌:

這裡就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。

5 總結

Spring Batch還有許多優秀的特性,如面對大量數據時的並行處理。本文主要入門介紹為主,不一一介紹,後續會專門講解。


歡迎關注微信公眾號<南瓜慢說>,將持續為你更新…

多讀書,多分享;多寫作,多整理。

Tags: