測試 Spring Cloud Stream 應用程式 - 第 1 部分

工程 | David Turanski | 2020 年 12 月 15 日 | ...

這篇博文是一個系列的一部分,該系列探討了基於 Java 函式的全新重新設計的 Spring Cloud Stream 應用程式。本期分為兩部分,探討了用於實現流應用程式的函式測試策略。我們將特別關注與外部資源整合的函式,這帶來了額外的測試挑戰。大多數預打包的源和接收器應用程式都屬於這種情況。為了說明這一點,我們將透過一個示例 couchbase-sink 應用程式進行演示。在第 1 部分中,我們將重點介紹接收器所基於的核心函式。在第 2 部分中,我們將討論如何為應用程式編寫測試。

以下是此部落格系列的所有先前條目。

測試注意事項

函式和應用程式

對於基於函式的流應用程式,核心功能以函式的形式暴露。預構建的 Spring Cloud Stream 應用程式的核心功能被打包為單獨的元件,以便它們可以被任何應用程式使用,而與 Spring Cloud Stream 無關。Spring Cloud Stream 本地支援 Java 函式,並將繫結到實現核心 java.util.function 型別之一的任何 bean:ConsumerSupplierFunction。作為一個單獨的元件,函式不需要依賴 Spring 或任何其他東西。如果您在任何包含 Spring Cloud Stream 繫結器作為依賴項的應用程式中將任何函式註冊為 bean,Spring Cloud Stream 將其繫結到配置的訊息目的地。

在資料管道中,資料流源自 Source 並流入 Sink,其間有零個或多個處理步驟。實際上,Source 充當來自某些外部資源(例如資料儲存、支援標準協議的任何服務或訊息代理)的資料 Supplier。Sink 充當用於其他外部資料的 Consumer。由於 Spring 為大多數常用外部資源提供一流支援,因此大多數預打包的 Source 和 Sink 都依賴於 Spring Integration、Spring Data 和 Spring Boot 的某種組合也就不足為奇了。此外,它們旨在透過 @ConfigurationProperties 配置用於許多環境、領域和用例。儘管這些函式本身不是 Spring Boot 應用程式,但它們必須匯入到 Spring Boot 應用程式中才能執行。

由於所有核心功能都由函式實現,我們希望將大部分測試工作集中在這個層面。為了確保我們的函式在所有預期成功和錯誤條件下都能正常執行,我們需要編寫測試來涵蓋這些場景。這些測試需要建立自動配置的應用程式上下文並提供或模擬所需的外部資源。

如果該函式可以透過 @ConfigurationProperties 配置,那麼我們可以將每個屬性組合視為一個不同的測試用例。有些屬性是必需的,有些是可選的。由於使用該函式需要終端使用者提供這些屬性,因此預期場景包括有效和無效配置,例如缺少必需屬性、無效值或無效組合(互斥屬性)。

單元測試與整合測試

這裡沒有廣泛接受的定義可以幫助我們。尤其是在源和接收器中,其核心功能*就是*整合,很難知道在單元測試和整合測試之間劃清界限。一方面,Java 函式是一個*單元*,因為它是一個單一的介面。但是,如果其唯一目的是與遠端系統整合,那麼獨立測試它就很難,甚至不可能。但是,我認為我們可以在一些一般特徵上達成一致

單元測試

  • 作為任何開發人員或 CI 環境中構建的一部分自動執行,無需任何外部配置

  • 速度合理

  • 由開發人員編寫並頻繁執行

整合測試

  • 在整合環境中自動執行

  • 需要部署被測試的元件以及外部依賴項

  • 可能很慢

  • 執行頻率較低

根據單元測試的這個定義,第 1 部分是關於單元測試函式的。

測試容器

Testcontainers 是一個近期流行且流行的 Java 庫,它允許您以程式設計方式啟動和銷燬任何可以在 Docker 容器中執行的外部資源。它包含數十個用於常用資源的開箱即用模組。您還可以使用該庫透過 Dockerfiles 或 docker-compose yaml 以程式設計方式建立自定義容器。雖然主要用於整合測試,但在模擬需要大量精力時,它對於編寫單元測試非常有用。當然,我們必須犧牲一些速度並放寬“無外部依賴”規則,以允許在主機上安裝和執行 Docker 守護程式。由於當今許多開發和 CI 環境已經需要使用和構建映象,這是一個合理的假設。

示例

Couchbase 消費者函式

為了說明,我們將編寫一個 Couchbase 消費者函式,使用 *upsert* 操作向 Couchbase 鍵值儲存新增一些資料。

