保持領先
VMware 提供培訓和認證,助您加速進步。
瞭解更多新發布的 Spring Batch 5.2 帶來了大量新特性!Spring Batch 是一種處理大量但有限的順序資料訪問的有力方式。想想:從 SQL 資料庫讀取並寫入 CSV,或者從 FTP 伺服器讀取並對 MongoDB 進行分析——這就是批處理。你知道這是什麼。這項工作的一半(抱歉雙關語!)是將各種資料來源和多個數據匯集點整合起來。另一個方面,正如你可以想象到的,對於耗時很長且可能失敗的工作負載,是維護與每個批處理作業執行相關的持久且詳細的元資料。同樣,我無法深入詳細介紹此版本中的所有新穎特性!因此,讓我們從高層面看看其中一些特性。
JobRepository
。在不久前,它有兩個 JobRepository
實現:一個支援 JDBC,另一個透過記憶體中的 Map
支援“持久化”。Map
選項對於測試或那些結果的永續性不如純效能重要的工作負載非常有用。我們移除了 Map
實現,建議人們使用像 H2 這樣的記憶體中 SQL 資料庫配合 JDBC JobRepository
。有些人需要純粹的效能,H2 選項不夠好。在這個版本中,我們引入了一個“無資源”的 JobRepository
,它不儲存任何狀態,甚至不在記憶體中。我們還添加了一個基於 JDBC 的 JobRepository
的持久替代方案,即基於 MongoDB 的 JobRepository
實現。ItemReader
時,新增了為 Spring Data JPA 查詢註冊提示的支援。ItemReader
時,新增了對資料類(Kotlin data class
或 Java record
例項)的支援。Function<I,O>
,以適配 ItemReader
、ItemWriter
和 ItemProcessor
型別。CompositeItemReader<T>
,可以順序地從多個委託的 ItemReader<T>
中讀取資料。RecursiveCollectionLineAggregator
中支援可配置的行分隔符CompositeItemReader<T>
讓我們來看看我最喜歡的兩個新特性:CompositeItemReader<T>
和 SEDA 友好的 BlockingQueueItemWriter
以及 BlockingQueueItemReader
實現。
這是此 Spring Batch 應用中唯一的 Job
的定義
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
class BatchConfiguration {
static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();
@Bean
Job job(JobRepository repository, Step one, Step two) {
return new JobBuilder("job", repository)//
.incrementer(new RunIdIncrementer()) //
.start(one)//
.next(two)//
.build();
}
}
這是一個簡單的作業,包含兩個 Step
例項,一個流向另一個。快速提醒:在 Spring Batch 中,Step
是一個工作單元。它描述了四件事:
ItemReader<T>
例項表示)ItemWriter<T>
例項表示)每個 Step
使用 ItemReader<I>
讀取一個 chunk 的資料量,將一個類似集合的東西(稱為 Chunk)傳遞給 ItemProcessor<I,O>
進行任意操作,然後將 ItemProcessor<I,O>
的輸出傳送給 ItemWriter<O>
。I
和 O
可以代表相同的泛型型別或不同的型別。然後,迴圈繼續進行,直到 ItemReader
中的所有資料都被讀取完畢。該步驟被視為完成,執行將轉移到下一個步驟。
在這個示例應用中,我們將從 customer
表讀取資料,讀取 id
、name
、os
和 language
記錄。我們也將從一個 .csv
檔案讀取類似的資料。我們將使用方便的新特性 CompositeItemReader<Customer>
來快速完成這項工作,從而避免進行單獨的規範化步驟。
package com.example.bootiful_34.batch;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.List;
@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {
private static final Resource CSV = new ClassPathResource("/customers.csv");
@Bean
FlatFileItemReader<Customer> customerCsvItemReader() {
return new FlatFileItemReaderBuilder<Customer>()//
.resource(CSV)
.delimited()
.names("id", "name", "language", "os")
.name("customerCsvItemReader")
.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
.build();
}
@Bean
JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Customer>()//
.name("customerJdbcItemReader")//
.dataSource(dataSource)//
.sql("select id, name, language, os from customer")//
.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
.build();
}
@Bean
CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
FlatFileItemReader<Customer> customerCsvItemReader) {
return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
}
@Bean
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
}
@Bean
Step one(JobRepository repository, PlatformTransactionManager txm,
CompositeItemReader<Customer> customerCompositeItemReader,
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
return new StepBuilder("one", repository)//
.<Customer, Customer>chunk(10, txm)//
.reader(customerCompositeItemReader)//
.writer(customerBlockingQueueItemWriter)//
.build();
}
static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {
@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
hints.resources().registerResource(CSV);
}
}
}
在這個示例中,我們有三個 ItemReader
bean,但步驟只消費一個 CompositeItemReader<T>
bean。它將依次讀取來自 FlatFileItemReader<Customer>
和 JdbcCursorItemReader<Customer>
bean 的任何資料。
在這個示例中,我們沒有配置 ItemProcessor<Customer,Customer>
。
對於 ItemWriter<Customer>
,我們使用了框架中另一個新穎的補充:BlockingQueueItemWriter<Customer>
!其思想很簡單:writer 將資料寫入一個 Java java.util.concurrent.BlockingQueue
。BlockingQueue
變數是一個定義在 BatchConfiguration
類中的 static final
變數,名為 CUSTOMERS
。下一個步驟將配置一個 BlockingQueueItemReader<T>
,它將從同一個 java.util.concurrent.BlockingQueue
中讀取。超簡單,對嗎?是的!但這將節省大量時間。
傳統上,Spring Batch 應用的上下文只與當前步驟相關聯。隨著資料流經一個作業,Spring Batch 的 Step
只給你三次處理資料的機會:從 ItemReader<I>
、ItemProcessor<I,O>
和 ItemWriter<O>
。想在資料寫入後進行更多處理?那就得等到下一個步驟!你已經將資料寫入磁碟或其他持久介質,然後你必須重新讀取它。Spring Batch 會跟蹤你在讀取和寫入方面進行到多遠,那我們為何要如此小心?為何我們需要如此頻繁地將所有內容持久化?
現在情況不再如此了,因為 Spring Batch 支援將給定 Step
的輸出寫入 BlockingQueue
。值得注意的是,BlockingQueue
例項有一個額外的好處,即支援對寫入資料量設定限制。這與分級事件驅動架構(SEDA)的風格配合得很好。SEDA 背後的思想是根據資料流經的不同階段來定義工作。隨著資料從一個階段移動到另一個階段,它流入(有界的)佇列。這些有界佇列提供反壓。如果工作被拒絕或在容量超出時簡單地寫入磁碟,你無法壓垮給定階段的處理器。這稱為反壓,對於可伸縮性至關重要。
每個階段僅從一個佇列中獲取工作。這提供了一種天然的負載均衡方式:啟動更多給定階段處理器的例項,工作就會均勻地分配給它們。你甚至可以結合 Spring Batch 的遠端分割槽和分塊範例,將工作分配到叢集中,進一步擴充套件這種架構。
這種架構通常與訊息系統相關聯——佇列通常被假定為訊息匯流排中的佇列(或主題);然而,該架構背後的原則在批處理系統中也同樣適用。
讓我們看看步驟二!
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
class StepTwoConfiguration {
@Bean
Step two(JobRepository repository, PlatformTransactionManager transactionManager,
BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
return new StepBuilder("two", repository)//
.<Customer, Customer>chunk(10, transactionManager)//
.reader(blockingQueueItemReader)//
.writer(customerItemWriter)//
.build();
}
@Bean
BlockingQueueItemReader<Customer> blockingQueueItemReader() {
return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
}
@Bean
ItemWriter<Customer> customerItemWriter() {
return chunk -> chunk.forEach(System.out::println);
}
}
在這裡,我們定義了另一個 Step
,它從同一個 BlockingQueue
中讀取資料,然後簡單地將所有內容打印出來。
強大、簡單、可擴充套件的批處理?夫復何求?順便說一句,請記住,Spring Batch 的大部分工作——輸入和輸出——都從 Java 21 的虛擬執行緒中獲益巨大,Spring Boot 現在已經支援虛擬執行緒三個版本了!如果你使用的是 Java 21+(你肯定至少用的是 Java 21,對嗎?),別忘了設定 spring.threads.virtual.enabled=true
。