Spring Batch 和 Spring Integration 的實際應用

工程 | Dave Syer | 2010 年 2 月 15 日 | ...

Spring BatchSpring Integration 的使用者有一些共同的擔憂,我們經常被問到它們如何協同工作。Spring Batch Admin 1.0.0.M2 最近釋出了,它大量使用了 Spring Integration,因此它是研究一些特定用例的良好載體,這正是我們計劃在本文中介紹的內容。

Spring Batch 整合

1.0.0.M2 版本釋出的一部分是 Spring Batch Integration 模組,最近從 Spring Batch 遷移過來,並在 Batch Admin 中找到了新的歸宿。許多 Batch-Integration 的交叉用例都在 Spring Batch Integration 中實現或演示。新歸宿的原因是 Batch Admin 大量使用了 Batch Integration 的功能,因此調整這些專案的釋出週期更有意義。

Spring Batch Admin

Spring Batch Admin 是 SpringSource 的一個開源專案。它旨在為開發人員提供一個 Web UI 和用於構建自己的 UI 的工具,以便與 Spring Batch 作業進行互動(啟動、停止、調查失敗原因等)。最近的里程碑版本在 1.0 的計劃功能方面已經相當完整,但如果您有任何想法或貢獻,請訪問 論壇問題追蹤器 並參與社群。

開箱即用的目標執行時是單個 servlet 容器例項(例如 SpringSource tc Server),在該容器中,系統無需任何配置即可工作。但我們希望能夠支援基本用例的定製和擴充套件,包括將部署擴充套件到伺服器叢集,而 Spring Integration 正被證明是許多擴充套件點的關鍵。

結合批處理和整合

Spring Batch 和 Spring Integration 之間的界限並非總是清晰的,但有一些可以遵循的指導方針。主要是:考慮粒度,並應用常見模式。本文描述了一些常見模式。更多模式在 Spring Batch Integration 和 Spring Batch Admin 中實現(也可能是未來文章的主題)。

為批處理過程新增訊息傳遞可以實現操作自動化,並分離和策略化關鍵關注點。例如,一條訊息可能觸發作業執行,然後訊息的傳送可以透過多種方式暴露。或者當作業完成或失敗時,可能會觸發傳送一條訊息,而這些訊息的消費者可能具有與應用程式本身無關的操作關注點。

反過來也一樣:訊息傳遞也可以嵌入到作業中,但這超出了本文的範圍。例如:透過通道讀寫待處理的專案。

以下是一些使用 Spring Integration 和 Spring Batch Integration 在 Batch Admin 中實現的用例。

模式:訊息觸發

