Spring 中的響應式事務

工程 | Mark Paluch | 2019年5月16日 | ...

早在2016年,我們就以Spring Framework 5為起點,開始了響應式之旅,並伴隨著幾個響應式整合。在我們的旅程中,其他專案也加入了響應式運動。隨著R2DBC的出現,我們現在也為SQL資料庫提供了響應式整合。隨著支援事務的整合不斷增長,我們不斷被問到

Spring Framework 是否支援響應式 @Transaction?

在我們旅程開始的時候,我們沒有響應式的事務整合形式,所以這個問題很簡單:不需要響應式事務管理。

隨著時間的推移,MongoDB 從 4.0 版本開始支援多文件事務。R2DBC(響應式 SQL 資料庫驅動程式的規範)開始出現,我們決定透過 Spring Data R2DBC 來支援 R2DBC。這兩個專案都希望暴露事務行為,因此它們最終在其 Template API 上提供了 inTransaction(…) 方法,以執行受原生事務保護的工作單元。

雖然對於較小的工作塊使用 inTransaction(…) 方法很方便,但它並不反映 Spring 支援事務的方式。在處理指令式程式設計模型時,Spring Framework 允許兩種事務管理安排:@TransactionalTransactionTemplate(宣告式和程式設計式事務管理)。

這兩種事務管理方法都構建在 PlatformTransactionManager 之上,該管理器負責事務性資源的事務管理。PlatformTransactionManager 可以是 Spring 提供的事務管理器實現,也可以是基於 JTA 的 Java EE 實現。

這兩種方法都有一個共同點,那就是將事務狀態繫結到 ThreadLocal 儲存,這使得在不傳遞 TransactionStatus 物件的情況下進行事務狀態管理成為可能。事務管理應該在後臺以非侵入性的方式進行。ThreadLocal 在指令式程式設計安排中有效,因為它基於一個基本假設,即我們不會在事務內繼續執行工作來佔用執行緒。

命令式事務管理的工作原理

事務管理需要將其事務狀態與執行關聯起來。在指令式程式設計中,這通常是 ThreadLocal 儲存 - 事務狀態繫結到 Thread。基本假設是,事務性程式碼在容器呼叫它的同一執行緒上執行。

響應式程式設計模型消除了命令式(同步/阻塞)程式設計模型的這一基本假設。仔細檢視響應式執行,我們可以發現程式碼在不同的執行緒上執行。當使用程序間通訊時,這一點會更加明顯。我們不再能安全地假設我們的程式碼在同一執行緒上完全執行。

這種假設的變化使依賴於 ThreadLocal 的事務管理實現失效。

由於整合和最佳化(如運算子融合),執行緒切換會隨時發生。這種變化打破了所有依賴於 ThreadLocal 的程式碼。結果是,我們需要一種不同的安排來反映事務狀態,而無需一直傳遞 TransactionStatus 物件。

在響應式領域,關聯帶外資料並不是一個新需求。我們在其他領域也面臨這個需求,例如 Spring Security 中的 SecurityContext 用於響應式方法安全(舉個例子)。Spring 構建其響應式支援的響應式庫 Project Reactor 自 3.1 版本以來就提供了對訂閱者上下文的支援。

Reactor Context 對於響應式程式設計來說,就像 ThreadLocal 對於指令式程式設計一樣:Contexts 允許將上下文資料繫結到特定的執行。對於響應式程式設計,這是一個 Subscription。Reactor 的 Context 允許 Spring 將事務狀態以及所有資源和同步繫結到特定的 Subscription。所有使用 Project Reactor 的響應式程式碼現在都可以參與響應式事務。想要訪問事務性細節的程式碼,如果返回標量值,則必須重寫以使用響應式型別來參與事務。否則,將無法使用 Context

響應式事務管理

從 Spring Framework 5.2 M2 開始,Spring 透過 ReactiveTransactionManager SPI 支援響應式事務管理。

ReactiveTransactionManager 是一個用於響應式和非阻塞整合(使用事務性資源)的事務管理抽象。它是返回 Publisher 型別的響應式 @Transactional 方法以及使用 TransactionalOperator 的程式設計事務管理的基礎。

前兩個響應式事務管理器實現是

  • R2DBC 透過 Spring Data R2DBC 1.0 M2
  • MongoDB 透過 Spring Data MongoDB 2.2 M4

讓我們看看響應式事務是什麼樣子的

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql("INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

響應式事務在註解驅動的安排中看起來非常類似於命令式事務。但主要區別在於我們使用的是 DatabaseClient,這是一個響應式資源抽象。所有事務管理都在後臺進行,利用 Spring 的事務攔截器和 ReactiveTransactionManager

Spring 根據方法返回型別區分要應用的事務管理型別

  • 方法返回 Publisher 型別:響應式事務管理
  • 所有其他返回型別:命令式事務管理

這種區分很重要,因為您仍然可以使用命令式元件,例如 JPA 或 JDBC 查詢。將這些結果包裝到 Publisher 型別中會指示 Spring 應用響應式而非命令式事務管理。也就是說,響應式事務安排不會開啟 ThreadLocal 繫結的事務,而這對於 JPA 或 JDBC 是必需的。

TransactionalOperator

下一步,讓我們透過使用 TransactionalOperator 來了解程式設計事務管理

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql("INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的程式碼包含一些值得注意的元件

  • R2dbcTransactionManager:這是 R2DBC ConnectionFactory 的響應式事務管理器。
  • DatabaseClient:該客戶端使用 R2DBC 驅動程式訪問 SQL 資料庫。
  • TransactionalOperator:此運算子將所有上游 R2DBC 釋出者與事務上下文關聯起來。您可以使用運算子樣式 as(…::transactional) 或回撥樣式 execute(txStatus -> …)

響應式事務在訂閱時惰性啟動。該運算子啟動一個事務,設定適當的隔離級別,並將資料庫連線與訂閱者上下文關聯起來。所有參與的(上游)Publisher 例項都使用一個 Context 繫結的事務性連線。

響應式函式式運算子鏈可以是線性的(透過使用單個 Publisher)或非線性的(透過合併多個流)。當使用運算子樣式時,響應式事務會影響所有上游 Publisher。要將事務範圍限制在特定的一組 Publisher 上,請應用回撥樣式,如下所示

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql("INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事務管理僅限於在 execute(…) 中訂閱的 Publisher 例項。換句話說,事務是分範圍的。execute(…) 中的 Publisher 例項參與事務,而名為 outsideTransactionPublisher 在事務外部執行其工作。

R2DBC 是 Spring 與響應式事務的整合之一。另一個整合是 MongoDB(透過 Spring Data MongoDB),您可以使用它透過響應式程式設計參與多文件事務。

Spring Data MongoDB 附帶 ReactiveMongoTransactionManager 作為 ReactiveTransactionManager 實現。它建立一個會話並管理事務,以便在受管理事務中執行的程式碼參與多文件事務。

下面的示例展示了使用 MongoDB 的程式設計事務管理

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的程式碼設定了一個 ReactiveTransactionManager,並使用 TransactionalOperator 在單個事務中執行多個寫入操作。ReactiveMongoTemplate 被配置為參與響應式事務。

下一步

響應式事務管理隨 Spring Framework 5.2 M2、Spring Data MongoDB 2.2 M4 和 Spring Data R2DBC 1.0 M2 里程碑版本一起釋出。您可以獲取這些版本並在程式碼中開始整合響應式事務管理。我們期待社群的反饋,以便在六月初發布候選版本之前平滑任何不完善之處。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有