源碼解析springbatch的job是如何運行的?
- 2022 年 8 月 9 日
- 筆記
- springbatch, springboot, Web進階, 源碼探究-實現
202208-源碼解析springbatch的job是如何運行的?
注,本文中的demo程式碼節選於圖書《Spring Batch批處理框架》的配套源程式碼,並做並適配springboot升級版本,完全開源。
SpringBatch的背景和用法,就不再贅述了,默認本文受眾都使用過batch框架。
本文僅討論普通的ChunkStep,分片/非同步處理等功能暫不討論。
1. 表結構
Spring系列的框架程式碼,大多又臭又長,讓人頭暈。先列出整體流程,再去看源碼。順帶也可以了解存儲表結構。
- 每一個jobname,加運行參數的MD5值,被定義為一個job_instance,存儲在batch_job_instance表中;
- job_instance每次運行時,會創建一個新的job_execution,存儲在batch_job_execution / batch_job_execution_context 表中;
- 擴展:任務重啟時,如何續作? 答,判定為任務續作,創建新的job_execution時,會使用舊job_execution的運行態ExecutionContext(通俗講,火車出故障只換了車頭,車廂貨物不變。)
- job_execution會根據job排程中的step順序,逐個執行,逐個轉化為step_execution,並存儲在batch_step_execution / batch_step_execution_context表中
- 每個step在執行時,會維護step運行狀態,當出現異常或者整個step清單執行完成,會更新job_execution的狀態
- 在每個step執行前後、job_execution前後,都會通知Listener做回調。
框架使用的表
batch_job_instance
batch_job_execution
batch_job_execution_context
batch_job_execution_params
batch_step_execution
batch_step_execution_context
batch_job_seq
batch_step_execution_seq
batch_job_execution_seq
2. API入口
先看看怎麼調用啟動Job的API,看起來非常簡單,傳入job資訊和參數即可
@Autowired
@Qualifier("billJob")
private Job job;
@Test
public void billJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("currentTimeMillis", System.currentTimeMillis())
.addString("batchNo","2022080402")
.toJobParameters();
JobExecution result = jobLauncher.run(job, jobParameters);
System.out.println(result.toString());
Thread.sleep(6000);
}
<!-- 賬單作業 -->
<batch:job id="billJob">
<batch:step id="billStep">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="csvItemReader" writer="csvItemWriter" processor="creditBillProcessor" commit-interval="3">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
org.springframework.batch.core.launch.support.SimpleJobLauncher#run
// 簡化部分程式碼(參數檢查、log日誌)
@Override
public JobExecution run(final Job job, final JobParameters jobParameters){
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
// 上次執行存在,說明本次請求是重啟job,先做檢查
if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/* 檢查stepExecutions的狀態
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
* retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. ");
}
}
}
// Check jobParameters
job.getJobParametersValidator().validate(jobParameters);
// 創建JobExecution 同一個job+參數,只能有一個Execution執行器
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
// SyncTaskExecutor 看似是非同步,實際是同步執行(可擴展)
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 關鍵入口,請看[org.springframework.batch.core.job.AbstractJob#execute]
job.execute(jobExecution);
if (logger.isInfoEnabled()) {
Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime());
}
}
catch (Throwable t) {
rethrow(t);
}
}
private void rethrow(Throwable t) {
// 省略各類拋異常
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
// 更新job_execution的運行狀態
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}
return jobExecution;
}
3. 深入程式碼流程
簡單看看API入口,子類劃分較多,繼續往後看
總體程式碼流程
- org.springframework.batch.core.launch.support.SimpleJobLauncher#run 入口api,構建jobExecution
- org.springframework.batch.core.job.AbstractJob#execute 對jobExecution進行執行、listener的前置處理
- FlowJob#doExecute -> SimpleFlow#start 按順序逐個處理Step、構建stepExecution
- JobFlowExecutor#executeStep -> SimpleStepHandler#handleStep -> AbstractStep#execute 執行stepExecution
- TaskletStep#doExecute 通過RepeatTemplate,調用TransactionTemplate方法,在事務中執行
- 內部類TaskletStep.ChunkTransactionCallback#doInTransaction
- 反覆調起ChunkOrientedTasklet#execute 去執行read-process-writer方法,
- 通過自定義的Reader得到inputs,例如本文實現的是flatReader讀取csv文件
- 遍歷inputs,將item逐個傳入,調用processor處理
- 調用writer,將outputs一次性寫入
- 不同reader的實現內容不同,通過快取讀取的行數等資訊,可做到分片、按數量處理chunk
JobExecution的處理過程
org.springframework.batch.core.job.AbstractJob#execute
/** 運行給定的job,處理全部listener和DB存儲的調用
* Run the specified job, handling all listener and repository calls, and
* delegating the actual processing to {@link #doExecute(JobExecution)}.
*
* @see Job#execute(JobExecution)
* @throws StartLimitExceededException
* if start limit of one of the steps was exceeded
*/
@Ovrride
public final void execute(JobExecution execution) {
// 同步控制器,防並發執行
JobSynchronizationManager.register(execution);
// 計時器,記錄耗時
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs",
Tag.of("name", execution.getJobInstance().getJobName()));
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
Timer.Sample timerSample = BatchMetrics.createTimerSample();
try {
// 參數再次進行校驗
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
// 更新db中任務狀態
execution.setStartTime(new Date());
updateStatus(execution, BatchStatus.STARTED);
// 回調所有listener的beforeJob方法
listener.beforeJob(execution);
try {
doExecute(execution);
} catch (RepeatException e) {
throw e.getCause(); // 搞不懂這裡包一個RepeatException 有啥用
}
} else {
// 任務狀態時BatchStatus.STOPPING,說明任務已經停止,直接改成STOPPED
// The job was already stopped before we even got this far. Deal
// with it in the same way as any other interruption.
execution.setStatus(BatchStatus.STOPPED);
execution.setExitStatus(ExitStatus.COMPLETED);
}
} catch (JobInterruptedException e) {
// 任務被打斷 STOPPED
execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
execution.addFailureException(e);
} catch (Throwable t) {
// 其他原因失敗 FAILED
logger.error("Encountered fatal error executing job", t);
execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
execution.setStatus(BatchStatus.FAILED);
execution.addFailureException(t);
} finally {
try {
if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
&& execution.getStepExecutions().isEmpty()) {
ExitStatus exitStatus = execution.getExitStatus();
ExitStatus newExitStatus =
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}
// 計時器 計算總耗時
timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
Tag.of("name", execution.getJobInstance().getJobName()),
Tag.of("status", execution.getExitStatus().getExitCode())
));
longTaskTimerSample.stop();
execution.setEndTime(new Date());
try {
// 回調所有listener的afterJob方法 調用失敗也不影響任務完成
listener.afterJob(execution);
} catch (Exception e) {
logger.error("Exception encountered in afterJob callback", e);
}
// 寫入db
jobRepository.update(execution);
} finally {
// 釋放控制
JobSynchronizationManager.release();
}
}
}
3.2何時調用Reader?
在SimpleChunkProvider#provide中會分次調用reader,並將結果包裝為Chunk返回。
其中有幾個細節,此處不再贅述。
- 如何控制一次讀取幾個item?
- 如何控制最後一行讀完就不讀了?
- 如果需要跳過文件頭的前N行,怎麼處理?
- 在StepContribution中記錄讀取數量
org.springframework.batch.core.step.item.SimpleChunkProcessor#process
@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@SuppressWarnings("unchecked")
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
inputs = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, inputs);
}
}
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs);
// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
}
chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}
return RepeatStatus.continueIf(!inputs.isEnd());
}
3.3何時調用Processor/Writer?
在RepeatTemplate和外圍事務模板的包裝下,通過SimpleChunkProcessor進行處理:
- 查出若干條數的items,打包為Chunk
- 遍歷items,逐個item調用processor
- 通知StepListener,環繞處理調用before/after方法
// 忽略無關程式碼...
@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
// 輸入為空,直接返回If there is no input we don't have to do anything more
if (isComplete(inputs)) {
return;
}
// Make the transformation, calling remove() on the inputs iterator if
// any items are filtered. Might throw exception and cause rollback.
Chunk<O> outputs = transform(contribution, inputs);
// Adjust the filter count based on available data
contribution.incrementFilterCount(getFilterCount(inputs, outputs));
// Adjust the outputs if necessary for housekeeping purposes, and then
// write them out...
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
}
// 遍歷items,逐個item調用processor
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<>();
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item);
}
catch (Exception e) {
/*
* For a simple chunk processor (no fault tolerance) we are done here, so prevent any more processing of these inputs.
*/
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
if (output != null) {
outputs.add(output);
}
else {
iterator.remove();
}
}
return outputs;
}
4. 每個step是如何與事務處理掛鉤?
在TaskletStep#doExecute中會使用TransactionTemplate,包裝事務操作
標準的事務操作,通過函數式編程風格,從action的CallBack調用實際處理方法
- 通過transactionManager獲取事務
- 執行操作
- 無異常,則提交事務
- 若異常,則回滾
// org.springframework.batch.core.step.tasklet.TaskletStep#doExecute
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
// 事務啟用過程
// org.springframework.transaction.support.TransactionTemplate#execute
@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status);
return result;
}
}
5. 怎麼控制每個chunk幾條記錄提交一次事務? 控制每個事務窗口處理的item數量
在配置任務時,有個step級別的參數,[commit-interval],用於每個事務窗口提交的控制被處理的item數量。
RepeatTemplate#executeInternal 在處理單條item後,會查看已處理完的item數量,與配置的chunk數量做比較,如果滿足chunk數,則不再繼續,準備提交事務。
StepBean在初始化時,會新建SimpleCompletionPolicy(chunkSize會優先使用配置值,默認是5)
在每個chunk處理開始時,都會調用SimpleCompletionPolicy#start新建RepeatContextSupport#count用於計數。
源碼(簡化) org.springframework.batch.repeat.support.RepeatTemplate#executeInternal
/**
* Internal convenience method to loop over interceptors and batch
* callbacks.
* @param callback the callback to process each element of the loop.
*/
private RepeatStatus executeInternal(final RepeatCallback callback) {
// Reset the termination policy if there is one...
// 此處會調用completionPolicy.start方法,更新chunk的計數器
RepeatContext context = start();
// Make sure if we are already marked complete before we start then no processing takes place.
// 通過running欄位來判斷是否繼續處理next
boolean running = !isMarkedComplete(context);
// 省略listeners處理....
// Return value, default is to allow continued processing.
RepeatStatus result = RepeatStatus.CONTINUABLE;
RepeatInternalState state = createInternalState(context);
try {
while (running) {
/*
* Run the before interceptors here, not in the task executor so
* that they all happen in the same thread - it's easier for
* tracking batch status, amongst other things.
*/
// 省略listeners處理....
if (running) {
try {
// callback是實際處理方法,類似函數式編程
result = getNextResult(context, callback, state);
executeAfterInterceptors(context, result);
}
catch (Throwable throwable) {
doHandle(throwable, context, deferred);
}
// 檢查當前chunk是否處理完,決策出是否繼續處理下一條item
// N.B. the order may be important here:
if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty() {
running = false;
}
}
}
result = result.and(waitForResults(state));
// 省略throwables處理....
// Explicitly drop any references to internal state...
state = null;
}
finally {
// 省略程式碼...
}
return result;
}
總結
JSR-352標準定義了Java批處理的基本模型,包含批處理的元數據像 JobExecutions,JobInstances,StepExecutions 等等。通過此類模型,提供了許多基礎組件與擴展點:
- 完善的基礎組件
- Spring Batch 有很多的這類組件 例如 ItemReaders,ItemWriters,PartitionHandlers 等等對應各類數據和環境。
- 豐富的配置
- JSR-352 定義了基於XML的任務設置模型。Spring Batch 提供了基於Java (類型安全的)的配置方式
- 可伸縮性
- 伸縮性選項-Local Partitioning 已經包含在JSR -352 裡面了。但是還應該有更多的選擇 ,例如Spring Batch 提供的 Multi-threaded Step,Remote Partitioning ,Parallel Step,Remote Chunking 等等選項
- 擴展點
- 良好的listener模式,提供step/job運行前後的錨點,以供開發人員個性化處理批處理流程。
2013年, JSR-352標準包含在 JavaEE7中發布,到2022年已近10年,Spring也在探索新的批處理模式, 如Spring Attic /Spring Cloud Data Flow。 //docs.spring.io/spring-batch/docs/current/reference/html/jsr-352.html
擴展
1. Job/Step運行時的上下文,是如何保存?如何控制?
整個Job在運行時,會將運行資訊保存在JobContext中。 類似的,Step運行時也有StepContext。可以在Context中保存一些參數,在任務或者步驟中傳遞使用。
查看JobContext/StepContext源碼,發現僅用了普通變數保存Execution,這個類肯定有執行緒安全問題。 生產環境中常常出現多個任務並處處理的情況。
SpringBatch用了幾種方式來包裝並發安全:
- 每個job初始化時,通過JobExecution新建了JobContext,每個任務執行緒都用自己的對象。
- 使用JobSynchronizationManager,內含一個ConcurrentHashMap,KEY是JobExecution,VALUE是JobContext
- 在任務解釋時,會移除當前JobExecution對應的k-v
此處能看到,如果在JobExecution存儲大量的業務數據,會導致無法GC回收,導致OOM。所以在上下文中,只應保存精簡的數據。
2. step執行時,如果出現異常,如何保護運行狀態?
在源碼中,使用了各類同步控制和加鎖、oldVersion版本拷貝,整體比較複雜(org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction)
- oldVersion版本拷貝:上一次運行出現異常時,本次執行時沿用上次的斷點內容
// 節選部分程式碼
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
copy(stepExecution, oldVersion);
private void copy(final StepExecution source, final StepExecution target) {
target.setVersion(source.getVersion());
target.setWriteCount(source.getWriteCount());
target.setFilterCount(source.getFilterCount());
target.setCommitCount(source.getCommitCount());
target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
}
- 訊號量控制,在每個chunk運行完成後,需先獲取鎖,再更新stepExecution前
- Shared semaphore per step execution, so other step executions can run in parallel without needing the lockSemaphore (org.springframework.batch.core.step.tasklet.TaskletStep#doExecute)
// 省略無關程式碼
try {
try {
// 執行w-p-r模型方法
result = tasklet.execute(contribution, chunkContext);
if (result == null) {
result = RepeatStatus.FINISHED;
}
}
catch (Exception e) {
// 省略...
}
}
finally {
// If the step operations are asynchronous then we need to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step execution.
try {
// 獲取鎖
semaphore.acquire();
locked = true;
}
catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
Thread.currentThread().interrupt();
}
stepExecution.apply(contribution);
}
stepExecutionUpdated = true;
stream.update(stepExecution.getExecutionContext());
try {
// 更新上下文、DB中的狀態
// Going to attempt a commit. If it fails this flag will stay false and we can use that later.
getJobRepository().updateExecutionContext(stepExecution);
stepExecution.incrementCommitCount();
getJobRepository().update(stepExecution);
}
catch (Exception e) {
// If we get to here there was a problem saving the step execution and we have to fail.
String msg = "JobRepository failure forcing rollback";
logger.error(msg, e);
throw new FatalStepExecutionException(msg, e);
}