Spring Integration Java DSL:逐行教程

工程 | Artem Bilan | 2014 年 11 月 25 日 | ...

親愛的 Spring 社群成員們!

Spring Integration Java DSL 1.0 GA 釋出公告剛釋出,我希望透過基於經典的 Cafe Demo 整合示例的逐行教程向您介紹 Spring Integration Java DSL。我們在此描述了對 Spring Boot 的支援、Spring Framework 的 Java 和註解 配置、IntegrationFlow 特性,並向 Java 8 的 Lambda 支援致敬,它是 DSL 風格的靈感來源。當然,這一切都由 Spring Integration Core 專案提供支援。

對於那些尚不感興趣 Java 8 的讀者,我們提供了一個不使用 Lambda 的類似教程:Spring Integration Java DSL (pre Java 8): Line by line tutorial

但是,在我們開始描述 Cafe 演示應用程式之前,這裡有一個更簡短的示例供您入門...

@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {

	public static void main(String[] args) throws InterruptedException {
		ConfigurableApplicationContext ctx = 
                                 SpringApplication.run(Start.class, args);

		List<String> strings = Arrays.asList("foo", "bar");
		System.out.println(ctx.getBean(Upcase.class).upcase(strings));

		ctx.close();
	}

	@MessagingGateway
	public interface Upcase {

		@Gateway(requestChannel = "upcase.input")
		Collection<String> upcase(Collection<String> strings);

	}

	@Bean
	public IntegrationFlow upcase() {
	     return f -> f
		 	.split()                                         // 1
			.<String, String>transform(String::toUpperCase)  // 2
			.aggregate();                                    // 3
	}

}

我們將基礎設施(註解等)的描述留給主要的 cafe 流描述。這裡,我們希望您專注於最後一個 @Bean,即 IntegrationFlow 以及將訊息傳送到該流的閘道器方法。

main 方法中,我們將字串集合傳送到閘道器並將結果列印到標準輸出 (STDOUT)。該流首先將集合分割成單獨的 String (1);然後將每個字串轉換為大寫 (2),最後我們將它們重新聚合回一個集合 (3)。由於這是流的末尾,框架將聚合結果返回給閘道器,新的有效載荷成為閘道器方法的返回值。

等效的 XML 配置可能是...

<int:gateway service interface="foo.Upcase" 
                 default-request-channel="upcase.input">

<int:splitter input-channel="upcase.input" output-channel="transform"/>

<int:transformer expression="payload.toUpperCase()"
    input-channel="transform"
    output-channel="aggregate" />

<int:aggregator input-channle="aggregate" />

或者...

<int:gateway service interface="foo.Upcase" 
                default-request-channel="upcase.input">

<int:chain input-channel="upcase.input">
    <int:splitter />
    <int:transformer expression="payload.toUpperCase()" />
    <int:aggregator />
</int:chain>

## Cafe 演示

Cafe Demo 應用程式的目的是演示如何使用企業整合模式 (EIP) 來反映真實咖啡館中的訂單配送場景。透過此應用程式,我們處理幾種飲料訂單——熱飲和冰飲。執行應用程式後,我們可以在標準輸出 (System.out.println) 中看到冷飲比熱飲準備得更快。但是,整個訂單的配送會推遲到熱飲準備好之後。

為了反映領域模型,我們有幾個類:OrderOrderItemDrinkDelivery。它們都在整合場景中被提及,但我們在此不進行分析,因為它們足夠簡單。

我們應用程式的原始碼只放在一個類中;重要的行都用數字標註,對應於後面的註釋。

@SpringBootApplication               // 1
@IntegrationComponentScan            // 2
public class Application {

  public static void main(String[] args) throws Exception {
  	ConfigurableApplicationContext ctx =
  	              SpringApplication.run(Application.class, args);// 3

  	Cafe cafe = ctx.getBean(Cafe.class);                         // 4
  	for (int i = 1; i <= 100; i++) {                             // 5
       Order order = new Order(i);
       order.addItem(DrinkType.LATTE, 2, false); //hot
       order.addItem(DrinkType.MOCHA, 3, true);  //iced
       cafe.placeOrder(order);
  	}

  	System.out.println("Hit 'Enter' to terminate");              // 6
  	System.in.read();
  	ctx.close();
  }

  @MessagingGateway                                              // 7
  public interface Cafe {

  	@Gateway(requestChannel = "orders.input")                    // 8
  	void placeOrder(Order order);                                // 9

  }

