閃亮的 Spring Boot 3.4: Spring Batch

工程 | Josh Long | 2024年11月24日 | ...

新發布的 Spring Batch 5.2 帶來了大量新特性!Spring Batch 是一種處理大量但有限的順序資料訪問的有力方式。想想:從 SQL 資料庫讀取並寫入 CSV,或者從 FTP 伺服器讀取並對 MongoDB 進行分析——這就是批處理。你知道這是什麼。這項工作的一半(抱歉雙關語!)是將各種資料來源和多個數據匯集點整合起來。另一個方面,正如你可以想象到的,對於耗時很長且可能失敗的工作負載,是維護與每個批處理作業執行相關的持久且詳細的元資料。同樣,我無法深入詳細介紹此版本中的所有新穎特性!因此,讓我們從高層面看看其中一些特性。

  • 我們從一個 Job Repository 實現增加到三個——數數看:三個!最近的 Spring Batch 只支援基於 JDBC 的 JobRepository。在不久前,它有兩個 JobRepository 實現:一個支援 JDBC,另一個透過記憶體中的 Map 支援“持久化”。Map 選項對於測試或那些結果的永續性不如純效能重要的工作負載非常有用。我們移除了 Map 實現,建議人們使用像 H2 這樣的記憶體中 SQL 資料庫配合 JDBC JobRepository。有些人需要純粹的效能,H2 選項不夠好。在這個版本中,我們引入了一個“無資源”的 JobRepository,它不儲存任何狀態,甚至不在記憶體中。我們還添加了一個基於 JDBC 的 JobRepository 的持久替代方案,即基於 MongoDB 的 JobRepository 實現。
  • 使用 JPA ItemReader 時,新增了為 Spring Data JPA 查詢註冊提示的支援。
  • 使用基於 JDBC 的 ItemReader 時,新增了對資料類(Kotlin data class 或 Java record 例項)的支援。
  • 支援適配更多函式型別,不僅僅是 Function<I,O>,以適配 ItemReaderItemWriterItemProcessor 型別。
  • 使用阻塞佇列 Item Reader 和 Writer 實現併發步驟
  • 一個 CompositeItemReader<T>,可以順序地從多個委託的 ItemReader<T> 中讀取資料。
  • 作業註冊的簡化
  • RecursiveCollectionLineAggregator 中支援可配置的行分隔符

CompositeItemReader<T>

讓我們來看看我最喜歡的兩個新特性:CompositeItemReader<T> 和 SEDA 友好的 BlockingQueueItemWriter 以及 BlockingQueueItemReader 實現。

這是此 Spring Batch 應用中唯一的 Job 的定義

package com.example.bootiful_34.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Configuration
class BatchConfiguration {

	static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();

	@Bean
	Job job(JobRepository repository, Step one, Step two) {
		return new JobBuilder("job", repository)//
			.incrementer(new RunIdIncrementer()) //
			.start(one)//
			.next(two)//
			.build();
	}

}

這是一個簡單的作業,包含兩個 Step 例項,一個流向另一個。快速提醒:在 Spring Batch 中,Step 是一個工作單元。它描述了四件事:

  • 多少資料構成一個“批次”工作?(在 Spring Batch 術語中這稱為“chunk”)
  • 資料的來源(由一個 ItemReader<T> 例項表示)
  • 資料的寫入目標(由一個 ItemWriter<T> 例項表示)
  • 一個處理來自源並將資料傳送到目標的處理器。

每個 Step 使用 ItemReader<I> 讀取一個 chunk 的資料量,將一個類似集合的東西(稱為 Chunk)傳遞給 ItemProcessor<I,O> 進行任意操作,然後將 ItemProcessor<I,O> 的輸出傳送給 ItemWriter<O>IO 可以代表相同的泛型型別或不同的型別。然後,迴圈繼續進行,直到 ItemReader 中的所有資料都被讀取完畢。該步驟被視為完成,執行將轉移到下一個步驟。

在這個示例應用中,我們將從 customer 表讀取資料,讀取 idnameoslanguage 記錄。我們將從一個 .csv 檔案讀取類似的資料。我們將使用方便的新特性 CompositeItemReader<Customer> 來快速完成這項工作,從而避免進行單獨的規範化步驟。

package com.example.bootiful_34.batch;

import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.List;

