使用 Spring、協程和 Kotlin Flow 走向響應式

工程 | Sébastien Deleuze | 2019年4月12日 | ...

自從我們在2017年1月宣佈 Spring Framework 官方支援 Kotlin 以來,發生了很多事情。在 Google I/O 2017 大會上,Kotlin 被宣佈成為官方 Android 開發語言,我們繼續改進 Spring 生態系統中對 Kotlin 的支援,Kotlin 本身也持續發展,加入了 協程 等關鍵新特性。

我想借著 Spring Framework 5.2 的第一個里程碑版本 釋出的機會,概述一下 Spring 和 Kotlin 的現狀。我將盡力關注具體的改進,因為我相信 Spring 和 Kotlin 擁有相同的實用主義思維。

我認為這一切都與選擇有關。我們(Spring 團隊)提供了多種選擇,但同時作為應用開發者,你在啟動新的 Spring Boot 應用時也需要做出選擇。例如:

  • 我應該使用哪種語言?

  • 註解式 @Controller 還是函式式風格?

  • Spring MVC 還是 WebFlux?

這些問題顯然是高度主觀的,通常取決於專案背景,但我將分享我的個人觀點。

Java 還是 Kotlin?

Java 是顯而易見的預設選擇,但 Kotlin 是一個越來越受歡迎的替代方案。開發者從 Java 轉向 Kotlin 的原因是什麼?當人們問我時,我通常會說 Kotlin 讓 Java 開發者能夠利用現有技能編寫更短、更安全、表達力更強的程式碼。但要做出明智的選擇,我們需要確定更具體的要點。

我最喜歡的 Kotlin 特性是它將 null,這個所謂的(多次)“十億美元的錯誤”,變成了一項安全特性。Java 的錯誤不在於 null 本身,而在於其型別系統中沒有明確管理 null,導致了類似於動態語言中觀察到的問題。Kotlin 在其型別系統中擁抱 null,並用它來處理值的缺失。在 Kotlin 中,像 String 這樣的型別是不可空的,因此可以安全地使用,而像 String? 這樣的型別是可空的,應該謹慎使用。好訊息是 Kotlin 編譯器會在編譯時報告潛在的錯誤,你可以使用安全呼叫Elvis 運算子null 執行塊來優雅地處理它們。與 Java 的 Optional 不同,Kotlin 的空安全也適用於輸入引數,並且不會強制你使用影響程式碼效能和可讀性的包裝器。

DSL(領域特定語言)也是 Kotlin 的另一大亮點。Gradle Kotlin DSL(對 start.spring.io 的支援即將到來)就是一個很好的例子,它利用 Kotlin 的靜態型別特性,提供了一個非常豐富靈活的 API,具有出色的可發現性和信心。Spring Framework 為 bean 定義函式式路由甚至 MockMvc 提供了 Kotlin DSL。

我還可以詳細介紹很多其他的好理由,比如帶預設值的可選引數、與 Java API(如 Spring)的出色互操作性擴充套件函式、為了避免型別擦除而使用的具體化型別引數資料類或預設鼓勵的不可變性,但我認為你應該透過示例學習 Kotlin,最終藉助參考文件,並自己做出判斷。你也可以按照這個使用 Kotlin 的 Spring Boot 循序漸進教程進行學習。

所以,就說我會在下一個 Spring Boot 專案中選擇 Kotlin 吧 ;-)

註解式 @Controller 還是函式式風格?

正如我在引言中所說,選擇取決於上下文和個人品味。考慮到 Kotlin 優秀的 DSL 和函數語言程式設計能力,我非常喜歡使用 Kotlin 的函式式路由。我甚至正在探索如何透過實驗性的 Kofu DSL for Spring Boot 以函式式方式定義 Spring Boot 應用配置,該專案目前正在 Spring Fu 倉庫中孵化。

但今天,假設我的團隊由多年來習慣使用 @Controller 程式設計模型的開發者組成,而且我不想同時改變所有東西,所以讓我們保留 @Controller

Spring MVC 還是 WebFlux?

我們在 Web 框架方面提出的選擇如下。

你可以繼續使用 Spring MVC 和所有我們持續改進的相關成熟技術:Tomcat、JPA 等。你甚至可以透過使用現代的 WebClient API 替代 RestTemplate 來利用一些響應式功能。

但我們也提供了一個響應式棧,其中包括 WebFlux,這是一個基於 Reactive Streams 的 Web 框架,適用於那些需要更高可伸縮性、對延遲不敏感(對微服務架構有用)以及更好的流處理能力的開發者。生態系統的其他部分,如 Spring Data 和 Spring Security 也提供了響應式支援。

