建立批處理服務

本指南將引導您完成建立基本批處理驅動解決方案的過程。

您將構建什麼

您將構建一個服務,該服務從 CSV 電子表格匯入資料,使用自定義程式碼對其進行轉換,並將最終結果儲存在資料庫中。

你需要什麼

如何完成本指南

與大多數 Spring 入門指南一樣,您可以從頭開始並完成每個步驟,也可以跳過您已熟悉的基本設定步驟。無論哪種方式,您最終都會得到可工作的程式碼。

從頭開始,請轉到從 Spring Initializr 開始

跳過基礎知識,請執行以下操作

完成時,您可以對照gs-batch-processing/complete中的程式碼檢查結果。

從 Spring Initializr 開始

您可以使用此預初始化專案並單擊“生成”以下載 ZIP 檔案。此專案已配置為符合本教程中的示例。

手動初始化專案

  1. 導航到 https://start.spring.io。此服務會為您拉取應用程式所需的所有依賴項,併為您完成大部分設定。

  2. 選擇 Gradle 或 Maven 以及您想要使用的語言。本指南假設您選擇了 Java。

  3. 單擊依賴項並選擇Spring BatchHyperSQL Database

  4. 單擊生成

  5. 下載生成的 ZIP 檔案,它是一個已根據您的選擇配置的應用程式存檔。

如果您的 IDE 集成了 Spring Initializr,您可以從 IDE 中完成此過程。
您還可以從 GitHub fork 專案,並在您的 IDE 或其他編輯器中開啟它。

業務資料

通常,您的客戶或業務分析師會提供一個電子表格。對於這個簡單的示例,您可以在src/main/resources/sample-data.csv中找到一些虛構的資料

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此電子表格的每一行都包含一個名字和一個姓氏,用逗號分隔。這是一種相當常見的模式,Spring 可以無需自定義地處理。

接下來,您需要編寫一個 SQL 指令碼來建立一個表來儲存資料。您可以在src/main/resources/schema-all.sql中找到這樣的指令碼

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);
Spring Boot 在啟動期間會自動執行schema-@@platform@@.sql-all是所有平臺的預設值。

建立業務類

現在您可以看到資料輸入和輸出的格式,您可以編寫程式碼來表示一行資料,如下面的示例(來自src/main/java/com/example/batchprocessing/Person.java)所示

package com.example.batchprocessing;

public record Person(String firstName, String lastName) {

}

您可以透過建構函式使用名字和姓氏例項化Person記錄。

建立中間處理器

批處理中常見的範例是攝取資料,對其進行轉換,然後將其輸出到其他地方。在這裡,您需要編寫一個簡單的轉換器,將名稱轉換為大寫。以下列表(來自src/main/java/com/example/batchprocessing/PersonItemProcessor.java)顯示瞭如何執行此操作

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.infrastructure.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting ({}) into ({})", person, transformedPerson);

    return transformedPerson;
  }

}

PersonItemProcessor實現了 Spring Batch 的ItemProcessor介面。這使得將程式碼連線到您將在本指南後面定義的批處理作業變得容易。根據介面,您會收到一個傳入的Person物件,然後將其轉換為大寫的Person

輸入和輸出型別不必相同。事實上,在讀取一個數據源之後,有時應用程式的資料流需要不同的資料型別。

組合批處理作業

現在您需要組合實際的批處理作業。Spring Batch 提供了許多實用程式類,減少了編寫自定義程式碼的需求。相反,您可以專注於業務邏輯。

要配置您的作業,您必須首先建立一個 Spring @Configuration類,如src/main/java/com/example/batchprocessing/BatchConfiguration.java中的以下示例所示。此示例使用基於記憶體的資料庫,這意味著當它完成時,資料將消失。現在將以下 bean 新增到您的BatchConfiguration類中,以定義一個讀取器、一個處理器和一個寫入器

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names("firstName", "lastName")
    .targetType(Person.class)
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .beanMapped()
    .build();
}

第一段程式碼定義了輸入、處理器和輸出。

  • reader()建立了一個ItemReader。它查詢名為sample-data.csv的檔案,並解析每個行專案,其中包含足夠的資訊以將其轉換為Person

  • processor()建立了您之前定義的PersonItemProcessor的一個例項,旨在將資料轉換為大寫。

  • writer(DataSource)建立了一個ItemWriter。這個寫入器面向 JDBC 目標,並自動獲取由 Spring Boot 建立的DataSource。它包含插入單個Person所需的 SQL 語句,由 Java 記錄元件驅動。

最後一段(來自src/main/java/com/example/batchprocessing/BatchConfiguration.java)顯示了實際的作業配置

@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
  return new JobBuilder(jobRepository)
    .listener(listener)
    .start(step1)
    .build();
}

@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
          FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
  return new StepBuilder(jobRepository)
    .<Person, Person>chunk(3)
          .transactionManager(transactionManager)
    .reader(reader)
    .processor(processor)
    .writer(writer)
    .build();
}

