領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這是本系列部落格的第4部分,我們將介紹用於 Spring Cloud Stream 應用程式的 Java 函式。
本系列的其他部分。
在本系列的上一篇部落格中,我們看到了如何使用 java.util.function.Supplier 來生成 Spring Cloud Stream 源。在這個新版本中,我們將看到如何使用 java.util.function.Consumer 和 java.util.function.Function 開發和測試消費函式。稍後,我們將簡要解釋如何從該消費者生成 Spring Cloud Stream 接收器應用程式。
編寫消費者的思想相對簡單。我們從某個外部源消費資料,並將其交給消費者中的業務邏輯。正如我們在上一篇部落格中看到的 Supplier 一樣,動作發生在業務邏輯實現內部。如果我們使用諸如 Spring Integration 等庫來幫助我們完成所有繁重的工作,那麼就變成了簡單地透過適當的 API 將接收到的資料委託給庫。但是,如果沒有可用的此類庫,我們需要自己編寫所有這些程式碼。讓我們舉一個具體的例子來演示這一點。
Apache Pulsar 是一個流行的訊息中介軟體系統。讓我們假設我們想編寫一個通用的 Java Consumer,它從某個地方接收資料,然後將其轉發到 Pulsar。不必深入細節,這裡有一個簡單的 Consumer 可以實現這一點。基本實現程式碼取自此處。
@Bean
public org.apache.pulsar.client.api.Producer producer() {
String pulsarBrokerRootUrl = "pulsar://:6650";
PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
String topic = "persistent://sample/standalone/ns1/my-topic";
return client.createProducer(topic);
}
@Bean
public Consumer<byte[]> pulsarConsumer(Producer producer) {
return payload -> {
producer.send(payload);
};
}
再次宣告,這僅用於說明目的,可能不是將資料傳送到 Apache Pulsar 的完整實現。儘管如此,這演示了我們想要傳達的概念。檢視消費者,我們可以看到程式碼很簡單;我們在 lambda 表示式中所做的只是呼叫 Apache Pulsar Producer 上的 send 方法。
我們可以將上述消費者注入到應用程式中,並透過程式設計方式呼叫其 accept 方法,提供資料。正如我們在上一篇部落格中看到的,下圖表達了獨立執行函式或作為 Spring Cloud Data Flow 等平臺上資料編排管道一部分的想法。