  private AtomicInteger hotDrinkCounter = new AtomicInteger();

  private AtomicInteger coldDrinkCounter = new AtomicInteger();  // 10

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {                               // 11
  	return Pollers.fixedDelay(1000).get();
  }

  @Bean
  public IntegrationFlow orders() {                             // 12
  	return f -> f                                               // 13
  	  .split(Order.class, Order::getItems)                      // 14
  	  .channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
  	  .<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
  	    .subFlowMapping("true", sf -> sf                        // 17
  	      .channel(c -> c.queue(10))                            // 18
  	      .publishSubscribeChannel(c -> c                       // 19
  	        .subscribe(s ->                                     // 20
  	          s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
  	        .subscribe(sub -> sub                               // 22
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)                                 // 23
  	          .handle(m -> System.out.println(m.getPayload())))))// 24
  	    .subFlowMapping("false", sf -> sf                        // 25
  	      .channel(c -> c.queue(10))
  	      .publishSubscribeChannel(c -> c
  	        .subscribe(s ->
  	          s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
  	        .subscribe(sub -> sub
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared hot drink #"
  	              + this.hotDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)
  	          .handle(m -> System.out.println(m.getPayload()))))))
  	  .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))                                // 27
  	  .aggregate(aggregator -> aggregator                       // 28
  	    .outputProcessor(group ->                               // 29
  	      new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))                     // 30
  	    .correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)     // 31
  	  .handle(CharacterStreamWritingMessageHandler.stdout());   // 32
  }

}

逐行檢查程式碼...

1

@SpringBootApplication

這是 Spring Boot 1.2 中的一個新元註解。它包括 @Configuration@EnableAutoConfiguration。由於我們處於 Spring Integration 應用程式中,並且 Spring Boot 為其提供了自動配置,因此會自動應用 @EnableIntegration,以初始化 Spring Integration 基礎設施,包括 Java DSL 的環境 - DslIntegrationConfigurationInitializer,它由 /META-INF/spring.factories 中的 IntegrationConfigurationBeanFactoryPostProcessor 拾取。

2

@IntegrationComponentScan

這是 Spring Integration 中與 @ComponentScan 類似的註解,用於掃描基於介面的元件(Spring Framework 的 @ComponentScan 只檢視類)。Spring Integration 支援發現使用 @MessagingGateway 註解的介面(見下文 #7)。

3

ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);

我們類的 main 方法旨在使用此類的配置啟動 Spring Boot 應用程式,並透過 Spring Boot 啟動一個 ApplicationContext。此外,它還將命令列引數委託給 Spring Boot。例如,您可以指定 --debug 來檢視啟動自動配置報告的日誌。

4

Cafe cafe = ctx.getBean(Cafe.class);