@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {

	private static final Resource CSV = new ClassPathResource("/customers.csv");

	@Bean
	FlatFileItemReader<Customer> customerCsvItemReader() {
		return new FlatFileItemReaderBuilder<Customer>()//
			.resource(CSV)
			.delimited()
			.names("id", "name", "language", "os")
			.name("customerCsvItemReader")
			.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
			.build();
	}

	@Bean
	JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
		return new JdbcCursorItemReaderBuilder<Customer>()//
			.name("customerJdbcItemReader")//
			.dataSource(dataSource)//
			.sql("select id, name, language, os from customer")//
			.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
			.build();
	}

	@Bean
	CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
			FlatFileItemReader<Customer> customerCsvItemReader) {
		return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
	}

	@Bean
	BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
		return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
	}

	@Bean
	Step one(JobRepository repository, PlatformTransactionManager txm,
			CompositeItemReader<Customer> customerCompositeItemReader,
			BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
		return new StepBuilder("one", repository)//
			.<Customer, Customer>chunk(10, txm)//
			.reader(customerCompositeItemReader)//
			.writer(customerBlockingQueueItemWriter)//
			.build();
	}

	static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {

		@Override
		public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
			hints.resources().registerResource(CSV);
		}

	}

}

在這個示例中,我們有三個 ItemReader bean,但步驟只消費一個 CompositeItemReader<T> bean。它將依次讀取來自 FlatFileItemReader<Customer>JdbcCursorItemReader<Customer> bean 的任何資料。

在這個示例中,我們沒有配置 ItemProcessor<Customer,Customer>

分級事件驅動架構(SEDA)和批處理?是的!

對於 ItemWriter<Customer>,我們使用了框架中另一個新穎的補充:BlockingQueueItemWriter<Customer>!其思想很簡單:writer 將資料寫入一個 Java java.util.concurrent.BlockingQueueBlockingQueue 變數是一個定義在 BatchConfiguration 類中的 static final 變數,名為 CUSTOMERS。下一個步驟將配置一個 BlockingQueueItemReader<T>,它將從同一個 java.util.concurrent.BlockingQueue讀取。超簡單,對嗎?是的!但這將節省大量時間。

傳統上,Spring Batch 應用的上下文只與當前步驟相關聯。隨著資料流經一個作業,Spring Batch 的 Step 只給你三次處理資料的機會:從 ItemReader<I>ItemProcessor<I,O>ItemWriter<O>。想在資料寫入後進行更多處理?那就得等到下一個步驟!你已經將資料寫入磁碟或其他持久介質,然後你必須重新讀取它。Spring Batch 會跟蹤你在讀取和寫入方面進行到多遠,那我們為何要如此小心?為何我們需要如此頻繁地將所有內容持久化?

現在情況不再如此了,因為 Spring Batch 支援將給定 Step 的輸出寫入 BlockingQueue。值得注意的是,BlockingQueue 例項有一個額外的好處,即支援對寫入資料量設定限制。這與分級事件驅動架構(SEDA)的風格配合得很好。SEDA 背後的思想是根據資料流經的不同階段來定義工作。隨著資料從一個階段移動到另一個階段,它流入(有界的)佇列。這些有界佇列提供反壓。如果工作被拒絕或在容量超出時簡單地寫入磁碟,你無法壓垮給定階段的處理器。這稱為反壓,對於可伸縮性至關重要。

每個階段僅從一個佇列中獲取工作。這提供了一種天然的負載均衡方式:啟動更多給定階段處理器的例項,工作就會均勻地分配給它們。你甚至可以結合 Spring Batch 的遠端分割槽和分塊範例,將工作分配到叢集中,進一步擴充套件這種架構。

這種架構通常與訊息系統相關聯——佇列通常被假定為訊息匯流排中的佇列(或主題);然而,該架構背後的原則在批處理系統中也同樣適用。

讓我們看看步驟二!

package com.example.bootiful_34.batch;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
class StepTwoConfiguration {

	@Bean
	Step two(JobRepository repository, PlatformTransactionManager transactionManager,
			BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
		return new StepBuilder("two", repository)//
			.<Customer, Customer>chunk(10, transactionManager)//
			.reader(blockingQueueItemReader)//
			.writer(customerItemWriter)//
			.build();
	}

	@Bean
	BlockingQueueItemReader<Customer> blockingQueueItemReader() {
		return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
	}

	@Bean
	ItemWriter<Customer> customerItemWriter() {
		return chunk -> chunk.forEach(System.out::println);
	}

}

在這裡,我們定義了另一個 Step,它從同一個 BlockingQueue 中讀取資料,然後簡單地將所有內容打印出來。

強大、簡單、可擴充套件的批處理?夫復何求?順便說一句,請記住,Spring Batch 的大部分工作——輸入和輸出——都從 Java 21 的虛擬執行緒中獲益巨大,Spring Boot 現在已經支援虛擬執行緒三個版本了!如果你使用的是 Java 21+(你肯定至少用的是 Java 21,對嗎?),別忘了設定 spring.threads.virtual.enabled=true

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

保持領先

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一份簡單訂閱,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視所有