RSocket入門:伺服器呼叫客戶端

工程 | Ben Wilcock | 2020年5月12日 | ...

閱讀時間:約7分鐘。 編碼時間:約20分鐘。

如果您一直在關注我關於 RSocket系列文章,您會多次聽到我提及“客戶端和伺服器”。但是,對於 RSocket,客戶端和伺服器之間的界限是模糊的。使用 RSocket,伺服器可以向客戶端傳送訊息,並且客戶端可以像伺服器一樣響應這些請求。

事實上,RSocket 文件不使用“客戶端”或“伺服器”這兩個術語。文件使用的是“請求者”和“響應者”這兩個術語。在 RSocket 中,任何元件都可以充當請求者,任何元件都可以充當響應者,甚至可以同時充當兩者。在 RSocket 中,請求者和響應者之間所有這些雙向通訊都透過一個“雙向”連線進行。

今天,您將利用這些特性,透過程式設計讓您的 rsocket-client 響應來自伺服器的請求。在伺服器端程式碼中,您將監聽客戶端連線事件,當連線事件發生時,會將新客戶端儲存在已連線客戶端列表中。您還將要求每個客戶端在其連線處於活動狀態期間,向伺服器流式傳輸遙測訊息。

如果您一直在關注這個系列文章,您可以按照下面的說明一起編碼。程式碼也可以從 GitHub 下載。

步驟 1:更新 Spring Boot 和 RSocket

首先,做一些整理工作。Spring BootRSocket Java 庫最近都進行了更新。這些更新包括常規的錯誤修復、增強功能和棄用項,因此升級對我們有利。

在 Maven 的 pom.xml 檔案的 <parent> 部分,將 spring-boot-starter-parent 的版本更改為 2.3.0.RELEASE,如下所示。您需要這樣做兩次,因為您有兩個單獨的專案 — rsocket-clientrsocket-server

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

要重新整理您的程式碼,請在兩個專案的根資料夾中執行以下命令

./mvnw clean compile

現在您可以繼續編碼任務了。

步驟 2:向客戶端新增一個請求-響應訊息處理器

客戶端需要能夠響應來自伺服器的訊息。在 rsocket-client 專案的 RSocketShellClient.java 中,新增一個名為 ClientHandler 的新內部類,如下所示

@Slf4j
class ClientHandler {
 
 @MessageMapping("client-status")
 public Flux<String> statusUpdate(String status) {
   log.info("Connection {}", status);
   return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
 }
}

這個類包含一個名為 statusUpdate() 的方法,該方法使用 @MessageMapping 註解修飾,就像 rsocket-server 專案中的註解一樣。客戶端使用這個類和這個方法來捕獲和響應來自伺服器的請求。響應本身是一個訊息流。每隔 5 秒,客戶端就會告訴伺服器當前的空閒記憶體。可以將其視為客戶端遙測資料。

為了使其工作,您必須將這個類“註冊”到您的 RSocket 連線中。您將在下一節中進行此操作。

步驟 3:在客戶端的建構函式中註冊 ClientHandler

在客戶端能夠響應來自伺服器的訊息之前,它必須將 ClientHandler 註冊到 RSocket 連線中。修訂後的建構函式程式碼如下所示。請注意建構函式的方法簽名發生了變化,添加了 RSocketStrategies 變數。Spring 會將此變數提供給您的建構函式。用下面列出的新程式碼替換您舊的建構函式程式碼。

public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
 
 // (1)
 String client = UUID.randomUUID().toString();
 log.info("Connecting using client ID: {}", client);
  
 // (2)
 SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
  
 // (3) 
 this.rsocketRequester = rsocketRequesterBuilder
 .setupRoute("shell-client")
 .setupData(client)
 .rsocketStrategies(strategies)
 .rsocketConnector(connector -> connector.acceptor(responder))
 .connectTcp("localhost", 7000)
 .block();
  
 // (4)
 this.rsocketRequester.rsocket()
 .onClose()
 .doOnError(error -> log.warn("Connection CLOSED"))
 .doFinally(consumer -> log.info("Client DISCONNECTED"))
 .subscribe();
 }

在上面的程式碼中,您首先生成並存儲一個唯一 ID,用於標識此客戶端例項 (1)。接下來,使用 RSocket strategies 和一個新的 ClientHandler 例項建立一個新的 SocketAcceptor (2)。然後使用 RSocketRequesterBuilder 註冊新的 SocketAcceptor (3)。最後,透過處理 RSocket 的 onClose() 事件來確保優雅地處理斷開連線 (4)。

客戶端程式碼就這些了。讓我們繼續伺服器端的修改。

步驟 4:在伺服器端記住客戶端

rsocket-server 專案中要做的第一件事是透過向 RSocketController.java 類新增一個類級別變數來建立 RSocketRequester 客戶端集合,如下所示

private final List<RSocketRequester> CLIENTS = new ArrayList<>();

接下來,透過新增一個新方法來新增一個連線處理器,如下所示

  @ConnectMapping("shell-client")
 void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
 // The code for the method will go HERE
 }