為了提高效率,我們將使用 Couchbase Java 客戶端的響應式 API 實現該函式。此 API 返回 MutationResult 的釋出者,因此我們的核心介面是 Function<Flux<Message<?>>, Flux<MutationResult>>。此函式將使用 Spring 進行配置,並且可以嵌入到任何 Spring Boot 應用程式中。為了支援 couchbase-sink,我們將函式包裝在 Consumer<Flux<Message<?>>> 中。

upsert 操作在 Bucket 中插入或更新資料,Bucket 是 Couchbase 的主要資料儲存抽象。在我們的案例中,是一個 ReactiveBucket。Bucket 透過名稱指定,並且必須事先存在於 Couchbase 叢集中。從 v6.5 開始,Couchbase 支援 Collections。因此,Bucket 可以分割槽為許多集合,但這是一個可選功能,必須在叢集中啟用。upsert 方法針對命名集合或 defaultCollection

我們將鍵和值透過 Spring Message 傳遞給我們的函式,它由一個有效載荷和頭部組成。有效載荷可以是任何物件,頭部本質上是一個 Map。為了使此函式通用,我們可以使用 SpEL 表示式來指定鍵。鍵表示式針對 Message 進行評估,可以引用有效載荷中的欄位或方法,或者引用一個頭部。值是有效載荷。該函式還要求使用者指定一個桶和集合名稱。為了最大限度地提高靈活性,讓我們在 SpEL 上加倍努力,將所有內容都設為表示式。現在,如果需要,該函式可以在執行時從訊息中提取所有輸入值,以在任何桶中的任何集合中 upsert 任何資料。在最簡單的情況下,桶和集合可以靜態定義。

所以函式需要一些配置屬性

@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
    private static final String DEFAULT_VALUE_EXPRESSION = "payload";
    private final SpelExpressionParser parser = new SpelExpressionParser();

   /**
    * A SpEL expression to specify the bucket.
    */
    private Expression bucketExpression;

   /**
      * A SpEL expression to specify the key.
     */
    private Expression keyExpression;

  /**
    * A SpEL expression to specify the collection.
    */
    private Expression collectionExpression;

  /**
    * A SpEL expression to specify the value (default is payload).
    */
    private Expression valueExpression =
                parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
    ...

提示

要靜態配置這些值中的一些,請使用文字表達式,將值用單引號括起來,例如 couchbase.consumer.bucketExpression='mybucket'。通常,您會從訊息內容中提取鍵和值。

我們用 Spring 配置響應式函式和相應的消費者

@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {

    private static Logger logger =
            LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);

    @Bean
    public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
                Flux<MutationResult>> couchbaseConsumerFunction) {
        return message -> couchbaseConsumerFunction.apply(message)
               .subscribe(mutationResult -> logger.debug("Processed " + message));
    }

    @Bean
    public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
          Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
        return flux -> flux.flatMap(message -> {
            logger.debug("Processing message " + message);
             String bucketName = bucket(message,
                          consumerProperties.getBucketExpression());
            String key = key(message, consumerProperties.getKeyExpression());
            ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
             ReactiveCollection collection = collection(message,
                            consumerProperties.getCollectionExpression())
				  .map(name -> bucket.collection(name))
                                  .orElse(bucket.defaultCollection());
            return collection.upsert(key,
                              value(message, consumerProperties.getValueExpression()));
        });
    }

    private String bucket(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private String key(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);
    }

    private Object value(Message<?> message, Expression expression) {
        return expression.getValue(message);
    }

    private Optional<String> collection(Message<?> message,
                                             @Nullable Expression expression) {
        return expression == null ? Optional.empty() :
                Optional.of(expression.getValue(message, String.class));
    }
}

這兩個類就是我們實現該函式所需的全部。所需的依賴項是

<dependency>
    <groupId>com.couchbase.client</groupId>
    <artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>config-common</artifactId>
</dependency>

如前所述,這不是一個 Spring Boot 應用程式,而是一個必須嵌入到 Spring Boot 應用程式中才能執行的元件。Spring Boot 綁定了 @ConfigurationPropeties,還提供了 CouchbaseAutoConfiguration

注意

本例不使用 spring-data-couchbase,因為它旨在用於使用 Spring Data 儲存庫和自動對映特定域物件。由於我們的函式旨在處理任何有效載荷型別,我們使用 boot 來自動配置 Cluster 以及 Couchbase Java SDK。

