建立批次服務

本指南將引導您完成建立一個基本批處理驅動解決方案的過程。

您將構建什麼

您將構建一個服務,該服務從 CSV 電子表格匯入資料,使用自定義程式碼對其進行轉換,並將最終結果儲存在資料庫中。

您需要什麼

如何完成本指南

與大多數 Spring 入門指南一樣,您可以從頭開始完成每個步驟,也可以跳過您已熟悉的基本設定步驟。無論哪種方式,您最終都會得到可執行的程式碼。

從頭開始,請繼續閱讀從 Spring Initializr 開始

跳過基礎部分,請執行以下操作

完成後,您可以對照 gs-batch-processing/complete 中的程式碼檢查您的結果。

從 Spring Initializr 開始

您可以使用此預初始化專案,然後點選 Generate 下載 ZIP 檔案。該專案已配置好,適合本教程中的示例。

手動初始化專案

  1. 導航到 https://start.spring.io。此服務會拉取您應用所需的所有依賴項,併為您完成大部分設定。

  2. 選擇 Gradle 或 Maven 以及您想使用的語言。本指南假設您選擇了 Java。

  3. 點選 Dependencies(依賴項),然後選擇 Spring Batch 和 HyperSQL Database。

  4. 點選 Generate(生成)。

  5. 下載生成的 ZIP 檔案,其中包含按您選擇配置的應用存檔。

如果您的 IDE 集成了 Spring Initializr,您可以直接在 IDE 中完成此過程。
您也可以從 GitHub fork 該專案,並在您的 IDE 或其他編輯器中開啟它。

業務資料

通常,您的客戶或業務分析師會提供一個電子表格。對於這個簡單的示例,您可以在 src/main/resources/sample-data.csv 中找到一些虛構的資料。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此電子表格的每一行包含一個名字和一個姓氏,用逗號分隔。這是一種相當常見的模式,Spring 可以無需定製地處理。

接下來,您需要編寫一個 SQL 指令碼來建立一個表以儲存資料。您可以在 src/main/resources/schema-all.sql 中找到這樣的指令碼。

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);
Spring Boot 在啟動時會自動執行 schema-@@platform@@.sql-all 是所有平臺的預設值。

建立業務類

既然您可以看到資料輸入和輸出的格式,您就可以編寫程式碼來表示一行資料,如下例(來自 src/main/java/com/example/batchprocessing/Person.java)所示。

package com.example.batchprocessing;

public record Person(String firstName, String lastName) {

}

您可以透過建構函式例項化 Person 記錄,提供名字和姓氏。

建立一箇中間處理器

批處理中一個常見的正規化是攝取資料、對其進行轉換,然後將其輸出到其他地方。在這裡,您需要編寫一個簡單的轉換器,將名稱轉換為大寫。以下列表(來自 src/main/java/com/example/batchprocessing/PersonItemProcessor.java)展示瞭如何實現這一點。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting ({}) into ({})", person, transformedPerson);

    return transformedPerson;
  }

}

PersonItemProcessor 實現了 Spring Batch 的 ItemProcessor 介面。這使得將程式碼連線到您將在本指南後面定義的批處理作業中變得容易。根據介面規定,您會接收一個傳入的 Person 物件,然後將其轉換為一個大寫的 Person 物件。

輸入和輸出型別不一定相同。事實上,在讀取一個數據源後,有時應用程式的資料流需要不同的資料型別。

組織一個批處理作業

現在您需要將實際的批處理作業組織起來。Spring Batch 提供了許多實用類,減少了編寫自定義程式碼的需求。相反,您可以專注於業務邏輯。

要配置您的作業,您首先必須建立一個 Spring 的 @Configuration 類,如下例(位於 src/main/java/com/example/batchprocessing/BatchConfiguration.java)所示。本示例使用記憶體資料庫,這意味著完成後資料就會消失。現在向您的 BatchConfiguration 類新增以下 bean,以定義一個讀取器、一個處理器和一個寫入器。

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names("firstName", "lastName")
    .targetType(Person.class)
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .beanMapped()
    .build();
}

第一部分程式碼定義了輸入、處理器和輸出。

  • reader() 建立一個 ItemReader。它查詢名為 sample-data.csv 的檔案,並解析每一行,提取足夠的資訊將其轉換為 Person 物件。

  • processor() 建立一個您之前定義的 PersonItemProcessor 例項,用於將資料轉換為大寫。

  • writer(DataSource) 建立一個 ItemWriter。這個寫入器面向 JDBC 目標,並自動獲取 Spring Boot 建立的 DataSource。它包含了插入單個 Person 物件所需的 SQL 語句,由 Java record 元件驅動。

最後一部分(來自 src/main/java/com/example/batchprocessing/BatchConfiguration.java)展示了實際的作業配置。

@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
  return new JobBuilder("importUserJob", jobRepository)
    .listener(listener)
    .start(step1)
    .build();
}

@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
          FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
  return new StepBuilder("step1", jobRepository)
    .<Person, Person>chunk(3, transactionManager)
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .build();
}

