Spring、協程和 Kotlin Flow 的響應式開發

工程 | Sébastien Deleuze | 2019年4月12日 | ...

自2017年1月宣佈 Spring Framework 正式支援 Kotlin 以來,已經發生了許多事情。Kotlin 在 Google I/O 2017 上被宣佈為官方 Android 開發語言,我們繼續改進 Spring 產品組合中的 Kotlin 支援,Kotlin 本身也隨著 協程 等關鍵新功能不斷發展。

我想借 Spring Framework 5.2 的第一個里程碑 的機會,概述一下我們在 Spring 和 Kotlin 方面的現狀。我會盡力專注於具體的改進,因為我相信 Spring 和 Kotlin 擁有相同的務實心態。

我認為這歸結為選擇。我們(Spring 團隊)提供的選擇,以及你們作為應用程式開發者在啟動新的 Spring Boot 應用程式時必須做出的選擇。例如

  • 應該使用什麼語言?

  • 註解的 @Controller 還是函式式風格?

  • Spring MVC 還是 WebFlux?

這些問題顯然是非常主觀的,並且通常取決於專案上下文,但我將分享我個人觀點。

Java 還是 Kotlin?

Java 是顯而易見的預設選擇,但 Kotlin 是一個越來越受歡迎的替代方案。有什麼理由可以讓開發人員從 Java 轉向 Kotlin?當人們問我時,我通常會說 Kotlin 允許 Java 開發人員利用他們現有的技能編寫更簡潔、更安全、更具表現力的程式碼。但要做出明智的選擇,我們應該確定更具體的要點。

我最喜歡的 Kotlin 功能是它將 `null`,即所謂的(多重)“十億美元的錯誤”,變成了一個安全功能。Java 的錯誤不是 `null` 本身,而是沒有在其型別系統中顯式管理 `null`,導致出現接近動態語言中可觀察到的問題。Kotlin 透過在其型別系統中利用 `null` 來 處理值的缺失。在 Kotlin 中,像 `String` 這樣的型別不是可空的,因此可以安全地使用而無需小心;而像 `String?` 這樣的型別是可空的,應該謹慎使用。好訊息是 Kotlin 編譯器會在編譯時報告潛在錯誤,您可以透過 安全呼叫Elvis 運算子非空執行 塊來優雅地處理它們。與 Java 的 `Optional` 不同,Kotlin 的空安全也適用於輸入引數,並且不會強制您使用影響效能和可讀性的包裝器。

DSL 也是 Kotlin 閃耀的另一個領域。 Gradle Kotlin DSL(在 start.spring.io 上的支援 即將推出)是一個很好的例子,它允許使用非常豐富且靈活的 API,並且由於 Kotlin 的靜態型別特性,具有出色的可發現性和信心。Spring Framework 為 Bean 定義函式式路由 甚至 MockMvc 提供了 Kotlin DSL。

我可以詳細說明許多其他切換的好處,例如 帶預設值的可選引數與 Java API(如 Spring)的良好互操作性擴充套件函式用於避免型別擦除的 reified 型別引數資料類 或預設鼓勵的不可變性,但我認為您最終應該透過 邊學邊練 Kotlin,並在 參考文件 的幫助下做出自己的判斷。您還可以遵循這個一步一步的 Spring Boot Kotlin 教程

所以,我將在我的下一個 Spring Boot 專案中選擇 Kotlin ;-)

註解的 @Controller 還是函式式風格?

正如我在引言中所說,選擇取決於上下文,並且是品味問題。我非常喜歡 使用 Kotlin 進行函式式路由,考慮到該語言非常好的 DSL 和函數語言程式設計能力。我甚至正在探索如何透過實驗性的 Kofu Spring Boot DSL 以函式式方式定義 Spring Boot 應用程式配置,該 DSL 正在 Spring Fu 儲存庫中孵化。

但今天,假設我的團隊由多年來習慣於 `@Controller` 程式設計模型的開發人員組成,並且我不想一次性改變所有東西,所以我們保留 `@Controller`。

Spring MVC 還是 WebFlux?

我們在 Web 框架方面提供的選擇如下。

您可以繼續使用 Spring MVC 和所有相關的知名技術,我們將繼續改進它們:Tomcat、JPA 等。您甚至可以透過使用現代的 `WebClient` API 而不是 `RestTemplate` 來利用一些響應式部分。

但我們也提供了一個響應式堆疊,包括 WebFlux,這是一個基於 Reactive Streams 的 Web 框架,適用於那些想要更高可伸縮性、不受延遲影響(適用於面向微服務的架構)以及更好流處理能力的人。生態系統的其他部分,如 Spring Data 和 Spring Security,也提供響應式支援。

