RSocket 入門:Spring Boot Request-Stream

工程 | Ben Wilcock | 2020 年 3 月 23 日 | ...

閱讀時間:約 15 分鐘。

在本系列之前的文章中,您嘗試了使用 Spring Boot 和 RSocket 進行請求-響應即發即忘的訊息模式。本次,您將嘗試 RSocket 的另一種全新的訊息模型——request-stream (請求-流)。

在此練習中,您將學習如何使用傳統的 'client-requests-a-server-stream'(客戶端請求伺服器流)方式進行資料流傳輸。

到目前為止我還沒有提到的一點是,RSocket 允許您在兩個方向上使用其訊息模型。因此,如果您想使用不太常見的 'server-requests-a-client-stream'(伺服器請求客戶端流)模型,RSocket 也沒有問題。此外,還有很多非 Java 的RSocket 實現可供選擇,包括 Go、Javascript 和 .Net——如果您的架構包含 Java 可能不是最佳選擇的平臺,這些實現將是理想之選。

澄清這一點後,請按照以下步驟,將流資料功能新增到您之前文章中現有的 RSocket 客戶端和伺服器程式碼中。

如果您還沒有閱讀之前關於伺服器端客戶端請求-響應訊息傳遞或即發即忘的文章,現在是您的機會!程式碼示例在GitHub 上

步驟 1:新增伺服器端流方法

回到您的伺服器端 RSocketController 類,新增一個名為 .stream() 的新方法,其簽名符合 RSocket 對此訊息模型的期望 — '接受一個物件,返回一個 flux'。使用 @MessageMapping 註解標記此新方法,並指定一個合適的對映名稱 — 例如 "stream"。方法的示例程式碼如下

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("Received stream request: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(SERVER, STREAM, index))
                .log();
    }

RSocketController 位於 rsocket-server 資料夾中的 io.pivotal.rsocketserver 包內。

.stream() 方法的唯一引數 Message 來自之前討論過的 io.pivotal.rsocketserver.data 包。此訊息構成了客戶端請求資料流的基礎。上面的程式碼會在收到客戶端請求時立即將其記錄到控制檯。

該方法返回的 Flux 物件是Project Reactor的一部分,並且也在 Spring Framework 的響應式支援中得到使用。

RSocket 使用 Flux,因為它極大地簡化了響應式資料流的處理。Flux 是資料的一個“Publisher”(釋出者)。它描述了 0 到 N 個元素的流,並提供了大量用於處理流資料的運算子 — 類似於Java 8 的 streaming APIs

在上面的程式碼中,每秒會有一個新的 Long 元素新增到 Flux 中 — 透過呼叫 .interval() 設定 — 從而實質上提供了一個持續的資料流。.map() 函式使用 Long 值作為索引建立了一個新的訊息物件,而在最後一行,呼叫 .log() 方法會將流經 Flux 的所有元素(包括錯誤等)列印到控制檯。

步驟 2:新增客戶端流方法

在客戶端專案的 RSocketShellClient 類中,首先新增一個 Disposable 物件的全域性引用,如下所示

private static Disposable disposable;

RSocketShellClient 位於 rsocket-client 資料夾中的 io.pivotal.rsocketclient 包內。

一旦資料流開始,此 Disposable 物件允許您控制它。

接下來,向您的 RSocketShellClient 新增一個 .stream() 方法。使用 @ShellMethod 註解標記此方法。示例程式碼如下所示

    @ShellMethod("Send one request. Many responses (stream) will be printed.")
    public void stream() {
        log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
        this.disposable = this.rsocketRequester
                .route("stream")
                .data(new Message(CLIENT, STREAM))
                .retrieveFlux(Message.class)
                .subscribe(er -> log.info("Response received: {}", er));
    }

在上面的程式碼中,透過指定 "stream" 作為 .route()rsocketRequester 被告知將請求路由到伺服器的 .stream() 方法。一個新的訊息物件提供了請求的 .data()。因為您希望伺服器返回一個流,所以您使用了 rsocketRequester 上的 .requestFlux() 方法,並指定返回的 Flux 包含型別為 Message 的元素。最後,您在 .subscribe() 方法中設定了一個日誌函式作為流的訂閱者。

注意 rsocketRequester 生成的 Disposable 是如何被儲存的。您將需要它來停止流。

步驟 3:新增客戶端停止流方法

透過保留對流的引用,您可以在想要停止流時將其處理掉(dispose)。要將流取消功能新增到您的 RSocketShellClient 中,請新增一個名為 .s() 的新方法,並使用 @ShellMethod 註解標記它,如下所示

    @ShellMethod("Stop streaming messages from the server.")
    public void s(){
        if(null != disposable){
            disposable.dispose();
        }
    }

在該方法內部,呼叫 disposable.dispose() 會取消流。有了此方法後,要停止流,請在 shell:> 提示符下輸入 s,然後按 Enter 鍵。流就會停止。您的編碼任務現已完成。接下來,測試客戶端和伺服器是否正常協同工作。

步驟 4:構建並執行 RSocket 伺服器

開啟終端視窗並進入 rsocket-server 目錄。使用 Maven 和 Spring Boot 外掛執行伺服器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器將在 localhost7000 埠啟動。

步驟 5:構建並執行 RSocket 客戶端

開啟第二個終端視窗並進入 rsocket-client 目錄。然後,如下構建並執行 RSocket 客戶端應用程式

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

啟動後,Spring Shell 會顯示一個新的提示符

shell:>

您可以在提示符下輸入 stream 來請求伺服器的流。客戶端傳送一個 Message 作為其對流的請求。流中的每個 Message 在伺服器傳送時和客戶端接收時都會被打印出來。客戶端的控制檯日誌看起來像這樣

shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)

要停止流,請在 shell:> 提示符下輸入 s,然後按 Enter 鍵。

步驟 5:清理

您可以透過在 shell:> 提示符下輸入 exit 來停止 rsocket-client,如下所示。

shell:>exit

您可以透過在其終端視窗中按下 Ctrl-C 來停止 rsocket-server 程序。

工作原理

RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向伺服器傳送單個請求訊息。此請求會啟動一個從伺服器到客戶端的資料流。然後客戶端會將收到的每條訊息記錄到控制檯。

伺服器端的 RSocketController 檢查請求訊息的元資料以查詢 route。此訊息的路由設定為 "stream",因此伺服器將訊息傳遞給相應的 .stream(Message request) 方法。然後伺服器開始每秒向客戶端傳送一條訊息流,直到客戶端要求其停止。

客戶端可以隨時停止流。在 .s() 方法中,這是透過呼叫原始流訂閱返回的 Disposable 物件的 .dispose() 方法來實現的。

總結

在本文中,您學習瞭如何在 Spring Boot 中使用 RSocket 構建 request-stream (請求-流) 功能。在下一篇文章中,我們將新增 channel messaging (通道訊息傳遞) 功能。下次見!

訂閱 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持連線

訂閱

領先一步

VMware 提供培訓和認證,助力您的進步。

瞭解更多

獲得支援

Tanzu Spring 透過一份簡單的訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群中的所有近期活動。

檢視全部