Spring Integration 的優點在於訊息生產者和訊息消費者之間的關注點分離,一個很好的具體例子是訊息觸發作業執行的能力。在這種情況下,消費者是完全通用的,並且是圍繞標準 Spring Batch 的一個非常薄的包裝器JobLauncher(這裡的程式碼來自 Spring Batch Integration 的 JobLaunchingMessageHandler

@ServiceActivator
public JobExecution launch(JobLaunchRequest request) {

    Job job = request.getJob();
    JobParameters jobParameters = request.getJobParameters();

    return jobLauncher.run(job, jobParameters);
    
}

正如您從上面的程式碼片段中可以看到的,這個包裝器非常薄,幾乎不值一提,但它的優點是具有非常清晰和明顯的職責,並且易於獨立測試。JobLaunchRequest物件是一個特殊的包裝器,用於傳遞給JobLauncher的輸入引數,以便它們可以構成 Spring Integration 訊息的有效載荷。

這個JobLaunchingMessageHandler被連線到一個MessageChannel在 Spring Batch Admin 中(位於 Manager jar 的/META-INF/bootstrap/integration/launch-context.xml):

<service-activator input-channel="job-requests">
    <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
        <beans:constructor-arg ref="jobLauncher" />
    </beans:bean>
</service-activator>

至此,這種整合模式的消費者端就完成了。這是一種本地方法,因為它不太適合遠端呼叫,因為JobLaunchRequest故意不是Serializable(因為Job不是)。

要啟動一個Job,我們在本地所需要做的就是建立一個生產者並用它來發送一個JobLaunchRequestjob-requests通道。Batch Admin Manager 模組中有一個整合測試正是這樣做的,但這裡整合方法的真正強大之處在於能夠策略化請求並讓它們來自各種不同的生產者。

模式:通道複用

一條訊息可以傳送到job-requestsBatch Admin 中的通道,方法有很多。要打破本地呼叫的限制並遠端暴露作業,所需要做的就是將其他形式的傳入請求適配為一個JobLaunchRequest,而 Spring Integration 使這一切變得非常容易。這是我們稱之為通道複用的模式的基本場景。例如:

HTTP: 瀏覽器

Spring Integration HTTP 介面卡模組可用於透過 HTTP 接收輸入訊息

<http:inbound-channel-adapter name="/job-requests" channel="job-launches" 
    request-mapper="bodyInboundRequestMapper" view="reload-job-executions" />

此程式碼片段可在 Batch Admin Manager 模組中找到(位於META-INF/servlet/integration-servlet.xml)。它暴露了一個端點 URLhttp://.../batch/job-requests,我們可以透過在瀏覽器中提交表單來發送作業執行請求。

原則上,請求可以是任何我們喜歡的形式,因為我們可以在此介面卡的下游和JobLaunchingMessageHandler的上游轉換訊息。在 Spring Batch Admin 中,轉換是透過另一個 POJO 訊息處理程式(StringToJobLaunchRequestAdapter)對介面卡輸出進行的。

HTTP: 命令列

上面使用的同一個 HTTP 介面卡可以用來從 UN*X 命令列遠端啟動作業。這是使用 Spring Integration HTTP 介面卡的一個非常棒的方法:你可以使用簡單的 shell 指令碼自動化許多操作。例如,如果應用程式在本地部署,並且有一個名為 "staging" 的作業,這將起作用

$ echo staging[input.file=foo] | curl -v -d @- -H "Content-Type: text/plain" \
  https://:8080/springone-web-demo/batch/job-requests

名為 "staging" 的作業使用一個引數啟動(input.file=foo),其中foo是要作為輸入讀取的檔案的絕對路徑。該作業配置瞭如下的 item reader

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="linesToSkip" value="1" />
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
    </property>
    <property name="resource" value="#{jobParameters[input.file]}" />
</bean>

(此程式碼片段不在 Spring Batch Admin 示例中,但在我們在 Spring One Americas 2009 上進行的演示中出現過。)

檔案輪詢

Spring Integration 可以輪詢目錄中的檔案(使用檔案介面卡模組)。生成的訊息只需要適配到job-requests通道。Spring Batch Admin 在一個簡單的訊息處理程式中實現了這一點(FileToJobLaunchRequestAdapte):
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
    JobParameters jobParameters = new JobParametersBuilder().addString(
            "input.file", file.getAbsolutePath()).toJobParameters();
    return new JobLaunchRequest(job, jobParameters);
}

這個簡單的 POJO 方法被宣告為@ServiceActivator(它也可以是@Transformer),因此它可以插入到訊息處理鏈中,緊接在JobLaunchingMessageHandler之前,將一個File轉換成一個JobLaunchRequest.

重啟

失敗的作業通常可以在 Spring Batch 中重新啟動,這個功能可以透過 Spring Batch Admin UI 的網頁瀏覽器使用。它也可以在命令列中使用,或者對任何可以向名為job-restarts:

<channel id="job-restarts" />
    <service-activator input-channel="job-restarts" output-channel="job-requests">
    <beans:bean class="org.springframework.batch.admin.integration.JobNameToJobRestartRequestAdapter">
        <beans:property name="jobLocator" ref="jobRegistry" />
        <beans:property name="jobExplorer" ref="jobExplorer" />
    </beans:bean>
</service-activator>

的所有通道傳送訊息的人使用。這個通道只需要作業名稱,並且已被暴露為一個 HTTP 入站端點,所以從 UN*X 命令列你可以這樣做

$ echo staging | curl -v -d @- -H "Content-Type: text/plain" \
  https://:8080/springone-web-demo/batch/job-restarts

重試

