RSocket 入門:伺服器呼叫客戶端

工程 | Ben Wilcock | 2020 年 5 月 12 日 | ...

閱讀時間:約 7 分鐘。 編碼時間:約 20 分鐘。

如果您一直在關注我的關於 RSocket 的系列文章,您會多次聽到我提到“客戶端和伺服器”。但是,對於 RSocket,客戶端和伺服器之間的界限是模糊的。使用 RSocket,伺服器可以向客戶端傳送訊息,客戶端也可以像伺服器一樣響應這些請求。

事實上,RSocket 文件中並沒有使用“客戶端”或“伺服器”的術語。文件中使用的是“請求者”和“響應者”。在 RSocket 中,任何元件都可以充當請求者,任何元件都可以充當響應者,甚至可以同時充當兩者。在 RSocket 中,請求者和響應者之間的所有這些來回通訊都透過一個“雙向”連線進行。

今天,你將利用這些特性,程式設計你的 rsocket-client 來響應來自伺服器的請求。在伺服器端程式碼中,你將監聽客戶端連線事件,當連線事件發生時,你將把新的客戶端儲存在已連線客戶端列表中。你還將要求每個客戶端在連線存活期間,向伺服器回傳遙測訊息流。

如果你一直關注這個系列,你可以按照下面的說明進行編碼。程式碼也可以從GitHub下載。

步驟 1:更新 Spring Boot 和 RSocket

首先,做一些清理工作。Spring BootRSocket Java 庫最近都進行了更新。這些更新包括通常的錯誤修復、增強和棄用,因此升級符合我們的利益。

在 Maven pom.xml<parent> 部分,將 spring-boot-starter-parent 更改為 2.3.0.RELEASE 版本,如下所示。你需要執行兩次此操作,因為你有兩個獨立的專案 — rsocket-clientrsocket-server

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.3.0.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

要重新整理程式碼,請在兩個專案的根資料夾中執行以下命令

./mvnw clean compile

現在你可以繼續進行編碼任務了。

步驟 2:向客戶端新增請求-響應訊息處理器

客戶端需要能夠響應來自伺服器的訊息。在 rsocket-client 專案的 RSocketShellClient.java 中,新增一個新的內部類 ClientHandler,如下所示

@Slf4j
class ClientHandler {
 
 @MessageMapping("client-status")
 public Flux<String> statusUpdate(String status) {
   log.info("Connection {}", status);
   return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
 }
}

這個類包含一個名為 statusUpdate() 的方法,它用 @MessageMapping 註解修飾,就像 rsocket-server 專案中的一樣。客戶端使用這個類和這個方法來捕獲和響應來自伺服器的請求。響應本身是一個訊息流。每隔 5 秒,客戶端就會告訴伺服器其當前的可用記憶體。可以將其視為客戶端遙測資料。

為此,你必須將這個類“註冊”到你的 RSocket 連線中。你將在下一節中進行此操作。

步驟 3:在客戶端的建構函式中註冊 ClientHandler

在客戶端能夠響應來自伺服器的訊息之前,它必須向 RSocket 連線註冊 ClientHandler。修改後的建構函式程式碼如下。請注意建構函式方法簽名的更改,以新增 RSocketStrategies 變數。Spring 將此變數提供給你的建構函式。將舊的建構函式程式碼替換為下面列出的新程式碼。

public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
 
 // (1)
 String client = UUID.randomUUID().toString();
 log.info("Connecting using client ID: {}", client);
  
 // (2)
 SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
  
 // (3) 
 this.rsocketRequester = rsocketRequesterBuilder
 .setupRoute("shell-client")
 .setupData(client)
 .rsocketStrategies(strategies)
 .rsocketConnector(connector -> connector.acceptor(responder))
 .connectTcp("localhost", 7000)
 .block();
  
 // (4)
 this.rsocketRequester.rsocket()
 .onClose()
 .doOnError(error -> log.warn("Connection CLOSED"))
 .doFinally(consumer -> log.info("Client DISCONNECTED"))
 .subscribe();
 }

在上面的程式碼中,你首先生成並存儲一個標識此客戶端例項的唯一 ID (1)。接下來,你使用 RSocket strategies 和一個新的 ClientHandler 例項建立一個新的 SocketAcceptor (2)。然後使用 RSocketRequesterBuilder 註冊新的 SocketAcceptor (3)。最後,透過處理 RSocket onClose() 事件 (4) 來確保優雅地處理斷開連線。

客戶端程式碼就到此為止。讓我們繼續進行伺服器端更改。

步驟 4:在伺服器上記住客戶端

rsocket-server 專案中要做的第一件事是,透過向 RSocketController.java 類新增一個類級別變數,來建立 RSocketRequester 客戶端的集合,如下所示

private final List<RSocketRequester> CLIENTS = new ArrayList<>();

接下來,新增一個連線處理程式,透過新增一個新方法,如下所示

  @ConnectMapping("shell-client")
 void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
 // The code for the method will go HERE
 }

