領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多親愛的 Spring 社群:
我們很高興地宣佈,在 Spring Integration 4.1 Release Candidate 釋出後不久,Spring Integration Java DSL 1.0 Release Candidate 現已釋出。請使用 Maven 或 Gradle 的 Milestone Repository,或下載 分發存檔 來進行試用。
有關更多資訊,請參閱專案的 主頁。
此次釋出包含許多新功能和改進,以及一些 bug 修復。GA 版本計劃在11月中旬釋出。
以下是自 上一個里程碑 以來的主要更改摘要
重構和破壞性更改
儘管仍然支援早期 Java 版本,但 Spring Integration Java DSL 主要面向 Java 8 及其 Lambda 支援。我們已刪除了一些 函式式介面,轉而使用 Java 8 中的類似介面:Consumer<T>、Function<T, R> 等。當然,為了支援與舊 Java 版本的向後相容性,我們在 DSL 原始碼中實現了類似的介面。使用舊於 8 的 Java 版本且使用了更改後接口的使用者需要進行修改以修復其編譯錯誤。例如
從這裡
.handle(Integer.class, (p, h) -> p * 2,
new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
到這裡
.handle(Integer.class, (p, h) -> p * 2,
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
當然,如果您在這裡使用 Java 8 Lambda,程式碼將無需更改
.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))
IntegrationFlows 現在只包含 from(...) 方法。.fromFixedMessageChannel() 已被 .from(String messageChannelName, boolean fixedSubscriber) 替換。
此外,為了解決一些包纏繞問題,我們將一些類移動到了不同的包中。
方法作用域函式
為了簡化 IDE 的程式碼補全,並避免冗餘地搜尋所需的 名稱空間工廠,我們添加了帶有 Function<T, R> 引數的過載方法。例如,以下程式碼片段是等效的
.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
其中 c 變數是 Channel 的“method-aggregator”物件,它委託給相應的 名稱空間工廠。其他類似的 Lambda 方法包括
IntegrationFlows.from(MessageSourcesFunction sources)IntegrationFlows.from(MessageProducersFunction producers)IntegrationFlows.from(MessagingGatewaysFunction gateways)IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)FunctionExpression
Spring Integration 具有強大的 Spring Expression Language (SpEL) 支援。由於 Java DSL 是純粹的(哦!)Java,因此在 expression 屬性中用長字串指定業務邏輯並沒有太大意義。受 Java 8 Lambda 支援的啟發,並追求最小改動的目標,我們引入了 FunctionExpression - SpEL Expression 介面的實現 - 它接受一個 Function<T, R> 並在每次 getValue() 時委託給它。現在,DSL 中的許多元件都提供 (Function<T, R> function) 方法作為類似 SpEL 方法的替代。以下是 FtpInboundFileSynchronizingMessageSource 的 localFilename 屬性的示例
使用 SpEL
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilenameExpression("payload.toUpperCase() + '.a'")
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
使用 Lambda
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilename(f -> f.toUpperCase() + ".a")))
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
FunctionExpression 的其他有趣用法包括 Enricher 和 HeaderEnricher
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression 還支援執行時型別轉換,就像標準的 SpelExpression 一樣。
子流程
我們為一些 if...else 和 publish-subscribe 元件引入了 SubFlow 支援。最簡單的例子是 .publishSubscribeChannel()
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
當然,使用單獨的 IntegrationFlow @Bean 定義也可以達到相同的結果,但我們希望您會發現子流程風格的邏輯組合很有用。
類似的 publish-subscribe 子流程組合由 .routeToRecipients() 提供。
另一個例子是 .filter() 上的 .discardFlow() 而不是 .discardChannel()。
.route() 值得特別關注
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 繼續像常規 Router 對映一樣工作,但 .subFlowMapping() 將該子流程與主流程繫結。換句話說,任何路由器的子流程在 .route() 之後都會返回主流程。
類似的“返回主流程”子流程由 .gateway() 支援
@Bean
public IntegrationFlow gatewayFlow() {
return f ->
f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}
然而,這個 Gateway 子流程只是透過顯式的 DirectChannel 與主流程連線,並使用該通道作為 requestChannel 選項包裝成常規的 GatewayMessageHandler。
當然,子流程可以巢狀任意深度,但我們不建議這樣做,因為事實上,即使在路由器的情況下,在流程中新增複雜的子流程也會很快變得難以人工解析。
結論
自上一個里程碑以來,我們沒有新增更多的 協議特定介面卡。並非所有介面卡都將直接由 DSL 支援,儘管最常用的介面卡享有第一類支援。但是,那些不享有第一類支援的介面卡可以透過 .handle() 輕鬆連線。正如我們之前討論過的,我們正在尋找輸入來確定剩餘介面卡的實現優先順序,所以請不要猶豫分享您的想法和建議!
您可以透過其 原始碼 和 參考手冊 獲取有關這些類和現有類的更多資訊。
我們期待儘快收到您的評論和反饋(StackOverflow (spring-integration 標籤)、Spring JIRA、GitHub),並在我們未來幾周的 GA 版本釋出前報告您發現的問題。
一如既往,我們非常歡迎 貢獻。