第一個方法定義作業,第二個方法定義單個步驟。作業由步驟構建,其中每個步驟都可以包含一個讀取器、一個處理器和一個寫入器。

然後您列出每個步驟(儘管此作業只有一個步驟)。作業結束,Java API 生成了一個完美配置的作業。

在步驟定義中,您定義每次寫入多少資料。在這種情況下,它每次寫入最多三條記錄。接下來,您透過使用之前注入的 bean 來配置讀取器、處理器和寫入器。

chunk()<Person,Person>為字首,因為它是一個泛型方法。這表示每個“塊”處理的輸入和輸出型別,並與ItemReader<Person>ItemWriter<Person>對齊。

批處理配置的最後一部分是獲取作業完成通知的方式。以下示例(來自src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)顯示了這樣一個類

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{}> in the database.", person));
    }
  }
}

JobCompletionNotificationListener監聽作業何時BatchStatus.COMPLETED,然後使用JdbcTemplate檢查結果。

使應用程式可執行

雖然批處理可以嵌入到 Web 應用程式和 WAR 檔案中,但下面演示的更簡單的方法建立了一個獨立應用程式。您將所有內容打包在一個由一箇舊的 Java main()方法驅動的單個可執行 JAR 檔案中。

Spring Initializr 為您建立了一個應用程式類。對於這個簡單的示例,它無需進一步修改即可工作。以下列表(來自src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)顯示了應用程式類

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication 是一個方便的註解,它添加了以下所有內容

  • @Configuration:將類標記為應用程式上下文的 bean 定義源。

  • @EnableAutoConfiguration:告訴 Spring Boot 根據類路徑設定、其他 bean 和各種屬性設定開始新增 bean。例如,如果 spring-webmvc 在類路徑中,此註解會將應用程式標記為 Web 應用程式並激活關鍵行為,例如設定 DispatcherServlet

  • @ComponentScan:告訴 Spring 在 com/example 包中查詢其他元件、配置和服務,使其能夠找到控制器。

main() 方法使用 Spring Boot 的 SpringApplication.run() 方法啟動應用程式。您是否注意到沒有一行 XML?也沒有 web.xml 檔案。這個 Web 應用程式是 100% 純 Java,您不必處理任何管道或基礎設施的配置。

請注意,SpringApplication.exit()System.exit()確保 JVM 在作業完成後退出。有關更多詳細資訊,請參閱Spring Boot 參考文件中的應用程式退出部分

出於演示目的,有程式碼注入JdbcTemplate,查詢資料庫,並打印出批處理作業插入的人員姓名。

請注意應用程式如何不使用@EnableBatchProcessing註解。以前,@EnableBatchProcessing可用於啟用 Spring Boot 對 Spring Batch 的自動配置。現在可以定義一個帶有@EnableBatchProcessing註解或擴充套件 Spring Batch 的DefaultBatchConfiguration的 bean 來告訴自動配置回退,從而允許應用程式完全控制 Spring Batch 的配置方式。

構建可執行 JAR

您可以使用 Gradle 或 Maven 從命令列執行應用程式。您還可以構建一個包含所有必要依賴項、類和資源並執行的單個可執行 JAR 檔案。構建可執行 JAR 使在整個開發生命週期中,跨不同環境等,輕鬆交付、版本化和部署服務作為應用程式。

如果您使用 Gradle,您可以透過使用 ./gradlew bootRun 執行應用程式。或者,您可以透過使用 ./gradlew build 構建 JAR 檔案,然後按如下方式執行 JAR 檔案

java -jar build/libs/gs-batch-processing-0.1.0.jar

如果您使用 Maven,您可以透過使用 ./mvnw spring-boot:run 執行應用程式。或者,您可以使用 ./mvnw clean package 構建 JAR 檔案,然後按如下方式執行 JAR 檔案

java -jar target/gs-batch-processing-0.1.0.jar
這裡描述的步驟建立了一個可執行的 JAR。您還可以構建一個經典的 WAR 檔案

作業為每個被轉換的人員列印一行。作業執行後,您還可以看到查詢資料庫的輸出。它應該類似於以下輸出

Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
Found <Person[firstName=JILL, lastName=DOE]> in the database.
Found <Person[firstName=JOE, lastName=DOE]> in the database.
Found <Person[firstName=JUSTIN, lastName=DOE]> in the database.
Found <Person[firstName=JANE, lastName=DOE]> in the database.
Found <Person[firstName=JOHN, lastName=DOE]> in the database.

總結

恭喜!您構建了一個批處理作業,該作業從電子表格中攝取資料,對其進行處理,並將其寫入資料庫。

另請參閱

以下指南也可能有所幫助

想寫新指南或為現有指南做貢獻嗎?請檢視我們的貢獻指南

所有指南的程式碼均採用 ASLv2 許可,文字內容採用署名-禁止演繹知識共享許可

獲取程式碼