領先一步
VMware 提供培訓和認證,助您快速發展。
瞭解更多本文是系列部落格的第 3 部分,介紹了用於 Spring Cloud Stream 應用程式的 Java 函式。
本系列的其他部分。
在本系列的最後兩篇部落格中,我們提供了對將所有現有 Spring Cloud Stream App Starters 遷移到函式的新舉措的一般介紹,以及組合它們的各種方式。在本部落格中,我們將繼續本系列,展示如何開發、測試這些函式以及如何使用它們生成 Spring Cloud Stream 應用程式。特別地,本文重點介紹如何編寫 supplier 函式(實現 java.util.function.Supplier
),然後生成 Spring Cloud Stream 的相應 source 應用程式。
為了深入理解這個概念,我們將採用一個用例並實現一個解決方案來滿足它。
我們需要一個函式,當使用正確的配置呼叫時,它會以 atom、rss 等格式提供部落格訂閱源的內容。我們需要支援兩種 supplier 呼叫模型 - 一種是我們在 FaaS 環境中透過程式設計方式呼叫函式(例如,一個 REST 端點),另一種是 streaming supplier,一旦訂閱源可用,我們就能獲得持續的資料流。我們希望基於 ROME 庫構建這些 suppliers,ROME 庫是一個流行的訂閱源聚合庫。我們將從非 Spring 開發者和 Spring 開發者兩種視角來看待這個問題。
假設你不是 Spring 開發者,並且不熟悉已經為 ROME 提供抽象的 Spring Integration。在這種情況下,我們當然可以直接使用 ROME 來生成訂閱源記錄。例如,這是一個適用於此場景的有效 Supplier。
public Supplier<SyndEntry> feedSupplier()
{
return () -> {
//Use the ROME framework directly to produce syndicated entries.
}
}
這樣做的好處是,我們可以在沒有任何 Spring 知識的情況下開發 supplier,並且可以使用該環境提供的抽象或依賴 Spring Cloud Function 等框架直接部署到無伺服器環境。
這本質上意味著,如果你是一名不熟悉 Spring Framework 的 Java 開發者,你仍然可以使用 java.util.function
包中定義的介面(例如 Function
、Supplier
和 Consumer
)來編寫函式,只需提供業務邏輯即可。然後,我們可以將開發的這個 artifact 轉換為 Spring Cloud Stream 應用程式,方法是新增 Spring Cloud Stream binder 依賴並使其成為一個 SpringBootApplication
。透過提供一些配置屬性,例如中介軟體目的地,我們可以立即獲得將應用程式部署到 Spring Cloud Data Flow 等平臺上的價值,Spring Cloud Data Flow 將應用程式作為資料管道的一部分進行編排。透過這種方式,我們編寫的函式完全獨立於任何 Spring 依賴,並且只在部署過程的最後階段引入 Spring Cloud Stream、Spring Cloud Function 和 Spring Cloud Data Flow 等 Spring 元件。下圖展示了這一思路。
正如我們所見,函式元件可以獨立呼叫,也可以在將其轉換為 Spring Cloud Stream 應用程式後,作為 Spring Cloud Data Flow 管道的一部分進行呼叫。
雖然上述模型可能是一個很好的起點,但當我們開始直接使用 ROME 框架時,可能會很快意識到它涉及大量繁重的工作和更深入的庫知識。出錯的可能性很高,所以我們需要編寫大量測試來驗證我們的自定義實現按預期工作,並且所有邊界情況都已覆蓋。我們開始懷疑是否已經存在一些更簡單的抽象。這樣我們就無需編寫任何 ROME 特定的程式碼,因為抽象層會處理所有的複雜細節。幸運的是,我們有一個解決方案。Spring Integration 為許多企業技術提供了大量 inbound 和 outbound 介面卡。feed 介面卡就是其中之一,其實現基於 ROME。事實上,我們在預打包的 stream-applications 中提供的許多函式元件都基於 Spring Integration 介面卡。這些介面卡已被廣泛用於大量企業用例,並經過了嚴格的實戰測試。但是,我們想要編寫 supplier 的技術可能在 Spring Integration 中不可用。在這種情況下,正如我們上面看到的,我們當然可以自己編寫程式碼,並從 supplier 中呼叫它。
如果你還沒這樣做,請 fork 並 clone stream applications 倉庫。然後在 functions/supplier
下建立一個新的 feed-supplier
模組。使用現有的 supplier 之一作為模板進行指導。
在專案中新增以下 Spring Integration Feed 介面卡依賴。這將引入 Spring Integration 的 feed 介面卡以及任何其他傳遞依賴。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
現在我們已經引入了核心依賴,接下來開始編寫一些程式碼。由於函式預計在 Spring Boot 環境中使用,我們需要建立一個 ConfigurationProperties
類來驅動 supplier 函式的配置。它可能看起來像這樣。
package org.springframework.cloud.fn.supplier.feed;
@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {
/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;
/**
* Feed url.
*/
private URL feedUrl;
// rest is omitted
}
如我們所見,我們在所有屬性上都使用了 feed.supplier
字首。
接下來,我們建立一個基於 Spring 的配置類,在其中提供所有必需的元件。我們將逐步構建它。下面是該類的基本結構。
package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {
}
將這些欄位新增到類中。
private final ConcurrentMetadataStore metadataStore;
private final Resource resource;
private final FeedSupplierProperties feedSuppplierProperties;
關於這些欄位的快速說明。Spring Integration 中的 Feed 介面卡提供了一種功能,可以避免重複讀取已經從訂閱源中讀取過的條目。我們上面定義的 metadataKey
屬性用於此目的。它透過使用元資料儲存來實現這一點。對於流行的資料庫,有各種元資料儲存可用。請包含以下依賴以使用記憶體中的簡單元資料儲存。
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>metadata-store-common</artifactId>
<version>${project.version}</version>
</dependency>
請注意,此要求特定於此 supplier,並非所有 suppliers 都需要它。
如果沒有基於 HTTP(或 HTTPS)的 URL 可用(我們可以透過配置屬性設定),使用者可以提供一個 Resource
bean 來讀取訂閱源。
讓我們新增一個建構函式來使用這些欄位。
FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
ConcurrentMetadataStore metadataStore,
@Nullable Resource resource) {
this.feedSuppplierProperties = feedSupplierProperties;
this.metadataStore = metadataStore;
this.resource = resource;
}
Resource
可以為空,因為大多數情況下我們可以直接將 URL 字串作為配置屬性傳遞,而無需提供 Resource
bean。
Spring Integration Feed 介面卡提供了 FeedEntryMessageSource,它是一個 MessageSource
實現。我們將在 supplier 中使用此訊息源。讓我們將其設定為 Spring Bean。下面的程式碼非常容易理解。
@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
this.feedSuppplierProperties.getMetadataKey()) :
...
return feedEntryMessageSource;
}
現在我們已經準備好了 MessageSource bean,編寫一個簡單的 Supplier 並透過呼叫其 get
方法來以程式設計方式呼叫它相對容易。程式碼如下。
@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
return () -> feedEntryMessageSource().receive();
}
我們可以將此 Supplier bean 注入到應用程式中,並透過程式設計方式呼叫 get
方法。當此 Supplier
在 Spring Cloud Stream 應用程式中使用時(我們稍後會看到),它將使用 Spring Cloud Stream 提供的預設輪詢器,該輪詢器預設每秒觸發一次 supplier。此計劃可以在輪詢器中更改。
非響應式輪詢方案看起來不錯,但我們可能會問,如果我不想每隔一段時間就顯式輪詢,而是想以流式方式在訊息源中資料可用時立即獲取資料怎麼辦?嗯,我們有一個解決方案 - 開發一個響應式 supplier,一旦接收到資料,就立即傳送。讓我們看看詳細資訊。
在這裡,Spring Integration 再次提供了一些我們可以用來將 FeedEntryMessageSource
轉換為響應式釋出者的抽象,如下所示。
@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}
您可能會注意到,此 supplier 返回的是 Flux<Message<SyndEntry>>
,而不是像我們在最初的非響應式 supplier 中那樣返回 Message<SyndEntry>
,在非響應式 supplier 中,我們依賴於 supplier 的程式設計呼叫或某種其他輪詢機制。
好的,很高興我們有一個來自 Spring Integration 的 MessageSource
,並且可以使用那個工具方法將其轉換為 Flux
。如果不存在這樣的 MessageSource
,而我們必須手動編寫用於要為其編寫響應式 style supplier 的系統的資料基本檢索程式碼怎麼辦?對於這些情況,我們可以使用 Project Reactor 提供的各種工具,然後透過程式設計方式將資料提供給它們。總而言之,當我們編寫響應式 streaming supplier 時,我們必須將資料作為 Flux
返回。
讓我們為這個響應式 supplier 新增一個單元測試。我們可以使用RFC 4287 - Atom 聯合格式中描述的 Atom 訂閱源示例作為測試資料。將其包含在 src/test/resources
中。
這是測試類。
@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
"feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {
@Autowired
Supplier<Flux<Message<SyndEntry>>> feedSupplier;
@Test
public void testFromSampleRssFile() {
final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();
StepVerifier.create(messageFlux)
.assertNext((message) -> {
assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
assertThat(message.getPayload().getContents().size()).isEqualTo(1);
assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
})
.thenCancel()
.verify();
}
@SpringBootApplication
static class FeedSupplierTestApplication {
}
}
functions 專案在一個 Maven BOM 中聚合了所有可用的函式。將 feed-supplier
新增到此 BOM 中。如果你基於此函式生成 Spring Cloud Stream 應用程式,則主要需要這樣做。
在這個過程的當前階段,我們可以向倉庫提交包含我們 supplier 的 pull request,但如果想從 supplier 生成基於 Spring Cloud Stream binder 的應用程式,請繼續閱讀。生成後,這些應用程式可以獨立執行,也可以作為 Spring Cloud Data Flow 中資料編排管道的一部分執行。
請在 applications/source
下建立一個名為 feed-source
的新模組。正如我們在之前的部落格中提到的,java.util.function.Supplier
被對映為 Spring Cloud Stream Source。
我們不需要在我們的 feed supplier 之上新增任何自定義程式碼,因為它本身就可以使用。然而,既然我們正在討論 Spring Cloud Stream 應用程式,我們需要結合 supplier 函式使用測試 binder,以瞭解 supplier 如何與 Spring Cloud Stream 一起工作。
我們可以使用現有的 sources 之一作為模板來指導我們完成整個過程。我們甚至可以複製其中一個並逐步進行更改。
所有應用程式都使用了父 pom stream-applications-core,它引入了所有必要的測試依賴,例如上面提到的測試 binder。它還提供了負責生成基於 binder 應用程式的應用程式生成器外掛的基礎設施。
我們想強調的一點是,除非應用程式模組包含自定義程式碼,否則此模組僅成為一個生成基於 binder 的應用程式的應用程式生成器。換句話說,我們不會向其中新增一個帶有 @SpringBootApplication
的類,而是為我們生成的。
新增以下依賴以使用測試 binder 進行測試
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
現在我們可以新增一個測試來驗證 feed-supplier
在 Spring Cloud Stream 中是否與測試 binder 一起工作。基本上,我們需要確保 supplier 透過測試 binder 生成資料,並將資料傳送到測試 binder 上的目的地。
這是測試背後的基本思路
public class FeedSourceTests {
@Test
public void testFileSource() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(FeedSourceTestApplication.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
assertThat(title).isEqualTo("Atom draft-07 snapshot");
}
}
@SpringBootApplication
@Import(FeedSupplierConfiguration.class)
public static class FeedSourceTestApplication {
}
}
該測試與我們為 supplier 新增的單元測試大致相似,但存在一個很大的區別。在 supplier 中,我們是直接呼叫它並驗證生成的資料。在這裡,我們不直接呼叫 supplier,而是由 Spring Cloud Stream 中的繫結機制自動為我們完成。我們從 outbound 目的地接收資料,然後進行驗證。
測試通過後,就可以生成應用程式了。
預設情況下,該外掛為 Spring Cloud Stream 中的 Kafka 和 Rabbit binder 生成應用程式。這在 stream-applications-core
的父 pom 中配置。如果需要為不同的 binder 定製生成,我們需要在那裡進行更改。下面是應用程式生成器外掛的配置。
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>feed</name>
<type>source</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
</generatedApp>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-composite-function-support</artifactId>
<version>${stream-apps-core.version}</version>
</dependency>
</dependencies>
</configuration>
</plugin>
讓我們快速回顧一些細節。我們要求外掛建立一個名為 feed-source
的應用程式,並希望它使用我們上面開發的 Supplier
作為主配置類。在外掛的 dependencies 部分中,我們還需要新增應用程式所需的任何依賴,在本例中是 feed-supplier
。我們需要將所有 processor 函式新增到所有生成的 source 應用程式中。這是因為我們可以將 source 與其他 processors 組合,而無需將它們作為單獨的微服務執行,正如我們在之前的部落格中看到的那樣。關於 source 與 processors 的函式組合的更多詳細資訊也請參見此處。這就是為什麼我們在外掛的 dependencies 部分中新增 stream-applications-composite-function-support
依賴的原因。
構建應用程式模組,我們將在 apps
資料夾中看到基於 binder 的應用程式。它們將命名為 feed-source-kafka
和 feed-source-rabbit
。我們可以進入其中任何一個應用程式並構建它,然後將其用作獨立應用程式或作為 Spring Cloud Data Flow 管道的一部分。
在這篇部落格文章中,我們看到了開發、測試和貢獻 supplier/Spring Cloud Stream 應用程式組合的整個過程。請按照本文概述的步驟編寫您自己的 suppliers 和 sources。如果您已經完成了,請考慮將其貢獻回倉庫。
本文是本系列部落格的第三篇,該系列將涵蓋許多相關主題。在接下來的幾周內,敬請期待更多深入探討和重點主題。在本系列的下一篇部落格中,我們將像本文介紹如何編寫新的 Supplier 和 Source 一樣,編寫一個 Consumer 函式,然後從中生成一個 Sink 應用程式。