取得進步
VMware 提供培訓和認證,助您突飛猛進。
瞭解更多本文是探討基於 Java Function 的全新設計的 Spring Cloud Stream 應用的系列部落格的一部分。本期文章分為兩部分,探討用於實現流應用的 Function 的測試策略。我們將特別關注與外部資源整合的 Function,這會帶來額外的測試挑戰。大多數預打包的 Source 和 Sink 應用都屬於這種情況。為了說明這一點,我們將講解一個示例 couchbase-sink 應用。在第 1 部分中,我們將重點介紹 Sink 所基於的核心 Function。在第 2 部分中,我們將探討如何為該應用編寫測試。
以下是本部落格系列中的所有先前文章。
對於基於 Function 的流應用,核心功能會暴露為一個 Function。預構建的 Spring Cloud Stream 應用的核心 Function 被打包為獨立的元件,以便它們可以被任何應用使用,而無需依賴 Spring Cloud Stream。Spring Cloud Stream 原生支援 Java Function,並將繫結到任何實現了核心 java.util.function
型別之一的 Bean:Consumer
、Supplier
或 Function
。作為一個獨立元件,該 Function 不需要依賴 Spring 或任何其他東西。如果你在任何包含 Spring Cloud Stream 繫結器(binder)依賴的應用中將任何 Function 註冊為 Bean,Spring Cloud Stream 都會將其繫結到一個配置的訊息目的地(message destination)。
在資料管道中,資料流起源於 Source 並流入 Sink,中間可能包含零個或多個處理步驟。實際上,Source 充當從某些外部資源(如資料儲存、支援標準協議的任何服務或訊息代理)提供資料的 Supplier。Sink 充當消費資料到其他外部資源的 Consumer。由於 Spring 為大多數常用外部資源提供了首屈一指的支援,因此預打包的 Source 和 Sink 大多數依賴於 Spring Integration、Spring Data 和 Spring Boot 的某種組合也就不足為奇了。此外,它們被設計為透過 @ConfigurationProperties
配置,以適應許多環境、領域和用例。雖然這些 Function 本身不是 Spring Boot 應用,但它們必須被匯入到 Spring Boot 應用中才能執行。
由於所有核心功能都由 Function 實現,我們希望將大部分測試工作集中在此層面。為了確保我們的 Function 在所有預期的成功和錯誤條件下都能正常工作,我們需要編寫測試來覆蓋這些場景。這些測試需要建立一個自動配置的應用上下文,並準備或模擬所需的外部資源。
如果 Function 可以透過 @ConfigurationProperties
進行配置,那麼我們可以將每種屬性組合視為一個不同的測試用例。有些屬性是必需的,有些是可選的。由於使用該 Function 需要終端使用者提供這些屬性,因此預期的場景包括有效和無效配置,例如缺少必需的屬性、無效值或無效組合(互斥屬性)。
這裡沒有廣泛接受的定義來幫助我們。特別是對於 Source 和 Sink 來說,其核心功能就是整合,因此很難區分單元測試和整合測試的界限。一方面,Java Function 是一個單元,因為它是一個單一的介面。然而,如果它的唯一目的是與遠端系統整合,那麼就很難(如果不是不可能)進行獨立測試。不過,我認為我們可以就一些通用特徵達成一致。
單元測試
作為構建的一部分,在任何開發者或 CI 環境中自動執行,無需任何外部配置
速度合理
由開發者編寫並頻繁執行
整合測試
在整合環境中自動執行
需要部署被測元件以及外部依賴
可能速度較慢
執行頻率較低
根據這個單元測試的定義,第 1 部分是關於 Function 的單元測試。
Testcontainers 是一個近期流行起來的 Java 庫,它允許你以程式設計方式啟動和銷燬任何可在 Docker 容器中執行的外部資源。它包含了數十個針對常用資源的開箱即用模組。你還可以使用該庫透過程式設計、Dockerfile 或 docker-compose yaml 建立自定義容器。雖然它主要用於整合測試,但在模擬需要花費大量精力時,它對於編寫單元測試也非常有用。當然,我們不得不犧牲一些速度,並放寬“無外部依賴”的規則,以允許主機上安裝並執行 Docker 守護程序。考慮到許多開發和 CI 環境現在已經要求使用和構建映象,這是一個合理的假設。
為了說明問題,我們將編寫一個 Couchbase consumer Function,使用 upsert 操作向 Couchbase 鍵值儲存新增一些資料。
為了提高效率,我們將使用 Couchbase Java 客戶端的響應式 API 來實現該 Function。這個 API 返回一個 MutationResult 的 Publisher,因此我們的核心介面是 Function<Flux<Message<?>>, Flux<MutationResult>>
。這個 Function 將使用 Spring 進行配置,並且可以嵌入到任何 Spring Boot 應用中。為了支援 couchbase-sink
,我們將把這個 Function 包裝到一個 Consumer<Flux<Message<?>>>
中。
upsert
操作在 Bucket 中插入或更新資料,Bucket 是 Couchbase 主要的資料儲存抽象。在我們的例子中,是一個 ReactiveBucket
。Bucket 是按名稱指定的,並且必須事先存在於 Couchbase 叢集中。從 v6.5 開始,Couchbase 支援 Collections。因此 Bucket 可以被劃分為多個 Collection,但這一個可選功能,必須在叢集中啟用。upsert
方法針對的是一個命名的 Collection 或 defaultCollection。
我們將 key 和 value 透過 Spring Message 傳遞給 Function,Message 由 payload 和 headers 組成。payload 可以是任何物件,而 headers 本質上是一個 Map。為了使此 Function 具有通用性,我們可以使用 SpEL 表示式來指定 key。key 表示式會針對 Message 進行求值,可以引用 payload 中的欄位或方法,或者 header 中的值。value 就是 payload。Function 還要求使用者指定 bucket 和 collection 的名稱。為了最大限度地提高靈活性,我們進一步使用 SpEL,將所有內容都變為表示式。現在,如果需要,該 Function 可以在執行時從 message 中提取所有輸入值,以便在任何 bucket 的任何 collection 中 upsert 任何資料。在最簡單的情況下,bucket 和 collection 可以靜態定義。
因此 Function 需要一些配置屬性
@ConfigurationProperties("couchbase.consumer")
@Validated
public class CouchbaseConsumerProperties {
private static final String DEFAULT_VALUE_EXPRESSION = "payload";
private final SpelExpressionParser parser = new SpelExpressionParser();
/**
* A SpEL expression to specify the bucket.
*/
private Expression bucketExpression;
/**
* A SpEL expression to specify the key.
*/
private Expression keyExpression;
/**
* A SpEL expression to specify the collection.
*/
private Expression collectionExpression;
/**
* A SpEL expression to specify the value (default is payload).
*/
private Expression valueExpression =
parser.parseExpression(DEFAULT_VALUE_EXPRESSION);
...
提示
要靜態配置其中一些值,請使用字面量表達式,將值用單引號括起來,例如 couchbase.consumer.bucketExpression='mybucket'
。通常,你會從 message 內容中提取 key 和 value。
我們使用 Spring 配置響應式 Function 和相應的 Consumer
@Configuration
@EnableConfigurationProperties(CouchbaseConsumerProperties.class)
public class CouchbaseConsumerConfiguration {
private static Logger logger =
LoggerFactory.getLogger(CouchbaseConsumerConfiguration.class);
@Bean
public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
Flux<MutationResult>> couchbaseConsumerFunction) {
return message -> couchbaseConsumerFunction.apply(message)
.subscribe(mutationResult -> logger.debug("Processed " + message));
}
@Bean
public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
return flux -> flux.flatMap(message -> {
logger.debug("Processing message " + message);
String bucketName = bucket(message,
consumerProperties.getBucketExpression());
String key = key(message, consumerProperties.getKeyExpression());
ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
ReactiveCollection collection = collection(message,
consumerProperties.getCollectionExpression())
.map(name -> bucket.collection(name))
.orElse(bucket.defaultCollection());
return collection.upsert(key,
value(message, consumerProperties.getValueExpression()));
});
}
private String bucket(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private String key(Message<?> message, Expression expression) {
return expression.getValue(message, String.class);
}
private Object value(Message<?> message, Expression expression) {
return expression.getValue(message);
}
private Optional<String> collection(Message<?> message,
@Nullable Expression expression) {
return expression == null ? Optional.empty() :
Optional.of(expression.getValue(message, String.class));
}
}
這兩個類是實現 Function 所需的全部內容。所需的依賴項是
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
</dependency>
<!-- Enable configuration properties metadata to be added to the jar -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>config-common</artifactId>
</dependency>
如前所述,這不是一個 Spring Boot 應用,而是一個必須嵌入到 Spring Boot 應用中才能執行的元件。Spring Boot 負責繫結 @ConfigurationPropeties
,並提供 CouchbaseAutoConfiguration。
注意
本例未使用 spring-data-couchbase,因為它旨在用於 Spring Data Repository 和自動對映特定領域物件。由於我們的 Function 旨在處理任何 payload 型別,我們使用 Boot 來自動配置 Cluster 以及 Couchbase Java SDK。
那麼我們是如何得到一個真正可以工作的 Function 的呢?上面的示例程式碼是測試驅動開發的成果,經過多次迭代 refinement。由於 Function 依賴於執行所有工作的 Couchbase SDK Cluster
物件,因此在做任何事情之前,我們需要建立一個 Cluster 例項。Cluster 需要連線到 Couchbase 伺服器。如果碰巧我們的網路上已經有一個正在執行的 Couchbase 叢集,並且有一個可用於測試的 bucket,那麼我們最初可能會使用它。但是,即使我們假設 Couchbase 可以從我們的開發和 CI 環境訪問,如果由於某種原因無法連線到 Couchbase 會發生什麼情況——叢集宕機、憑證過期、許可權更改或其他原因?我們想讓這些問題破壞我們的 CI/CD 流水線或阻止我們的進展嗎?
幸運的是,我們可以使用 Testcontainers 的 couchbase 模組來啟動我們自己的 Couchbase 環境。
注意
完全披露:我也嘗試過 CouchbaseMock,但它似乎與當前的 couchbase Java 客戶端不相容。
Junit 5 所需的測試庫是
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>couchbase</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
為了在我們的 Junit 5 測試類中使用 Testcontainers,我們首先啟動一個配置了名為 test
的 bucket 的 Couchbase 容器。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
@Testcontainers
註解為使用 @Container
註解的欄位啟用生命週期管理。這裡,我們將 CouchbaseContainer
宣告為 static
,這樣 TestContainers 將在測試執行前啟動容器一次,並在測試後移除它。這是一件好事,因為啟動容器需要幾秒鐘。
注意
還可以看看 Playtika Testcontainers for Spring Boot。這是一個有趣的專案,它使用 Spring Boot 抽象化“嵌入式”服務以自動配置 Testcontainer。這需要你選擇的 org.springframework.cloud:spring-cloud-starter
版本。如果你使用的 Spring Cloud 版本與 Spring Boot 2.4+ 相容,你需要設定 "spring.cloud.bootstrap.enabled=true"
。示例未使用此庫,因為 Spring bean 不能宣告為 static
,所以我們必須為每個測試啟動一個新的容器例項。無論如何,Testcontainers 非常易於使用。
如上所述,不同的屬性配置代表不同的測試用例。Spring Boot 在應用啟動時會繫結屬性源中的屬性。因此,我們需要為我們想要測試的每種屬性組合建立一個新的應用上下文。我們在 stream-applications 倉庫中看到幾種不同的策略:
建立一個抽象的 @SpringBootTest
來配置 @SpringBootApplication
測試上下文和共享配置屬性。為每個測試用例建立一個子類,用 @TestPropertySource
進行註解,如此處所示。
使用 ApplicationContextRunner 為每個測試用例建立一個新的 ApplicationContext
,如此處所示。
使用 SpringApplicationBuilder 為每個測試用例建立一個新的 ApplicationContext
,如此處所示。
你使用哪種方法很大程度上取決於個人選擇。示例 Function 的測試使用了 ApplicationContextRunner
,預先配置了測試容器提供的必需的 Boot Couchbase 連線屬性。Testcontainers 的一個不錯的功能是它按預期暴露標準埠,並將每個暴露的埠對映到隨機的可用埠。Couchbase testContainer 包含 getConnectionString()
方法,它是 Couchbase 特有的。通常,你可以根據需要使用 container.getMappedPort(int originalPort)
。
提示
使用隨機 TCP 埠對於自動化測試至關重要,因為 1) 你不知道給定環境中可能使用了哪些埠 2) 構建工具通常並行執行測試。當靜態定義埠時,這經常會導致埠不可用錯誤。
@Testcontainers
public class CouchbaseConsumerTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
private ApplicationContextRunner applicationContextRunner;
@BeforeEach
void setup() {
applicationContextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfig.class)
.withPropertyValues(
"spring.couchbase.connection-string=" +
container.getConnectionString(),
"spring.couchbase.username=" + container.getUsername(),
"spring.couchbase.password=" + container.getPassword());
}
我們使用 TestConfig.class
啟動應用上下文,我們將其作為內部類提供
@SpringBootApplication
static class TestConfig {
@Autowired
Cluster cluster;
@PreDestroy
public void destroy() {
cluster.disconnect();
}
}
在許多情況下,這可以是一個空類,只需使用 @SpringBootApplication
進行註解,以觸發屬性繫結和任何必需的自動配置——在本例中是 CouchbaseAutoConfiguration
。在這裡,我們斷開與叢集的連線,以防止在上下文關閉時出現多餘的堆疊跟蹤。
對於這些測試,我們將建立一個簡單的 User
型別,包含 name 和 email 地址,我們可以使用 email 地址作為 key。
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
private String name;
private String email;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public User() {
}
public User(String name, String email) {
this.name = name;
this.email = email;
}
...
現在我們準備測試我們的 Function。由於 Function 是響應式的,我們將使用 reactor-test
庫中的 StepVerifier
來驗證返回的 Flux 的內容。我們從最簡單的 happy path 場景開始:upsert 一個 User,提供最少必需的配置:Bucket 名稱和 key 表示式。我們將構建一個包含 User
payload 的 Message
。要將 User 儲存到 test
Bucket 的預設 Collection 中,使用 User 的 email 作為 key,我們只需要將 Bucket 名稱作為字面量提供,並將 key 表示式設定為 payload.email
。這些屬性需要使用在 CouchbaseConsumerProperties
中配置的 couchbase.consumer
字首。至少,這是預期的行為。在我們能夠驗證呼叫 Function 後資料是否存在於資料儲存中之前,我們無法確定所有這些是否都能正常工作。我們直接使用 Couchbase API 來檢索資料,並斷言其內容符合我們的預期。
@Test
void singleUpsert() {
applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucketExpression='test'",
"couchbase.consumer.keyExpression=payload.email")
.run(context -> {
CouchbaseConsumerProperties properties =
context.getBean(CouchbaseConsumerProperties.class);
String bucketName = properties.getBucketExpression().getValue(String.class);
Cluster cluster = context.getBean(Cluster.class);
Function<Flux<Message<?>>, Flux<MutationResult>>
couchbaseConsumerFunction =
context.getBean("couchbaseConsumerFunction", Function.class);
StepVerifier.create(couchbaseConsumerFunction.apply(
Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
.expectNextMatches(mutationResult ->
mutationResult.mutationToken().get().bucketName().equals(bucketName))
.verifyComplete();
User saved = cluster.bucket(bucketName).defaultCollection()
.get("[email protected]").contentAs(User.class);
assertThat(saved.getName()).isEqualTo("David");
});
}
當我們在 IDE 中執行測試時,看到綠色的結果,我們欣喜若狂,Function 也按照之前所示的方式實現了。實際上,我們首先就需要這樣的測試來編寫 Function。這就是為什麼我們在這個簡單的測試中投入了大量的思考和努力。我們還想測試應用多個物件,併為 value 和 bucket 設定自定義表示式。我們可能還想檢查屬性類中的 Java 驗證註解。
@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
return keyExpression;
}
我忘了,註解是放在 getter 上還是 setter 上?我們真的需要 @Validated
類註解嗎?讓我們來查一下。如果我們忘記設定 couchbase.consumer.keyExpression
,應該會在堆疊跟蹤中的某個地方看到異常訊息 'keyExpression is required'
。如果沒有,那我們就做錯了。幸運的是,spring-boot-starter-test
提供了我們測試所需的一切,包括用於斷言的流式 DSL Assertj、Mockito 和 Junit 5。
@Test
void keyExpressionRequired() {
assertThatExceptionOfType(RuntimeException.class).isThrownBy(
() -> applicationContextRunner.withPropertyValues(
"couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
.havingRootCause()
.withMessageContaining("'keyExpression' is required");
}
等我們完成時,編寫的程式碼行數將是實現 Function 所需的兩倍多,耗時也可能超過兩倍。但這些努力非常值得,因為它們證明了 Function 在常見場景下的行為符合預期,並且可以在重構或新增新功能時防止引入迴歸問題。完整的測試程式碼在此處。我很高興地告訴大家,我的 IDE 報告測試覆蓋率超過 90%。
本文結束了測試主題的第 1 部分。在本文中,我們探討了測試與外部資源(如 Couchbase)整合的 Function 的策略。我們還展示了 TestContainers 庫在測試分散式系統元件方面的巨大作用,尤其是在使用 mock、stub 或嵌入式伺服器不切實際的情況下。第 2 部分將涵蓋基於 Function 的流應用的單元測試和整合測試。
感謝您的閱讀!希望您覺得這篇文章對您有所幫助。在本系列結束前,我們還有幾篇文章。