在 Java 中使用 Reactor API 的 WebFlux

到目前為止,使用基於 WebFlux 的 Spring 響應式棧需要一個相當大的轉變,即透過使用 Reactor 的 MonoFlux 或 RxJava 類似型別等 API,將 IO 相關功能(Web、持久化)從命令式風格切換到宣告式/函式式風格。這種顛覆性的方法與指令式程式設計相比提供了實際優勢,但它也非常不同,需要不低的學習曲線。

讓我們透過具體的程式碼看看這意味著什麼,並藉此機會向你展示如何使用 R2DBC(基於 Reactive Streams 的 JDBC 替代方案)和 Spring Data R2DBC 以響應式方式訪問 SQL 資料庫。

如果選擇 Java,我們會編寫如下所示的 UserRepository 類,它使用 Spring Data R2DBC 提供的 DatabaseClient API 暴露一個響應式 API 來訪問 SQL 資料庫。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

儲存使用者操作可以採用 fork-join 方式完成,因為這些操作彼此不依賴,但為了比較,我使用了透過 then() 串聯的順序操作。

你可以看到,在這種 API 中,void 變成了 Mono<Void>User 變成了 Mono<User>。這允許以非阻塞方式使用它們,並提供了豐富的運算子。但這同時也強制要求使用 Mono 包裝器,並顯著改變了這些 API 的使用方式。例如,如果某些操作需要按順序執行,就像在 init() 方法中那樣(命令式程式碼很簡單),在這裡我們必須使用 then 運算子構建一個宣告式管道。

Flux<User> 提供了更多的附加價值,因為它允許將傳入的使用者作為流進行處理,而阻塞式棧中常用的 List<User> 的使用意味著在處理之前將所有資料載入到記憶體中。注意,這裡我們也可以使用 Mono<List<User>>

在控制器端,你可以看到 Spring WebFlux 原生地支援這些響應式型別,你還可以看到基於 Reactive Streams 的 API 的另一個特點,即異常主要被用作由響應式型別攜帶的錯誤訊號,而不是像常規命令式程式碼那樣被丟擲。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

在 Kotlin 中使用協程 API 的 WebFlux

重要的是要理解,Spring 的響應式支援是構建在 Reactive Streams 之上的,並且考慮了互操作性,Reactor 被用於兩個不同的目的

  • 它是我們在 Spring 響應式基礎設施中處處使用的 Reactive Streams 實現

  • 它也是預設暴露的響應式公共 API

但 Spring 的響應式支援從一開始就被設計為易於適應其他非同步或響應式 API,如 CompletableFuture、RxJava 2,以及現在支援的協程。在這種情況下,我們內部仍然利用 Reactor,但在公共 API 層面適應不同的終端使用者響應式 API。

當然,如果你喜歡這種方法,在 Kotlin 中繼續使用 FluxMono 是完全可以的,但 Spring Framework 5.2 引入了一個新的重要特性:我們現在可以使用 Kotlin 協程 以更命令式的方式利用 Spring 響應式棧。

協程是 Kotlin 的輕量級執行緒,允許以命令式方式編寫非阻塞程式碼。在語言層面,由 suspend 關鍵字標識的掛起函式為非同步操作提供了抽象,而在庫層面,kotlinx.coroutines 提供了 async { } 等函式以及 Flow 等型別,Flow 是協程世界中的 Flux 等價物。

當類路徑中包含 kotlinx-coroutines-corekotlinx-coroutines-reactor 依賴時,協程支援將被啟用

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那麼,用 Kotlin 代替 Java 編寫,並使用協程和 Flow 代替 MonoFluxUserRepositoryUserController 會是什麼樣子呢?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

你可以在這裡看到,我們不是返回例如 Mono<User>,而是在一個掛起函式中返回 User(或者更準確地說是它的可空變體 User?),這個掛起函式可以以命令式方式使用。init() 方法實現中的差異很好地說明了這一點,因為我們現在使用的是常規的命令式程式碼,而不是鏈式呼叫 then

但是等等,我怎麼能直接在 DatabaseClient 型別上使用協程呢?它是一個基於 MonoFlux 的響應式 API。這是可能的,因為 Spring Data R2DBC 也提供了 Kotlin 擴充套件(例如請參閱此處),一旦匯入,這些擴充套件允許你在 DatabaseClient 上新增基於協程的方法。按照約定,掛起方法以 await 為字首或以 AndAwait 為字尾,並且名稱與其基於 Mono 的對應方法相似。

