領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這是測試流應用程式的第 2 部分。在第 1 部分中,我們實現了並測試了示例 couchbase-sink 應用程式所需的核心功能。函式級別的測試涵蓋了預期的成功和錯誤場景,並依賴 Testcontainers 來配置 Couchbase 叢集。本文假設您已閱讀第 1 部分,並從第 1 部分結束的地方繼續。
在第一部分中,我們驗證了為將資料 upsert 到 Couchbase 中而編寫的函式按預期工作。我們現在可以使用該函式(它以 java.util.Consumer 的形式公開)來實現一個接收器,以在用 Spring Cloud Stream 構建的資料管道中使用。像大多數預打包的流應用程式一樣,我們只需將函式配置嵌入到 Spring Boot 應用程式中。與為 Kafka 和 Rabbit 生成相同配置的預打包應用程式不同,我們將自己編寫一個應用程式來使用 Kafka 繫結器。
這是主應用程式類
@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
public static void main(String... args) {
new SpringApplication(CouchbaseSinkApplication.class).run(args);
}
}
我們還需要新增一些依賴項:函式、Spring Cloud Stream 和 Kafka 繫結器。
<dependency>
<groupId>io.spring.example</groupId>
<artifactId>couchbase-consumer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
而且,由於我們是自己編寫的,我們可以在 application.properties 中設定一些必需的屬性。由於 couchbase-consumer 包含 2 個候選函式,我們需要告訴 Spring Cloud Stream 使用 Consumer 包裝器。此外,我們將預設的消費者輸入繫結名稱 couchbaseConsumer-in-0 別名為 input,以便接收器與 Spring Cloud Data Flow 協同工作。
spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input
就是這樣!至少我們是這麼認為的。我們怎麼能確定呢?我們需要進行的測試(不出所料)類似於函式級測試。但我們並不真正需要執行每個測試用例,因為我們已經知道函式在具有各種屬性設定的引導應用程式中會如何表現。但我們尚未透過 Spring Cloud Stream 實際呼叫該函式。此外,由於我們可以重用為函式編寫的大部分測試程式碼,因此成本並不高。所以我們只需要一個“冒煙測試”來執行正常路徑,以確保我們沒有遺漏任何必需的依賴項,或者我們的配置屬性中沒有拼寫錯誤,或者現在沒有,或者將來升級某些依賴項時也沒有任何陷阱。在這裡,我們配置了一個 Couchbase TestContainer,就像我們測試函式時所做的那樣。但我們不會直接呼叫函式,而是讓 Spring Cloud Stream 在我們將訊息傳送到為接收器配置的輸入目的地時呼叫它。對於此測試,我們使用 TestChannelBinder,這是一個由以下依賴項提供的記憶體中繫結器
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
我們使用 TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class) 將 TestChannelBinder 新增到我們測試的應用程式上下文中。這為我們提供了一個 InputDestination bean,用於向接收器傳送訊息。與函式測試一樣,我們使用 Cluster 物件來驗證資料是否存在於 Couchbase 中。由於 upsert 操作是非同步的,我們需要輪詢資料儲存一段時間,直到資料存在。 awaitility 庫非常適合測試非同步系統。在這種情況下,我們將等待 10 秒,然後假設操作失敗。
@Testcontainers
public class CouchbaseSinkApplicationTests {
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withBucket(new BucketDefinition("test"));
static Map<String, Object> connectProperties = new HashMap<>();
@BeforeAll
static void initialize() {
connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
connectProperties.put("spring.couchbase.username", container.getUsername());
connectProperties.put("spring.couchbase.password", container.getPassword());
}
@Test
void test() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(CouchbaseSinkApplication.class))
.web(WebApplicationType.NONE)
.properties(connectProperties)
.run("--couchbase.consumer.bucketExpression='test'",
"--couchbase.consumer.keyExpression=payload.email")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
Cluster cluster = context.getBean(Cluster.class);
inputDestination.send(new GenericMessage<>(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
User user = cluster.bucket("test")
.defaultCollection().get("[email protected]")
.contentAs(User.class);
assertThat(user).isNotNull();
assertThat(user.getName()).isEqualTo("Bart Simpson");
});
}
}
}
至此,我們在應用程式和函式測試之間已經有了良好的測試覆蓋率。但我們尚未驗證我們想要構建和部署的應用程式二進位制檔案是否能在真正的整合環境中工作。由於接收器應用程式使用 Kafka 繫結器,因此整合測試環境需要一個 Kafka 代理、一個 Couchbase 叢集和我們部署的應用程式。我們可以直接部署和執行 Spring Boot 可執行 jar。如今,它更常是一個容器映象。
通常,假設作為容器構建的接收器會工作並沒有太大風險,但我們至少希望確保我們知道如何配置應用程式以使用外部 Kafka 代理和 Couchbase 叢集,以及我們是否正確構建了映象。
對於預構建的 Spring Cloud Stream 應用程式,我們有進一步的理由來測試構建的工件。核心應用程式不提供任何額外的程式碼。相反,我們使用 spring-cloud-dataflow-apps-generator-plugin 自動生成相同的應用程式,這些應用程式可以與 Kafka 或 RabbitMQ 一起執行。該外掛需要 Maven 配置,我們為每個應用程式手動新增。僅僅因為我們的函式適用於 TestChannelBinder,我們不能確定構建的工件能夠工作,直到我們執行它。錯誤配置應用程式生成器外掛、外掛本身的更改或基礎映象,或任何依賴項都可能導致問題。Testcontainers 和 Junit 5 為我們提供了一種相對直接的方式來整合測試預構建的應用程式,包括 Kafka 和 RabbitMQ。為了幫助我們編寫整合測試,我們在 stream-applications-test-suport 中提供額外的支援。該庫透過新增依賴項向社群開放
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>stream-applications-test-support</artifactId>
<scope>test</scope>
</dependency>
該示例包含一個整合測試,用於測試構建的映象,在本例中是使用 Spring Boot Maven 外掛 構建的。與應用程式測試一樣,我們將只插入 Kafka、Couchbase 和我們的映象,然後通電,確保我們看不到或聞不到任何煙霧。
完整的整合測試是
@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {
static StreamAppContainer sink =
new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
@Container
static CouchbaseContainer container =
new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
static Cluster cluster;
@Autowired
TestTopicSender testTopicSender;
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
cluster = Cluster.connect(container.getConnectionString(),
ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
}
@AfterAll
static void stop() {
sink.stop();
}
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
}
要解讀這一點,讓我們從 @KafkaStreamAppTest 類註解開始。這將啟動一個 Kafka 測試容器,並使用 Spring for Apache Kafka 配置 Kafka 元件,我們可以使用它們來生成和消費 Kafka 訊息。Kafka 容器在靜態初始化器中啟動,這使其成為一個真正的單例,允許在 JVM 中執行的每個測試都使用它。除了 Spring 配置之外,該註解還包含 @TestContainers 作為元註解。對於此測試,我們不讓 Testcontainers 管理 StreamAppContainer 的生命週期,因為我們希望在知道 Couchbase 叢集正在執行後才啟動它。Couchbase 容器有一些額外的配置。為方便起見,它與 StreamAppContainer 共享一個虛擬網路(自動配置為使用與 Kafka 容器相同的網路)。這允許 Stream App Container 使用我們選擇的別名 couchbase-server 連線到 Couchbase 伺服器(請記住,容器內的 localhost 指的是它自己的 IP 地址)。
@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
.withNetwork(KafkaConfig.kafka.getNetwork())
.withNetworkAliases("couchbase-server")
.withBucket(new BucketDefinition("test"));
StreamAppContainer 是一個 GenericContainer,它具有連線到 Kafka 並使用 Kafka 繫結器所需的配置。Spring 配置還會在已知主題上設定一個監聽器,以消費來自容器的任何輸出。在這種情況下,這並未被使用,因為我們只有一個接收器的輸入。輸入目標是隨機生成的,並透過 getInputDestination() 訪問。
static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...
@BeforeAll
static void initialize() {
await().until(() -> container.isRunning());
String connectionString = "couchbase://couchbase-server";
sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
.withLogConsumer(appLog("couchbase-sink"))
.withCommand(
"--spring.couchbase.connection-string=couchbase://couchbase-server",
"--spring.couchbase.username=" + container.getUsername(),
"--spring.couchbase.password=" + container.getPassword(),
"--couchbase.consumer.bucket-expression='test'",
"--couchbase.consumer.key-expression=payload.email")
.start();
Couchbase 容器執行後,我們將啟動接收器。我們等待標準的 Spring Boot 啟動訊息以確認接收器已啟動。我們還添加了一個 LogConsumer,以便在出現錯誤時輸出所有日誌訊息。請注意,連線字串只是使用 Couchbase 容器的網路別名。這是可能的,因為接收器和 Couchbase 使用相同的虛擬網路。在這裡,我們透過命令列傳遞所有屬性,但我們也可以透過 withEnvironment() 將它們設定為環境變數。由於我們控制接收器生命週期,因此需要在所有測試完成後停止它。
該測試使用自動裝配的 TestTopicSender。這是一個與中介軟體無關的介面,在本例中由 KafkaTemplate 支援。此介面對於為 Kafka 和 Rabbit 執行相同的測試用例非常有用。在這裡,我們也可以自動裝配 KafkaTemplate。在撰寫本文時,Kafka 模板只配置了 String 序列化器/反序列化器,因此我們使用 ObjectMapper 來處理 String。
@Test
void test() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
testTopicSender.send(sink.getInputDestination(),
objectMapper.writeValueAsString(
new User("Bart Simpson", "[email protected]")));
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ExistsResult result = cluster.bucket("test")
.defaultCollection().exists("[email protected]");
assertThat(result.exists()).isTrue();
});
User user = objectMapper.readValue(
cluster.bucket("test").defaultCollection().get("[email protected]")
.contentAs(String.class), User.class);
assertThat(user.getName()).isEqualTo("Bart Simpson");
}
由於此測試需要接收器映象,因此我們使用 Junit 5 @Tag 註解將其標記為整合測試。我們還將 Maven 配置為將其從正常構建中排除,並且僅當設定了 integration 配置檔案時才構建映象並執行它。完整的原始碼在此處,需要 Java 8+ 和 Docker。
在這篇文章中,我們探討了測試與外部服務(如 Couchbase)整合的 Spring Cloud Stream 應用程式的策略。大部分測試(在第一部分中描述)是在函式級別完成的。應用程式和整合測試實際上是冒煙測試,用於驗證我們已正確構建、配置和整合所有內容。我們還展示瞭如何使用 TestContainers 測試流應用程式。
感謝您的到來!我們希望這些內容對您有所幫助。本系列還有幾篇文章才會結束。