領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多親愛的Spring社群!
就在 Spring Integration Java DSL 1.0 GA 釋出公告之後,我想透過一個基於經典 Cafe Demo 整合示例的逐行教程向大家介紹 Spring Integration Java DSL。我們將在這裡描述 Spring Boot 支援、Spring Framework Java 和註解 配置、IntegrationFlow 特性,並向作為 DSL 風格靈感的 Java 8 Lambda 支援致敬。當然,這一切都由 Spring Integration Core 專案提供支援。
對於那些對 Java 8 還不感興趣的使用者,我們提供了一個沒有 Lambda 的類似教程:Spring Integration Java DSL (pre Java 8):逐行教程。
但是,在我們開始描述 Cafe 演示應用程式之前,這裡有一個更短的示例,僅供入門...
@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext ctx =
SpringApplication.run(Start.class, args);
List<String> strings = Arrays.asList("foo", "bar");
System.out.println(ctx.getBean(Upcase.class).upcase(strings));
ctx.close();
}
@MessagingGateway
public interface Upcase {
@Gateway(requestChannel = "upcase.input")
Collection<String> upcase(Collection<String> strings);
}
@Bean
public IntegrationFlow upcase() {
return f -> f
.split() // 1
.<String, String>transform(String::toUpperCase) // 2
.aggregate(); // 3
}
}
我們將基礎設施(註解等)的描述留給主要的咖啡館流程描述。在這裡,我們希望您專注於最後一個 @Bean,即 IntegrationFlow 以及向該流程傳送訊息的閘道器方法。
在 main 方法中,我們將字串集合傳送到閘道器並將結果列印到標準輸出。該流程首先將集合拆分為單個 String(1);然後將每個字串轉換為大寫(2),最後我們將它們重新聚合回一個集合(3)。由於這是流程的結束,框架將聚合結果返回到閘道器,新負載成為閘道器方法的返回值。
等效的 XML 配置可能是...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:splitter input-channel="upcase.input" output-channel="transform"/>
<int:transformer expression="payload.toUpperCase()"
input-channel="transform"
output-channel="aggregate" />
<int:aggregator input-channle="aggregate" />
或者...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:chain input-channel="upcase.input">
<int:splitter />
<int:transformer expression="payload.toUpperCase()" />
<int:aggregator />
</int:chain>
##咖啡館演示
Cafe Demo 應用程式的目的是演示企業整合模式(EIP)如何用於反映真實咖啡館中的 order-delivery 場景。透過此應用程式,我們處理了幾種飲料訂單 - 熱飲和冰飲。執行應用程式後,我們可以在標準輸出(System.out.println)中看到冷飲比熱飲準備得更快。但是,整個訂單的交付會推遲到熱飲準備好之後。
為了反映領域模型,我們有幾個類:Order、OrderItem、Drink 和 Delivery。它們都在整合場景中提到,但我們不會在這裡分析它們,因為它們足夠簡單。
我們應用程式的原始碼只放在一個類中;重要行用數字標註,對應於下面的註釋
@SpringBootApplication // 1
@IntegrationComponentScan // 2
public class Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args);// 3
Cafe cafe = ctx.getBean(Cafe.class); // 4
for (int i = 1; i <= 100; i++) { // 5
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false); //hot
order.addItem(DrinkType.MOCHA, 3, true); //iced
cafe.placeOrder(order);
}
System.out.println("Hit 'Enter' to terminate"); // 6
System.in.read();
ctx.close();
}
@MessagingGateway // 7
public interface Cafe {
@Gateway(requestChannel = "orders.input") // 8
void placeOrder(Order order); // 9
}
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 11
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow orders() { // 12
return f -> f // 13
.split(Order.class, Order::getItems) // 14
.channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
.subFlowMapping("true", sf -> sf // 17
.channel(c -> c.queue(10)) // 18
.publishSubscribeChannel(c -> c // 19
.subscribe(s -> // 20
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
.subscribe(sub -> sub // 22
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item) // 23
.handle(m -> System.out.println(m.getPayload())))))// 24
.subFlowMapping("false", sf -> sf // 25
.channel(c -> c.queue(10))
.publishSubscribeChannel(c -> c
.subscribe(s ->
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
.subscribe(sub -> sub
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared hot drink #"
+ this.hotDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
.handle(m -> System.out.println(m.getPayload()))))))
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots())) // 27
.aggregate(aggregator -> aggregator // 28
.outputProcessor(group -> // 29
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList()))) // 30
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null) // 31
.handle(CharacterStreamWritingMessageHandler.stdout()); // 32
}
}
逐行檢查程式碼...
1
@SpringBootApplication
這是 Spring Boot 1.2 中的新元註解。包括 @Configuration 和 @EnableAutoConfiguration。由於我們處於 Spring Integration 應用程式中,並且 Spring Boot 對其具有自動配置,因此會自動應用 @EnableIntegration,以初始化 Spring Integration 基礎設施,包括 Java DSL 的環境 - DslIntegrationConfigurationInitializer,它由 /META-INF/spring.factories 中的 IntegrationConfigurationBeanFactoryPostProcessor 獲取。
2
@IntegrationComponentScan
Spring Integration 的 @ComponentScan 類似物,用於根據介面掃描元件(Spring Framework 的 @ComponentScan 只檢視類)。Spring Integration 支援發現用 @MessagingGateway 註解的介面(參見下面的 #7)。
3
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
我們類的 main 方法旨在透過使用此類的配置啟動 Spring Boot 應用程式,並透過 Spring Boot 啟動 ApplicationContext。此外,它將命令列引數委託給 Spring Boot。例如,您可以指定 --debug 以檢視引導自動配置報告的日誌。
4
Cafe cafe = ctx.getBean(Cafe.class);
由於我們已經有了 ApplicationContext,我們可以開始與應用程式互動。而 Cafe 就是這個入口點 - 用 EIP 的術語來說是 gateway。閘道器只是介面,應用程式不與訊息 API 互動;它只處理領域(參見下面的 #7)。
5
for (int i = 1; i <= 100; i++) {
為了演示咖啡館的“工作”,我們發起了 100 個訂單,每個訂單包含兩種飲料——一種熱飲和一種冰飲。然後將 Order 傳送到 Cafe 閘道器。
6
System.out.println("Hit 'Enter' to terminate");
通常,Spring Integration 應用程式是非同步的,因此為了避免 main 執行緒過早退出,我們會阻塞 main 方法,直到透過命令列進行一些終端使用者互動。非守護執行緒會保持應用程式開啟,但 System.read() 為我們提供了一種乾淨關閉應用程式的機制。
7
@MessagingGateway
用於標記業務介面的註解,表示它是最終應用程式和整合層之間的 gateway。它是 Spring Integration XML 配置中 <gateway /> 元件的類比。Spring Integration 為此介面建立一個 Proxy 並將其作為 bean 填充到應用程式上下文中。此 Proxy 的目的是將引數包裝到 Message<?> 物件中,並根據提供的選項將其傳送到 MessageChannel。
8
@Gateway(requestChannel = "orders.input")
方法級別的註解,用於透過方法以及目標整合流區分業務邏輯。在此示例中,我們使用 orders.input 的 requestChannel 引用,它是我們 IntegrationFlow 輸入通道的 MessageChannel bean 名稱(參見下面的 #13)。
9
void placeOrder(Order order);
介面方法是最終應用程式與整合層互動的中心點。此方法具有 void 返回型別。這意味著我們的整合流是 one-way 的,我們只是將訊息傳送到整合流,而不等待回覆。
10
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();
兩個計數器,用於收集我們的咖啡館如何處理飲料的資訊。
11
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
default poller bean。它是 Spring Integration XML 配置中 <poller default="true"> 元件的類似物。對於 inputChannel 是 PollableChannel 的端點是必需的。在這種情況下,它對於兩個 Cafe queues(熱和冰)是必需的(參見下面的 #18)。這裡我們使用 DSL 專案中的 Pollers 工廠並使用其方法鏈式流暢 API 來構建輪詢器元資料。請注意,如果端點需要特定的 poller(而不是預設輪詢器),則可以直接從 IntegrationFlow 定義中使用 Pollers。
12
@Bean
public IntegrationFlow orders() {
IntegrationFlow bean 定義。它是 Spring Integration Java DSL 的核心元件,儘管它在執行時不扮演任何角色,只在 bean 註冊階段起作用。下面的所有其他程式碼都在 IntegrationFlow 物件中註冊 Spring Integration 元件(MessageChannel、MessageHandler、EventDrivenConsumer、MessageProducer、MessageSource 等),該物件由 IntegrationFlowBeanPostProcessor 解析以處理這些元件並在應用程式上下文中將它們註冊為必要的 bean(某些元素,例如通道可能已經存在)。
13
return f -> f
IntegrationFlow 是一個 Consumer 函式式介面,因此我們可以最小化程式碼並只關注整合場景需求。它的 Lambda 接受 IntegrationFlowDefinition 作為引數。此類別提供了一組全面的方法,可以組成 chain。我們將這些方法稱為 EIP-methods,因為它們為 EI 模式提供實現並從 Spring Integration Core 填充元件。在 bean 註冊階段,IntegrationFlowBeanPostProcessor 將此內聯(Lambda)IntegrationFlow 轉換為 StandardIntegrationFlow 並處理其元件。我們也可以使用 IntegrationFlows 工廠實現相同的功能(例如 IntegrationFlow.from("channelX"). ... .get()),但我們發現 Lambda 定義更優雅。使用 Lambda 的 IntegrationFlow 定義將 DirectChannel 作為流的 inputChannel 填充,並在此示例中以 orders.input(流 bean 名稱 + ".input")為名稱在應用程式上下文中註冊為 bean。這就是我們為 Cafe 閘道器使用該名稱的原因。
14
.split(Order.class, Order::getItems)
由於我們的整合流透過 orders.input 通道接受訊息,我們已準備好消費和處理它們。我們場景中的第一個 EIP 方法是 .split()。我們知道來自 orders.input 通道的訊息 payload 是一個 Order 領域物件,因此我們可以簡單地在這裡使用其型別並使用 Java 8 的 method-reference 特性。第一個引數是我們期望的訊息 payload 型別,第二個是 getItems() 方法的方法引用,它返回 Collection<OrderItem>。因此,這執行 split EI 模式,當我們將每個集合條目作為單獨的訊息傳送到下一個通道時。在後臺,.split() 方法註冊一個 MethodInvokingSplitter MessageHandler 實現和該 MessageHandler 的 EventDrivenConsumer,並將 orders.input 通道作為 inputChannel 連線。
15
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.channel() EIP 方法允許在端點之間指定具體的 MessageChannel,就像 Spring Integration XML 配置中透過 output-channel/input-channel 屬性對完成的那樣。預設情況下,DSL 整合流定義中的端點與 DirectChannel 連線,後者根據 IntegrationFlow bean 名稱和流鏈中的 index 獲取 bean 名稱。在這種情況下,我們使用另一個 Lambda 表示式,它從其 Channels 工廠中選擇一個特定的 MessageChannel 實現,並使用流暢的 API 進行配置。當前通道是一個 ExecutorChannel,以允許將訊息從 splitter 分發到單獨的 Thread,以便在下游流中並行處理它們。
16
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping
我們場景中的下一個 EIP 方法是 .route(),用於將 hot/iced 訂單專案傳送到不同的咖啡館廚房。我們在這裡再次使用方法引用(isIced())從傳入訊息中獲取 routingKey。第二個 Lambda 引數表示一個 router mapping——類似於 Spring Integration XML 配置中 <router> 元件的 <mapping> 子元素。但是,由於我們使用的是 Java,我們可以進一步利用其 Lambda 支援!Spring Integration Java DSL 為 router 引入了 subflow 定義,除了傳統的 channel mapping。每個子流根據路由執行,如果子流產生結果,則將其傳遞給路由器之後流定義中的下一個元素。
17
.subFlowMapping("true", sf -> sf
指定當前路由器 mappingKey 的整合流。在此示例中,我們有兩個子流 - hot 和 iced。子流是相同的 IntegrationFlow 函式式介面,因此我們可以像在頂層 IntegrationFlow 定義上一樣使用其 Lambda。子流與其父流沒有任何執行時依賴,它只是一個邏輯關係。
18
.channel(c -> c.queue(10))
我們已經知道 IntegrationFlow 的 Lambda 定義從 [FLOW_BEAN_NAME].input DirectChannel 開始,所以可能會有一個問題:“如果我們再次指定 .channel(),它如何在這裡工作?”。DSL 會處理這種情況,並用 BridgeHandler 和端點連線這兩個通道。在我們的示例中,我們在這裡使用受限的 QueueChannel 來反映真實生活中咖啡館廚房的繁忙狀態。這裡就是我們需要那個 global poller 來監聽此通道的下一個端點的地方。
19
.publishSubscribeChannel(c -> c
.publishSubscribeChannel() EIP 方法是 MessageChannels.publishSubscribe() 的 .channel() 變體,但具有 .subscribe() 選項,我們可以將子流指定為通道的訂閱者。沒錯,子流又一次!因此,子流可以指定到任何深度。無論是否存在 .subscribe() 子流,父流中的下一個端點也是此 .publishSubscribeChannel() 的訂閱者。由於我們已經處於 .route() 子流中,最後一個訂閱者是一個隱式的 BridgeHandler,它只是將訊息彈出到頂層——到一個類似的隱式 BridgeHandler,將訊息彈出到主流中的下一個 .transform() 端點。關於我們流的當前位置還有一點:前一個 EIP 方法是 .channel(c -> c.queue(10)),這個也是用於 MessageChannel 的。所以,它們再次透過一個隱式的 BridgeHandler 連線。在實際應用中,我們可以避免這個 .publishSubscribeChannel(),只需為咖啡館廚房提供一個 .handle(),但我們的目標是儘可能多地涵蓋 DSL 功能。這就是我們將廚房工作分配到同一 PublishSubscribeChannel 的多個子流的原因。
20
.subscribe(s ->
.subscribe() 方法接受一個 IntegrationFlow 作為引數,可以將其指定為 Lambda 以將訂閱者配置為 subflow。我們在這裡使用多個子流訂閱者,以避免多行 Lambda,並儘可能涵蓋一些 DSL 以及 Spring Integration 的功能。
21
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))
在這裡,我們使用一個簡單的 .handle() EIP 方法,只是為了阻塞當前執行緒一段時間,以演示咖啡館廚房準備飲料的速度。我們在這裡使用 Google Guava 的 Uninterruptibles.sleepUninterruptibly,以避免在 Lambda 表示式中使用 try...catch 塊,儘管您可以這樣做,並且您的 Lambda 將是多行的。或者您可以將該程式碼移動到一個單獨的方法中,並在這裡將其用作 method reference。
由於我們沒有在 .publishSubscribeChannel() 上使用任何 Executor,所有訂閱者都將在同一個執行緒上按順序執行;在我們的案例中,它是來自前一個 QueueChannel 上的 poller 的 TaskScheduler 執行緒之一。這就是為什麼這個 sleep 阻塞所有下游程序並允許演示那個限制為 10 的 QueueChannel 的 busy state。
22
.subscribe(sub -> sub
下一個子流訂閱者將在 iced 飲料的 1 秒 sleep 之後執行。我們在這裡使用另一個子流,因為前一個 .handle() 是 one-way 的,具有 MessageHandler 的 Lambda 的性質。這就是為什麼,為了繼續我們整個流程的處理,我們有幾個訂閱者:一些子流在完成工作後結束,不向父流返回任何內容。
23
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
當前訂閱者子流中的 transformer 用於將 OrderItem 轉換為友好的 STDOUT 訊息,以供下一個 .handle 使用。這裡我們看到了泛型與 Lambda 表示式的使用。這是透過 GenericTransformer 函式式介面實現的。
24
.handle(m -> System.out.println(m.getPayload())))))
這裡的 .handle() 只是為了演示如何使用 Lambda 表示式將 payload 列印到標準輸出。這是一個訊號,表明我們的飲料已準備好。之後,PublishSubscribeChannel 的最終(隱式)訂閱者只是將帶有 OrderItem 的訊息傳送到主流程中的 .transform()。
25
.subFlowMapping("false", sf -> sf
熱飲的 .subFlowMapping()。實際上,它與之前的冰飲子流類似,但具有特定的熱飲業務邏輯。
26
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))
熱飲的 sleepUninterruptibly。沒錯,我們需要更多時間來燒開水!
27
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots()))
主要的 OrderItem 到 Drink transformer,在咖啡館廚房訂閱者完成飲料準備後,當 .route() 子流返回其結果時執行。
28
.aggregate(aggregator -> aggregator
.aggregate() EIP 方法提供了類似的選項來配置 AggregatingMessageHandler 及其端點,就像我們在使用 Spring Integration XML 配置時可以使用 <aggregator> 元件一樣。當然,使用 Java DSL,我們有更多的能力直接配置聚合器,而無需任何其他額外 bean。Lambda 又一次派上用場!從咖啡館業務邏輯的角度來看,我們為最初的 Order 組合 Delivery,因為我們在一開始就將原始訂單 .split() 為 OrderItem。
29
.outputProcessor(group ->
AggregatorSpec 的 .outputProcessor() 允許我們在聚合器完成分組後發出自定義結果。它類似於 <aggregator> 元件中的 ref/method 或 POJO 方法上的 @Aggregator 註解。我們的目標是為所有 Drink 組合一個 Delivery。
30
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
如您所見,我們在這裡使用 Java 8 的 Stream 特性處理 Collection。我們遍歷已釋出的 MessageGroup 中的訊息,並將它們中的每一個轉換為其 Drink payload。Stream 的結果(.collect())(一個 Drink 列表)被傳遞給 Delivery 建構函式。包含此新 Delivery 負載的 Message 被髮送到我們咖啡館場景中的下一個端點。
31
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null)
.correlationStrategy() Lambda 演示了我們如何自定義聚合器行為。當然,我們在這裡可以只依賴 Spring Integration 內建的 SequenceDetails,它預設從流程開頭將 .split() 填充到每個拆分訊息中,但 CorrelationStrategy 的 Lambda 示例是為了說明而包含的。(使用 XML,我們可以使用 correlation-expression 或自定義 CorrelationStrategy)。此行中 .aggregate() EIP 方法的第二個引數是 endpointConfigurer,用於自定義選項,例如 autoStartup、requiresReply、adviceChain 等。我們在這裡使用 null 來表示我們依賴端點的預設選項。許多 EIP 方法提供了帶和不帶 endpointConfigurer 的過載版本,但 .aggregate() 需要一個端點引數,以避免對 AggregatorSpec Lambda 引數進行顯式型別轉換。
32
.handle(CharacterStreamWritingMessageHandler.stdout());
這是我們流程的終點—— Delivery 已交付給客戶!我們只是使用 Spring Integration Core 中開箱即用的 CharacterStreamWritingMessageHandler 將訊息 payload 列印到 STDOUT。這是一個展示 Spring Integration Core(及其模組)中現有元件如何透過 Java DSL 使用的案例。
好了,我們已經完成了基於 Spring Integration Java DSL 的 Cafe Demo 示例的描述。將其與 XML 示例 進行比較,以獲取更多關於 Spring Integration 的資訊。
這不是一個全面的 DSL 教程。我們在這裡不回顧 endpointConfigurer 選項、Transformers 工廠、IntegrationComponentSpec 層次結構、NamespaceFactories、如何指定多個 IntegrationFlow bean 並將它們連線到一個應用程式等,請參閱 參考手冊 以獲取更多資訊。
至少這份逐行教程應該向您展示 Spring Integration Java DSL 的基礎知識,以及它與 Spring Framework Java & 註解配置、Spring Integration 基礎和 Java 8 Lambda 支援的無縫融合!
另請參閱 si4demo,瞭解 Spring Integration(包括 Java DSL)的演進,如 2014 年 SpringOne/2GX 大會所示。(影片應很快釋出)。
一如既往,我們期待您的評論和反饋(StackOverflow (spring-integration 標籤)、Spring JIRA、GitHub),我們非常歡迎貢獻!
P.S. 即使本教程完全基於 Java 8 Lambda 支援,我們也不想錯過 Java 8 之前的使用者,我們將提供類似的非 Lambda 部落格文章。敬請期待!