現在讓我們更深入地瞭解一下 Flow<User> 返回型別。首先,請注意我們指的是 kotlinx.coroutines.flow.Flow,而不是 Java 9+ 提供的 Reactive Streams 容器型別 java.util.concurrent.Flow

你會像使用 Java 8+ 的 Stream 或其 Kotlin 等價物 Sequence 一樣使用 Flow API,但巨大的區別在於它適用於非同步操作並管理背壓。所以它是協程世界中的 Flux 等價物,適用於熱流或冷流,有限流或無限流,主要區別如下:

  • Flow 是基於推送的,而 Flux 是推拉混合的

  • 背壓透過掛起函式實現

  • Flow 只有一個掛起方法 collect,運算子實現為擴充套件函式

  • 藉助協程,運算子易於實現

  • 擴充套件函式允許向 Flow 新增自定義運算子

  • 收集操作是掛起函式

  • map 運算子支援非同步操作(無需 flatMap),因為它接受一個掛起函式引數

現在讓我們看一下控制器的協程版本

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

同樣,你可以看到程式碼與我們在 Spring MVC 中使用的常規命令式程式碼非常接近。

除了為基於 FluxMono 的 API(如 WebClientServerRequestServerResponse)提供協程擴充套件之外,Spring WebFlux 現在還原生支援註解式 @Controller 類中的掛起函式和 Flow 返回型別。

使用命令式程式碼進行非同步操作

讓我們利用 WebClient 協程擴充套件來看看如何串聯非同步呼叫。我們將請求一個遠端 HTTP 端點來獲取額外的 UserDetail1UserDetail2

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

這裡我們使用 WebClient 協程擴充套件,如 awaitExchange()awaitBody(),以純粹命令式的方式執行非同步和非阻塞操作。而且由於 Flowmap 運算子接受一個掛起函式引數,我們可以在其中執行這樣的操作,不像在 Java 中使用響應式 API 那樣需要 flatMap

並行分解

如前所述,協程預設是順序執行的,但它們也可以用於並行執行操作。讓我們重構前面的示例,使其併發執行兩個遠端呼叫。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

這裡我們利用結構化併發,透過 async {} 構建器建立 Deferred<UserDetail1>Deferred<UserDetail2> 例項來觸發兩個使用者詳情的並行檢索,然後透過呼叫兩個 await() 方法等待它們完成,這些方法將在可用時返回 UserDetail1UserDetail2 例項。

結論

我認為將 Spring 響應式棧與協程和 Kotlin Flow API 結合使用,在命令式和宣告式方法之間提供了一個有趣的權衡。它以一種非常易於理解的方式,讓你能夠利用 WebFlux 和 Spring Data 響應式的可伸縮性和特性。

Spring WebFlux 和 Spring Data 中的協程支援將作為即將釋出的 Spring Boot 2.2 版本的一部分提供。你可以閱讀參考文件,並期待進一步的改進,例如對 RSocket @MessageMapping 端點和 RSocketRequester 擴充套件的協程支援。Spring Data Moore 也將在 Spring Data MongoDB、Cassandra 和 Redis 上提供類似的協程擴充套件。並且 Spring Data 可能會在某個時候提供對協程倉庫的支援。我們還將使 Reactor 和協程上下文互操作,以支援安全和響應式事務。

最後,我想感謝許多才華橫溢的工程師,沒有他們,這一切都不可能實現

  • 來自 Kotlin 團隊的 Roman Elizarov 和 Vsevolod Tolstopyatov,感謝他們在協程和 Flow 方面令人難以置信的工作

  • Konrad Kaminski,感謝他最初由社群驅動的 Spring 協程支援工作

  • Jake Wharton,感謝他早期圍繞統一 Rx 和協程進行的原型設計

  • Stéphane Maldini 和 David Karnok,感謝他們鼓舞人心的工作

  • Juergen Hoeller, Rossen Stoyanchev 和 Brian Dussault,感謝他們的信任

  • Mark Paluch 和 Oliver Drotbohm,感謝他們在持久化方面的支援

像往常一樣,我期待收到反饋,以及 Kotlin 團隊關於 Flow API 的反饋,因為它仍處於預覽模式。來參加我在 Devoxx FranceJAXSpring I/OSunny Tech 的即將到來的演講,瞭解更多資訊。

乾杯!

獲取 Spring 通訊

訂閱 Spring 通訊,保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助你加速前進。

瞭解更多

獲得支援

Tanzu Spring 透過一個簡單的訂閱,提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部