Spring integration Java DSL 1.1 M1 已釋出

釋出 | Artem Bilan | 2015年4月15日 | ...

尊敬的 Spring 社群成員:

我們高興地宣佈 Spring Integration Java DSL 1.1 里程碑 1 現已釋出。使用 里程碑倉庫 並配合 Maven 或 Gradle 進行早期訪問試用。

compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"

老實說,1.1 版本計劃的許多功能尚未實現,但感謝我們的朋友 Josh Long鼓勵以及最近宣佈的 Apache Kafka 支援(Spring Integration Kafka 支援 1.1 正式釋出Spring XD 1.1.1 釋出),我們釋出此里程碑 1 版本主要是為了在 Java 配置 DSL 中展示對 Apache Kafka 的支援。

在本文中,我們將介紹此版本中的該功能及其他功能。

Apache Kafka 支援

讓我們從 Spring Integration Java DSL 的 KafkaTests 類中的一個“簡單”示例開始

@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
        return new DefaultConnectionFactory(
                new ZookeeperConfiguration(zookeeper.connectString()));
}

@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager =
                           new MetadataStoreOffsetManager(connectionFactory);
        // start reading at the end of the
       offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
       return offsetManager;
}

@Bean
public IntegrationFlow listeningFromKafkaFlow(
                    ConnectionFactory connectionFactory,
                    OffsetManager offsetManager) {
     return IntegrationFlows
         .from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
                  .autoCommitOffset(false)
		  .payloadDecoder(String::new)
		  .keyDecoder(b -> Integer.valueOf(new String(b)))
		  .configureListenerContainer(c ->
				c.offsetManager(offsetManager)
						.maxFetch(100)))
         .<String, String>transform(String::toUpperCase)
         .channel(c -> c.queue("listeningFromKafkaResults"))
         .get();
}
  • EmbeddedZookeeper 是 Apache Kafka test artifact(在本例中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test')的一部分,它與 kafka.utils.TestUtils 等許多其他功能一起,對於單元測試非常有用。
  • 有關 ConnectionFactoryOffsetManager 的更多資訊,請參閱 Spring Integration Kafka 專案。
  • 上述配置中最重要的部分是 IntegrationFlow bean 定義。Spring Integration Java DSL 提供了一個名稱空間工廠 - Kafka - 它利用 IntegrationComponentSpec 實現來支援 Spring Integration Kafka 介面卡,例如用於 KafkaMessageDrivenChannelAdapterKafkaMessageDrivenChannelAdapterSpec
  • 這是一個構建器模式的例子,該規範只是將來自 method-chain 的選項委託給底層的 KafkaMessageDrivenChannelAdapter 例項。
  • 對於那些不熟悉 Scala(Apache Kafka 的編寫語言)的人,比如我自己,請注意 .payloadDecoder(String::new) 這一行。kafka.serializer.Decoder 是一個 Scala trait,它被編譯成一個 Java 介面(不是類!),所以我們可以在這裡將它表示為一個 Java 8 lambda 方法。
  • .configureListenerContainer() 是一個 lambda 感知方法,用於分離 KafkaMessageListenerContainer 特定選項的關注點。

Kafka 名稱空間工廠中的其他自解釋工廠方法包括用於 KafkaHighLevelConsumerMessageSource 輪詢介面卡的 .inboundChannelAdapter(...) 和用於 KafkaProducerMessageHandler.outboundChannelAdapter(...)。請查閱它們的 JavaDocs 以獲取更多資訊。

有關更多資訊,請檢視 Josh Long 關於使用 Apache Kafka 和 Spring 構建整合和資料處理管道的帖子!

POJO 方法呼叫

社群提供了許多很棒的反饋(網路研討會回放:Spring Integration Java DSL 介紹),其中很多是關於 bean 方法呼叫元件(服務、轉換器、路由器等)。我們清楚地聽到了你們的意見:元件方法選擇已得到改進。下面是一個示例,它類似於 XML 配置中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>


@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {

	@Autowired
	private GreetingService greetingService;

	@Bean
	public IntegrationFlow greetingFlow() {
		return IntegrationFlows.from("greetingChannel")
				.handle(this.greetingService)
				.get();
	}

}

@Component
public class GreetingService {

   public void greeting(String payload) {
        System.out.println("Hello " + payload);
   }
}

在這裡,greeting 方法將由框架自動選擇。另一種方法是使用 methodName 引數,以便在存在歧義的情況下指定方法。許多其他 EIP 實現也引入了類似的 POJO 方法呼叫 EIP 方法,例如 transform(Object service, String methodName)split(Object service) 等。

Spring Integration Java DSL 也遵循 Spring Integration 訊息註解,例如 @ServiceActivator@Router@Filter 等,甚至包括 @Payload@Header。請查閱 IntegrationFlowDefinition 的 JavaDocs 以獲取更多資訊。

IntegrationFlowAdapter

毫不奇怪,由於 IntegrationFlow 是一個介面,我們可以直接提供其實現作為自定義元件,並且它在 Spring Integration Java DSL 環境中可以按原樣工作

@Component
public class MyFlow implements IntegrationFlow {

	@Override
	public void configure(IntegrationFlowDefinition<?> f) {
		f.<String, String>transform(String::toUpperCase);
	}

}

這類似於 @Bean 定義,但這種方法有助於我們的元件保持更鬆散的耦合。

但是,等等,還有更多!IntegrationFlow 實現(例如 @Bean 定義中的 lambda)僅限於 DirectChannel 輸入通道。我們在這裡更進一步,引入了 IntegrationFlowAdapter。這是我最喜歡的一個示例,演示了它的用法

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

        private final AtomicBoolean invoked = new tomicBoolean();

        public Date nextExecutionTime(TriggerContext triggerContext) {
              return this.invoked.getAndSet(true) ? null : new Date();
       }

       @Override
       protected IntegrationFlowDefinition<?> buildFlow() {
          return from(this, "messageSource",
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                   .split(this)
		   .transform(this)
		   .aggregate(a -> a.processor(this, null), null)
		   .enrichHeaders(Collections.singletonMap("foo", "FOO"))
		   .filter(this)
		   .handle(this)
		   .channel(c -> c.queue("myFlowAdapterOutput"));
      }

      public String messageSource() {
	       return "B,A,R";
      }

      @Splitter
      public String[] split(String payload) {
           return StringUtils.commaDelimitedListToStringArray(payload);
      }

      @Transformer
      public String transform(String payload) {
           return payload.toLowerCase();
      }

      @Aggregator
      public String aggregate(List<String> payloads) {
             return payloads.stream().collect(Collectors.joining());
      }

      @Filter
      public boolean filter(@Header Optional<String> foo) {
              return foo.isPresent();
      }

      @ServiceActivator
      public String handle(String payload, @Header String foo) {
             return payload + ":" + foo;
      }

}

當然,有了 POJO 方法呼叫支援(參見上文),可以輕鬆構建流程。

動態語言(指令碼)支援

Spring FrameworkSpring Integration 長期以來一直支援動態語言,並且主要與 XML Spring 配置關聯。從 Java 程式碼處理指令碼(如 Groovy、Ruby、JavaScript 等)可能看起來很奇怪,但我們發現它是一個在執行時重新載入功能的有用工具,尤其是在 Java lambda 不夠動態的情況下。讓我們看看 Spring Integration Java DSL 中的 Scripts 名稱空間工廠

@Configuration
@EnableIntegration
public class ScriptsConfiguration {

	@Value("com/my/project/integration/scripts/splitterScript.groovy")
	private Resource splitterScript;

	@Bean
	public PollableChannel results() {
		return new QueueChannel();
	}

	@Bean
	public IntegrationFlow scriptSplitter() {
		return f -> f
        	             .split(Scripts.script(this.splitterScript)
                                              .refreshCheckDelay(10000)
                                              .variable("foo", "bar"))
                             .channel(results());
	}

}

這種指令碼支援使我們能夠僅處理外部資源,這些資源可以在執行時更改和重新載入。Spring Integration Scripting 模組支援的 inline 指令碼沒有意義,因為對於這種情況我們有 Java 8 lambda。

內聯 WireTap

Wire Tap EI 模式Spring Integration 中作為 ChannelInterceptor 實現,可以像這樣注入到任何 MessageChannel 中作為攔截器

@Bean
public MessageChannel myChannel() {
     return MessageChannels.direct()
                .interceptor(new WireTap(loggerChannel()))
                .get();
}

IntegrationFlow 定義允許我們省略 EIP 元件之間的 MessageChannel 宣告,因此我們引入了內聯的 .wireTap() EIP 方法,以便為這些匿名通道注入 WireTap。以下是一些示例

@Bean
public IntegrationFlow wireTapFlow1() {
	return IntegrationFlows.from("tappedChannel1")
		.wireTap("tapChannel",
                         wt -> wt.selector(m -> m.getPayload().equals("foo")))
		.channel("nullChannel")
		.get();
}

@Bean
public IntegrationFlow wireTapFlow2() {
	return f -> f
		.wireTap(sf -> sf
			.<String, String>transform(String::toUpperCase)
			.channel(c -> c.queue("wireTapSubflowResult")))
		.channel("nullChannel");
}

請參閱 IntegrationFlowDefinition.wireTap() 方法的 JavaDocs 以獲取更多資訊,並且不要錯過我們在 GitHub 專案頁面上的測試用例。

總結

1.1 版本還有很多工作要做,例如進一步簡化 .aggregate() 等配置,注入外部子流的能力,將 IntegrationComponentSpec 實現配置為單獨的 @Bean 以簡化目標流定義的能力,更多特定協議的名稱空間工廠等等。請隨時透過 StackOverflow、JIRA 和 GitHub issues 聯絡我們,分享您的想法和建議!

專案頁面 | JIRA | 問題 | [貢獻] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration 標籤)

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一次簡單訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部