領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多關於 Spring Batch 和 Spring Integration 的使用者有一些共同的關注點,我們經常被問到它們是如何協同工作的。最近 Spring Batch Admin 1.0.0.M2 釋出了,它大量使用了 Spring Integration,因此是探討一些特定用例的一個很好的載體,這正是我們打算在本文中做的。
1.0.0.M2 版本的一部分是 Spring Batch Integration 模組,最近從 Spring Batch 遷移過來,並獲得了 Batch Admin 的新家。Batch-Integration 的許多交叉用例已在 Spring Batch Integration 中實現或演示。之所以選擇新的家園,是因為 Batch Admin 使用了 Batch Integration 的許多功能,因此使這些專案的釋出週期保持一致更有意義。
Spring Batch Admin 是來自 SpringSource 的開源專案。它旨在為開發人員提供 Web UI 和工具,用於構建自己的 UI 以與 Spring Batch 作業進行互動(例如,啟動、停止、調查失敗原因等)。最近的里程碑版本在計劃的功能方面已相當完善,但如果您有想法或想做出貢獻,請訪問 論壇 和 問題跟蹤器,並參與社群。
開箱即用的目標執行時是 servlet 容器的單個例項(例如 SpringSource tc Server),在該容器中,系統可以零配置或無需配置即可執行。但我們希望能夠支援基本用例的自定義和擴充套件,包括將部署擴充套件到伺服器叢集,而 Spring Integration 正是許多擴充套件點的關鍵。
Spring Batch 和 Spring Integration 之間的界限並非總是清晰的,但有一些可以遵循的指導方針。主要是:考慮粒度,並應用常見模式。其中一些常見模式將在本文中進行描述。更多模式已在 Spring Batch Integration 和 Spring Batch Admin 中實現(並且可能是未來文章的主題)。
為批處理過程新增訊息傳遞功能可以實現操作的自動化,以及關鍵關注點的分離和策略化。例如,訊息可以觸發作業執行,然後訊息的傳送可以透過多種方式公開。或者,當作業完成或失敗時,可能會觸發訊息的傳送,而這些訊息的消費者可能擁有與應用程式本身無關的操作關注點。
反之亦然:訊息傳遞也可以嵌入到作業中,但這超出了本文的範圍。例如:透過通道讀取或寫入專案進行處理。
以下是一些使用 Spring Integration 和 Spring Batch Integration 在 Batch Admin 中實現的用例。
Spring Integration 的優點在於訊息生產者和訊息消費者之間的關注點分離,一個很好的具體例子就是訊息觸發作業執行的能力。在這種情況下,消費者是完全通用的,並且是標準 Spring Batch 的一個非常薄的包裝器。JobLauncher(此處程式碼來自 Spring Batch Integration 的 JobLaunchingMessageHandler)
@ServiceActivator
public JobExecution launch(JobLaunchRequest request) {
Job job = request.getJob();
JobParameters jobParameters = request.getJobParameters();
return jobLauncher.run(job, jobParameters);
}
從上面的程式碼片段可以看出,包裝器非常薄,幾乎不值得一提,但它的優點在於它有一個清晰、明顯的責任,並且易於單獨測試。這個JobLaunchRequest物件是用於向JobLauncher傳送輸入引數的特殊包裝器,以便它們可以成為 Spring Integration 中訊息的有效負載。
的JobLaunchingMessageHandler已連線到 Spring Batch Admin 中的MessageChannel(在 Manager jar 中/META-INF/bootstrap/integration/launch-context.xml):
<service-activator input-channel="job-requests">
<beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>
此整合模式的消費者端已完成。這是一種本地方法,因為它不太適合遠端呼叫,因為JobLaunchRequest故意不是Serializable(因為作業不是)。
要本地啟動一個作業,我們只需要建立一個生產者並使用它向JobLaunchRequest傳送一個job-requests通道。Batch Admin Manager 模組中有一個整合測試,它就是這樣做的,但是這裡整合方法的真正強大之處在於能夠對請求進行策略化,並讓它們來自各種不同的生產者。
訊息可以透過多種方式傳送到 Batch Admin 中的job-requests通道。要擺脫本地呼叫並遠端公開作業,只需將傳入的請求以某種其他形式適配為JobLaunchRequest,Spring Integration 使這變得非常容易。這是我們稱之為通道重用的基本場景。示例是
Spring Integration HTTP 介面卡模組可用於透過 HTTP 接受輸入訊息
<http:inbound-channel-adapter name="/job-requests" channel="job-launches"
request-mapper="bodyInboundRequestMapper" view="reload-job-executions" />
此程式碼片段可以在 Batch Admin Manager 模組中找到(META-INF/servlet/integration-servlet.xml)。它公開了一個端點 URLhttp://.../batch/job-requests,我們可以透過提交瀏覽器表單來使用它來發送作業執行請求。
原則上,請求可以是我們喜歡的任何形式,因為我們可以在此介面卡下游和JobLaunchingMessageHandler上游的訊息進行轉換。在 Spring Batch Admin 中,轉換是透過另一個 POJO 訊息處理程式(StringToJobLaunchRequestAdapter)對介面卡的輸出進行的。
上面使用的同一個 HTTP 介面卡可以從 UN*X 命令列遠端啟動作業。這是使用 Spring Integration HTTP 介面卡的一種非常好的方式:您可以使用簡單的 shell 指令碼自動化許多操作。例如,如果應用程式在本地部署了一個名為“staging”的作業,那麼這將起作用。
$ echo staging[input.file=foo] | curl -v -d @- -H "Content-Type: text/plain" \
https://:8080/springone-web-demo/batch/job-requests
名為“staging”的作業以一個引數啟動(input.file=foo),其中foo是用於讀取輸入的檔案的絕對路徑。作業配置了一個專案讀取器,如下所示
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="linesToSkip" value="1" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
</property>
<property name="resource" value="#{jobParameters[input.file]}" />
</bean>
(此程式碼片段不在 Spring Batch Admin 示例中,但它在我們 2009 年 Spring One Americas 的演示中。)
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(job, jobParameters);
}
這個簡單的 POJO 方法宣告為@ServiceActivator(它也可以是@Transformer), 因此它可以插入訊息處理鏈,就在JobLaunchingMessageHandler之前,用於將File轉換為JobLaunchRequest.
一個失敗的作業通常可以在 Spring Batch 中重新啟動,並且此功能可透過 Spring Batch Admin UI 的 Web 瀏覽器訪問。它也可以在命令列上訪問,或者對任何能夠向名為job-restarts:
<channel id="job-restarts" />
<service-activator input-channel="job-restarts" output-channel="job-requests">
<beans:bean class="org.springframework.batch.admin.integration.JobNameToJobRestartRequestAdapter">
<beans:property name="jobLocator" ref="jobRegistry" />
<beans:property name="jobExplorer" ref="jobExplorer" />
</beans:bean>
</service-activator>
的 Spring Integration 通道傳送訊息的人都可以訪問。此通道只需要作業名稱,並且已公開為 HTTP 入站端點,因此從 UN*X 命令列,您可以這樣做。
$ echo staging | curl -v -d @- -H "Content-Type: text/plain" \
https://:8080/springone-web-demo/batch/job-restarts
如果一個作業因可恢復錯誤(例如呼叫遠端服務的超時或網路故障)而反覆失敗,您可能希望它自動重新啟動。重試可以在作業內部的低級別使用 Spring Batch 的某些功能來處理,但重試整個作業需要對執行時進行一些操作。這可以透過 Spring Integration 輕鬆完成,因為現在job-requests通道正在接受啟動請求。此處的端點將充當過濾器,查詢作業中已知可重試的失敗條件,然後作為重新啟動轉換器(如上面的示例)。因此,這樣的鏈將起作用
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter>
<bean class="...RetryableJobExecutionFilter"
xmlns="http://www.springframework.org/schema/beans">
<property name="pattern" value="(&s).*TimeoutException.*" />
</bean>
</filter>
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
其中RetryableJobExecutionFilter可以實現如下
public boolean isRetryable(JobExecution jobExecution) {
boolean retryable = false;
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isLessThan(BatchStatus.STOPPED)) {
continue;
}
if (stepExecution.getExitStatus().getExitDescription().matches(pattern)) {
retryable = true;
break;
}
}
return retryable;
}
這個例子是在我們的 Spring One 演示中,它不在 Spring Batch Admin 中,儘管對於您需要的任何特定過濾器來說,實現它都是微不足道的。
檔案上傳到應用程式直接透過 Spring Batch Admin UI 支援。不建議使用 HTTP POST 上傳大檔案,主要是因為應用程式必須在記憶體中緩衝內容,但這是上傳小型或中型資料集以供 Spring Batch 處理的便捷功能。
實際的示例應用程式不使用 Spring Integration 的檔案輪詢器(但它可供想要按上述方式配置它的客戶端使用);相反,它在檔案上傳後使用直接的訊息觸發器。策略是讓 Manager 模組上傳檔案,然後向釋出-訂閱通道(input-files).
可以使用輸入檔案的任何作業都必須有一個上游元件訂閱該通道,並在檔案感興趣時傳遞它。在示例中,這是透過按父目錄名稱過濾檔案來完成的。
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter expression="payload.parent.name=='sample'" />
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
如果輸入檔案有一個父目錄(可以在 Web UI 中設定)“sample”,那麼它將被管道傳輸到服務啟用器,該服務啟用器將其轉換為JobLaunchRequest並將其傳遞給JobLaunchingMessageHandler進行處理(如上所述)。
public interface FileSender {
void send(File file);
}
該介面沒有實現(單元測試中的存根除外),因為 Spring Integration 可以提供一個。
<gateway id="fileSender"
service-interface="org.springframework.batch.admin.service.FileSender"
default-request-channel="input-files" />
<beans:bean class="org.springframework.batch.admin.service.LocalFileService">
<beans:property name="fileSender" ref="fileSender" />
</beans:bean>
Spring Batch Admin 允許使用者上傳 Spring 配置檔案,以便從 UI 啟動和管理作業。這對於在執行時重新引數化作業非常有用,例如,在執行一系列效能測試以測量各種效能調整(如更改步驟中的提交間隔)的效果時。
為了接受配置檔案作為輸入,我們使用訊息通道,以便它可以在多種不同的輸入方法之間重用。配置進入一個名為job-configurations:
<service-activator input-channel="job-configurations" output-channel="job-registrations">
<beans:bean class="org.springframework.batch.admin.integration.JobConfigurationResourceLoader">
<beans:property name="jobRegistry" ref="jobRegistry" />
</beans:bean>
</service-activator>
這裡的服務啟用器只接受一個 SpringResource並將其視為配置檔案:載入一個ApplicationContext,掃描其中的作業元件並將其註冊到提供的登錄檔中。一旦進入登錄檔,就可以從 UI 中的主作業選單啟動作業,或透過job-requests通道,如上所述。
與輸入檔案一樣,配置檔案可以透過 HTTP 傳入,在這種情況下是作為檔案附件或純文字引數,也可以透過檔案輪詢傳入。輪詢用例在 Manager 模組中實現,因此值得快速檢視一下它的工作原理。在META-INF/bootstrap/integration/configuration-context.xml我們發現這個
<file:inbound-channel-adapter directory="target/config" channel="job-configuration-files"
filename-pattern=".*\.xml">
<poller max-messages-per-poll="1">
<cron-trigger expression="5/1 * * * * *" />
</poller>
</file:inbound-channel-adapter>
該介面卡將輪詢一個目錄(此處為演示目的硬編碼為“target/config”,但在實際應用程式中將進行引數化)並查詢檔名以“.xml”結尾的檔案。當一個匹配該模式的檔案到達時,它(作為java.io.File)被髮送到job-configuration-files通道。從那裡訊息被轉換,以便File變成一個Resource,並且可以傳送到job-configurations通道。
一旦您開始使用 Spring Integration 訊息來驅動許多應用程式功能,就經常能夠利用訊息流進行資訊或報告目的。例如,在作業啟動、停止(完成或失敗)時傳送訊息會很有用。使用 Spring Integration 的MessagePublishingInterceptor很容易做到。在 Spring Batch Admin Manager 中,該攔截器配置為傳送作業執行訊息
<aop:config>
<aop:advisor advice-ref="jobMessagePublishingInterceptor" pointcut="execution(* *..Job+.execute(..))" />
</aop:config>
<bean id="jobMessagePublishingInterceptor" class="org.springframework.integration.aop.MessagePublishingInterceptor"
xmlns="http://www.springframework.org/schema/beans">
<constructor-arg index="0">
<bean class="org.springframework.batch.admin.integration.TrivialExpressionSource" p:payload="#args[execution]" />
</constructor-arg>
<property name="defaultChannel" ref="job-operator" />
</bean>
每次作業被執行時,AOP 顧問會傳遞引數值(一個StepExecution)被髮送到job-operator通道。有興趣的各方可以訂閱該通道並獲取有關最近執行的訊息的資訊。Spring Batch Admin 開箱即用不處理這些訊息,只是在控制檯記錄它們,並在 UI 中列出它們以便檢查。構建自己的 Spring Batch Admin 之上應用程式的客戶端可能會發現這些訊息對於通知操作員或報告系統關於作業結果很有用。
設定資訊性訊息可能會導致新的應用程式功能出現:上面描述的作業重試功能是為 Spring One 實現的,它透過掛接一個端點來監聽job-operator通道。
我們希望本文能為您提供一些關於 Spring Integration 在 Batch 應用程式中使用方式的見解。上面幾乎所有的程式碼示例都以某種形式存在於 Spring Batch Admin 中,但這絕非故事的結局,並且在 Spring Batch Integration 和 Spring Batch Admin 專案中有更多示例。訪問 Batch Admin 網站以獲取更多資訊,並瞭解在哪裡獲取程式碼進行試用。InfoQ 上還有一個 影片,其中 Spring Batch 和 Spring Integration 的負責人(Dave Syer 和 Mark Fisher)在 Spring One 上介紹了本文中的一些主題。