搶佔先機
VMware 提供培訓和認證,助您快速提升。
瞭解更多尊敬的 Spring 社群成員:
我們高興地宣佈 Spring Integration Java DSL 1.1 里程碑 1 現已釋出。使用 里程碑倉庫 並配合 Maven 或 Gradle 進行早期訪問試用。
compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"
老實說,1.1
版本計劃的許多功能尚未實現,但感謝我們的朋友 Josh Long 的鼓勵以及最近宣佈的 Apache Kafka 支援(Spring Integration Kafka 支援 1.1 正式釋出,Spring XD 1.1.1 釋出),我們釋出此里程碑 1 版本主要是為了在 Java 配置 DSL 中展示對 Apache Kafka 的支援。
在本文中,我們將介紹此版本中的該功能及其他功能。
讓我們從 Spring Integration Java DSL 的 KafkaTests
類中的一個“簡單”示例開始
@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
return new DefaultConnectionFactory(
new ZookeeperConfiguration(zookeeper.connectString()));
}
@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
MetadataStoreOffsetManager offsetManager =
new MetadataStoreOffsetManager(connectionFactory);
// start reading at the end of the
offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
return offsetManager;
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(
ConnectionFactory connectionFactory,
OffsetManager offsetManager) {
return IntegrationFlows
.from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
.autoCommitOffset(false)
.payloadDecoder(String::new)
.keyDecoder(b -> Integer.valueOf(new String(b)))
.configureListenerContainer(c ->
c.offsetManager(offsetManager)
.maxFetch(100)))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
EmbeddedZookeeper
是 Apache Kafka test
artifact(在本例中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'
)的一部分,它與 kafka.utils.TestUtils
等許多其他功能一起,對於單元測試非常有用。ConnectionFactory
和 OffsetManager
的更多資訊,請參閱 Spring Integration Kafka 專案。IntegrationFlow
bean 定義。Spring Integration Java DSL 提供了一個名稱空間工廠 - Kafka
- 它利用 IntegrationComponentSpec
實現來支援 Spring Integration Kafka 介面卡,例如用於 KafkaMessageDrivenChannelAdapter
的 KafkaMessageDrivenChannelAdapterSpec
。method-chain
的選項委託給底層的 KafkaMessageDrivenChannelAdapter
例項。.payloadDecoder(String::new)
這一行。kafka.serializer.Decoder
是一個 Scala trait
,它被編譯成一個 Java 介面(不是類!),所以我們可以在這裡將它表示為一個 Java 8 lambda 方法。.configureListenerContainer()
是一個 lambda 感知方法,用於分離 KafkaMessageListenerContainer
特定選項的關注點。Kafka
名稱空間工廠中的其他自解釋工廠方法包括用於 KafkaHighLevelConsumerMessageSource
輪詢介面卡的 .inboundChannelAdapter(...)
和用於 KafkaProducerMessageHandler
的 .outboundChannelAdapter(...)
。請查閱它們的 JavaDocs 以獲取更多資訊。
有關更多資訊,請檢視 Josh Long 關於使用 Apache Kafka 和 Spring 構建整合和資料處理管道的帖子!
社群提供了許多很棒的反饋(網路研討會回放:Spring Integration Java DSL 介紹),其中很多是關於 bean 方法呼叫元件(服務、轉換器、路由器等)。我們清楚地聽到了你們的意見:元件方法選擇已得到改進。下面是一個示例,它類似於 XML 配置中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>
@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {
@Autowired
private GreetingService greetingService;
@Bean
public IntegrationFlow greetingFlow() {
return IntegrationFlows.from("greetingChannel")
.handle(this.greetingService)
.get();
}
}
@Component
public class GreetingService {
public void greeting(String payload) {
System.out.println("Hello " + payload);
}
}
在這裡,greeting
方法將由框架自動選擇。另一種方法是使用 methodName
引數,以便在存在歧義的情況下指定方法。許多其他 EIP 實現也引入了類似的 POJO 方法呼叫 EIP 方法,例如 transform(Object service, String methodName)
、split(Object service)
等。
Spring Integration Java DSL 也遵循 Spring Integration 訊息註解,例如 @ServiceActivator
、@Router
、@Filter
等,甚至包括 @Payload
、@Header
。請查閱 IntegrationFlowDefinition
的 JavaDocs 以獲取更多資訊。
毫不奇怪,由於 IntegrationFlow
是一個介面,我們可以直接提供其實現作為自定義元件,並且它在 Spring Integration Java DSL 環境中可以按原樣工作
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
這類似於 @Bean
定義,但這種方法有助於我們的元件保持更鬆散的耦合。
但是,等等,還有更多!IntegrationFlow
實現(例如 @Bean
定義中的 lambda)僅限於 DirectChannel
輸入通道。我們在這裡更進一步,引入了 IntegrationFlowAdapter
。這是我最喜歡的一個示例,演示了它的用法
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new tomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this, "messageSource",
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("foo", "FOO"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "B,A,R";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> foo) {
return foo.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String foo) {
return payload + ":" + foo;
}
}
當然,有了 POJO 方法呼叫支援(參見上文),可以輕鬆構建流程。
Spring Framework 和 Spring Integration 長期以來一直支援動態語言,並且主要與 XML Spring 配置關聯。從 Java 程式碼處理指令碼(如 Groovy、Ruby、JavaScript 等)可能看起來很奇怪,但我們發現它是一個在執行時重新載入功能的有用工具,尤其是在 Java lambda 不夠動態的情況下。讓我們看看 Spring Integration Java DSL 中的 Scripts
名稱空間工廠
@Configuration
@EnableIntegration
public class ScriptsConfiguration {
@Value("com/my/project/integration/scripts/splitterScript.groovy")
private Resource splitterScript;
@Bean
public PollableChannel results() {
return new QueueChannel();
}
@Bean
public IntegrationFlow scriptSplitter() {
return f -> f
.split(Scripts.script(this.splitterScript)
.refreshCheckDelay(10000)
.variable("foo", "bar"))
.channel(results());
}
}
這種指令碼支援使我們能夠僅處理外部資源,這些資源可以在執行時更改和重新載入。Spring Integration Scripting 模組支援的 inline
指令碼沒有意義,因為對於這種情況我們有 Java 8 lambda。
Wire Tap EI 模式在 Spring Integration 中作為 ChannelInterceptor
實現,可以像這樣注入到任何 MessageChannel
中作為攔截器
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct()
.interceptor(new WireTap(loggerChannel()))
.get();
}
IntegrationFlow
定義允許我們省略 EIP 元件之間的 MessageChannel
宣告,因此我們引入了內聯的 .wireTap()
EIP 方法,以便為這些匿名通道注入 WireTap
。以下是一些示例
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlows.from("tappedChannel1")
.wireTap("tapChannel",
wt -> wt.selector(m -> m.getPayload().equals("foo")))
.channel("nullChannel")
.get();
}
@Bean
public IntegrationFlow wireTapFlow2() {
return f -> f
.wireTap(sf -> sf
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("wireTapSubflowResult")))
.channel("nullChannel");
}
請參閱 IntegrationFlowDefinition.wireTap()
方法的 JavaDocs 以獲取更多資訊,並且不要錯過我們在 GitHub 專案頁面上的測試用例。
1.1 版本還有很多工作要做,例如進一步簡化 .aggregate()
等配置,注入外部子流的能力,將 IntegrationComponentSpec
實現配置為單獨的 @Bean
以簡化目標流定義的能力,更多特定協議的名稱空間工廠等等。請隨時透過 StackOverflow、JIRA 和 GitHub issues 聯絡我們,分享您的想法和建議!
專案頁面 | JIRA | 問題 | [貢獻] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration
標籤)