那麼我們是如何得到一個真正有效的函式的呢?上面給出的示例程式碼是測試驅動開發的結果,經過多次迭代改進。由於該函式依賴於執行所有工作的 Couchbase SDK Cluster 物件,因此我們需要在執行任何操作之前建立一個 Cluster 例項。Cluster 需要連線到 Couchbase 伺服器。如果我們的網路上已經有一個 Couchbase 叢集正在執行,並且有一個我們可以用於測試的 Bucket,那麼我們最初可能會使用它。但是,即使我們假設 Couchbase 可以從我們的開發和 CI 環境中訪問,如果由於某種原因我們無法連線到 Couchbase(叢集已關閉、憑據過期、許可權更改或其他原因)會發生什麼?我們是否希望這會破壞我們的 CI/CD 管道或阻止我們的進展?

幸運的是,我們可以使用 Testcontainers couchbase 模組來啟動我們自己的 Couchbase 環境。

注意

完全披露:我也嘗試了CouchbaseMock,但它似乎與當前的couchbase Java 客戶端不相容。

Junit 5 所需的測試庫是

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>couchbase</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

要在我們的 Junit 5 測試類中使用 Testcontainers,我們首先使用配置了名為 test 的桶的 Couchbase 容器。

@Testcontainers
public class CouchbaseConsumerTests {

	@Container
	static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
			.withBucket(new BucketDefinition("test"));

@Testcontainers 註解為帶有 @Container 註解的欄位啟用生命週期管理。在這裡,我們將 CouchbaseContainer 宣告為 static,因此 TestContainers 將在測試執行前啟動一次容器,並在測試後將其移除。這是一件好事,因為啟動容器需要幾秒鐘。

注意

此外,請檢視 Playtika Testcontainers for Spring Boot。這是一個有趣的專案,它使用 Spring Boot 抽象“嵌入式”服務以自動配置 Testcontainer。這需要您首選的 org.springframework.cloud:spring-cloud-starter 版本。如果您使用的 Spring Cloud 版本與 Spring Boot 2.4+ 相容,則需要設定 "spring.cloud.bootstrap.enabled=true"。示例未使用此庫,因為 Spring bean 無法宣告為 static,因此我們必須為每個測試啟動一個新的容器例項。無論如何,Testcontainers 非常易於使用。

如上所述,不同的屬性配置代表不同的測試用例。Spring Boot 在應用程式啟動時從其屬性源繫結屬性。因此,我們需要為要測試的每種屬性組合建立一個新的應用程式上下文。我們在 stream-applications 倉庫中看到了幾種不同的策略

  • 建立一個抽象的 @SpringBootTest 以配置 @SpringBootApplication 測試上下文和共享配置屬性。為每個測試用例建立一個用 @TestPropertySource 註解的子類,如此處所示。

  • 使用 ApplicationContextRunner 為每個測試用例建立一個新的 ApplicationContext,如此處所示。

  • 使用 SpringApplicationBuilder 為每個測試用例建立一個新的 ApplicationContext,如此處所示。

您使用哪一個主要取決於個人選擇。示例函式的測試使用 ApplicationContextRunner,它預先配置了測試容器提供的所需引導 Couchbase 連線屬性。Testcontainers 的一個優點是它按預期暴露標準埠,將每個暴露的埠對映到隨機可用埠。Couchbase testContainer 包含 getConnectionString(),這是 Couchbase 特有的。通常,您可以根據需要使用 container.getMappedPort(int originalPort)

提示

使用隨機 TCP 埠對於自動化測試至關重要,因為 1) 您不知道給定環境中可能正在使用哪些埠 2) 構建工具通常並行執行測試。這通常會導致由於靜態定義而導致的埠不可用錯誤。

@Testcontainers
public class CouchbaseConsumerTests {

    @Container
    static CouchbaseContainer container =
            new CouchbaseContainer("couchbase/server:6.6.0")
                   .withBucket(new BucketDefinition("test"));

	private ApplicationContextRunner applicationContextRunner;

    @BeforeEach
    void setup() {
        applicationContextRunner = new ApplicationContextRunner()
            .withUserConfiguration(TestConfig.class)
            .withPropertyValues(
                 "spring.couchbase.connection-string=" +
                                                container.getConnectionString(),
                 "spring.couchbase.username=" + container.getUsername(),
                 "spring.couchbase.password=" + container.getPassword());
    }

我們使用 TestConfig.class 啟動應用程式上下文,我們將其作為內部類提供

@SpringBootApplication
static class TestConfig {
    @Autowired
    Cluster cluster;

