RSocket 入門:Spring Boot 通道

工程 | Ben Wilcock | 2020 年 4 月 6 日 | ...

閱讀時間:約 6 分鐘。練習時間:約 20 分鐘。

如果像我一樣,您仍在 RSocket 之旅的起點,請檢視RSocket 協議背後的動機。這份簡短但富有洞察力的文件中有一條資訊讓我深有感觸——“不匹配的抽象增加了開發系統的成本”。

從軟體設計的角度來看,RSocket 的四種互動模型提供了顯著的優勢。這意味著我們可以為每個用例使用正確的互動模型來建模元件之間的通訊。這種更高效的模型可以為您節省大量的編碼時間和精力!

到目前為止,在本系列中,我們已經探索了請求-響應即發即棄請求流訊息傳遞。今天,您將向客戶端和伺服器程式碼新增通道

什麼是通道?

通道是雙向管道,允許資料流在任一方向流動。透過通道,從客戶端到伺服器的資料流可以與從伺服器到客戶端的資料流共存。通道有許多實際用途。通道可以承載視訊會議流、傳送和接收雙向聊天訊息、使用增量和差異同步資料,或者提供報告、觀察和監控系統的方法。

RSocket 中的通道並不比流或請求-響應複雜。也就是說,您將在下面實現的場景比您之前嘗試的要稍微複雜一些,因此最好在開始之前瞭解它。

在接下來的練習中,伺服器將訊息流式傳輸到客戶端。客戶端控制伺服器流中訊息的頻率。它透過一個“延遲”設定流來實現這一點。客戶端流中的設定告訴伺服器它傳送每條訊息之間應該暫停多長時間。可以將其視為訊息頻率撥盤。當頻率設定高時,暫停時間更短,因此您會看到很多伺服器傳送的訊息。當頻率設定低時,暫停時間更長,因此您會看到更少的伺服器傳送的訊息。考慮到這個結果,讓我們開始編碼吧。

完整的程式碼示例可在 GitHub 上找到。RSocketController 位於 rsocket-server 資料夾的 io.pivotal.rsocketserver 包中。RSocketShellClient 位於 rsocket-client 資料夾的 io.pivotal.rsocketclient 包中。

步驟 1:將通道方法新增到 RSocketController

在伺服器端的 RSocketController 類中,新增一個名為 channel() 的方法,該方法接受一個 Flux<Duration> 並返回一個 Flux<Message>。此方法簽名(Flux 輸入,Flux 輸出)將此方法標識為 RSocket 通道方法。使用值 "channel" 為該方法新增 @MessageMapping() 註解。此方法的程式碼如下。

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                    .doOnNext(setting -> log.info("\nFrequency setting is {} second(s).\n", setting.getSeconds()))
                    .switchMap(setting -> Flux.interval(setting)
                                                   .map(index -> new Message(SERVER, CHANNEL, index)))
                                                   .log();
    }

在程式碼中,.doOnNext() 正在監聽來自客戶端的設定流。每次新的 delay 設定到達時,它都會將訊息寫入日誌。.switchMap() 為每個新設定建立一個新的 Flux。這個新的 Flux 根據 delay 設定中包含的 .interval() 延遲發出新的 Message 物件。伺服器將這些新訊息透過流傳送回客戶端。

步驟 2:將通道方法新增到 RSocketShellClient

在客戶端的 RSocketShellClient 類中,新增一個新的 channel() 方法,並用 @ShellMethod() 註解對其進行標註。將方法的用途描述作為註解值,如以下示例所示。

    @ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
    public void channel(){

// Code goes here

}

接下來,在該方法中,新增建立設定流的程式碼。程式碼如下所示:

Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));

Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                                        .doOnNext(d -> log.info("\nSending setting for {}-second interval.\n", d.getSeconds()));

