建立 Supplier Function 並生成 Spring Cloud Stream Source

工程 | Soby Chacko | 2020 年 7 月 27 日 | ...

本文是系列部落格的第 3 部分,介紹了用於 Spring Cloud Stream 應用程式的 Java 函式。

本系列的其他部分。

第 1 部分 - 一般介紹

第 2 部分 - 函式組合

在本系列的最後兩篇部落格中,我們提供了對將所有現有 Spring Cloud Stream App Starters 遷移到函式的新舉措的一般介紹,以及組合它們的各種方式。在本部落格中,我們將繼續本系列,展示如何開發、測試這些函式以及如何使用它們生成 Spring Cloud Stream 應用程式。特別地,本文重點介紹如何編寫 supplier 函式(實現 java.util.function.Supplier),然後生成 Spring Cloud Stream 的相應 source 應用程式。

編寫新的 supplier

為了深入理解這個概念,我們將採用一個用例並實現一個解決方案來滿足它。

用例

我們需要一個函式,當使用正確的配置呼叫時,它會以 atom、rss 等格式提供部落格訂閱源的內容。我們需要支援兩種 supplier 呼叫模型 - 一種是我們在 FaaS 環境中透過程式設計方式呼叫函式(例如,一個 REST 端點),另一種是 streaming supplier,一旦訂閱源可用,我們就能獲得持續的資料流。我們希望基於 ROME 庫構建這些 suppliers,ROME 庫是一個流行的訂閱源聚合庫。我們將從非 Spring 開發者和 Spring 開發者兩種視角來看待這個問題。

非 Spring 開發者

假設你不是 Spring 開發者,並且不熟悉已經為 ROME 提供抽象的 Spring Integration。在這種情況下,我們當然可以直接使用 ROME 來生成訂閱源記錄。例如,這是一個適用於此場景的有效 Supplier。

public Supplier<SyndEntry> feedSupplier()
{
	return () -> {
		//Use the ROME framework directly to produce syndicated entries.
	}
}

這樣做的好處是,我們可以在沒有任何 Spring 知識的情況下開發 supplier,並且可以使用該環境提供的抽象或依賴 Spring Cloud Function 等框架直接部署到無伺服器環境。

這本質上意味著,如果你是一名不熟悉 Spring Framework 的 Java 開發者,你仍然可以使用 java.util.function 包中定義的介面(例如 FunctionSupplierConsumer)來編寫函式,只需提供業務邏輯即可。然後,我們可以將開發的這個 artifact 轉換為 Spring Cloud Stream 應用程式,方法是新增 Spring Cloud Stream binder 依賴並使其成為一個 SpringBootApplication。透過提供一些配置屬性,例如中介軟體目的地,我們可以立即獲得將應用程式部署到 Spring Cloud Data Flow 等平臺上的價值,Spring Cloud Data Flow 將應用程式作為資料管道的一部分進行編排。透過這種方式,我們編寫的函式完全獨立於任何 Spring 依賴,並且只在部署過程的最後階段引入 Spring Cloud Stream、Spring Cloud Function 和 Spring Cloud Data Flow 等 Spring 元件。下圖展示了這一思路。

Stream Applications Layered Architecture for Functions

正如我們所見,函式元件可以獨立呼叫,也可以在將其轉換為 Spring Cloud Stream 應用程式後,作為 Spring Cloud Data Flow 管道的一部分進行呼叫。

Spring 開發者

雖然上述模型可能是一個很好的起點,但當我們開始直接使用 ROME 框架時,可能會很快意識到它涉及大量繁重的工作和更深入的庫知識。出錯的可能性很高,所以我們需要編寫大量測試來驗證我們的自定義實現按預期工作,並且所有邊界情況都已覆蓋。我們開始懷疑是否已經存在一些更簡單的抽象。這樣我們就無需編寫任何 ROME 特定的程式碼,因為抽象層會處理所有的複雜細節。幸運的是,我們有一個解決方案。Spring Integration 為許多企業技術提供了大量 inbound 和 outbound 介面卡。feed 介面卡就是其中之一,其實現基於 ROME。事實上,我們在預打包的 stream-applications 中提供的許多函式元件都基於 Spring Integration 介面卡。這些介面卡已被廣泛用於大量企業用例,並經過了嚴格的實戰測試。但是,我們想要編寫 supplier 的技術可能在 Spring Integration 中不可用。在這種情況下,正如我們上面看到的,我們當然可以自己編寫程式碼,並從 supplier 中呼叫它。

