領先一步
VMware 提供培訓和認證,助力您的進步。
瞭解更多閱讀時間:約 15 分鐘。
在本系列之前的文章中,您嘗試了使用 Spring Boot 和 RSocket 進行請求-響應和即發即忘的訊息模式。本次,您將嘗試 RSocket 的另一種全新的訊息模型——request-stream (請求-流)。
在此練習中,您將學習如何使用傳統的 'client-requests-a-server-stream'(客戶端請求伺服器流)方式進行資料流傳輸。
到目前為止我還沒有提到的一點是,RSocket 允許您在兩個方向上使用其訊息模型。因此,如果您想使用不太常見的 'server-requests-a-client-stream'(伺服器請求客戶端流)模型,RSocket 也沒有問題。此外,還有很多非 Java 的RSocket 實現可供選擇,包括 Go、Javascript 和 .Net——如果您的架構包含 Java 可能不是最佳選擇的平臺,這些實現將是理想之選。
澄清這一點後,請按照以下步驟,將流資料功能新增到您之前文章中現有的 RSocket 客戶端和伺服器程式碼中。
如果您還沒有閱讀之前關於伺服器端、客戶端請求-響應訊息傳遞或即發即忘的文章,現在是您的機會!程式碼示例在GitHub 上。
回到您的伺服器端 RSocketController
類,新增一個名為 .stream()
的新方法,其簽名符合 RSocket 對此訊息模型的期望 — '接受一個物件,返回一個 flux'。使用 @MessageMapping
註解標記此新方法,並指定一個合適的對映名稱 — 例如 "stream"
。方法的示例程式碼如下
@MessageMapping("stream")
Flux<Message> stream(Message request) {
log.info("Received stream request: {}", request);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(SERVER, STREAM, index))
.log();
}
RSocketController
位於rsocket-server
資料夾中的io.pivotal.rsocketserver
包內。
.stream()
方法的唯一引數 Message
來自之前討論過的 io.pivotal.rsocketserver.data
包。此訊息構成了客戶端請求資料流的基礎。上面的程式碼會在收到客戶端請求時立即將其記錄到控制檯。
該方法返回的 Flux
物件是Project Reactor的一部分,並且也在 Spring Framework 的響應式支援中得到使用。
RSocket 使用 Flux
,因為它極大地簡化了響應式資料流的處理。Flux
是資料的一個“Publisher”(釋出者)。它描述了 0 到 N 個元素的流,並提供了大量用於處理流資料的運算子 — 類似於Java 8 的 streaming APIs。
在上面的程式碼中,每秒會有一個新的 Long
元素新增到 Flux 中 — 透過呼叫 .interval()
設定 — 從而實質上提供了一個持續的資料流。.map()
函式使用 Long
值作為索引建立了一個新的訊息物件,而在最後一行,呼叫 .log()
方法會將流經 Flux 的所有元素(包括錯誤等)列印到控制檯。
在客戶端專案的 RSocketShellClient
類中,首先新增一個 Disposable
物件的全域性引用,如下所示
private static Disposable disposable;
RSocketShellClient
位於rsocket-client
資料夾中的io.pivotal.rsocketclient
包內。
一旦資料流開始,此 Disposable
物件允許您控制它。
接下來,向您的 RSocketShellClient
新增一個 .stream()
方法。使用 @ShellMethod
註解標記此方法。示例程式碼如下所示
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rsocketRequester
.route("stream")
.data(new Message(CLIENT, STREAM))
.retrieveFlux(Message.class)
.subscribe(er -> log.info("Response received: {}", er));
}
在上面的程式碼中,透過指定 "stream"
作為 .route()
,rsocketRequester
被告知將請求路由到伺服器的 .stream()
方法。一個新的訊息物件提供了請求的 .data()
。因為您希望伺服器返回一個流,所以您使用了 rsocketRequester
上的 .requestFlux()
方法,並指定返回的 Flux
包含型別為 Message
的元素。最後,您在 .subscribe()
方法中設定了一個日誌函式作為流的訂閱者。
注意
rsocketRequester
生成的Disposable
是如何被儲存的。您將需要它來停止流。
透過保留對流的引用,您可以在想要停止流時將其處理掉(dispose)。要將流取消功能新增到您的 RSocketShellClient
中,請新增一個名為 .s()
的新方法,並使用 @ShellMethod
註解標記它,如下所示
@ShellMethod("Stop streaming messages from the server.")
public void s(){
if(null != disposable){
disposable.dispose();
}
}
在該方法內部,呼叫 disposable.dispose()
會取消流。有了此方法後,要停止流,請在 shell:>
提示符下輸入 s
,然後按 Enter
鍵。流就會停止。您的編碼任務現已完成。接下來,測試客戶端和伺服器是否正常協同工作。
開啟終端視窗並進入 rsocket-server
目錄。使用 Maven 和 Spring Boot 外掛執行伺服器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器將在 localhost
的 7000
埠啟動。
開啟第二個終端視窗並進入 rsocket-client
目錄。然後,如下構建並執行 RSocket 客戶端應用程式
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,Spring Shell 會顯示一個新的提示符
shell:>
您可以在提示符下輸入 stream
來請求伺服器的流。客戶端傳送一個 Message
作為其對流的請求。流中的每個 Message
在伺服器傳送時和客戶端接收時都會被打印出來。客戶端的控制檯日誌看起來像這樣
shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)
要停止流,請在 shell:>
提示符下輸入 s
,然後按 Enter
鍵。
您可以透過在 shell:>
提示符下輸入 exit
來停止 rsocket-client
,如下所示。
shell:>exit
您可以透過在其終端視窗中按下 Ctrl-C
來停止 rsocket-server
程序。
RSocketShellClient
中的 .stream()
方法使用 RSocketRequester
向伺服器傳送單個請求訊息。此請求會啟動一個從伺服器到客戶端的資料流。然後客戶端會將收到的每條訊息記錄到控制檯。
伺服器端的 RSocketController
檢查請求訊息的元資料以查詢 route
。此訊息的路由設定為 "stream"
,因此伺服器將訊息傳遞給相應的 .stream(Message request)
方法。然後伺服器開始每秒向客戶端傳送一條訊息流,直到客戶端要求其停止。
客戶端可以隨時停止流。在 .s()
方法中,這是透過呼叫原始流訂閱返回的 Disposable
物件的 .dispose()
方法來實現的。
在本文中,您學習瞭如何在 Spring Boot 中使用 RSocket 構建 request-stream (請求-流) 功能。在下一篇文章中,我們將新增 channel messaging (通道訊息傳遞) 功能。下次見!