建立用於消費資料和生成 Spring Cloud Stream Sink 應用的函式

工程 | Soby Chacko | 2020 年 8 月 3 日 | ...

這是關於 Spring Cloud Stream 應用的 Java 函式系列部落格的第 4 部分。

系列中的其他部分。

第 1 部分 - 一般介紹

第 2 部分 - 函式組合

第 3 部分 - Supplier 函式和 Source 應用

在本系列的上一篇部落格中,我們探討了如何使用 java.util.function.Supplier 來生成 Spring Cloud Stream source。在本新版中,我們將看到如何使用 java.util.function.Consumerjava.util.function.Function 開發和測試消費函式。稍後,我們將簡要解釋如何從該 consumer 生成 Spring Cloud Stream sink 應用。

編寫 Consumer

編寫 consumer 的思路相對簡單。我們從外部源消費資料,並將其傳遞給 consumer 中的業務邏輯。正如我們在之前的部落格中看到的 Supplier 一樣,操作發生在業務邏輯實現內部。如果我們使用庫來幫助我們完成所有繁重的工作,例如 Spring Integration,那麼就只需透過適當的 API 將接收到的資料委託給庫即可。然而,如果沒有可用的此類庫,我們需要自己編寫所有程式碼。讓我們舉一個具體的例子來演示這一點。

為 Apache Pulsar 編寫 Consumer

Apache Pulsar 是一個流行的訊息中介軟體系統。讓我們假設一下,我們要編寫一個通用的 Java Consumer,它從某個地方接收資料,然後將其轉發到 Pulsar。不去深究細節,下面是一個實現此目的的簡單 Consumer。基本實現程式碼取自這裡

@Bean
public org.apache.pulsar.client.api.Producer producer() {
  String pulsarBrokerRootUrl = "pulsar://:6650";
  PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
  String topic = "persistent://sample/standalone/ns1/my-topic";
  return client.createProducer(topic);
}

@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
  return payload -> {
     producer.send(payload);
  };
}

再次強調,這僅用於說明目的,可能不是將資料傳送到 Apache Pulsar 的完整實現。儘管如此,這演示了我們想要傳達的概念。檢視 consumer,我們可以看到程式碼很簡單;我們在 lambda 表示式中所做的只是呼叫 Apache Pulsar Producer 上的 send 方法。

我們可以將上述 consumer 注入到應用程式中,並透過程式設計方式呼叫其 accept 方法,提供資料。正如我們在上一篇部落格中看到的,下圖展示了獨立執行函式或作為資料編排管道一部分執行在諸如 Spring Cloud Data Flow 等平臺上的想法。

Stream Applications Layered Architecture for Functions

好的,我們可能會認為那個 consumer 相當簡單。那麼如果我們要做的事情有點複雜呢?下面我們將這樣做。

編寫 RSocket 消費函式

RSocket 是一種雙向二進位制協議,Spring Framework 為其提供了出色的支援。RSocket 提供了一種即發即忘(fire and forget)模型,允許我們向 RSocket 伺服器傳送訊息而不接收響應。我們希望使用 TCP 為此模型編寫一個 consumer,該 consumer 接收外部資料,然後將其推送到 RSocket 伺服器。RSocket 的Java 實現基於Project Reactor。因此,當我們編寫 consumer 時,需要使用響應式型別和模式(類似於上一篇部落格中的響應式 feed supplier)。

當使用即發即忘策略時,RSocket 返回 Mono<Void>,我們的 consumer 需要從函式中返回它。然而,對於 java.util.function.Consumer,我們無法返回任何東西。因此,我們必須編寫一個具有簽名 Function<String, Mono<Void>> rsocketConsumer() 的函式。由於該函式返回一個 Mono<Void>,這在語義上等同於編寫一個 consumer。函式的使用者需要獲取 Mono 的引用並訂閱它。開箱即用的 consumer 中也使用了類似模式,我們已經為MongoDBCassandra 提供了這樣的 consumer。

設定專案時,請包含以下 maven 依賴。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

來自 Spring Boot 的此 starter 依賴項會將所有 RSocket 依賴項傳遞地帶到我們的專案中。

在編寫函式程式碼之前,讓我們編寫一個 ConfigurationProperties 類來定義函式所需的一些核心屬性。

@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {

  private String host = "localhost";

  private int port = 7000;

  private String route;
…
}

正如我們所見,使用字首 rsocket.consumer,我們定義了三個屬性 - hostport 用於 RSocket 伺服器,而 route 是伺服器上的一個端點。