第一個方法定義了作業,第二個方法定義了一個步驟。作業由步驟組成,每個步驟可以包含一個讀取器、一個處理器和一個寫入器。

然後列出每個步驟(儘管這個作業只有一個步驟)。作業結束,Java API 會生成一個完美配置的作業。

在步驟定義中,您定義了每次寫入多少資料。在本例中,它每次最多寫入三個記錄。接下來,您使用先前注入的 bean 配置讀取器、處理器和寫入器。

chunk() 前面帶有 <Person,Person>,因為它是泛型方法。這表示每次“塊”處理的輸入和輸出型別,並與 ItemReader<Person>ItemWriter<Person> 對齊。

批處理配置的最後一部分是獲取作業完成通知的方式。以下示例(來自 src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)展示了這樣一個類。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{}> in the database.", person));
    }
  }
}

JobCompletionNotificationListener 監聽作業何時狀態為 BatchStatus.COMPLETED,然後使用 JdbcTemplate 檢查結果。

使應用程式可執行

儘管批處理可以嵌入到 Web 應用和 WAR 檔案中,但下面演示的更簡單方法建立了一個獨立應用程式。您將所有內容打包到一個可執行 JAR 檔案中,由一個傳統的 Java main() 方法驅動。

Spring Initializr 為您建立了一個應用程式類。對於這個簡單的示例,它無需進一步修改即可工作。以下列表(來自 src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)展示了應用程式類。

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication 是一個便利註解,它添加了以下所有內容

  • @Configuration:將類標記為應用上下文的 bean 定義源。

  • @EnableAutoConfiguration:告訴 Spring Boot 根據類路徑設定、其他 bean 和各種屬性設定開始新增 bean。例如,如果 spring-webmvc 在類路徑中,此註解會將應用程式標記為 Web 應用程式並激活關鍵行為,例如設定 DispatcherServlet

  • @ComponentScan:告訴 Spring 在 com/example 包中查詢其他元件、配置和服務,使其能夠找到控制器。

main() 方法使用 Spring Boot 的 SpringApplication.run() 方法啟動應用程式。您注意到沒有一行 XML 嗎?也沒有 web.xml 檔案。這個 Web 應用程式是 100% 純 Java,您不必處理任何管道或基礎設施的配置。

注意,SpringApplication.exit()System.exit() 確保 JVM 在作業完成後退出。有關更多詳細資訊,請參閱 Spring Boot 參考文件中的應用程式退出部分

出於演示目的,提供了注入 JdbcTemplate、查詢資料庫並打印出批處理作業插入的人員姓名的程式碼。

注意應用程式如何不使用 @EnableBatchProcessing 註解。以前,可以使用 @EnableBatchProcessing 來啟用 Spring Boot 對 Spring Batch 的自動配置。現在可以定義一個帶有 @EnableBatchProcessing 註解或擴充套件 Spring Batch 的 DefaultBatchConfiguration 的 bean 來告訴自動配置回退,從而允許應用程式完全控制 Spring Batch 的配置方式。

構建可執行 JAR

您可以使用 Gradle 或 Maven 從命令列執行應用程式。您也可以構建一個包含所有必要依賴項、類和資源的單個可執行 JAR 檔案並執行它。構建可執行 JAR 使服務在整個開發生命週期、跨不同環境等過程中易於交付、版本控制和部署為應用程式。

如果您使用 Gradle,可以透過執行 ./gradlew bootRun 來執行應用程式。或者,您可以使用 ./gradlew build 構建 JAR 檔案,然後如下所示執行 JAR 檔案:

java -jar build/libs/gs-batch-processing-0.1.0.jar

如果您使用 Maven,可以透過執行 ./mvnw spring-boot:run 來執行應用程式。或者,您可以使用 ./mvnw clean package 構建 JAR 檔案,然後如下所示執行 JAR 檔案:

java -jar target/gs-batch-processing-0.1.0.jar
此處描述的步驟建立了一個可執行的 JAR。您也可以構建一個傳統的 WAR 檔案

作業會為每個經過轉換的人員列印一行。作業執行後,您還可以看到查詢資料庫的輸出。它應該類似於以下輸出:

Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <Person[firstName=JILL, lastName=DOE]> in the database.
Found <Person[firstName=JOE, lastName=DOE]> in the database.
Found <Person[firstName=JUSTIN, lastName=DOE]> in the database.
Found <Person[firstName=JANE, lastName=DOE]> in the database.
Found <Person[firstName=JOHN, lastName=DOE]> in the database.

總結

恭喜!您構建了一個批處理作業,該作業從電子表格中攝取資料、對其進行處理並寫入資料庫。

另請參閱

以下指南也可能對您有幫助

想編寫新指南或為現有指南做貢獻?請檢視我們的貢獻指南

所有指南的程式碼都以 ASLv2 許可證釋出,其文字內容以 署名-禁止演繹(Attribution, NoDerivatives)知識共享許可協議 釋出。

獲取程式碼