@ConnectMapping 註解允許您監聽發生的客戶端連線事件。使用此事件,您可以安排兩項工作。第一項是將每個新客戶端新增到 CLIENTS 列表中。第二項是呼叫每個客戶端並啟動其遙測流。

將以下程式碼新增到您剛剛建立的方法中

requester.rsocket()
        .onClose() // (1)
        .doFirst(() -> { 
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.add(requester); // (2)
        })
        .doOnError(error -> { 
            log.warn("Channel to client {} CLOSED", client); // (3)
        })
        .doFinally(consumer -> { 
            CLIENTS.remove(requester);
            log.info("Client {} DISCONNECTED", client); // (4)
        })
        .subscribe();

首先要注意的是對 requester.rsocket().onClose() 方法的呼叫 (1)。此方法返回一個響應式的 Mono 物件,其中包含您需要的所有回撥。

mono 的 doFirst() 方法在任何對 subscribe() 的呼叫之前被呼叫,但在 mono 初始建立之後。使用此方法將客戶端的 requester 物件新增到 CLIENTS 列表 (2)。

這段程式碼可能感覺有點反直覺——在客戶端連線時呼叫 onClose(),然後使用返回的 mono 來儲存新客戶端的引用。有時,事件驅動的 API 可能會感覺有點奇怪。但可以將其視為此 RSocket 連線的 mono 向您傳送了一個“我還活著”的事件。您正在使用該建立事件來觸發在列表中儲存每個客戶端的引用。

當連接出現問題時,RSocket 會呼叫 mono 的 doOnError() 方法。這包括客戶端選擇關閉連線的情況。您可以使用提供的 error 變數來決定採取什麼操作。在上面的程式碼中,錯誤簡單地被記錄為警告 (3)。

mono 的 doFinally() 方法在 RSocket 連線關閉時觸發。此方法是在列表 CLIENTS 中移除客戶端的理想位置 (4)。

最後,subscribe() 啟用您已新增到 mono 的響應式程式碼,併發出訊號表示您已準備好處理事件。

步驟 5:從客戶端獲取空閒記憶體讀數

當每個客戶端連線時,請求一個遙測讀數流。為此,您需要在 connectShellClientAndAskForTelemetry() 方法中再次工作,向您之前新增的 client-status 訊息處理器傳送請求。程式碼如下所示

requester.route("client-status")
        .data("OPEN")
        .retrieveFlux(String.class)
        .doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
        .subscribe();

使用 requester,定位路由 "client-status"。傳送字串 "OPEN" 作為訊息資料,並檢索一個 String 型別的 Flux。到達的每個字串都包含客戶端當前的空閒記憶體讀數。將此讀數記錄到控制檯。最後,不要忘記呼叫 subscribe() 來啟用您的響應式程式碼。

步驟 6:構建並執行 RSocket 伺服器

是時候測試您的程式碼了。開啟一個終端視窗,切換到 rsocket-server 目錄,並按如下方式執行伺服器

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost7000 埠啟動。

步驟 7:構建並執行 RSocket 客戶端

開啟第二個終端視窗,切換到 rsocket-client 目錄。然後,按如下方式構建並執行客戶端

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

工作原理

啟動後,您會注意到客戶端和伺服器元件在控制檯中都有新的日誌條目。在 rsocket-client 視窗中,您會看到顯示客戶端唯一 ID(UUID 形式)和 "OPEN" 連線狀態的日誌條目,如下所示

Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>

至少等待 10 秒鐘,然後在 shell:> 提示符下鍵入 exit。rsocket-client 現在會斷開與伺服器的連線並關閉。

現在切換到 rsocket-server 視窗。日誌條目看起來像這樣

Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED

客戶端連線時,會被新增到客戶端列表中,控制檯會列印其客戶端 ID 和狀態“CONNECTED”。然後,每隔 5 秒,日誌會顯示客戶端當前的“空閒記憶體”讀數。最後,當客戶端斷開連線時,其 RSocket 通道的狀態變為“CLOSED”,客戶端狀態變為“DISCONNECTED”。

您可以透過在 rsocket-server 的終端視窗中按 Ctrl-C 來停止該程序。

總結

呼叫客戶端的能力非常強大。它適用於各種場景,包括移動裝置、物聯網或微服務。由於所有這些都可以透過 TCP 或 WebSockets 進行,您無需依賴訊息代理等重量級解決方案,就已擁有所需的所有基礎設施。

您在這裡學習了很多內容。您學習瞭如何將伺服器變成“請求者”,將客戶端變成“響應者”。您瞭解瞭如何監聽連線事件。您還學習了一點關於如何處理來自 rsocket 連線的錯誤和事件的知識。並且,儘管在此練習中您選擇了“請求-流”作為您的伺服器-客戶端通訊模式,但您可以根據您的需要使用四種 RSocket 互動模式中的任何一種。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助力您的進步。

瞭解更多

獲得支援

Tanzu Spring 透過一個簡單的訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部