在 supplier 中使用 Spring Integration Feed 介面卡

如果你還沒這樣做,請 fork 並 clone stream applications 倉庫。然後在 functions/supplier 下建立一個新的 feed-supplier 模組。使用現有的 supplier 之一作為模板進行指導。

在專案中新增以下 Spring Integration Feed 介面卡依賴。這將引入 Spring Integration 的 feed 介面卡以及任何其他傳遞依賴。

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-feed</artifactId>
</dependency>

新增基本配置屬性

現在我們已經引入了核心依賴,接下來開始編寫一些程式碼。由於函式預計在 Spring Boot 環境中使用,我們需要建立一個 ConfigurationProperties 類來驅動 supplier 函式的配置。它可能看起來像這樣。

package org.springframework.cloud.fn.supplier.feed;

@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {

/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;

/**
* Feed url.
*/
private URL feedUrl;

// rest is omitted
}

如我們所見,我們在所有屬性上都使用了 feed.supplier 字首。

新增配置類

接下來,我們建立一個基於 Spring 的配置類,在其中提供所有必需的元件。我們將逐步構建它。下面是該類的基本結構。

package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {

}

將這些欄位新增到類中。

private final ConcurrentMetadataStore metadataStore;

private final Resource resource;

private final FeedSupplierProperties feedSuppplierProperties;

關於這些欄位的快速說明。Spring Integration 中的 Feed 介面卡提供了一種功能,可以避免重複讀取已經從訂閱源中讀取過的條目。我們上面定義的 metadataKey 屬性用於此目的。它透過使用元資料儲存來實現這一點。對於流行的資料庫,有各種元資料儲存可用。請包含以下依賴以使用記憶體中的簡單元資料儲存。

<dependency>
   <groupId>org.springframework.cloud.fn</groupId>
   <artifactId>metadata-store-common</artifactId>
   <version>${project.version}</version>
</dependency>

請注意,此要求特定於此 supplier,並非所有 suppliers 都需要它。

如果沒有基於 HTTP(或 HTTPS)的 URL 可用(我們可以透過配置屬性設定),使用者可以提供一個 Resource bean 來讀取訂閱源。

讓我們新增一個建構函式來使用這些欄位。

FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
                   ConcurrentMetadataStore metadataStore,
                   @Nullable Resource resource) {
  this.feedSuppplierProperties = feedSupplierProperties;
  this.metadataStore = metadataStore;
  this.resource = resource;
}

Resource 可以為空,因為大多數情況下我們可以直接將 URL 字串作為配置屬性傳遞,而無需提供 Resource bean。

Spring Integration Feed 介面卡提供了 FeedEntryMessageSource,它是一個 MessageSource 實現。我們將在 supplier 中使用此訊息源。讓我們將其設定為 Spring Bean。下面的程式碼非常容易理解。

@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
  final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
        this.feedSuppplierProperties.getMetadataKey()) :
       ...
  return feedEntryMessageSource;
}

非響應式 Supplier

現在我們已經準備好了 MessageSource bean,編寫一個簡單的 Supplier 並透過呼叫其 get 方法來以程式設計方式呼叫它相對容易。程式碼如下。

@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
  return () -> feedEntryMessageSource().receive();
}

我們可以將此 Supplier bean 注入到應用程式中,並透過程式設計方式呼叫 get 方法。當此 Supplier 在 Spring Cloud Stream 應用程式中使用時(我們稍後會看到),它將使用 Spring Cloud Stream 提供的預設輪詢器,該輪詢器預設每秒觸發一次 supplier。此計劃可以在輪詢器中更改。

響應式 Supplier

