測試 Spring Cloud Stream 應用 - 第 2 部分

工程 | David Turanski | 2020 年 12 月 15 日 | ...

這是測試 Stream 應用的第 2 部分。在第 1 部分中,我們為示例 couchbase-sink 應用實現了核心函式並進行了測試。函式級別的測試涵蓋了預期的成功和錯誤場景,並依賴於 Testcontainers 來提供 Couchbase 叢集。本文假設您已閱讀第 1 部分,並將從上次中斷的地方繼續。

Couchbase Sink

在第 1 部分中,我們驗證了為將資料插入(upsert)到 Couchbase 而編寫的函式按預期工作。現在我們可以使用該函式(公開為 java.util.Consumer)來實現一個 sink,用於構建 Spring Cloud Stream 資料管道。與大多數預打包的 Stream 應用一樣,我們只需將函式配置嵌入到 Spring Boot 應用中。與預打包的應用不同的是,預打包應用會生成針對 Kafka 和 RabbitMQ 配置的相同應用,而我們將自行建立使用 Kafka binder 的應用。

這是主要應用類

@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
	public static void main(String... args) {
		new SpringApplication(CouchbaseSinkApplication.class).run(args);
	}
}

我們還需要新增一些依賴項:函式、Spring Cloud Stream 和 Kafka binder。

<dependency>
        <groupId>io.spring.example</groupId>
        <artifactId>couchbase-consumer</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

而且,既然我們是自行建立的,我們可以在 application.properties 中設定一些必需的屬性。由於 couchbase-consumer 包含 2 個候選函式,我們需要告訴 Spring Cloud Stream 使用 Consumer 包裝器。此外,我們將預設 consumer input binding 名稱 couchbaseConsumer-in-0 別名為 input,以便該 sink 可以與 Spring Cloud Data Flow 一起使用。

spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input

就這樣!至少我們是這麼認為的。我們怎麼能確定呢?毫不奇怪,我們需要的測試型別類似於函式級別的測試。但我們並非真的需要執行每個測試用例,因為我們已經知道函式在具有各種屬性設定的 Boot 應用中將如何表現。但是,我們尚未實際透過 Spring Cloud Stream 呼叫該函式。而且,成本也不是很高,因為我們可以重用為函式編寫的大部分測試程式碼。所以我們只需要一個“冒煙測試”來執行正常路徑,以確保我們沒有遺漏某些必需的依賴項,或者我們的配置屬性沒有拼寫錯誤,或者現在或將來升級某些依賴項時沒有意外。這裡我們像測試函式時一樣,配置了一個 Couchbase TestContainer。但我們不是直接呼叫函式,而是透過將訊息傳送到為 sink 配置的輸入目標,讓 Spring Cloud Stream 來完成。對於此測試,我們使用 TestChannelBinder,這是由以下依賴項提供的記憶體中 binder。

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <type>test-jar</type>
        <classifier>test-binder</classifier>
        <scope>test</scope>
</dependency>

我們使用 TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class) 將 TestChannelBinder 新增到測試的應用上下文中。這為我們提供了一個 InputDestination bean,用於向 sink 傳送訊息。與函式測試一樣,我們使用 Cluster 物件來驗證 Couchbase 中是否存在資料。由於 upsert 操作是非同步的,我們需要輪詢資料儲存一段時間,直到資料存在。 awaitility 庫非常適合測試非同步系統。在這種情況下,我們將等待 10 秒,然後假設操作失敗。

@Testcontainers
public class CouchbaseSinkApplicationTests {
  @Container
  static CouchbaseContainer container =
          new CouchbaseContainer("couchbase/server:6.6.0")
             .withBucket(new BucketDefinition("test"));

  static Map<String, Object> connectProperties = new HashMap<>();

  @BeforeAll
  static void initialize() {
    connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
    connectProperties.put("spring.couchbase.username", container.getUsername());
    connectProperties.put("spring.couchbase.password", container.getPassword());
  }