如果作業因可恢復的錯誤(例如呼叫遠端服務時超時或網路故障)重複失敗,您可能希望它自動重新啟動。重試可以在作業內部使用 Spring Batch 的一些特性在低級別處理,但要重試整個作業需要對執行時進行一些操作。這可以透過 Spring Integration 簡單地實現,現在job-requests通道正在接受啟動請求。為此目的的端點將充當一個過濾器,查詢已知可重試的作業失敗條件,然後作為一個重啟轉換器(如上面的示例)。因此,像這樣的鏈將起作用

<chain input-channel="input-files" output-channel="job-requests" 
        xmlns="http://www.springframework.org/schema/integration">
    <filter>
        <bean class="...RetryableJobExecutionFilter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="pattern" value="(&amp;s).*TimeoutException.*" />
        </bean>
	</filter>
    <service-activator>
        <bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="job" ref="job1" />
        </bean>
    </service-activator>
</chain>

其中RetryableJobExecutionFilter可能這樣實現

public boolean isRetryable(JobExecution jobExecution) {
    boolean retryable = false;
    for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
        if (stepExecution.getStatus().isLessThan(BatchStatus.STOPPED)) {
            continue;
        }
        if (stepExecution.getExitStatus().getExitDescription().matches(pattern)) {
            retryable = true;
            break;
        }
    }
    return retryable;
}

這個例子在我們的 Spring One 演示中出現過;它不在 Spring Batch Admin 中,但對於您需要的任何特定過濾器來說,實現起來都很簡單。

輸入檔案上傳

透過 Spring Batch Admin UI 直接支援將檔案上傳到應用程式。不建議使用 HTTP POST 上傳大檔案,主要是因為應用程式必須在記憶體中緩衝內容,但這對於上傳小型或中型資料集供 Spring Batch 處理來說是一個不錯的功能。

示例應用程式實際上並沒有使用 Spring Integration 的檔案輪詢器(但對於希望按上述方式配置它的客戶端來說,它是可用的);而是使用檔案上傳後的直接訊息觸發。策略是 Manager 模組上傳檔案,然後向釋出-訂閱通道(input-files).

)傳送一條訊息。任何可以使用輸入檔案的作業,只需要有一個上游元件訂閱該通道,並在檔案符合條件時將其傳遞下去。在示例中,這是透過按檔案的父目錄名稱過濾檔案來完成的

<chain input-channel="input-files" output-channel="job-requests" 
        xmlns="http://www.springframework.org/schema/integration">
    <filter expression="payload.parent.name=='sample'" />
    <service-activator>
        <bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter" 
            xmlns="http://www.springframework.org/schema/beans">
            <property name="job" ref="job1" />
        </bean>
    </service-activator>
</chain>

如果輸入檔案的父目錄(可在 Web UI 中設定)是 "sample",那麼它將被匯入到服務啟用器中,該啟用器將其轉換為一個JobLaunchRequest,並將其傳送出去由JobLaunchingMessageHandler處理(如前所述)。

模式:POJO 訊息處理

將檔案傳送到input-files通道是在 Batch Admin Manager 中完成的,秉承了 Spring 應用程式的最佳傳統,透過簡單的 POJO 和基於介面的元件實現。有一個FileService介面和一個本地實現,該實現使用一個臨時目錄來處理透過 HTTP 傳入的檔案。檔案上傳後,服務透過一個帶有自定義介面的簡單訊息閘道器傳送它
public interface FileSender {
    
    void send(File file);

}

該介面沒有實現(除了單元測試中的存根),因為 Spring Integration 可以提供一個

<gateway id="fileSender" 
    service-interface="org.springframework.batch.admin.service.FileSender"
    default-request-channel="input-files" />

<beans:bean class="org.springframework.batch.admin.service.LocalFileService">
    <beans:property name="fileSender" ref="fileSender" />
</beans:bean>

配置檔案上傳

Spring Batch Admin 允許使用者上傳 Spring 配置檔案,以便從 UI 啟動和管理作業。這對於在執行時重新引數化作業非常有用,例如在執行效能測試套件以衡量各種效能調整(如更改步驟中的提交間隔)的效果時。