現在我們有了配置屬性,讓我們建立一個 Configuration 類來配置我們的函式 bean。

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

  @Bean
  public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
                                            RsocketConsumerProperties rsocketConsumerProperties) {
     final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
           rsocketConsumerProperties.getPort());

     return input -> rSocketRequester
                 .flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
                       .data(input)
                       .send());
  }
}

我們將一個來自 Spring Boot 自動配置的 builder 注入到函式中,該 builder 幫助我們建立 RSocketRequester。使用此 builder,我們透過 TCP 連線建立一個 Mono<RSocketRequester>connectTcp API 方法接受 RSocket 的主機和埠資訊。一旦我們獲得了 RSocketRequester 的控制代碼,我們就在函式中提供的 lambda 中使用它。

我們在 Mono<RSocketRequester> 上呼叫 flatMap,對於每個傳入訊息,我們在呼叫最終將資料推送到 RSocket 伺服器的 send 方法之前指定需要傳送的 route 和資料。

這就是編寫一個消費資料然後使用即發即忘互動模型將其傳送到 RSocket 伺服器的函式的全部過程。請記住,由於 Spring Framework 在底層提供了各種 RSocket 支援和抽象,這段程式碼看起來非常簡單。

讓我們快速編寫一個測試來驗證該函式是否按預期工作。

正如我們在上一篇部落格中對響應式 supplier 所做的那樣,將以下依賴項新增到專案中。這有助於我們測試響應式元件。

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <scope>test</scope>
</dependency>

以下是帶有其他必要元件的測試。

@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {

  @Autowired
  Function<Message<?>, Mono<Void>> rsocketConsumer;

  @Autowired
  TestController controller;

  @Test
  void testRsocketConsumer() {

     rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
           .subscribe();

     StepVerifier.create(this.controller.fireForgetPayloads)
           .expectNext("Hello RSocket")
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  @ComponentScan
  static class RSocketConsumerTestApplication{}

  @Controller
  static class TestController {
     final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

     @MessageMapping("test-route")
     void someMethod(String payload) {
        this.fireForgetPayloads.onNext(payload);
     }
  }
}

測試元件的快速說明。

  • 我們在 SpringBootApplication 上提供了屬性 spring.rsocket.server.port。這允許 Spring Boot 為測試自動配置一個預設的 RSocket 伺服器。這裡將埠硬編碼為 7000,因為這是 Spring Boot 在自動配置元件時使用的預設埠。這與我們在上面屬性中使用的預設值相同。我們還指定了我們想在測試中使用的 route

  • 提供了一個帶有 MessageMapping 註解的方法的 Controller,它攔截到達我們在測試中指定的路由的訊息。到達伺服器上該路由的每個傳入記錄都傳遞到一個 Flux 中,稍後可以在測試中斷言期間重放。

  • 在測試中,我們呼叫了注入的 RSocket consumer(我們之前編寫的)上的 apply 方法,併為其提供了一個測試訊息。

  • 最後,我們使用 StepVerifier 來驗證訊息已成功傳送到 RSocket 伺服器。

從 RSocket Consumer 生成 Spring Cloud Stream Sink 應用

上一篇部落格中,我們詳細介紹瞭如何從 Supplier 函式生成 Spring Cloud Stream source 應用。您可以遵循我們那裡使用的相同模式,從上面編寫的 RSocket 函式生成一個 sink 應用。我們在這裡不再重複所有涉及的細節。可以使用這裡提供的許多不同的 sink 應用作為模板。當我們使用 Spring Cloud Stream 中的測試 binder 測試函式時,將訊息傳送到 InputDestination。Spring Cloud Stream 會將其下游傳送到 RSocket 伺服器。然後我們可以使用我們在上面的單元測試中使用的相同驗證策略。有關使用測試 binder 測試 Spring Cloud Stream 元件的更多資訊,請參閱此處

結論

在本篇部落格文章中,我們看到了如何編寫一個簡單的 consumer 來消費資料並對其進行處理,以 Apache Pulsar 為例。然後,我們探討了如何以 Function<String, Mono<Void>> 的形式開發響應式 consumer,並以 RSocket 的即發即忘策略作為指導。我們還演示瞭如何對這個響應式 consumer 進行單元測試。請遵循本文中概述的步驟來編寫您自己的資料 consumer,如果您這樣做,請考慮貢獻一個 pull request。

敬請期待…​

在接下來的幾周裡,我們將帶來更多深入的焦點話題。在本系列的下一篇部落格中,我們將開始一系列案例研究,探索已經存在的函式和應用程式。

獲取 Spring 時事通訊

透過 Spring 時事通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,為您的進步加速。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部