Java 中的 Reactor API WebFlux

到目前為止,使用 Spring 響應式堆疊的 WebFlux 需要一個相當大的轉變,透過使用 Reactor MonoFlux 或 RxJava 類似型別的 API,將 IO 相關功能(Web、永續性)從指令式程式設計風格切換到宣告式/函數語言程式設計風格。這種顛覆性的方法比指令式程式設計具有真正的優勢,但它也非常不同,並且需要相當大的學習曲線。

讓我們透過具體的程式碼來看看這意味著什麼,並藉此機會向您展示如何使用 R2DBC(基於 Reactive Streams 的 JDBC 替代方案)和 Spring Data R2DBC 以響應式方式訪問 SQL 資料庫。

如果我們選擇 Java,我們將編寫以下 `UserRepository` 類,該類公開了一個響應式 API,使用 Spring Data R2DBC 提供的 `DatabaseClient` API 來訪問 SQL 資料庫。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

儲存使用者可以透過 fork-join 的方式完成,因為這些操作彼此不依賴,但為了比較起見,我使用了透過 `then()` 連結的順序操作。

您可以看到,在這種 API 中,`void` 變成了 `Mono`,`User` 變成了 `Mono`。這使得它們可以以非阻塞的方式使用,並提供對豐富運算子集的訪問。但它也強制使用 `Mono` 包裝器,並顯著改變了您使用這些 API 的方式。例如,如果某些操作需要順序執行,就像在 `init()` 方法中一樣,這在命令式程式碼中很容易實現,而在這裡我們必須使用 `then` 運算子構建宣告式管道。

`Flux` 提供了更多的附加價值,因為它允許在處理傳入使用者時將它們作為流進行處理,而阻塞堆疊中通常使用的 `List` 暗示在處理之前將所有資料載入到記憶體中。請注意,我們也可以在這裡使用 `Mono>`。

在控制器方面,您可以看到 Spring WebFlux 原生支援這些響應式型別,您還可以看到基於 Reactive Streams API 的另一個特性,即異常主要被用作由響應式型別攜帶的錯誤訊號,而不是像常規命令式程式碼那樣被丟擲。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

Kotlin 協程 API WebFlux

重要的是要理解 Spring 響應式支援是建立在 Reactive Streams 之上的,並考慮到互操作性,Reactor 被用於兩個不同的目的:

  • 它是我們用於 Spring 響應式基礎設施的 Reactive Streams 實現。

  • 它也是預設暴露的響應式公共 API。

但 Spring 響應式支援從一開始就被設計為可以輕鬆適應其他非同步或響應式 API,如 `CompletableFuture`、RxJava 2、以及現在的協程。在這種情況下,我們仍然在內部利用 Reactor,在公共 API 層面適應不同的終端使用者響應式 API。

當然,如果您更喜歡這種方法,在 Kotlin 中繼續使用 `Flux` 和 `Mono` 是完全可以的,但 Spring Framework 5.2 引入了一個新的主要功能:我們現在可以使用 Kotlin 協程 以更命令式的方式利用 Spring 響應式堆疊。

協程是 Kotlin 的輕量級執行緒,允許以命令式的方式編寫非阻塞程式碼。在語言層面,用 `suspend` 關鍵字標識的掛起函式提供了非同步操作的抽象,而在庫層面,kotlinx.coroutines 提供了像 `async {}` 這樣的函式以及像 `Flow` 這樣的型別,它是協程世界中的 `Flux` 等價物。

當 `kotlinx-coroutines-core` 和 `kotlinx-coroutines-reactor` 依賴項在類路徑中時,將啟用協程支援。

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那麼,用 Kotlin 而不是 Java 編寫的 `UserRepository` 和 `UserController`,並使用協程和 `Flow` 而不是 `Mono` 和 `Flux` 看起來是什麼樣的?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

您可以看到,這裡,例如,我們返回 `User`(或者更確切地說,它的可空變體 `User?`)而不是返回 `Mono`,在可以以命令式方式使用的掛起函式中。`init()` 方法實現中的差異很好地說明了這一點,因為我們現在使用的是常規命令式程式碼,而不是鏈式 `then` 呼叫。

但是,等等,我如何直接在基於 `Mono` 和 `Flux` 的響應式 API `DatabaseClient` 上使用協程?這是可能的,因為 Spring Data R2DBC 還提供了 Kotlin 擴充套件(請參閱 這個),一旦匯入,您就可以將基於協程的方法新增到 `DatabaseClient`。按照約定,掛起方法以字首 `await` 或字尾 `AndAwait` 命名,並且與它們的 `Mono` 對等方法名稱相似。

