領先一步
VMware 提供培訓和認證,助力你快速提升。
瞭解更多閱讀時間:約6分鐘。實踐時間:約20分鐘。
如果你和我一樣,剛剛開始你的 RSocket 之旅,不妨瞭解一下 RSocket 協議背後的動機。這份簡短但富有洞察力的文件中有一句話讓我深有感觸——“抽象不匹配會增加系統開發成本”。
從軟體設計的角度來看,RSocket 的四種互動模型提供了顯著優勢。這意味著我們可以針對每個用例,使用正確的互動模型來建模元件間的通訊。這種更高效的模型可以為你節省大量的編碼時間和精力!
到目前為止,在本系列中,我們已經探討了請求-響應、即發即忘以及請求-流訊息傳遞。今天你將為客戶端和伺服器程式碼新增通道。
通道是雙向管道,允許資料流在兩個方向流動。使用通道,客戶端到伺服器的資料流可以與伺服器到客戶端的資料流共存。通道在現實世界中有許多用途。通道可以承載視訊會議流、傳送和接收雙向聊天訊息、使用增量和差異同步資料,或者提供報告、觀察和監控系統的方式。
RSocket 中的通道並不比流或請求-響應複雜。話雖如此,下面將要實現的場景比你之前嘗試的要稍微複雜一些,所以在開始之前最好先理解它。
在接下來的練習中,伺服器將訊息流式傳輸到客戶端。客戶端控制伺服器流中訊息的頻率。它透過使用“延遲”設定流來實現這一點。客戶端流中的設定告訴伺服器傳送每條訊息之間應該暫停多長時間。可以將其視為一個訊息頻率調節盤。頻率設定高時,暫停時間短,你會看到很多伺服器傳送的訊息。頻率設定低時,暫停時間長,你會看到較少的伺服器傳送的訊息。考慮到這個結果,我們開始編碼吧。
完整的程式碼示例可在 GitHub 上獲取。
RSocketController
位於rsocket-server
資料夾的io.pivotal.rsocketserver
包中。RSocketShellClient
位於rsocket-client
資料夾的io.pivotal.rsocketclient
包中。
RSocketController
在伺服器端的 RSocketController
類中,新增一個名為 channel()
的方法,該方法接受一個 Flux<Duration>
並返回一個 Flux<Message>
。這種方法簽名——Flux 輸入,Flux 輸出——將其標識為 RSocket 通道方法。使用值 "channel"
為該方法新增 @MessageMapping()
註解。此方法的程式碼如下。
@MessageMapping("channel")
Flux<Message> channel(final Flux<Duration> settings) {
return settings
.doOnNext(setting -> log.info("\nFrequency setting is {} second(s).\n", setting.getSeconds()))
.switchMap(setting -> Flux.interval(setting)
.map(index -> new Message(SERVER, CHANNEL, index)))
.log();
}
在程式碼中,.doOnNext()
監聽來自客戶端的設定流。每當新的 delay
設定到達時,它會將訊息寫入日誌。.switchMap()
為每個新設定建立一個新的 Flux。這個新的 Flux 基於 delay
設定中包含的 .interval()
延遲發出新的 Message
物件。伺服器將這些新訊息透過流傳送回客戶端。
RSocketShellClient
在客戶端的 RSocketShellClient
類中,新增一個新的 channel()
方法,並使用 @ShellMethod()
註解對其進行標註。將方法用途的描述作為註解的值,如下例所示。
@ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
public void channel(){
// Code goes here
}
接下來,在該方法中,新增建立設定流的程式碼。程式碼如下:
Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));
Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
.doOnNext(d -> log.info("\nSending setting for {}-second interval.\n", d.getSeconds()));
每個 Mono
包含一個 Duration
設定。每個持續時間控制來自伺服器的每條訊息之間的暫停。總共有 3 個 Mono。第一個包含 1 秒的短暫停設定。第二個有更寬鬆的 3 秒暫停設定,但使用 .delayElement()
方法指示此 Mono 延遲 5 秒再產生此持續時間。第三個 Mono 包含 5 秒的暫停設定,但在 15 秒過去之前不會發出其持續時間。這 3 個 Mono 使用 .concat()
方法連線成一個 Flux
。使用 .doOnNext()
添加了日誌記錄語句,以便在程式碼執行時可以看到發生了什麼。
注意:構建基於
Flux
的流有很多方法,但對於本教程來說,只是一個簡單的示例。
現在,Flux 中有了設定流,在該方法中新增與伺服器通訊所需的程式碼。
disposable = this.rsocketRequester
.route("channel")
.data(settings)
.retrieveFlux(Message.class)
.subscribe(message -> log.info("Received: {} \n(Type 's' to stop.)", message));
如果你一直跟著本系列,現在應該對這段程式碼很熟悉了。rsocketRequester
是你在建構函式中建立的全域性變數。它提供了你與伺服器的 RSocket 通訊連結。.route()
設定為 "channel"
,以匹配伺服器端程式碼中的訊息對映。.data()
是你在上面建立的 mono 流。.retrieveFlux()
指定你期望收到 Message
物件的流,而 .subscribe()
開始你的訂閱,並確保收到的每條訊息都列印到日誌中,以便你看到發生了什麼。訂閱建立的 Disposable
物件被保留並用於控制通道。
你可以在這裡檢視該方法的完整程式碼。這就是我們需要的所有程式碼。讓我們執行它!
開啟一個終端視窗,進入 rsocket-server
目錄,然後按如下方式執行伺服器:
cd rsocket-server
./mvnw clean package spring-boot:run -DskipTests=true
伺服器將在 localhost
的 7000
埠啟動。
開啟一個第二個終端視窗,進入 rsocket-client
目錄。然後按如下方式構建並執行客戶端:
cd rsocket-client
./mvnw clean package spring-boot:run -DskipTests=true
啟動後,Spring Shell 將為你提供一個新的提示符:
shell:>
在提示符下輸入 channel
來向伺服器請求一個通道。
客戶端建立延遲計時器設定流並開始將其傳送到伺服器。出站流中的每個持續時間都會被客戶端和伺服器打印出來。伺服器傳送回訊息流,客戶端將其列印到日誌中。客戶端的終端看起來像這樣:
shell:>channel
i.p.rsocketclient.RSocketShellClient :
Sending setting for 1-second interval.
i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304561)
(Type 's' to stop.)
# removed log-lines
i.p.rsocketclient.RSocketShellClient :
Sending setting for 3-second interval.
i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=0, created=1585304568)
(Type 's' to stop.)
# removed log-lines
i.p.rsocketclient.RSocketShellClient :
Sending setting for 5-second interval.
2020-03-27 10:23:00.243 INFO 5680 --- [tor-tcp-epoll-1] i.p.rsocketclient.RSocketShellClient : Received: Message(origin=Server, interaction=Channel, index=4, created=1585304580)
(Type 's' to stop.)
# removed log-lines
要停止通道傳輸,輸入 s
然後按 Enter
鍵。
你可以在 shell:>
提示符下輸入 exit
來退出 rsocket-client
,像這樣:
shell:>exit
你可以在 rsocket-server
的終端視窗中按下 Ctrl-C
來停止其程序。
客戶端建立一個包含 3 個持續時間元素的序列。第一個持續時間設定立即發出,第二個在 5 秒後發出,第三個在 15 秒後發出。每次發出新的持續時間時,都會記錄到控制檯。這個設定流被髮送到伺服器,並控制伺服器生成的訊息流。
在伺服器端,每當從客戶端流中提取新的持續時間設定時,就會建立一個新的訊息流並返回。客戶端傳送的最新的設定控制著此伺服器傳送流中每條訊息之間的時間延遲。
當客戶端釋放訂閱的 disposable 物件時,通道傳輸停止。
如果你完整地閱讀了本系列,你現在已經看到了 RSocket 的所有四種互動模型的實際應用:請求-響應、即發即忘、請求-流,以及現在的通道。
擁有這四種通訊風格,你將大大降低遇到我們在開頭討論的那些惱人的“抽象不匹配”場景的可能性。將 RSocket 納入你的工具箱,你將擁有一種靈活、低開銷、高效能的訊息協議,可以在你的軟體中使用——一個專為微服務架構而構建的協議。