  @Test
  void test() {
    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
      TestChannelBinderConfiguration
        .getCompleteConfiguration(CouchbaseSinkApplication.class))
	.web(WebApplicationType.NONE)
        .properties(connectProperties)
        .run("--couchbase.consumer.bucketExpression='test'",
               "--couchbase.consumer.keyExpression=payload.email")) {
        InputDestination inputDestination = context.getBean(InputDestination.class);
        Cluster cluster = context.getBean(Cluster.class);
        inputDestination.send(new GenericMessage<>(
           new User("Bart Simpson", "[email protected]")));

       await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
          User user = cluster.bucket("test")
                .defaultCollection().get("[email protected]")
                .contentAs(User.class);
         assertThat(user).isNotNull();
         assertThat(user.getName()).isEqualTo("Bart Simpson");
       });
     }
  }
}

整合測試

至此,我們在應用和函式測試之間有了良好的測試覆蓋率。但我們尚未驗證我們想要構建和部署的應用二進位制檔案在真正的整合環境中是否工作。由於 sink 應用使用 Kafka binder,整合測試環境需要一個 Kafka broker、一個 Couchbase 叢集和我們部署的應用。我們可以直接部署和執行 Spring Boot 可執行 jar。現在,它更常見的是容器映象。

一般來說,假設構建為容器的 sink 會工作風險不大,但我們至少要確保我們知道如何配置應用以使用外部 Kafka broker 和 Couchbase 叢集,以及我們正確地構建了映象。

對於預構建的 Spring Cloud Stream 應用,我們有進一步的理由測試構建的 artifact。核心應用不提供任何附加程式碼。相反,我們使用 spring-cloud-dataflow-apps-generator-plugin 自動生成可以與 Kafka 或 RabbitMQ 一起執行的相同應用。該外掛需要 Maven 配置,我們手動為每個應用新增。僅僅因為我們的函式與 TestChannelBinder 一起工作,我們不能確定構建的 artifact 工作正常,直到我們執行它。 apps generator 外掛配置錯誤、外掛本身的更改、基礎映象或任何依賴項的更改都可能導致問題。Testcontainers 和 Junit 5 為我們提供了一種相對直接的方式來整合測試預構建的應用與 Kafka 和 RabbitMQ。為了幫助我們編寫整合測試,我們在 stream-applications-test-suport 中提供了額外的支援。透過新增以下依賴項,社群可以使用此庫。

<dependency>
    <groupId>org.springframework.cloud.stream.app</groupId>
    <artifactId>stream-applications-test-support</artifactId>
    <scope>test</scope>
</dependency>

該示例包含一個整合測試,用於測試構建的映象,在本例中是使用 Spring Boot Maven plugin 構建的。就像應用測試一樣,我們只需插入 Kafka、Couchbase 和我們的映象,通電,並確保我們沒有看到或聞到任何煙味。

完整的整合測試是

@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {

  static StreamAppContainer sink =
        new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");

  @Container
  static CouchbaseContainer container =
      new CouchbaseContainer("couchbase/server:6.6.0")
          .withNetwork(KafkaConfig.kafka.getNetwork())
          .withNetworkAliases("couchbase-server")
          .withBucket(new BucketDefinition("test"));

  static Cluster cluster;

  @Autowired
  TestTopicSender testTopicSender;

  @BeforeAll
  static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
          .withLogConsumer(appLog("couchbase-sink"))
          .withCommand(
            "--spring.couchbase.connection-string=couchbase://couchbase-server",
            "--spring.couchbase.username=" + container.getUsername(),
            "--spring.couchbase.password=" + container.getPassword(),
            "--couchbase.consumer.bucket-expression='test'",
            "--couchbase.consumer.key-expression=payload.email")
          .start();

    cluster = Cluster.connect(container.getConnectionString(),
    ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
  }
  @AfterAll
  static void stop() {
    sink.stop();
  }

  @Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }
}

