領先一步
VMware 提供培訓和認證,助您快速提升。
瞭解更多親愛的 Spring 社群成員,
我們很高興地宣佈,繼 Spring Integration 4.1 Release Candidate 釋出後不久,Spring Integration Java DSL 1.0 Release Candidate 現已可用。請使用 Maven 或 Gradle 配置 Milestone Repository,或下載 發行版歸檔檔案,來試用一下。
有關更多資訊,請參閱專案主頁。
此版本包含了許多新特性和改進,以及大量的 bug 修復。GA (General Availability) 版本計劃於 11 月中旬釋出。
以下是自上一個里程碑版本以來的主要變更摘要:
重構和破壞性變更
雖然仍支援更早的 Java 版本,但 Spring Integration Java DSL 主要針對 Java 8 及其 Lambda 支援。我們移除了一些 functional interfaces
,轉而使用 Java 8 中類似的介面,如 Consumer<T>
、Function<T, R>
等。當然,為了支援與舊 Java 版本的向後相容性,我們在 DSL 原始碼中實現了類似的介面。使用更改後的介面且 Java 版本低於 8 的使用者需要進行修改以解決編譯錯誤。例如:
從此
.handle(Integer.class, (p, h) -> p * 2,
new EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
到此
.handle(Integer.class, (p, h) -> p * 2,
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> spec) {
spec.poller(Pollers.cron("7 * * * * ?"));
}
})
當然,如果您在此處使用 Java 8 Lambda,則程式碼無需更改
.handle(Integer.class, (p, h) -> p * 2, e -> e.poller(Pollers.cron("7 * * * * ?")))
IntegrationFlows
現在只包含 from(...)
方法。.fromFixedMessageChannel()
已被 .from(String messageChannelName, boolean fixedSubscriber)
替換。
此外,為了解決一些包纏繞(package tangle)問題,我們將一些類移動到了不同的包中。
方法範圍函式
為了簡化 IDE 中的程式碼補全並避免對所需 Namespace Factory
進行冗餘搜尋,我們添加了帶有 Function<T, R>
引數的過載方法。例如,這些程式碼片段是等效的:
.....
.channel(Amqp.pollableChannel(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
.channel(c -> c.amqpPollable(this.rabbitConnectionFactory)
.queueName("amqpReplyChannel")
.channelTransacted(true))
....
其中變數 c
是 Channel
的“方法聚合器”物件,它委託給適當的 Namespace Factory
。其他類似的 Lambda 方法有:
IntegrationFlows.from(MessageSourcesFunction sources)
IntegrationFlows.from(MessageProducersFunction producers)
IntegrationFlows.from(MessagingGatewaysFunction gateways)
IntegrationFlowDefinition.handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters)
EndpointSpec.poller(Function<PollerFactory, PollerSpec> pollers)
FunctionExpression
Spring Integration 擁有出色的 Spring Expression Language (SpEL) 支援。由於 Java DSL 是純 (呃!) Java,因此在 expression
屬性的長字串中指定某些業務邏輯意義不大。受 Java 8 Lambda 支援的啟發,併為了追求最小化變更,我們引入了 FunctionExpression
——SpEL Expression
介面的一個實現——它接受一個 Function<T, R>
並在每次呼叫 getValue()
時委託給它。現在,DSL 中的許多元件都提供了 (Function<T, R> function)
方法,作為類似 SpEL 方法的替代方案。以下是 FtpInboundFileSynchronizingMessageSource
的 localFilename
屬性示例:
使用 SpEL
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilenameExpression("payload.toUpperCase() + '.a'")
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
使用 Lambda
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(s -> s.ftp(this.ftpSessionFactory)
.remoteDirectory("ftpSource")
.localFilename(f -> f.toUpperCase() + ".a")))
.channel(c -> c.queue("ftpInboundResultChannel"))
.get();
}
FunctionExpression
的其他有趣用法是 Enricher
和 HeaderEnricher
.enrich(e -> e.requestChannel("enrichChannel")
.requestPayload(Message::getPayload)
.propertyFunction("date", m -> new Date()))
FunctionExpression
也支援執行時型別轉換,與標準的 SpelExpression
類似。
子流(SubFlows)
我們為一些 if...else
和 publish-subscribe
元件引入了 SubFlow
支援。最簡單的例子是 .publishSubscribeChannel()
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
當然,相同的結果也可以透過單獨的 IntegrationFlow
@Bean
定義來實現,但我們希望您會發現子流風格的邏輯組合很有用。
類似的 publish-subscribe
子流組合由 .routeToRecipients()
提供。
另一個例子是在 .filter()
上使用 .discardFlow()
而不是 .discardChannel()
。
.route()
值得特別關注
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping()
繼續像常規 Router
對映一樣工作,但 .subFlowMapping()
將子流與主流繫結在一起。換句話說,任何路由器的子流在 .route()
之後都會返回主流程。
類似的“返回主流程”子流由 .gateway()
支援
@Bean
public IntegrationFlow gatewayFlow() {
return f ->
f.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway(gf -> gf.transform("From Gateway SubFlow: "::concat));
}
然而,這個 Gateway SubFlow 只是透過顯式的 DirectChannel
與主流程連線,並使用該通道作為 requestChannel
選項包裝到常規的 GatewayMessageHandler
中。
當然,子流可以任意深度巢狀,但我們不建議這樣做,因為實際上,即使在路由器的情況下,在流程中新增複雜的子流也會很快變得難以被人理解。
結論
自上一個里程碑版本以來,我們沒有新增更多 protocol specific adapters
。並非所有介面卡都會被 DSL 直接支援,儘管最常用的介面卡擁有頭等支援。但是,那些沒有頭等支援的介面卡可以使用 .handle()
輕鬆連線。正如我們之前討論的,我們正在尋求您的意見來優先實現剩餘的介面卡,所以請不要吝嗇分享您的想法和建議!
您可以從它們的原始碼和參考手冊中獲取關於這些類和現有類的更多資訊。
我們期待儘快收到您的評論和反饋(StackOverflow (spring-integration
標籤),Spring JIRA,GitHub),並在我們 GA 之前的幾周內報告您發現的問題。
一如既往,我們非常歡迎貢獻。