領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多Spring Batch 5.2 的新版本帶來了海量新功能!Spring Batch 是處理大量但有限的順序資料訪問的引人注目的方式。例如:從 SQL 資料庫讀取資料並寫入 CSV,或者從 FTP 伺服器讀取資料並分析 MongoDB 資料——這就是批處理。你們都知道這是什麼。這項工作的一半(恕我雙關!)是整合各種資料來源和多個數據接收器。另一方面的考慮,正如你們可能想象到的,對於耗時且可能失敗的工作負載,是維護與每個批處理作業執行相關的持久且廣泛的元資料。同樣,在這個版本中我無法深入介紹所有新穎之處!所以,讓我們從宏觀角度看看一些新功能。
JobRepository。在不久的過去,它有兩個 JobRepository 實現:一個支援 JDBC,另一個透過記憶體中的 Map 支援“持久化”。Map 選項對於測試或持久化結果不太重要的純粹效能導向的工作負載來說很方便。我們移除了 Map 實現,建議人們使用像 H2 這樣的記憶體 SQL 資料庫配合 JDBC JobRepository。有些人追求純粹的效能,而 H2 選項不夠好。在這個版本中,我們引入了一個“無資源”的 JobRepository,它不儲存任何狀態,甚至不儲存在記憶體中。我們還為基於 JDBC 的 JobRepository 添加了一個持久化的替代方案,即引入了一個基於 MongoDB 的 JobRepository 實現。ItemReader 的 Spring Data JPA 查詢註冊提示提供了新的支援。ItemReader 時,為資料類——Kotlin data class 或 Java record 例項——提供了新的支援。Function<I,O>——適配到 ItemReader、ItemWriter 和 ItemProcessor 型別。CompositeItemReader<T>,它可以按順序從多個委託的 ItemReader<T> 中讀取資料。RecursiveCollectionLineAggregator 中支援可配置的行分隔符CompositeItemReader<T>讓我們來看看我最喜歡的兩個新功能:CompositeItemReader<T> 和 SEDA 友好的 BlockingQueueItemWriter 和 BlockingQueueItemReader 實現。
這是這個 Spring Batch 應用中唯一 Job 的定義
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
class BatchConfiguration {
static final BlockingQueue<Customer> CUSTOMERS = new LinkedBlockingQueue<>();
@Bean
Job job(JobRepository repository, Step one, Step two) {
return new JobBuilder("job", repository)//
.incrementer(new RunIdIncrementer()) //
.start(one)//
.next(two)//
.build();
}
}
這是一個簡單的作業,包含兩個 Step 例項,一個接著另一個。快速回顧一下:在 Spring Batch 中,Step 是一個工作單元。它描述了四件事:
ItemReader<T> 的例項表示)ItemWriter<T> 的例項表示)每個 Step 使用 ItemReader<I> 讀取一個塊(chunk)資料量,將一個類似集合的“塊”傳遞給 ItemProcessor<I,O> 進行任意的修改,然後將 ItemProcessor<I,O> 的輸出傳送給 ItemWriter<O>。I 和 O 可以代表相同的泛型型別,也可以代表不同的型別。然後,迴圈繼續,直到 ItemReader 中的所有資料都被耗盡。該步驟被認為是完成的,執行將繼續到下一個步驟。
在這個示例應用程式中,我們將從 customer 表讀取資料,讀取 id、name、os 和 language 記錄。我們還將從一個 .csv 檔案讀取類似的資料。我們將使用方便的新 CompositeItemReader<Customer> 來輕鬆完成這項工作,避免我們進行單獨的規範化步驟。
package com.example.bootiful_34.batch;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.CompositeItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportRuntimeHints;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.List;
@Configuration
@ImportRuntimeHints(StepOneConfiguration.CustomersCsvRuntimeHintsRegistrar.class)
class StepOneConfiguration {
private static final Resource CSV = new ClassPathResource("/customers.csv");
@Bean
FlatFileItemReader<Customer> customerCsvItemReader() {
return new FlatFileItemReaderBuilder<Customer>()//
.resource(CSV)
.delimited()
.names("id", "name", "language", "os")
.name("customerCsvItemReader")
.fieldSetMapper(fs -> new Customer(fs.readInt(0), fs.readString(1), fs.readString(2), fs.readString(3)))
.build();
}
@Bean
JdbcCursorItemReader<Customer> customerJdbcItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Customer>()//
.name("customerJdbcItemReader")//
.dataSource(dataSource)//
.sql("select id, name, language, os from customer")//
.rowMapper((rs, rowNum) -> new Customer(rs.getInt(1), rs.getString(2), rs.getString(3), rs.getString(4)))//
.build();
}
@Bean
CompositeItemReader<Customer> customerCompositeItemReader(JdbcCursorItemReader<Customer> customerJdbcItemReader,
FlatFileItemReader<Customer> customerCsvItemReader) {
return new CompositeItemReader<>(List.of(customerJdbcItemReader, customerCsvItemReader));
}
@Bean
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter() {
return new BlockingQueueItemWriter<>(BatchConfiguration.CUSTOMERS);
}
@Bean
Step one(JobRepository repository, PlatformTransactionManager txm,
CompositeItemReader<Customer> customerCompositeItemReader,
BlockingQueueItemWriter<Customer> customerBlockingQueueItemWriter) {
return new StepBuilder("one", repository)//
.<Customer, Customer>chunk(10, txm)//
.reader(customerCompositeItemReader)//
.writer(customerBlockingQueueItemWriter)//
.build();
}
static class CustomersCsvRuntimeHintsRegistrar implements RuntimeHintsRegistrar {
@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
hints.resources().registerResource(CSV);
}
}
}
在這個例子中,我們有三個 ItemReader bean,但該步驟只消耗一個 CompositeItemReader<T> bean。它又會按順序讀取 FlatFileItemReader<Customer> 和 JdbcCursorItemReader<Customer> bean 中的任何資料。
在這個例子中,我們沒有配置 ItemProcessor<Customer,Customer>。
對於 ItemWriter<Customer>,我們使用了框架中另一個新穎的功能:BlockingQueueItemWriter<Customer>!這個想法很簡單:寫入器將資料寫入 Java 的 java.util.concurrent.BlockingQueue。BlockingQueue 變數是 BatchConfiguration 類中定義的 static final 變數,名為 CUSTOMERS。下一個步驟將有一個配置的 BlockingQueueItemReader<T>,它將從同一個 java.util.concurrent.BlockingQueue 中讀取。超級簡單,對吧?是的!但這將節省大量時間。
傳統上,Spring Batch 應用只具有與當前步驟相關的上下文。當資料透過作業流動時,Spring Batch Step 只提供了三次機會:從 ItemReader<I>、ItemProcessor<I,O> 和 ItemWriter<O>。想在資料寫入後進行更多處理?那得等到下一步!你已經把它寫到磁碟或其他持久化介質上了,然後你必須重新讀取它。Spring Batch 會跟蹤你的讀寫進度,那麼為什麼我們非要如此小心謹慎呢?為什麼我們必須如此頻繁地將所有內容持久化呢?
現在不再需要這樣了,因為 Spring Batch 支援將給定的 Step 輸出寫入 BlockingQueue。值得注意的是,BlockingQueue 例項還具有支援限制寫入資料量的額外好處。這與階段式事件驅動架構(SEDA)的風格非常契合。SEDA 背後的思想是根據資料透過的不同階段來定義工作。當資料從一個階段移動到另一個階段時,它會流入(有界)佇列。這些有界佇列提供反壓。如果工作被拒絕,或者在容量超出時被簡單地寫入磁碟,你就不會壓垮某個階段的處理器。這就是所謂的反壓,它對於可伸縮性至關重要。
每個階段只從佇列中獲取其工作。這提供了一種自然的負載平衡方式:啟動更多給定階段的處理器例項,工作就會在它們之間平均分配。你可以透過 Spring Batch 的遠端分割槽和分塊正規化將這種架構推向更遠,讓你能夠跨叢集分割工作。
這種架構通常與訊息系統相關——佇列通常被假定為訊息匯流排中的佇列(或主題);然而,這種架構背後的原則在批處理系統中同樣適用。
讓我們看看第二步!
package com.example.bootiful_34.batch;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
class StepTwoConfiguration {
@Bean
Step two(JobRepository repository, PlatformTransactionManager transactionManager,
BlockingQueueItemReader<Customer> blockingQueueItemReader, ItemWriter<Customer> customerItemWriter) {
return new StepBuilder("two", repository)//
.<Customer, Customer>chunk(10, transactionManager)//
.reader(blockingQueueItemReader)//
.writer(customerItemWriter)//
.build();
}
@Bean
BlockingQueueItemReader<Customer> blockingQueueItemReader() {
return new BlockingQueueItemReader<>(BatchConfiguration.CUSTOMERS);
}
@Bean
ItemWriter<Customer> customerItemWriter() {
return chunk -> chunk.forEach(System.out::println);
}
}
在這裡,我們定義了另一個 Step,它從同一個 BlockingQueue 讀取,然後簡單地打印出所有內容。
健壯、簡單、可擴充套件的批處理?你還能要求什麼?順便說一句,請記住 Spring Batch 的大部分輸入輸出功能,都從 Java 21 的虛擬執行緒中獲益良多,Spring Boot 已經支援了三年了!如果你使用 Java 21+,別忘了設定 spring.threads.virtual.enabled=true。(你至少在使用 Java 21,對吧?)