@ConnectMapping 註解讓你可以在客戶端連線事件發生時監聽它們。使用這個事件,你可以安排兩項工作。第一項是將每個新客戶端新增到 CLIENTS 列表中。第二項是呼叫每個客戶端並啟動它們的遙測流。

將以下程式碼新增到你剛剛建立的方法中

requester.rsocket()
        .onClose() // (1)
        .doFirst(() -> { 
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.add(requester); // (2)
        })
        .doOnError(error -> { 
            log.warn("Channel to client {} CLOSED", client); // (3)
        })
        .doFinally(consumer -> { 
            CLIENTS.remove(requester);
            log.info("Client {} DISCONNECTED", client); // (4)
        })
        .subscribe();

首先要注意的是對 requester.rsocket().onClose() 方法的呼叫 (1)。此方法返回一個響應式 Mono 物件,其中包含你需要的所有回撥。

mono 的 doFirst() 方法在任何 subscribe() 呼叫之前被呼叫,但在 mono 初始建立之後。使用此方法將客戶端的 requester 物件新增到 CLIENTS 列表 (2)。

這段程式碼可能會讓人覺得反直覺——在客戶端連線時呼叫 onClose(),然後使用生成的 mono 儲存對新客戶端的引用。有時,事件驅動的 API 可能會讓人感覺有點奇怪。但可以將其視為此 RSocket 連線的 mono 向你傳送一個“我活著”的事件。你正在使用該建立事件來觸發列表中每個客戶端引用的儲存。

當連接出現問題時,RSocket 會呼叫 mono 的 doOnError() 方法。這包括客戶端選擇關閉連線的情況。你可以使用提供的 error 變數來決定採取什麼操作。在上面的程式碼中,錯誤只是被記錄為警告 (3)。

當 RSocket 連線關閉時,mono 的 doFinally() 方法會被觸發。此方法是執行從 CLIENTS 列表中刪除客戶端的程式碼的理想位置 (4)。

最後,subscribe() 啟用你新增到 mono 中的響應式程式碼,並表示你已準備好處理事件。

步驟 5:從客戶端請求可用記憶體讀數

當每個客戶端連線時,請求遙測讀數流。為此,在 connectShellClientAndAskForTelemetry() 方法中再次操作,你需要向之前新增的 client-status 訊息處理程式傳送請求。程式碼如下

requester.route("client-status")
        .data("OPEN")
        .retrieveFlux(String.class)
        .doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
        .subscribe();

使用 requester,目標路由 "client-status"。傳送字串 "OPEN" 作為訊息資料,並檢索型別為 StringFlux。每個到達的字串都包含客戶端當前的可用記憶體讀數。將此讀數記錄到控制檯。最後,不要忘記 subscribe() 來啟用你的響應式程式碼。

步驟 6:構建並執行 RSocket 伺服器

是時候測試你的程式碼了。開啟一個終端視窗,進入 rsocket-server 目錄,然後執行伺服器,如下所示

cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true

伺服器在 localhost7000 上啟動。

步驟 7:構建並執行 RSocket 客戶端

開啟第二個終端視窗,進入 rsocket-client 目錄。從那裡,構建並執行客戶端,如下所示

cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true

工作原理

啟動後,你會注意到客戶端和伺服器元件的控制檯中都有新的日誌條目。在 rsocket-client 視窗中,你將看到顯示客戶端唯一 ID(以 UUID 形式)和 "OPEN" 連線狀態的日誌條目,如下所示

Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>

至少等待 10 秒,然後在 shell:> 提示符下鍵入 exit。rsocket-client 現在斷開與伺服器的連線並關閉。

現在切換到 rsocket-server 視窗。日誌條目看起來像這樣

Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED

客戶端連線後,它會被新增到客戶端列表,控制檯會列印其客戶端 ID 和“CONNECTED”狀態。然後,每 5 秒,日誌會顯示客戶端當前的“可用記憶體”讀數。最後,當客戶端斷開連線時,其 RSocket 通道的狀態變為“CLOSED”,客戶端“DISCONNECTED”。

你可以透過在其終端視窗中按 Ctrl-C 來停止 rsocket-server 程序。

最後思考

能夠呼叫客戶端功能非常強大。它非常適合各種場景,包括移動裝置、物聯網或微服務。而且由於所有這些都可以透過 TCP 或 WebSockets 發生,因此你已經擁有所需的所有基礎設施,而無需訴諸訊息代理等重量級解決方案。

你涵蓋了很多內容。你學會了如何將伺服器變成“請求者”,將客戶端變成“響應者”。你發現瞭如何監聽連線事件。你還學到了一些如何處理來自 rsocket 連線的錯誤和事件的知識。而且,儘管在此練習中,你選擇了“請求-流”作為你的伺服器-客戶端通訊方式,但你可以根據需要使用四種 RSocket 互動模式中的任何一種。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有