領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多時間:約 15 分鐘。
在本系列的上一篇文章中,您嘗試了使用 RSocket 在 Spring Boot 中進行請求-響應和即發即棄訊息傳遞。這次您將嘗試 RSocket 的另一種全新訊息傳遞模型——請求流。
在此練習中,您將學習如何使用傳統的“客戶端請求伺服器流”方法傳輸資料。
到目前為止,我還沒有提到的一點是 RSocket 允許您在兩個方向上使用其訊息模型。因此,如果您想使用不太常見的“伺服器請求客戶端流”模型,RSocket 也沒問題。此外,還有許多非 Java 的 RSocket 實現可供選擇,包括 Go、Javascript 和 .Net——如果您的架構包含 Java 可能不是最佳選擇的平臺,這會非常理想。
澄清了這一點後,請按照以下步驟將流資料功能新增到您現有來自之前帖子的 RSocket 客戶端和伺服器程式碼中。
如果您沒有閱讀之前關於伺服器端、客戶端請求-響應訊息傳遞或即發即棄的帖子,現在就是您的機會!程式碼示例在 GitHub 上。
再次在您的伺服器端 RSocketController 類中工作,新增一個名為 .stream() 的新方法,其簽名是 — '接受一個物件,返回一個 Flux' — 這是 RSocket 對此訊息模型所期望的。使用 @MessageMapping 註解此新方法,指定一個合適的對映名稱 — 例如 "stream"。該方法的示例程式碼如下
@MessageMapping("stream")
Flux<Message> stream(Message request) {
log.info("Received stream request: {}", request);
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(SERVER, STREAM, index))
.log();
}
RSocketController位於rsocket-server資料夾中的io.pivotal.rsocketserver包中。
.stream() 方法的唯一引數 Message 來自之前討論過的 io.pivotal.rsocketserver.data 包。此訊息構成了客戶端請求資料流的基礎。一旦收到,上面的程式碼會將客戶端的請求記錄到控制檯。
該方法返回的 Flux 物件是 Project Reactor 的一部分,也用於 Spring Framework 的響應式支援。
RSocket 使用 Flux 是因為它極大地簡化了響應式資料流的處理。Flux 是資料的“釋出者”。它描述了 0 到 N 個元素的流,並提供了大量用於處理流資料的運算子 — 類似於 Java 8 的流 API。
在上面的程式碼中,每秒都會向 Flux 新增一個新的 Long 元素 — 透過 .interval() 呼叫設定 — 基本上提供了一個持續的資料流。.map() 函式使用 Long 作為索引值建立一個新的訊息物件,並且在最後一行,呼叫 .log() 方法會將流經 Flux 的所有元素(包括錯誤等)列印到控制檯。
在客戶端專案的 RSocketShellClient 類中,首先新增一個全域性的 Disposable 物件引用,如下所示
private static Disposable disposable;
RSocketShellClient位於rsocket-client資料夾中的io.pivotal.rsocketclient包中。
這個 Disposable 物件允許您在資料流啟動後控制它。
接下來,向您的 RSocketShellClient 新增一個 .stream() 方法。使用 @ShellMethod 註解此方法。其示例程式碼如下
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("\nRequest-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rsocketRequester
.route("stream")
.data(new Message(CLIENT, STREAM))
.retrieveFlux(Message.class)
.subscribe(er -> log.info("Response received: {}", er));
}
在上面的程式碼中,透過將 "stream" 指定為 .route(),rsocketRequester 被告知將請求路由到伺服器的 .stream() 方法。一個新的訊息物件為您的請求提供了 .data()。因為您希望伺服器返回一個流,所以您在 rsocketRequester 上使用了 .requestFlux() 方法,指定返回的 Flux 包含 Message 型別的元素。最後,您在 .subscribe() 方法中設定了一個日誌函式作為您流的訂閱者。
請注意,
rsocketRequester生成的Disposable正在被保留。您將需要它來停止流。
透過保留對流的引用,您可以在需要停止流時將其丟棄。要將流取消功能新增到您的 RSocketShellClient,請新增一個名為 .s() 的新方法,並使用 @ShellMethod 對其進行註解,如下所示
@ShellMethod("Stop streaming messages from the server.")
public void s(){
if(null != disposable){
disposable.dispose();
}
}
在方法內部,呼叫 disposable.dispose() 會取消流。有了這個方法,要停止流,請在 shell:> 提示符處鍵入 s,然後按 Enter。流將停止。您的編碼任務現已完成。接下來,測試客戶端和伺服器是否協同工作。
開啟一個終端視窗並移動到 rsocket-server 目錄。使用 Maven 和 Spring Boot 外掛執行伺服器,如下所示
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器在 localhost 埠 7000 上啟動。
開啟第二個終端視窗並移動到 rsocket-client 目錄。從那裡,構建並執行 RSocket 客戶端應用程式,如下所示
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,Spring Shell 會為您顯示一個新的提示符
shell:>
您可以透過在提示符處鍵入 stream 來請求伺服器的流。客戶端傳送一個 Message 作為其對流的請求。流中的每個 Message 都會在伺服器傳送時和客戶端接收時打印出來。客戶端上的控制檯日誌看起來像這樣
shell:>stream
Request-Stream. Sending one request. Waiting for responses (Type 's' to stop)...
New Response: Message(origin=Server, interaction=Stream, index=0, created=1583923683) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=1, created=1583923684) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=2, created=1583923685) (Type 's' to stop.)
New Response: Message(origin=Server, interaction=Stream, index=3, created=1583923686) (Type 's' to stop.)
要停止流,請在 shell:> 提示符處鍵入 s,然後按 Enter。
您可以透過在 shell:> 提示符處鍵入 exit 來停止 rsocket-client,如下所示。
shell:>exit
你可以透過在其終端視窗中按 Ctrl-C 來停止 rsocket-server 程序。
RSocketShellClient 中的 .stream() 方法使用 RSocketRequester 向伺服器傳送單個請求訊息。此請求會啟動從伺服器到客戶端的資料流。然後客戶端會將收到的每條訊息記錄到控制檯。
伺服器端的 RSocketController 檢查請求訊息的元資料以獲取 route。此訊息的路由設定為 "stream",因此伺服器將訊息傳遞給相應的 .stream(Message request) 方法。然後伺服器會每秒向客戶端傳送訊息流,直到客戶端要求其停止。
客戶端可以隨時停止流。在 .s() 方法中,這是透過在原始流訂閱返回的 Disposable 上呼叫 .dispose() 來完成的。
在這篇文章中,您學習瞭如何使用 Spring Boot 中的 RSocket 構建請求流功能。在下一篇文章中,我們將新增通道訊息傳遞。下次再見!