為了接受輸入的配置檔案,我們使用一個訊息通道,這樣它就可以被多種不同的輸入方法複用。配置透過一個名為job-configurations:

<service-activator input-channel="job-configurations" output-channel="job-registrations">
	<beans:bean class="org.springframework.batch.admin.integration.JobConfigurationResourceLoader">
		<beans:property name="jobRegistry" ref="jobRegistry" />
	</beans:bean>
</service-activator>

這裡的服務啟用器只接受一個 SpringResource並將其視為一個配置檔案:載入一個ApplicationContext,掃描其中的Job元件,並將其註冊到提供的登錄檔中。一旦註冊到登錄檔中,作業就可以從 UI 的主 Jobs 選單啟動,或者透過job-requests通道啟動,如上所述。

就像輸入檔案一樣,配置檔案可以透過 HTTP 傳入,在這種情況下可以是檔案附件或純文字引數,也可以透過檔案輪詢傳入。輪詢用例在 Manager 模組中實現,所以值得快速看一下它是如何工作的。在META-INF/bootstrap/integration/configuration-context.xml中,我們找到了這個

<file:inbound-channel-adapter directory="target/config" channel="job-configuration-files"
	filename-pattern=".*\.xml">
	<poller max-messages-per-poll="1">
		<cron-trigger expression="5/1 * * * * *" />
	</poller>
</file:inbound-channel-adapter>

介面卡將輪詢一個目錄(這裡為演示目的硬編碼為 "target/config",但在實際應用程式中會進行引數化)並查詢名稱以 ".xml" 結尾的檔案。當符合該模式的檔案到達時,它被髮送(作為java.io.File)到job-configuration-files通道。訊息從那裡被轉換,以便File變成一個Resource,並且可以將其傳送到job-configurations通道。

模式:資訊訊息

一旦您開始使用 Spring Integration 訊息來驅動許多應用程式功能,通常可以方便地接入訊息流,以獲取資訊或用於報告目的。例如,在作業開始、停止(完成或失敗)時傳送一條訊息會很有用。使用MessagePublishingInterceptor可以輕鬆實現這一點。在 Spring Batch Admin Manager 中,該攔截器被配置為傳送作業執行訊息


<aop:config>
	<aop:advisor advice-ref="jobMessagePublishingInterceptor" pointcut="execution(* *..Job+.execute(..))" />
</aop:config>

<bean id="jobMessagePublishingInterceptor" class="org.springframework.integration.aop.MessagePublishingInterceptor"
	xmlns="http://www.springframework.org/schema/beans">
	<constructor-arg index="0">
		<bean class="org.springframework.batch.admin.integration.TrivialExpressionSource" p:payload="#args[execution]" />
	</constructor-arg>
	<property name="defaultChannel" ref="job-operator" />
</bean>

每當Job被執行時,AOP 通知器會傳遞引數值(一個JobExecution)到)到 job-operator通道。然後,感興趣的各方可以訂閱該通道並獲取有關最近執行訊息的資訊。Spring Batch Admin 開箱即用時不會對這些訊息進行任何處理,除了在控制檯記錄它們,並在 UI 中列出以便檢查。在 Spring Batch Admin 之上構建自己應用程式的客戶端可能會發現這些訊息有助於通知操作員或報告系統關於作業結果的資訊。

設定資訊訊息可能會帶來開啟新應用程式功能的副作用:上面描述的作業重試功能就是透過連線一個端點來監聽)到 job-operator通道。

我們希望本文能讓您對 Spring Integration 在批處理應用程式中的一些用法有所瞭解。上面幾乎所有的程式碼示例都以某種形式存在於 Spring Batch Admin 中,但這絕不是故事的結尾,在 Spring Batch Integration 和 Spring Batch Admin 專案中還有更多示例。訪問 Batch Admin 網站獲取更多資訊並瞭解在哪裡可以獲得程式碼進行嘗試。InfoQ 上還有一個關於本文部分主題的 影片,由 Spring Batch 和 Spring Integration 的負責人(Dave Syer 和 Mark Fisher)在 Spring One 大會上展示。

訂閱 Spring 新聞通訊

保持與 Spring 新聞通訊的聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 在一項簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部