由於我們已經有一個 ApplicationContext,我們可以開始與應用程式互動。而 Cafe 就是那個入口點——在 EIP 術語中稱為閘道器。閘道器只是介面,應用程式不與訊息 API 互動;它只處理領域(見下文 #7)。

5

for (int i = 1; i <= 100; i++) {

為了演示咖啡館的“工作”,我們初始化了 100 個訂單,每個訂單包含兩種飲料——一杯熱飲和一杯冰飲。然後將 Order 傳送到 Cafe 閘道器。

6

System.out.println("Hit 'Enter' to terminate");

通常 Spring Integration 應用程式是非同步的,因此為了避免從 main 執行緒過早退出,我們阻塞 main 方法,直到透過命令列進行一些終端使用者互動。非守護執行緒會保持應用程式開啟,但 System.read() 提供了一種乾淨地關閉應用程式的機制。

7

@MessagingGateway

此註解用於標記一個業務介面,表示它是終端應用程式和整合層之間的閘道器。它類似於 Spring Integration XML 配置中的 <gateway /> 元件。Spring Integration 為此介面建立一個 Proxy,並將其作為一個 Bean 填充到應用程式上下文中。此 Proxy 的目的是將引數包裝到 Message<?> 物件中,並根據提供的選項將其傳送到 MessageChannel

8

@Gateway(requestChannel = "orders.input")

此方法級別的註解用於透過方法和目標整合流來區分業務邏輯。在此示例中,我們使用了 requestChannel 引用 orders.input,它是我們 IntegrationFlow 輸入通道的 MessageChannel Bean 名稱(見下文 #13)。

9

void placeOrder(Order order);

該介面方法是終端應用程式與整合層互動的中心點。此方法返回型別為 void。這意味著我們的整合流是單向的,我們只將訊息傳送到整合流,但不等待回覆。

10

private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();

兩個計數器,用於收集我們的咖啡館如何處理飲料的資訊。

11

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {

預設poller bean。它類似於 Spring Integration XML 配置中的 <poller default="true"> 元件。對於 inputChannelPollableChannel 的端點是必需的。在這種情況下,對於咖啡館的兩個佇列——熱飲和冰飲(見下文 #18)是必要的。在這裡,我們使用 DSL 專案中的 Pollers 工廠,並使用其方法鏈式流暢 API 來構建輪詢器元資料。請注意,如果端點需要特定的 poller(而不是預設的 poller),可以直接從 IntegrationFlow 定義中使用 Pollers

12

@Bean
public IntegrationFlow orders() {

IntegrationFlow bean 定義。它是 Spring Integration Java DSL 的核心元件,儘管它在執行時不發揮任何作用,只在 Bean 註冊階段發揮作用。下面的所有其他程式碼都在 IntegrationFlow 物件中註冊 Spring Integration 元件(MessageChannelMessageHandlerEventDrivenConsumerMessageProducerMessageSource 等),IntegrationFlowBeanPostProcessor 會解析該物件以處理這些元件,並在必要時將它們註冊為應用程式上下文中的 Bean(某些元素,如通道,可能已經存在)。

13

return f -> f

IntegrationFlow 是一個 Consumer 函式式介面,因此我們可以最小化程式碼,只專注於整合場景的需求。它的 Lambda 接受 IntegrationFlowDefinition 作為引數。此類提供了一整套可以組合成的方法。我們將這些方法稱為 EIP 方法,因為它們提供了企業整合模式 (EI patterns) 的實現,並填充了來自 Spring Integration Core 的元件。在 Bean 註冊階段,IntegrationFlowBeanPostProcessor 會將這個內聯 (Lambda) IntegrationFlow 轉換為 StandardIntegrationFlow 並處理其元件。我們也可以使用 IntegrationFlows 工廠實現同樣的效果(例如 IntegrationFlow.from("channelX"). ... .get()),但我們認為 Lambda 定義更優雅。使用 Lambda 定義的 IntegrationFlow 會填充一個 DirectChannel 作為流的 inputChannel,並在我們的示例中以名稱 orders.input (流 Bean 名稱 + ".input") 註冊到應用程式上下文中。這就是我們在 Cafe 閘道器中使用該名稱的原因。

14

.split(Order.class, Order::getItems)

由於我們的整合流透過 orders.input 通道接收訊息,因此我們已準備好消費和處理它們。場景中的第一個 EIP 方法是 .split()。我們知道來自 orders.input 通道的訊息有效載荷是一個 Order 領域物件,因此我們可以在此處直接使用其型別並利用 Java 8 的方法引用特性。第一個引數是我們期望的訊息有效載荷型別,第二個是對 getItems() 方法的方法引用,該方法返回 Collection<OrderItem>。因此,這實現了分割 EI 模式,我們將集合中的每個條目作為單獨的訊息傳送到下一個通道。在後臺,.split() 方法註冊了一個 MethodInvokingSplitter MessageHandler 實現和該 MessageHandlerEventDrivenConsumer,並將 orders.input 通道連線為 inputChannel

15

.channel(c -> c.executor(Executors.newCachedThreadPool()))

.channel() EIP 方法允許指定端點之間的具體 MessageChannel,這類似於 Spring Integration XML 配置中透過 output-channel/input-channel 屬性對來實現。預設情況下,DSL 整合流定義中的端點與 DirectChannel 連線,這些通道根據 IntegrationFlow bean 名稱和流鏈中的索引獲取 bean 名稱。在此示例中,我們使用另一個 Lambda 表示式,它從 Channels 工廠中選擇一個特定的 MessageChannel 實現,並使用流暢 API 對其進行配置。當前的通道是一個 ExecutorChannel,允許將來自 splitter 的訊息分發到單獨的執行緒中,以便在下游流中並行處理它們。

16

.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping

我們場景中的下一個 EIP 方法是 .route(),用於將熱飲/冰飲訂單項傳送到不同的咖啡館廚房。我們再次在此使用方法引用 (isIced()) 從接收訊息中獲取 routingKey。第二個 Lambda 引數表示一個路由器對映——類似於 Spring Integration XML 配置中 <router> 元件的 <mapping> 子元素。然而,由於我們使用的是 Java,我們可以進一步利用其 Lambda 支援!Spring Integration Java DSL 除了傳統的通道對映外,還為路由器引入了子流定義。每個子流根據路由執行,如果子流產生結果,則結果會傳遞給路由器之後流定義中的下一個元素。

17

.subFlowMapping("true", sf -> sf 

指定當前路由器 mappingKey 的整合流。在此示例中,我們有兩個子流 - hoticed。子流是相同的 IntegrationFlow 函式式介面,因此我們可以完全按照我們在頂層 IntegrationFlow 定義中使用 Lambda 的方式使用它。子流與其父流沒有執行時依賴關係,它只是一種邏輯關係。

18

.channel(c -> c.queue(10))

我們已經知道 IntegrationFlow 的 Lambda 定義從 [FLOW_BEAN_NAME].input DirectChannel 開始,所以可能會有人問“如果再次指定 .channel() 在這裡如何工作?”。DSL 會處理這種情況,並使用 BridgeHandler 和端點連線這兩個通道。在我們的示例中,我們在此使用了一個受限的 QueueChannel 來反映現實生活中咖啡館廚房的繁忙狀態。這裡就是我們需要那個全域性輪詢器來監聽此通道的下一個端點的地方。

19

.publishSubscribeChannel(c -> c

.publishSubscribeChannel() EIP 方法是 .channel() 的一個變體,用於 MessageChannels.publishSubscribe(),但帶有 .subscribe() 選項,我們可以在其中將子流指定為通道的訂閱者。沒錯,子流又一次!因此,子流可以指定到任何深度。無論是否存在 .subscribe() 子流,父流中的下一個端點也是此 .publishSubscribeChannel() 的訂閱者。由於我們已經處於 .route() 子流中,最後一個訂閱者是一個隱式的 BridgeHandler,它只是將訊息彈出到頂層——類似於另一個隱式的 BridgeHandler,將訊息彈出到主流中的下一個 .transform() 端點。關於我們流的當前位置還有一點需要注意:上一個 EIP 方法是 .channel(c -> c.queue(10)),這個也是用於 MessageChannel。所以,它們也再次透過隱式的 BridgeHandler 連線起來。在實際應用中,我們可以僅透過一個用於咖啡館廚房的 .handle() 來避免此 .publishSubscribeChannel(),但我們在此的目標是儘可能多地涵蓋 DSL 特性。這就是我們將廚房工作分配到同一 PublishSubscribeChannel 的多個子流中的原因。

20

.subscribe(s ->

.subscribe() 方法接受一個 IntegrationFlow 作為引數,可以指定為 Lambda 以將訂閱者配置為子流。我們在此使用了多個子流訂閱者,以避免多行 Lambda 並涵蓋一些 DSL 和 Spring Integration 功能。

21

s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))

在此,我們使用了一個簡單的 .handle() EIP 方法來阻塞當前執行緒一段時間,以演示咖啡館廚房準備飲料的速度。我們在此使用了 Google Guava 的 Uninterruptibles.sleepUninterruptibly,以避免在 Lambda 表示式中使用 try...catch 塊,儘管您也可以這樣做,並且您的 Lambda 將是多行的。或者您可以將該程式碼移到一個單獨的方法中,並在此處使用方法引用

由於我們沒有在 .publishSubscribeChannel() 上使用任何 Executor,所有訂閱者將按順序在同一執行緒上執行;在我們的示例中,它是來自前一個 QueueChannel 上的 poller 的一個 TaskScheduler 執行緒。這就是為什麼這個 sleep 會阻塞所有下游程序,並允許展示那個限制為 10 個元素的 QueueChannel繁忙狀態

22

.subscribe(sub -> sub

下一個子流訂閱者將在為冰飲 sleep 1 秒後執行。我們在這裡使用另一個子流,因為前一個的 .handle() 對於 MessageHandler 的 Lambda 特性來說是單向的。因此,為了推進整個流的處理,我們有多個訂閱者:一些子流在完成工作後就結束了,不返回任何東西給父流。

23

 .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)         

當前訂閱者子流中的 transformer 用於將 OrderItem 轉換為適合下一個 .handle 使用的友好 STDOUT 訊息。在這裡,我們看到了泛型與 Lambda 表示式的結合使用。這是透過 GenericTransformer 函式式介面實現的。

24

.handle(m -> System.out.println(m.getPayload())))))

這裡的 .handle() 僅僅是為了演示如何使用 Lambda 表示式將有效載荷列印到 STDOUT。這是我們的飲料準備好的訊號。之後,PublishSubscribeChannel 的最終(隱式)訂閱者會將包含 OrderItem 的訊息傳送到主流程中的 .transform()

25

.subFlowMapping("false", sf -> sf

用於熱飲.subFlowMapping()。實際上,它與前面的冰飲子流類似,但具有特定的熱飲業務邏輯。

26

s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))

用於熱飲sleepUninterruptibly。沒錯,我們需要更多時間來燒水!

27

 .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))      

主要的 OrderItemDrinktransformer,在咖啡館廚房訂閱者完成飲料準備後,當 .route() 子流返回其結果時執行。

28

.aggregate(aggregator -> aggregator

.aggregate() EIP 方法提供了類似的選項來配置 AggregatingMessageHandler 及其端點,就像我們在使用 Spring Integration XML 配置時使用 <aggregator> 元件一樣。當然,使用 Java DSL,我們可以直接在原地配置聚合器,無需任何額外的 Bean。Lambda 又一次派上用場了!從咖啡館業務邏輯的角度來看,由於我們在開始時將原始訂單分割成了 OrderItems,因此我們為初始 Order 組裝了 Delivery

29

.outputProcessor(group -> 

AggregatorSpec.outputProcessor() 允許我們在聚合器完成組後發出自定義結果。它類似於 <aggregator> 元件或 POJO 方法上的 @Aggregator 註解的 ref/method。我們在此的目標是為所有飲料組裝一個 Delivery

30

new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))    

如您所見,我們在此使用了 Java 8 的 Stream 特性來處理 Collection。我們迭代來自已釋放的 MessageGroup 的訊息,並將每條訊息轉換為其 Drink 有效載荷 (map)。Stream (.collect()) 的結果(一個 Drink 列表)被傳遞給 Delivery 建構函式。包含此新 Delivery 有效載荷的 Message 被髮送到我們 Cafe 場景中的下一個端點。

31

.correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)

