領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這是部落格系列文章的第三部分,我們將介紹用於 Spring Cloud Stream 應用程式的 Java 函式。
本系列的其他部分。
在本系列的最後兩篇部落格中,我們對將所有現有 Spring Cloud Stream App Starters 遷移到函式以及如何 組合它們 這一新舉措進行了 概覽。在本部落格中,我們繼續該系列,展示如何開發、測試這些函式並將其用於生成 Spring Cloud Stream 應用程式。特別是,我們在此重點介紹如何編寫一個 supplier 函式(實現 java.util.function.Supplier),然後為 Spring Cloud Stream 生成相應的源應用程式。
為了讓這個概念更深入人心,我們將採用一個用例並實現一個解決方案來滿足它。
我們需要一個函式,當使用正確的配置呼叫時,它會以 atom、rss 等格式提供部落格訂閱源的內容。我們需要支援兩種 supplier 呼叫模型——一種是我們以程式設計方式呼叫函式(例如,在 FaaS 環境中呼叫的 REST 端點),另一種是流式 supplier,一旦有訂閱源可用,我們就會獲得持續的訂閱流。我們希望基於 ROME 庫 構建這些 supplier,這是一個流行的訂閱聚合庫。我們將從非 Spring 開發者和 Spring 開發者兩個角度來看待這個問題。
讓我們假設您不是 Spring 開發者,也不熟悉 Spring Integration,它已經提供了 ROME 的抽象。在這種情況下,我們當然可以直接使用 ROME 來生成訂閱記錄。例如,這是一個有效的此場景的 Supplier。
public Supplier<SyndEntry> feedSupplier()
{
return () -> {
//Use the ROME framework directly to produce syndicated entries.
}
}
這樣做的好處是,我們可以在不瞭解 Spring 的情況下開發 supplier,並且可以直接部署到無伺服器環境,使用該環境提供的抽象或依賴 Spring Cloud Function 等框架。
這基本上意味著,如果您是一位 Java 開發者,但 Spring Framework 技能不多,您仍然可以使用 java.util.function 包中定義的介面(例如 Function、Supplier 和 Consumer)來編寫函式,只需提供業務邏輯即可。然後,我們可以將我們開發的這個 artifact 轉換成一個 Spring Cloud Stream 應用程式,方法是新增一個 Spring Cloud Stream binder 依賴並將其轉換為 SpringBootApplication。透過提供一些配置屬性,如中介軟體目標,我們立即獲得了將應用程式部署到 Spring Cloud Data Flow 等平臺的附加價值,該平臺將應用程式作為資料管道的一部分進行編排。這樣,我們編寫的函式完全獨立於任何 Spring 依賴,並且僅在部署過程的最後才引入 Spring Cloud Stream、Spring Cloud Function 和 Spring Cloud Data Flow 等 Spring 元件。下圖捕捉了這一理念。