每個 Mono 包含一個 Duration 設定。每個持續時間控制著來自伺服器的每條訊息之間的暫停時間。總共有 3 個 mono。第一個包含 1 秒的短暫停設定。第二個包含更長的 3 秒暫停設定,但此 mono 使用 .delayElement() 方法被告知將此持續時間的生成延遲 5 秒。第三個 mono 包含 5 秒的暫停設定,但在 15 秒過去之前不會發出其持續時間。這 3 個 mono 使用 .concat() 方法連線成一個 Flux。使用 .doOnNext() 新增日誌記錄語句,以便您可以在程式碼執行時看到發生了什麼。

注意:有許多方法可以構建基於 Flux 的流,但對於本教程來說,它只是一個簡單的示例。

現在您已經將設定流放入 Flux 中,向該方法新增與伺服器通訊所需的程式碼

disposable = this.rsocketRequester
                    .route("channel")
                    .data(settings)
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("Received: {} \n(Type 's' to stop.)", message));

如果您一直關注本系列,那麼現在這段程式碼應該很熟悉了。rsocketRequester 是您在建構函式中構建的全域性變數。它提供了您與伺服器的 RSocket 通訊連結。.route() 設定為 "channel" 以匹配伺服器端程式碼中的訊息對映。.data() 是您上面建立的 mono 流。.retrieveFlux() 指定您期望一個 Message 物件流,.subscribe() 開始您的訂閱並確保收到的每條訊息都列印到日誌中,以便您可以看到發生了什麼。訂閱建立的 Disposable 物件被保留並用於控制通道。

您可以在此處檢視該方法的完整程式碼。這就是我們需要的所有程式碼。讓我們執行它!

步驟 3:構建並執行 RSocket 伺服器

開啟終端視窗,進入 rsocket-server 目錄,然後按如下方式執行伺服器:

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost7000 上啟動。

步驟 4:構建並執行 RSocket 客戶端

開啟第二個終端視窗並移動到 rsocket-client 目錄。然後,按如下方式構建並執行客戶端:

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

啟動後,Spring Shell 會顯示一個新的提示符

shell:>

您可以在提示符處輸入 channel 向伺服器請求一個通道。

客戶端建立其延遲計時器設定流並開始將它們傳送到伺服器。出站流中的每個持續時間都由客戶端和伺服器列印。伺服器發回訊息流,客戶端將其列印到日誌中。客戶端終端看起來像這樣:

shell:>channel
i.p.rsocketclient.RSocketShellClient :

Sending setting for 1-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304561)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 3-second interval.

i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304568)
(Type 's' to stop.)

# removed log-lines

i.p.rsocketclient.RSocketShellClient :

Sending setting for 5-second interval.

2020-03-27 10:23:00.243 INFO 5680 --- [tor-tcp-epoll-1] i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=4, created=1585304580)
(Type 's' to stop.)

# removed log-lines

要停止通道,請鍵入 s,然後按 Enter

步驟 5:清理

您可以透過在 shell:> 提示符下輸入 exit 來退出 rsocket-client,如下所示。

shell:>exit

你可以透過在其終端視窗中按 Ctrl-C 來停止 rsocket-server 程序。

工作原理

客戶端建立 3 個持續時間元素的序列。第一個持續時間設定立即發出,第二個在 5 秒後發出,第三個在 15 秒後發出。每次發出新的持續時間時,都會將其記錄到控制檯。此設定流傳送到伺服器並控制伺服器生成的訊息流。

在伺服器端,每次從客戶端流中提取新的持續時間設定時,都會建立一個新的訊息流並返回。客戶端傳送的最新設定控制著此伺服器傳送流中每條訊息之間的時間延遲。

當客戶端處置訂閱的可處置物件時,通道停止。

最後思考

如果您遵循了整個系列,那麼您現在已經看到了 RSocket 的所有四種互動模型:請求-響應即發即棄請求流,以及現在的通道。

有了這四種通訊方式,您不太可能遇到我們一開始討論的那些煩人的“抽象不匹配”場景。您的工具箱中有了 RSocket,您將擁有一個靈活、低摩擦、高效能的訊息傳遞協議,可用於您的軟體——一個專為微服務架構而構建的協議。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有