領先一步
VMware 提供培訓和認證,助力您的進步。
瞭解更多閱讀時間:約7分鐘。 編碼時間:約20分鐘。
如果您一直在關注我關於 RSocket 的系列文章,您會多次聽到我提及“客戶端和伺服器”。但是,對於 RSocket,客戶端和伺服器之間的界限是模糊的。使用 RSocket,伺服器可以向客戶端傳送訊息,並且客戶端可以像伺服器一樣響應這些請求。
事實上,RSocket 文件不使用“客戶端”或“伺服器”這兩個術語。文件使用的是“請求者”和“響應者”這兩個術語。在 RSocket 中,任何元件都可以充當請求者,任何元件都可以充當響應者,甚至可以同時充當兩者。在 RSocket 中,請求者和響應者之間所有這些雙向通訊都透過一個“雙向”連線進行。
今天,您將利用這些特性,透過程式設計讓您的 rsocket-client 響應來自伺服器的請求。在伺服器端程式碼中,您將監聽客戶端連線事件,當連線事件發生時,會將新客戶端儲存在已連線客戶端列表中。您還將要求每個客戶端在其連線處於活動狀態期間,向伺服器流式傳輸遙測訊息。
首先,做一些整理工作。Spring Boot 和 RSocket Java 庫最近都進行了更新。這些更新包括常規的錯誤修復、增強功能和棄用項,因此升級對我們有利。
在 Maven 的 pom.xml
檔案的 <parent>
部分,將 spring-boot-starter-parent
的版本更改為 2.3.0.RELEASE
,如下所示。您需要這樣做兩次,因為您有兩個單獨的專案 — rsocket-client
和 rsocket-server
。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
要重新整理您的程式碼,請在兩個專案的根資料夾中執行以下命令
./mvnw clean compile
現在您可以繼續編碼任務了。
客戶端需要能夠響應來自伺服器的訊息。在 rsocket-client
專案的 RSocketShellClient.java
中,新增一個名為 ClientHandler
的新內部類,如下所示
@Slf4j
class ClientHandler {
@MessageMapping("client-status")
public Flux<String> statusUpdate(String status) {
log.info("Connection {}", status);
return Flux.interval(Duration.ofSeconds(5)).map(index -> String.valueOf(Runtime.getRuntime().freeMemory()));
}
}
這個類包含一個名為 statusUpdate()
的方法,該方法使用 @MessageMapping
註解修飾,就像 rsocket-server
專案中的註解一樣。客戶端使用這個類和這個方法來捕獲和響應來自伺服器的請求。響應本身是一個訊息流。每隔 5 秒,客戶端就會告訴伺服器當前的空閒記憶體。可以將其視為客戶端遙測資料。
為了使其工作,您必須將這個類“註冊”到您的 RSocket 連線中。您將在下一節中進行此操作。
在客戶端能夠響應來自伺服器的訊息之前,它必須將 ClientHandler
註冊到 RSocket 連線中。修訂後的建構函式程式碼如下所示。請注意建構函式的方法簽名發生了變化,添加了 RSocketStrategies
變數。Spring 會將此變數提供給您的建構函式。用下面列出的新程式碼替換您舊的建構函式程式碼。
public RSocketShellClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
// (1)
String client = UUID.randomUUID().toString();
log.info("Connecting using client ID: {}", client);
// (2)
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// (3)
this.rsocketRequester = rsocketRequesterBuilder
.setupRoute("shell-client")
.setupData(client)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", 7000)
.block();
// (4)
this.rsocketRequester.rsocket()
.onClose()
.doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED"))
.subscribe();
}
在上面的程式碼中,您首先生成並存儲一個唯一 ID,用於標識此客戶端例項 (1)。接下來,使用 RSocket strategies
和一個新的 ClientHandler
例項建立一個新的 SocketAcceptor
(2)。然後使用 RSocketRequesterBuilder
註冊新的 SocketAcceptor
(3)。最後,透過處理 RSocket 的 onClose()
事件來確保優雅地處理斷開連線 (4)。
客戶端程式碼就這些了。讓我們繼續伺服器端的修改。
在 rsocket-server
專案中要做的第一件事是透過向 RSocketController.java
類新增一個類級別變數來建立 RSocketRequester
客戶端集合,如下所示
private final List<RSocketRequester> CLIENTS = new ArrayList<>();
接下來,透過新增一個新方法來新增一個連線處理器,如下所示
@ConnectMapping("shell-client")
void connectShellClientAndAskForTelemetry
(RSocketRequester requester, @Payload String client) {
// The code for the method will go HERE
}
@ConnectMapping
註解允許您監聽發生的客戶端連線事件。使用此事件,您可以安排兩項工作。第一項是將每個新客戶端新增到 CLIENTS
列表中。第二項是呼叫每個客戶端並啟動其遙測流。
將以下程式碼新增到您剛剛建立的方法中
requester.rsocket()
.onClose() // (1)
.doFirst(() -> {
log.info("Client: {} CONNECTED.", client);
CLIENTS.add(requester); // (2)
})
.doOnError(error -> {
log.warn("Channel to client {} CLOSED", client); // (3)
})
.doFinally(consumer -> {
CLIENTS.remove(requester);
log.info("Client {} DISCONNECTED", client); // (4)
})
.subscribe();
首先要注意的是對 requester.rsocket().onClose()
方法的呼叫 (1)。此方法返回一個響應式的 Mono
物件,其中包含您需要的所有回撥。
mono 的 doFirst()
方法在任何對 subscribe()
的呼叫之前被呼叫,但在 mono 初始建立之後。使用此方法將客戶端的 requester
物件新增到 CLIENTS
列表 (2)。
這段程式碼可能感覺有點反直覺——在客戶端連線時呼叫
onClose()
,然後使用返回的 mono 來儲存新客戶端的引用。有時,事件驅動的 API 可能會感覺有點奇怪。但可以將其視為此 RSocket 連線的 mono 向您傳送了一個“我還活著”的事件。您正在使用該建立事件來觸發在列表中儲存每個客戶端的引用。
當連接出現問題時,RSocket 會呼叫 mono 的 doOnError()
方法。這包括客戶端選擇關閉連線的情況。您可以使用提供的 error
變數來決定採取什麼操作。在上面的程式碼中,錯誤簡單地被記錄為警告 (3)。
mono 的 doFinally()
方法在 RSocket 連線關閉時觸發。此方法是在列表 CLIENTS
中移除客戶端的理想位置 (4)。
最後,subscribe()
啟用您已新增到 mono 的響應式程式碼,併發出訊號表示您已準備好處理事件。
當每個客戶端連線時,請求一個遙測讀數流。為此,您需要在 connectShellClientAndAskForTelemetry()
方法中再次工作,向您之前新增的 client-status
訊息處理器傳送請求。程式碼如下所示
requester.route("client-status")
.data("OPEN")
.retrieveFlux(String.class)
.doOnNext(s -> log.info("Client: {} Free Memory: {}.",client,s))
.subscribe();
使用 requester
,定位路由 "client-status"
。傳送字串 "OPEN"
作為訊息資料,並檢索一個 String
型別的 Flux
。到達的每個字串都包含客戶端當前的空閒記憶體讀數。將此讀數記錄到控制檯。最後,不要忘記呼叫 subscribe()
來啟用您的響應式程式碼。
是時候測試您的程式碼了。開啟一個終端視窗,切換到 rsocket-server
目錄,並按如下方式執行伺服器
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器在 localhost
的 7000
埠啟動。
開啟第二個終端視窗,切換到 rsocket-client
目錄。然後,按如下方式構建並執行客戶端
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,您會注意到客戶端和伺服器元件在控制檯中都有新的日誌條目。在 rsocket-client
視窗中,您會看到顯示客戶端唯一 ID(UUID 形式)和 "OPEN"
連線狀態的日誌條目,如下所示
Connecting using client ID: 0acc1c60-4bc4-444d-bb82-eb6b510f4168
Connection OPEN
Started RsocketShellClientApplication in 1.516 seconds (JVM running for 1.814)
shell:>
至少等待 10 秒鐘,然後在 shell:>
提示符下鍵入 exit
。rsocket-client 現在會斷開與伺服器的連線並關閉。
現在切換到 rsocket-server
視窗。日誌條目看起來像這樣
Started RsocketServerApplication in 0.945 seconds (JVM running for 1.248)
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CONNECTED.
Client: 0acc1c60-4bc4-444d-bb82-eb6b510f4168 Free Memory: 211317712.
Channel to client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 CLOSED
Client 0acc1c60-4bc4-444d-bb82-eb6b510f4168 DISCONNECTED
客戶端連線時,會被新增到客戶端列表中,控制檯會列印其客戶端 ID 和狀態“CONNECTED”。然後,每隔 5 秒,日誌會顯示客戶端當前的“空閒記憶體”讀數。最後,當客戶端斷開連線時,其 RSocket 通道的狀態變為“CLOSED”,客戶端狀態變為“DISCONNECTED”。
您可以透過在 rsocket-server
的終端視窗中按 Ctrl-C
來停止該程序。
呼叫客戶端的能力非常強大。它適用於各種場景,包括移動裝置、物聯網或微服務。由於所有這些都可以透過 TCP 或 WebSockets 進行,您無需依賴訊息代理等重量級解決方案,就已擁有所需的所有基礎設施。
您在這裡學習了很多內容。您學習瞭如何將伺服器變成“請求者”,將客戶端變成“響應者”。您瞭解瞭如何監聽連線事件。您還學習了一點關於如何處理來自 rsocket 連線的錯誤和事件的知識。並且,儘管在此練習中您選擇了“請求-流”作為您的伺服器-客戶端通訊模式,但您可以根據您的需要使用四種 RSocket 互動模式中的任何一種。