正如我們所觀察到的,函式元件可以獨立呼叫,也可以在將其轉換為 Spring Cloud Stream 應用程式後作為 Spring Cloud Data Flow 管道的一部分進行呼叫。
雖然上述模型可能是一個很好的起點,但當我們開始直接使用 ROME 框架時,我們可能會很快意識到它涉及到大量的繁重工作和更深入的庫知識。出錯的可能性很高,因此我們需要編寫大量的測試來驗證我們的自定義實現是否按預期工作,並且所有邊緣情況都已涵蓋。我們開始懷疑是否已經有更簡單的抽象可用。這樣我們就無需編寫任何 ROME 特定程式碼,因為抽象層會處理所有複雜性。幸運的是,我們有一個解決方案。Spring Integration 為許多企業技術提供了許多入站和出站介面卡。feed 介面卡 就是其中之一,其實現基於 ROME。事實上,我們預打包的 stream-applications 中提供的許多函式元件都基於 Spring Integration 介面卡。這些介面卡已被廣泛使用並經過大量實戰測試,適用於大量的企業用例。但是,我們想要編寫 supplier 的技術可能在 Spring Integration 中不可用。在這種情況下,如上所述,我們當然可以自己編寫程式碼並從 supplier 中呼叫它。
如果您還沒有這樣做,請分叉並克隆 流應用程式倉庫。然後在 functions/supplier 下建立一個新的 feed-supplier 模組。使用現有 supplier 之一作為模板進行指導。
在專案中新增以下 Spring Integration Feed 介面卡依賴。這會引入 Spring Integration 中的 feed 介面卡以及其他傳遞性依賴。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
現在我們有了核心依賴,讓我們開始編寫一些程式碼。由於這些函式預計將在 Spring Boot 環境中使用,我們需要建立一個 ConfigurationProperties 類來驅動 supplier 函式的配置。它可能看起來像這樣。
package org.springframework.cloud.fn.supplier.feed;
@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {
/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;
/**
* Feed url.
*/
private URL feedUrl;
// rest is omitted
}
正如我們所看到的,我們在所有屬性上都使用了 feed.supplier 字首。
接下來,讓我們建立一個基於 Spring 的配置類,其中提供所有必要的元件。我們將逐步構建它。下面是該類的基本結構。
package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {
}
將這些欄位新增到類中。
private final ConcurrentMetadataStore metadataStore;
private final Resource resource;
private final FeedSupplierProperties feedSuppplierProperties;
關於這些欄位的快速說明。Spring Integration 中的 Feed 介面卡提供了一種不讀取我們已經從 Feed 中讀取的相同條目的功能。我們上面定義的 metadataKey 屬性就是用於此目的的。它的實現方式是使用元資料儲存。有 各種元資料儲存可用 於流行的資料庫。包含以下依賴以獲取記憶體中的簡單元資料儲存。
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>metadata-store-common</artifactId>
<version>${project.version}</version>
</dependency>
請注意,此要求特定於此 supplier,並非所有 supplier 都可能需要它。
如果無法透過 HTTP(或 HTTPS)URL 訪問(我們可以透過配置屬性設定),使用者可以提供 Resource bean 來讀取訂閱源。
讓我們新增一個建構函式來使用這些欄位。
FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
ConcurrentMetadataStore metadataStore,
@Nullable Resource resource) {
this.feedSuppplierProperties = feedSupplierProperties;
this.metadataStore = metadataStore;
this.resource = resource;
}
Resource 可以為空,因為我們通常可以直接將 URL 字串作為配置屬性傳遞,而無需提供 Resource bean。
Spring Integration Feed 介面卡提供了 FeedEntryMessageSource,它是一個 MessageSource 實現。我們將在我們的 supplier 中使用這個訊息源。讓我們將其設定為 Spring Bean。下面的程式碼非常直觀。
@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
this.feedSuppplierProperties.getMetadataKey()) :
...
return feedEntryMessageSource;
}
現在我們已經準備好 MessageSource bean,編寫一個簡單的 Supplier 並透過呼叫其 get 方法以程式設計方式呼叫它相對來說是微不足道的。就是這樣。
@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
return () -> feedEntryMessageSource().receive();
}
我們可以將這個 Supplier bean 注入到我們的應用程式中,並以程式設計方式呼叫 get 方法。當這個 Supplier 在 Spring Cloud Stream 應用程式中使用時(我們稍後會看到),它將使用 Spring Cloud Stream 提供的 預設輪詢器,該輪詢器預設每秒觸發一次 supplier。此排程可以在輪詢器中更改。
非響應式輪詢解決方案看起來不錯,但我們可能會問,如果我不想每隔一段時間顯式輪詢,而是希望資料在訊息源中可用時立即以流式方式獲取,該怎麼辦?嗯,我們有一個解決方案——開發一個響應式 supplier,一旦資料可用,就立即傳遞接收到的資料。讓我們看看詳細資訊。
在這裡,Spring Integration 再次提供了一些抽象,我們可以使用它們將我們的 FeedEntryMessageSource 轉換為響應式釋出器,如下所示。
@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}
您可能會注意到,這個 supplier 返回的是 Flux<Message<SyndEntry>>,而不是初始非響應式 supplier 中所示的 Message<SyndEntry>,在初始非響應式 supplier 中,我們依賴於對 supplier 的程式設計呼叫或其他輪詢機制。
好的,我們有一個來自 Spring Integration 的 MessageSource,並且我們可以使用該實用方法將其轉換為 Flux,這很好。如果沒有這樣的 MessageSource,並且我們必須為我們想要編寫響應式風格 supplier 的系統手工製作基本資料檢索,那該怎麼辦?對於這些情況,我們可以使用 Project Reactor 提供的各種設施,然後以程式設計方式向它們提供資料。底線是,當我們編寫一個響應式流式 supplier 時,我們必須將資料作為 Flux 返回。
讓我們為這個響應式 supplier 新增一個單元測試。我們可以使用 RFC 4287 - Atom 聚合格式 中描述的 atom 訂閱源示例作為我們的測試資料。將其包含在 src/test/resources 中。
這是測試類。
@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
"feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {
@Autowired
Supplier<Flux<Message<SyndEntry>>> feedSupplier;
@Test
public void testFromSampleRssFile() {
final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();
StepVerifier.create(messageFlux)
.assertNext((message) -> {
assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
assertThat(message.getPayload().getContents().size()).isEqualTo(1);
assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
})
.thenCancel()
.verify();
}
@SpringBootApplication
static class FeedSupplierTestApplication {
}
}
函式專案將所有可用函式聚合到 Maven BOM 中。將 feed-supplier 新增到此 BOM 中。如果您基於此函式生成 Spring Cloud Stream 應用程式,則主要需要此功能。
在此過程的這一點上,我們可以向倉庫提交一個包含我們的 supplier 的拉取請求,但如果我們要從 supplier 生成基於 Spring Cloud Stream binder 的應用程式,請繼續閱讀。生成後,這些應用程式可以獨立執行,也可以作為 Spring Cloud Data Flow 中資料編排管道的一部分執行。
繼續在 applications/source 下建立一個名為 feed-source 的新模組。正如我們在之前的部落格中提到的,java.util.function.Supplier 被對映為 Spring Cloud Stream Source。
我們不需要在 feed supplier 上新增任何自定義程式碼,因為它可以按原樣使用。但是,既然我們正在討論 Spring Cloud Stream 應用程式,我們需要使用 測試繫結器 和 supplier 函式來檢視 supplier 如何與 Spring Cloud Stream 配合使用。
我們可以使用 現有源 之一作為模板來指導我們完成整個過程。我們甚至可以複製其中一個並逐步進行更改。
所有應用程式都使用父 pom stream-applications-core,它帶來了所有必要的測試依賴,例如上面提到的測試繫結器。它還提供了應用程式生成器外掛的基礎設施,該外掛負責生成基於繫結器的應用程式。
我們想強調的一點是,除非應用程式模組包含自定義程式碼,否則該模組只會成為一個應用程式生成器,用於生成基於繫結器的應用程式。換句話說,我們不會向其新增帶有 @SpringBootApplication 的類,而是由它為我們生成。
新增以下依賴項以使用測試繫結器進行測試
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
現在我們可以新增一個測試,以驗證 feed-supplier 是否與 Spring Cloud Stream 中的測試繫結器一起工作。基本上,我們需要確保 supplier 透過測試繫結器生成資料,並將其傳遞到測試繫結器上的目標。
這是測試背後的總體思路
public class FeedSourceTests {
@Test
public void testFileSource() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(FeedSourceTestApplication.class))
.web(WebApplicationType.NONE)
.run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
assertThat(title).isEqualTo("Atom draft-07 snapshot");
}
}
@SpringBootApplication
@Import(FeedSupplierConfiguration.class)
public static class FeedSourceTestApplication {
}
}
該測試與我們為 supplier 新增的單元測試大致相似,但有一個很大的不同。在 supplier 中,我們直接呼叫它並驗證生成的資料。在這裡,我們不直接呼叫 supplier,而是由 Spring Cloud Stream 中的繫結機制自動為我們完成。我們從出站目的地接收資料,然後進行驗證。
一旦測試透過,就該我們生成應用程式了。
預設情況下,該外掛會為 Spring Cloud Stream 中的 Kafka 和 Rabbit 繫結器生成應用程式。這在 stream-applications-core 中的父 pom 中配置。如果我們需要為不同的繫結器定製生成,我們需要在那裡進行這些更改。下面是應用程式生成器外掛的配置。
<plugin>
<groupId>org.springframework.cloud.stream.app.plugin</groupId>
<artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
<configuration>
<generatedApp>
<name>feed</name>
<type>source</type>
<version>${project.version}</version>
<configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
</generatedApp>
<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>feed-supplier</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-composite-function-support</artifactId>
<version>${stream-apps-core.version}</version>
</dependency>
</dependencies>
</configuration>
</plugin>
讓我們快速回顧一下這裡的細節。我們要求外掛建立一個名為 feed-source 的應用程式,並希望它使用我們上面開發的 Supplier 作為主要配置類。在外掛的依賴項部分,我們還需要新增應用程式所需的任何依賴項,在本例中是 feed-supplier。我們需要在所有生成的源應用程式中新增所有處理器函式。這是因為我們可以將源與其他處理器組合,而無需它們作為獨立的微服務執行,正如我們在 之前的部落格 中看到的那樣。有關使用處理器進行函式組合的更多詳細資訊,也可以在 這裡 找到。這就是為什麼我們在外掛的依賴項部分添加了依賴項 stream-applications-composite-function-support。
構建應用程式模組,我們將在 apps 資料夾中看到基於繫結器的應用程式。它們將被命名為 feed-source-kafka 和 feed-source-rabbit。我們可以轉到其中任何一個應用程式並構建它,然後將其用作獨立應用程式,或作為 Spring Cloud Data Flow 上管道的一部分。
在這篇部落格文章中,我們看到了開發、測試和貢獻 supplier/Spring Cloud Stream 應用程式組合的整個過程。請遵循此處所述的步驟來編寫您自己的 supplier 和 source。如果您已經完成了,請考慮將它們貢獻回倉庫。
這篇部落格是系列文章中的第三篇,將涵蓋許多相關主題。在接下來的幾周裡,請期待更多深入的、專注的主題。在本系列的下一篇部落格中,與我們在這篇文章中編寫新 Supplier 和 Source 所做類似,我們將編寫一個 Consumer 函式,然後從中生成一個 Sink 應用程式。