RSocket 入門:Spring Boot 請求流

工程 | Ben Wilcock | 2020 年 3 月 23 日 | ...

時間:約 15 分鐘。

在本系列的上一篇文章中,您嘗試了使用 RSocket 在 Spring Boot 中進行請求-響應即發即棄訊息傳遞。這次您將嘗試 RSocket 的另一種全新訊息傳遞模型——請求流。

在此練習中,您將學習如何使用傳統的“客戶端請求伺服器流”方法傳輸資料。

到目前為止,我還沒有提到的一點是 RSocket 允許您在兩個方向上使用其訊息模型。因此,如果您想使用不太常見的“伺服器請求客戶端流”模型,RSocket 也沒問題。此外,還有許多非 Java 的 RSocket 實現可供選擇,包括 Go、Javascript 和 .Net——如果您的架構包含 Java 可能不是最佳選擇的平臺,這會非常理想。

澄清了這一點後,請按照以下步驟將流資料功能新增到您現有來自之前帖子的 RSocket 客戶端和伺服器程式碼中。

如果您沒有閱讀之前關於伺服器端客戶端請求-響應訊息傳遞或即發即棄的帖子,現在就是您的機會!程式碼示例在 GitHub 上

步驟 1:新增伺服器端流方法

再次在您的伺服器端 RSocketController 類中工作,新增一個名為 .stream() 的新方法,其簽名是 — '接受一個物件,返回一個 Flux' — 這是 RSocket 對此訊息模型所期望的。使用 @MessageMapping 註解此新方法,指定一個合適的對映名稱 — 例如 "stream"。該方法的示例程式碼如下

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("Received stream request: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(SERVER, STREAM, index))
                .log();
    }

RSocketController 位於 rsocket-server 資料夾中的 io.pivotal.rsocketserver 包中。

.stream() 方法的唯一引數 Message 來自之前討論過的 io.pivotal.rsocketserver.data 包。此訊息構成了客戶端請求資料流的基礎。一旦收到,上面的程式碼會將客戶端的請求記錄到控制檯。

該方法返回的 Flux 物件是 Project Reactor 的一部分,也用於 Spring Framework 的響應式支援

RSocket 使用 Flux 是因為它極大地簡化了響應式資料流的處理。Flux 是資料的“釋出者”。它描述了 0 到 N 個元素的流,並提供了大量用於處理流資料的運算子 — 類似於 Java 8 的流 API

在上面的程式碼中,每秒都會向 Flux 新增一個新的 Long 元素 — 透過 .interval() 呼叫設定 — 基本上提供了一個持續的資料流。.map() 函式使用 Long 作為索引值建立一個新的訊息物件,並且在最後一行,呼叫 .log() 方法會將流經 Flux 的所有元素(包括錯誤等)列印到控制檯。

步驟 2:新增客戶端流方法

在客戶端專案的 RSocketShellClient 類中,首先新增一個全域性的 Disposable 物件引用,如下所示

private static Disposable disposable;

RSocketShellClient 位於 rsocket-client 資料夾中的 io.pivotal.rsocketclient 包中。

這個 Disposable 物件允許您在資料流啟動後控制它。

接下來,向您的 RSocketShellClient 新增一個 .stream() 方法。使用 @ShellMethod 註解此方法。其示例程式碼如下

    @ShellMethod("Send one request. Many responses (stream) will be printed.")
    public void stream() {
        log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
        this.disposable = this.rsocketRequester
                .route("stream")
                .data(new Message(CLIENT, STREAM))
                .retrieveFlux(Message.class)
                .subscribe(er -> log.info("Response received: {}", er));
    }

在上面的程式碼中,透過將 "stream" 指定為 .route()rsocketRequester 被告知將請求路由到伺服器的 .stream() 方法。一個新的訊息物件為您的請求提供了 .data()。因為您希望伺服器返回一個流,所以您在 rsocketRequester 上使用了 .requestFlux() 方法,指定返回的 Flux 包含 Message 型別的元素。最後,您在 .subscribe() 方法中設定了一個日誌函式作為您流的訂閱者。

請注意,rsocketRequester 生成的 Disposable 正在被保留。您將需要它來停止流。

步驟 3:新增客戶端停止流方法

透過保留對流的引用,您可以在需要停止流時將其丟棄。要將流取消功能新增到您的 RSocketShellClient,請新增一個名為 .s() 的新方法,並使用 @ShellMethod 對其進行註解,如下所示

    @ShellMethod("Stop streaming messages from the server.")
    public void s(){
        if(null != disposable){
            disposable.dispose();
        }
    }

在方法內部,呼叫 disposable.dispose() 會取消流。有了這個方法,要停止流,請在 shell:> 提示符處鍵入 s,然後按 Enter。流將停止。您的編碼任務現已完成。接下來,測試客戶端和伺服器是否協同工作。

步驟 4:構建並執行 RSocket 伺服器

開啟一個終端視窗並移動到 rsocket-server 目錄。使用 Maven 和 Spring Boot 外掛執行伺服器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost7000 上啟動。

步驟 5:構建並執行 RSocket 客戶端

開啟第二個終端視窗並移動到 rsocket-client 目錄。從那裡,構建並執行 RSocket 客戶端應用程式,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

啟動後,Spring Shell 會為您顯示一個新的提示符

shell:>

您可以透過在提示符處鍵入 stream 來請求伺服器的流。客戶端傳送一個 Message 作為其對流的請求。流中的每個 Message 都會在伺服器傳送時和客戶端接收時打印出來。客戶端上的控制檯日誌看起來像這樣

shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)

要停止流,請在 shell:> 提示符處鍵入 s,然後按 Enter

步驟 5:清理

您可以透過在 shell:> 提示符處鍵入 exit 來停止 rsocket-client,如下所示。

shell:>exit

你可以透過在其終端視窗中按 Ctrl-C 來停止 rsocket-server 程序。

工作原理

RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向伺服器傳送單個請求訊息。此請求會啟動從伺服器到客戶端的資料流。然後客戶端會將收到的每條訊息記錄到控制檯。

伺服器端的 RSocketController 檢查請求訊息的元資料以獲取 route。此訊息的路由設定為 "stream",因此伺服器將訊息傳遞給相應的 .stream(Message request) 方法。然後伺服器會每秒向客戶端傳送訊息流,直到客戶端要求其停止。

客戶端可以隨時停止流。在 .s() 方法中,這是透過在原始流訂閱返回的 Disposable 上呼叫 .dispose() 來完成的。

最後思考

在這篇文章中,您學習瞭如何使用 Spring Boot 中的 RSocket 構建請求流功能。在下一篇文章中,我們將新增通道訊息傳遞。下次再見!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有