為了解釋這一點,讓我們從 @KafkaStreamAppTest 類註解開始。這會啟動一個 Kafka test container,並使用 Spring for Apache Kafka 配置 Kafka 元件,我們可以使用它們與 Kafka 生產和消費訊息。Kafka 容器在靜態初始化器中啟動,這使得它成為真正的單例,允許在 JVM 中執行的每個測試都使用它。除了 Spring 配置之外,該註解還包含 @TestContainers 作為元註解。對於此測試,我們不讓 Testcontainers 管理 StreamAppContainer 的生命週期,因為我們希望在知道 Couchbase 叢集正在執行後才啟動它。Couchbase 容器有一些附加配置。為了方便起見,它與 StreamAppContainer 共享一個虛擬網路(自動配置為使用與 Kafka 容器相同的網路)。這允許 Stream App Container 使用我們選擇的別名 couchbase-server 連線到 Couchbase 伺服器(記住,容器內的 localhost 指的是它自己的 IP 地址)。

@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
        .withNetwork(KafkaConfig.kafka.getNetwork())
        .withNetworkAliases("couchbase-server")
        .withBucket(new BucketDefinition("test"));

StreamAppContainer 是一個 GenericContainer,包含連線到 Kafka 和使用 Kafka binder 的所需配置。Spring 配置還設定了一個監聽器,用於在已知 topic 上消費來自容器的任何輸出。在此情況下未使用,因為我們只有一個 sink 的輸入。輸入目標是隨機生成的,並透過 getInputDestination() 訪問。

static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...

@BeforeAll
static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
            .withLogConsumer(appLog("couchbase-sink"))
            .withCommand(
                    "--spring.couchbase.connection-string=couchbase://couchbase-server",
                    "--spring.couchbase.username=" + container.getUsername(),
                    "--spring.couchbase.password=" + container.getPassword(),
                    "--couchbase.consumer.bucket-expression='test'",
                    "--couchbase.consumer.key-expression=payload.email")
            .start();

Couchbase 容器執行後,我們將啟動 sink。我們等待標準的 Spring Boot 啟動訊息以確認 sink 已啟動。我們還添加了一個 LogConsumer 來輸出所有日誌訊息,以防出現錯誤。請注意,連線字串僅使用 Couchbase 容器的網路別名。這是可能的,因為 sink 和 Couchbase 使用相同的虛擬網路。這裡,我們在命令列上傳遞所有屬性,但我們也可以透過 withEnvironment() 將它們設定為環境變數。由於我們控制 sink 的生命週期,我們需要在所有測試完成後停止它。

該測試使用自動裝配的 TestTopicSender。這是一個與中介軟體無關的介面,在本例中由 KafkaTemplate 支援。該介面對於執行 Kafka 和 Rabbit 的相同測試用例非常有用。這裡,我們也可以自動裝配 KafkaTemplate。在撰寫本文時,Kafka template 僅配置了 String serdes,因此我們使用 ObjectMapper 來處理 String。

@Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }

由於此測試需要 sink 映象,因此我們使用 Junit 5 的 @Tag 註解將其標記為整合測試。我們還配置了 Maven,將其從正常構建中排除,並且僅在設定了 integration profile 時才構建映象並執行。完整的原始碼位於此處,需要 Java 8+ 和 Docker。

結論

在本文中,我們探討了測試與外部服務(如 Couchbase)整合的 Spring Cloud Stream 應用的策略。大部分測試(在第 1 部分中描述)是在函式級別完成的。應用和整合測試實際上是冒煙測試,用於驗證我們正確構建、配置和集成了所有內容。我們還展示瞭如何使用 TestContainers 測試 Stream 應用。

敬請關注……

感謝閱讀!希望您覺得這些內容有幫助。本系列還有幾篇文章。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助力您的進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部