非響應式輪詢方案看起來不錯,但我們可能會問,如果我不想每隔一段時間就顯式輪詢,而是想以流式方式在訊息源中資料可用時立即獲取資料怎麼辦?嗯,我們有一個解決方案 - 開發一個響應式 supplier,一旦接收到資料,就立即傳送。讓我們看看詳細資訊。

在這裡,Spring Integration 再次提供了一些我們可以用來將 FeedEntryMessageSource 轉換為響應式釋出者的抽象,如下所示。

@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
  return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}

您可能會注意到,此 supplier 返回的是 Flux<Message<SyndEntry>>,而不是像我們在最初的非響應式 supplier 中那樣返回 Message<SyndEntry>,在非響應式 supplier 中,我們依賴於 supplier 的程式設計呼叫或某種其他輪詢機制。

其他響應式解決方案

好的,很高興我們有一個來自 Spring Integration 的 MessageSource,並且可以使用那個工具方法將其轉換為 Flux。如果不存在這樣的 MessageSource,而我們必須手動編寫用於要為其編寫響應式 style supplier 的系統的資料基本檢索程式碼怎麼辦?對於這些情況,我們可以使用 Project Reactor 提供的各種工具,然後透過程式設計方式將資料提供給它們。總而言之,當我們編寫響應式 streaming supplier 時,我們必須將資料作為 Flux 返回。

單元測試響應式 Supplier

讓我們為這個響應式 supplier 新增一個單元測試。我們可以使用RFC 4287 - Atom 聯合格式中描述的 Atom 訂閱源示例作為測試資料。將其包含在 src/test/resources 中。

這是測試類。

@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
     "feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {

  @Autowired
  Supplier<Flux<Message<SyndEntry>>> feedSupplier;

  @Test
  public void testFromSampleRssFile() {
     final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();

     StepVerifier.create(messageFlux)
           .assertNext((message) -> {
              assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
              assertThat(message.getPayload().getContents().size()).isEqualTo(1);
              assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
           })
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  static class FeedSupplierTestApplication {

  }

}

將 Supplier 函式新增到 Maven 函式 BOM 中

functions 專案在一個 Maven BOM 中聚合了所有可用的函式。將 feed-supplier 新增到此 BOM 中。如果你基於此函式生成 Spring Cloud Stream 應用程式,則主要需要這樣做。

從 Supplier 生成 Spring Cloud Stream 應用程式

在這個過程的當前階段,我們可以向倉庫提交包含我們 supplier 的 pull request,但如果想從 supplier 生成基於 Spring Cloud Stream binder 的應用程式,請繼續閱讀。生成後,這些應用程式可以獨立執行,也可以作為 Spring Cloud Data Flow 中資料編排管道的一部分執行。

請在 applications/source 下建立一個名為 feed-source 的新模組。正如我們在之前的部落格中提到的,java.util.function.Supplier 被對映為 Spring Cloud Stream Source。

我們不需要在我們的 feed supplier 之上新增任何自定義程式碼,因為它本身就可以使用。然而,既然我們正在討論 Spring Cloud Stream 應用程式,我們需要結合 supplier 函式使用測試 binder,以瞭解 supplier 如何與 Spring Cloud Stream 一起工作。

我們可以使用現有的 sources 之一作為模板來指導我們完成整個過程。我們甚至可以複製其中一個並逐步進行更改。

所有應用程式都使用了父 pom stream-applications-core,它引入了所有必要的測試依賴,例如上面提到的測試 binder。它還提供了負責生成基於 binder 應用程式的應用程式生成器外掛的基礎設施。

我們想強調的一點是,除非應用程式模組包含自定義程式碼,否則此模組僅成為一個生成基於 binder 的應用程式的應用程式生成器。換句話說,我們不會向其中新增一個帶有 @SpringBootApplication 的類,而是為我們生成的。

使用測試 Binder 測試 supplier

新增以下依賴以使用測試 binder 進行測試

<dependencies>
   <dependency>
       <groupId>org.springframework.cloud.fn</groupId>
       <artifactId>feed-supplier</artifactId>
       <scope>test</scope>
   </dependency>
</dependencies>