現在讓我們深入瞭解一下 `Flow` 返回型別。首先,請注意我們指的是 `kotlinx.coroutines.flow.Flow`,而不是 `java.util.concurrent.Flow`,後者是 Java 9+ 提供的 Reactive Streams 容器型別。

您將像使用 Java 8+ `Stream` 或其 Kotlin 等價物 `Sequence` 一樣使用 `Flow` API,但巨大的區別在於它適用於非同步操作並管理背壓。因此,它是協程世界中的 `Flux` 等價物,適用於熱流或冷流、有限流或無限流,主要區別如下:

  • `Flow` 是推模式的,而 `Flux` 是推拉混合模式的。

  • 背壓透過掛起函式實現。

  • `Flow` 只有一個 掛起的 `collect` 方法,並且運算子作為 擴充套件 實現。

  • 由於協程,運算子易於實現

  • 擴充套件允許向 `Flow` 新增自定義運算子。

  • 收集操作是掛起函式。

  • `map` 運算子 支援非同步操作(無需 `flatMap`),因為它接受一個掛起函式引數。

現在讓我們看看控制器的協程版本。

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

您再次可以看到,程式碼非常接近我們用 Spring MVC 使用的常規命令式程式碼。

除了為 `WebClient`、`ServerRequest` 或 `ServerResponse` 等基於 `Flux` 和 `Mono` 的 API 提供協程擴充套件外,Spring WebFlux 現在還為註解的 `@Controller` 類原生支援掛起函式和 `Flow` 返回型別。

命令式程式碼的非同步操作

讓我們利用 `WebClient` 協程擴充套件來檢視如何連結非同步呼叫。我們將請求遠端 HTTP 端點以獲取額外的 `UserDetail1` 和 `UserDetail2`。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

在這裡,我們使用 `WebClient` 協程擴充套件,如 `awaitExchange()` 和 `awaitBody()`,以純命令式方式執行非同步和非阻塞操作。並且由於 `Flow` `map` 運算子 接受一個掛起函式引數,我們可以在其中執行此類操作,而無需像使用 Java 中的響應式 API 那樣使用 `flatMap`。

並行分解

如前所述,協程預設是順序執行的,但也可以用於並行執行操作。讓我們重構之前的示例,以併發執行兩個遠端呼叫。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

在這裡,我們利用 結構化併發 來觸發兩個使用者詳情的並行檢索,透過建立 `Deferred` 和 `Deferred` 例項(透過 `async {}` 構建器),然後我們透過呼叫兩個 `await()` 方法來等待它們完成,當可用時,它們將返回 `UserDetail1` 和 `UserDetail2` 例項。

結論

我認為將 Spring 響應式堆疊與此類協程和 Kotlin `Flow` API 結合使用,在命令式和宣告式方法之間提供了有趣的權衡。它以一種非常易於理解的方式利用了 WebFlux 和 Spring Data 的響應式可伸縮性和功能。

Spring WebFlux 和 Spring Data 中的協程支援將在即將釋出的 Spring Boot 2.2 版本中提供。您可以閱讀 參考文件,並可以期待進一步的改進,例如對 RSocket `@MessageMapping` 端點和 `RSocketRequester` 擴充套件的協程支援。Spring Data Moore 還將提供 Spring Data MongoDB、Cassandra 和 Redis 的類似協程擴充套件。Spring Data 可能會在某個時候提供對 協程儲存庫 的支援。我們還將使 Reactor 和協程上下文可互操作,以支援安全和響應式事務。

我想最後感謝許多才華橫溢的工程師,沒有他們,這一切都將不可能實現。

  • 來自 Kotlin 團隊的 Roman Elizarov 和 Vsevolod Tolstopyatov,感謝他們為協程和 `Flow` 所做的不可思議的工作。

  • Konrad Kaminski 感謝社群驅動的 Spring 協程的初步支援。

  • Jake Wharton 感謝他早期在統一 Rx 和協程方面的原型工作。

  • Stéphane Maldini 和 David Karnok 感謝他們的啟發性工作。

  • Juergen Hoeller、Rossen Stoyanchev 和 Brian Dussault 感謝他們的信任。

  • Mark Paluch 和 Oliver Drotbohm 感謝他們在持久化方面的支援。

一如既往,我期待反饋,也期待 Kotlin 團隊對 `Flow` API 的反饋,因為它仍處於預覽階段。請來參加我在 Devoxx FranceJAXSpring I/OSunny Tech 的即將舉行的演講,瞭解更多資訊。

乾杯!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有