   @PreDestroy
    public void destroy() {
        cluster.disconnect();
    }
}

在許多情況下,這可以是一個帶有 @SpringBootApplication 註解的空類,以觸發屬性繫結和任何所需的自動配置——本例中是 CouchbaseAutoConfiguration。在這裡,我們斷開與叢集的連線,以防止在上下文關閉時出現多餘的堆疊跟蹤。

對於這些測試,我們將建立一個簡單的 User 型別,包含姓名和電子郵件地址,我們可以將其用作鍵

@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
	private String name;

	private String email;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}

	public User() {
	}

	public User(String name, String email) {
		this.name = name;
		this.email = email;
	}
    ...

現在我們準備測試我們的函數了。由於該函式是響應式的,我們將使用 reactor-test 庫中的 StepVerifier 來驗證返回的 Flux 的內容。我們從最簡單的成功路徑場景開始:upsert 一個單獨的使用者,提供最少所需的配置:桶名稱和鍵表示式。我們將構建一個帶有 User 有效載荷的 Message。要將使用者儲存到 test 桶的預設集合中,並使用使用者的電子郵件作為鍵,我們只需提供桶名稱作為文字並將鍵表示式設定為 payload.email。這些屬性需要使用 CouchbaseConsumerProperties 中配置的 couchbase.consumer 字首。至少,這是預期的行為。我們無法確定所有這些是否有效,直到我們能夠驗證在呼叫函式後,資料存在於資料儲存中。我們直接使用 Couchbase API 檢索資料並斷言內容是我們預期的。

@Test
void singleUpsert() {
   applicationContextRunner.withPropertyValues(
           "couchbase.consumer.bucketExpression='test'",
            "couchbase.consumer.keyExpression=payload.email")
      .run(context -> {
           CouchbaseConsumerProperties properties =
                    context.getBean(CouchbaseConsumerProperties.class);
           String bucketName = properties.getBucketExpression().getValue(String.class);
           Cluster cluster = context.getBean(Cluster.class);
           Function<Flux<Message<?>>, Flux<MutationResult>>
                 couchbaseConsumerFunction =
                       context.getBean("couchbaseConsumerFunction", Function.class);
           StepVerifier.create(couchbaseConsumerFunction.apply(
               Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
            .expectNextMatches(mutationResult ->
                   mutationResult.mutationToken().get().bucketName().equals(bucketName))
            .verifyComplete();

        User saved = cluster.bucket(bucketName).defaultCollection()
                                   .get("[email protected]").contentAs(User.class);
       assertThat(saved.getName()).isEqualTo("David");
  });
}

有了之前展示的函式實現,當我們在 IDE 中執行測試時,我們高興地看到綠色。實際上,我們首先需要一個這樣的測試來編寫函式。這就是為什麼我們對這個簡單的測試投入了大量思考和精力。我們還想測試應用多個物件,併為值和桶設定自定義表示式。我們可能還想檢查屬性類中的 Java 驗證註解。

@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
    return keyExpression;
}

我忘了,註解是放在 getter 還是 setter 上?我們真的需要 @Validated 類註解嗎?讓我們找出答案。如果忘記設定 couchbase.consumer.keyExpression,我們應該在堆疊跟蹤的某個地方得到一個異常訊息 'keyExpression is required'。如果沒有,那麼我們做錯了。幸運的是,spring-boot-starter-test 為我們提供了測試所需的一切,包括 Assertj(一個用於斷言的流式 DSL)、Mockito 和 Junit 5。

@Test
void keyExpressionRequired() {
  assertThatExceptionOfType(RuntimeException.class).isThrownBy(
   () -> applicationContextRunner.withPropertyValues(
      "couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
    .havingRootCause()
    .withMessageContaining("'keyExpression' is required");
}

到我們完成時,我們將編寫的行數是實現函式所需行數的兩倍以上,並且可能花費的時間也超過兩倍。但是,這項工作非常值得,因為它為我們提供了函式在常見場景下按預期執行的證據,並在重構或新增新功能時提供了防止引入迴歸的保護。完整的測試請參閱此處。我很高興地說,我的 IDE 報告的覆蓋率超過 90%。

結論

測試主題的第 1 部分到此結束。在這篇文章中,我們探討了測試與外部資源(例如 Couchbase)整合的函式的策略。我們還展示了 TestContainers 庫對於測試分散式系統元件的實用性,尤其是在模擬、存根或嵌入式伺服器不切實際時。第 2 部分將涵蓋基於函式的流應用程式的單元測試和整合測試。

敬請期待…​

感謝您的光臨!我們希望您覺得此內容有所幫助。在本系列結束之前,我們還有幾篇文章。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有