現在我們可以新增一個測試來驗證 feed-supplier 在 Spring Cloud Stream 中是否與測試 binder 一起工作。基本上,我們需要確保 supplier 透過測試 binder 生成資料,並將資料傳送到測試 binder 上的目的地。

這是測試背後的基本思路

public class FeedSourceTests {

  @Test
  public void testFileSource() throws Exception {
     try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
           TestChannelBinderConfiguration
                 .getCompleteConfiguration(FeedSourceTestApplication.class))
           .web(WebApplicationType.NONE)
           .run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {

        OutputDestination target = context.getBean(OutputDestination.class);
        Message<byte[]> sourceMessage = target.receive(10000);
        Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
        assertThat(title).isEqualTo("Atom draft-07 snapshot");
     }
  }

  @SpringBootApplication
  @Import(FeedSupplierConfiguration.class)
  public static class FeedSourceTestApplication {

  }
}

該測試與我們為 supplier 新增的單元測試大致相似,但存在一個很大的區別。在 supplier 中,我們是直接呼叫它並驗證生成的資料。在這裡,我們不直接呼叫 supplier,而是由 Spring Cloud Stream 中的繫結機制自動為我們完成。我們從 outbound 目的地接收資料,然後進行驗證。

測試通過後,就可以生成應用程式了。

生成基於 Binder 的應用程式

預設情況下,該外掛為 Spring Cloud Stream 中的 Kafka 和 Rabbit binder 生成應用程式。這在 stream-applications-core 的父 pom 中配置。如果需要為不同的 binder 定製生成,我們需要在那裡進行更改。下面是應用程式生成器外掛的配置。

<plugin>
   <groupId>org.springframework.cloud.stream.app.plugin</groupId>
   <artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
   <configuration>
       <generatedApp>
           <name>feed</name>
           <type>source</type>
           <version>${project.version}</version>
           <configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
       </generatedApp>
       <dependencies>
           <dependency>
               <groupId>org.springframework.cloud.fn</groupId>
               <artifactId>feed-supplier</artifactId>
           </dependency>
           <dependency>
               <groupId>org.springframework.cloud.stream.app</groupId>
               <artifactId>stream-applications-composite-function-support</artifactId>
               <version>${stream-apps-core.version}</version>
           </dependency>
       </dependencies>
   </configuration>
</plugin>

讓我們快速回顧一些細節。我們要求外掛建立一個名為 feed-source 的應用程式,並希望它使用我們上面開發的 Supplier 作為主配置類。在外掛的 dependencies 部分中,我們還需要新增應用程式所需的任何依賴,在本例中是 feed-supplier。我們需要將所有 processor 函式新增到所有生成的 source 應用程式中。這是因為我們可以將 source 與其他 processors 組合,而無需將它們作為單獨的微服務執行,正如我們在之前的部落格中看到的那樣。關於 source 與 processors 的函式組合的更多詳細資訊也請參見此處。這就是為什麼我們在外掛的 dependencies 部分中新增 stream-applications-composite-function-support 依賴的原因。

構建應用程式模組,我們將在 apps 資料夾中看到基於 binder 的應用程式。它們將命名為 feed-source-kafkafeed-source-rabbit。我們可以進入其中任何一個應用程式並構建它,然後將其用作獨立應用程式或作為 Spring Cloud Data Flow 管道的一部分。

結論。

在這篇部落格文章中,我們看到了開發、測試和貢獻 supplier/Spring Cloud Stream 應用程式組合的整個過程。請按照本文概述的步驟編寫您自己的 suppliers 和 sources。如果您已經完成了,請考慮將其貢獻回倉庫。

敬請期待…​

本文是本系列部落格的第三篇,該系列將涵蓋許多相關主題。在接下來的幾周內,敬請期待更多深入探討和重點主題。在本系列的下一篇部落格中,我們將像本文介紹如何編寫新的 Supplier 和 Source 一樣,編寫一個 Consumer 函式,然後從中生成一個 Sink 應用程式。

獲取 Spring 郵件訂閱

透過 Spring 郵件訂閱保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您快速發展。

瞭解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部