好吧,那個消費者相當簡單,我們可能會這樣想。如果我們想做一些更復雜的事情呢?下面,我們將準確地做這件事。
RSocket 是一種雙向二進位制協議,Spring Framework 為其提供了出色的支援。RSocket 提供了一種“即發即忘”模型,允許我們向 RSocket 伺服器傳送訊息而無需接收響應。我們希望使用 TCP 為此模型編寫一個消費者,其中消費者接收外部資料然後將其推送到 RSocket 伺服器。RSocket 的 Java 實現基於Project Reactor。因此,當我們編寫消費者時,我們需要使用響應式型別和模式(類似於上一篇部落格中的響應式資料來源)。
當使用“即發即忘”策略時,RSocket 返回一個 Mono<Void>,我們的消費者需要從函式中返回它。然而,在 java.util.function.Consumer 的情況下,我們不能返回任何東西。因此,我們必須編寫一個簽名為 Function<String, Mono<Void>> rsocketConsumer() 的函式。由於該函式返回一個 Mono<Void>,這在語義上等同於編寫一個消費者。函式的使用者需要獲取 Mono 的引用並訂閱它。我們為 MongoDB 和 Cassandra 提供的開箱即用消費者中也使用了類似的模式。
設定專案時,請包含以下 Maven 依賴項。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
這個來自 Spring Boot 的 starter 依賴將傳遞地將所有 RSocket 依賴項帶入我們的專案。
在編寫函式程式碼之前,讓我們編寫一個 ConfigurationProperties 類來定義函式所需的一些核心屬性。
@ConfigurationProperties("rsocket.consumer")
public class RsocketConsumerProperties {
private String host = "localhost";
private int port = 7000;
private String route;
…
}
正如我們所看到的,使用字首 rsocket.consumer,我們定義了三個屬性——host 和 port 用於 RSocket 伺服器,route 是伺服器上的一個端點。
現在我們有了配置屬性,讓我們建立一個 Configuration 類來配置我們的函式 Bean。
@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {
@Bean
public Function<String, Mono<Void>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
final Mono<RSocketRequester> rSocketRequester = builder.connectTcp(rsocketConsumerProperties.getHost(),
rsocketConsumerProperties.getPort());
return input -> rSocketRequester
.flatMap(requester -> requester.route(rsocketConsumerProperties.getRoute())
.data(input)
.send());
}
}
我們將來自 Spring Boot 自動配置的構建器注入到函式中,該構建器幫助我們建立 RSocketRequester。使用此構建器,我們建立了一個具有 TCP 連線的 Mono<RSocketRequester>。connectTcp API 方法接收 RSocket 主機和埠資訊。一旦我們獲得了 RSocketRequester 的控制代碼,我們就會在函式中提供的 lambda 表示式中使用它。
我們對 Mono<RSocketRequester> 呼叫 flatMap,對於每個傳入訊息,我們指定 route 和需要傳送的資料,然後呼叫 send 方法,最終將資料推送到 RSocket 伺服器。
這就是編寫一個消費資料然後使用“即發即忘”互動模型將其傳送到 RSocket 伺服器的函式所需的全部內容。請記住,由於 Spring Framework 在底層提供了各種 RSocket 支援和抽象,此程式碼看起來非常簡單。
讓我們快速編寫一個測試,以驗證函式是否按預期工作。
正如我們在上一篇部落格中對響應式資料來源所做的那樣,將以下依賴項新增到專案中。這有助於我們測試響應式元件。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
以下是帶有其他必要元件的測試。
@SpringBootTest(properties = {"spring.rsocket.server.port=7000", "rsocket.consumer.route=test-route"})
public class RsocketConsumerTests {
@Autowired
Function<Message<?>, Mono<Void>> rsocketConsumer;
@Autowired
TestController controller;
@Test
void testRsocketConsumer() {
rsocketConsumer.apply(new GenericMessage<>("Hello RSocket"))
.subscribe();
StepVerifier.create(this.controller.fireForgetPayloads)
.expectNext("Hello RSocket")
.thenCancel()
.verify();
}
@SpringBootApplication
@ComponentScan
static class RSocketConsumerTestApplication{}
@Controller
static class TestController {
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
@MessageMapping("test-route")
void someMethod(String payload) {
this.fireForgetPayloads.onNext(payload);
}
}
}
對測試元件的快速解釋。
我們在 SpringBootApplication 上提供了屬性 spring.rsocket.server.port。這允許 Spring Boot 為測試自動配置一個預設的 RSocket 伺服器。這裡將埠硬編碼為 7000,因為這是 Spring Boot 在自動配置元件時使用的預設埠。這與我們上面屬性中使用的預設值相同。我們還指定了我們想要在測試中使用的 route。
提供了一個帶有 MessageMapping 註解方法的 Controller,它攔截到達我們在測試中指定路由的訊息。伺服器上路由上的每個傳入記錄都會傳遞到一個 Flux 中,以便稍後在測試斷言期間進行重播。
在測試中,我們呼叫了注入的 RSocket 消費者(我們之前編寫的)上的 apply 方法,併為其提供了一個測試訊息。
最後,我們使用 StepVerifier 來驗證訊息是否已成功傳送到 RSocket 伺服器。
在上一篇部落格中,我們詳細介紹瞭如何從 Supplier 函式生成 Spring Cloud Stream 源應用程式。您可以遵循我們那裡使用的相同模式,從我們上面編寫的 RSocket 函式生成一個 sink 應用程式。我們在這裡不再重複所有細節。使用此處提供的許多不同的 sink 應用程式作為模板。當我們在 Spring Cloud Stream 中使用測試繫結器測試函式時,將訊息傳送到 InputDestination。Spring Cloud Stream 會將其傳送到下游的 RSocket 伺服器。然後我們可以使用與上面單元測試中相同的驗證策略。有關使用測試繫結器測試 Spring Cloud Stream 元件的更多資訊,請參閱此處。
在這篇部落格文章中,我們看到了如何編寫一個簡單的消費者,該消費者消費資料並對其進行操作,以 Apache Pulsar 為例。然後我們探討了如何以 Function<String, Mono<Void>> 的形式開發一個響應式消費者,並以 RSocket 的“即發即忘”策略作為指導。我們還演示瞭如何對這個響應式消費者進行單元測試。請遵循本文中列出的步驟來編寫您自己的資料消費者,如果您這樣做,請考慮貢獻一個拉取請求。
在接下來的幾周裡,請期待更多深入的專題。本系列的下一篇部落格將開始一系列案例研究,我們將探討已有的函式和應用程式。