.correlationStrategy() Lambda 演示了我們如何自定義聚合器的行為。當然,我們在此可以僅依賴 Spring Integration 內建的 SequenceDetails,它在流的開始時預設從 .split() 生成並填充到每條分割後的訊息中,但為了說明,此處包含了 CorrelationStrategy 的 Lambda 示例。(使用 XML,我們可以使用 correlation-expression 或自定義 CorrelationStrategy)。此行中 .aggregate() EIP 方法的第二個引數用於 endpointConfigurer,以自定義選項,例如 autoStartuprequiresReplyadviceChain 等。我們在此使用 null 來表明我們依賴端點的預設選項。許多 EIP 方法提供了帶或不帶 endpointConfigurer 的過載版本,但 .aggregate() 需要一個端點引數,以避免對 AggregatorSpec Lambda 引數進行顯式轉換。

32

.handle(CharacterStreamWritingMessageHandler.stdout());

這是我們流的末尾 - Delivery 已交付給客戶!我們在此使用 Spring Integration Core 中開箱即用的 CharacterStreamWritingMessageHandler 將訊息有效載荷列印到 STDOUT。這展示瞭如何從 Java DSL 中使用 Spring Integration Core(及其模組)中現有元件。

好了,我們已經完成了基於 Spring Integration Java DSL 的 Cafe Demo 示例的描述。將其與 XML 示例 進行比較,以獲取更多關於 Spring Integration 的資訊。

這不是一個全面的 DSL 功能教程。我們在此沒有回顧 endpointConfigurer 選項、Transformers 工廠、IntegrationComponentSpec 層次結構、NamespaceFactories、如何指定多個 IntegrationFlow bean 並將它們連線到一個應用程式等等。有關更多資訊,請參閱參考手冊

至少,這篇逐行教程應該向您展示 Spring Integration Java DSL 的基礎知識,以及它在 Spring Framework Java 和註解配置、Spring Integration 基礎和 Java 8 Lambda 支援之間的無縫融合!

另請參閱 si4demo,瞭解 Spring Integration 的演變,包括 Java DSL,這在 2014 年的 SpringOne/2GX 大會上有所展示。(影片應很快可用)。

一如既往,我們期待您的評論和反饋(StackOverflow (spring-integration 標籤)、Spring JIRAGitHub),我們非常歡迎貢獻

附言:即使本教程完全基於 Java 8 Lambda 支援,我們也不想忽略 Java 8 之前的使用者,我們將提供類似的非 Lambda 部落格文章。敬請關注!

訂閱 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速發展。

瞭解更多

獲得支援

Tanzu Spring 透過一個簡單的訂閱,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部