使用 Spring 進行響應式事務

工程 | Mark Paluch | 2019 年 5 月 16 日 | ...

回溯到 2016 年,我們的響應式之旅始於 Spring Framework 5,同時伴隨著一些響應式整合。在我們的整個旅程中,其他專案也加入了響應式浪潮。透過 R2DBC,我們現在還為 SQL 資料庫提供了響應式整合。隨著支援事務的整合的增加,我們不斷被問及

Spring Framework 是否支援響應式 @Transaction?

在我們的旅程開始時,我們還沒有響應式形式的事務性整合,所以這個問題很容易回答:不需要響應式事務管理。

隨著時間的推移,MongoDB 開始支援 MongoDB Server 4.0 中的多文件事務。R2DBC(響應式 SQL 資料庫驅動程式的規範)開始興起,我們決定透過 Spring Data R2DBC 來支援 R2DBC。這兩個專案都希望公開事務行為,所以它們最終在其模板 API 上提供了 inTransaction(…) 方法來執行由原生事務保護的工作單元。

雖然使用 inTransaction(…) 方法處理較小的工作塊很方便,但這並不反映 Spring 支援事務的方式。在使用指令式程式設計模型時,Spring Framework 允許兩種事務管理方式:@TransactionalTransactionTemplate(分別是宣告式和程式設計式事務管理)。

這兩種事務管理方法都建立在 PlatformTransactionManager 之上,它管理事務性資源的事務。PlatformTransactionManager 可以是 Spring 提供的事務管理器實現,也可以是基於 JTA 的 Java EE 實現。

這兩種方法都有一個共同點,即將事務狀態繫結到 ThreadLocal 儲存,這允許在不傳遞 TransactionStatus 物件的情況下管理事務狀態。事務管理應該在後臺以非侵入的方式進行。ThreadLocal 在指令式程式設計安排中之所以有效,是因為底層假設我們不會讓執行緒在事務內部繼續工作。

命令式事務管理如何工作

事務管理需要將其事務狀態與執行關聯起來。在指令式程式設計中,這通常是 ThreadLocal 儲存——事務狀態繫結到 Thread。底層假設是事務程式碼在容器呼叫它的同一個執行緒上執行。

響應式程式設計模型消除了命令式(同步/阻塞)程式設計模型的這一基本假設。仔細觀察響應式執行,我們可以發現程式碼在不同的執行緒上執行。在使用程序間通訊時,這一點更加明顯。我們不再能安全地假設我們的程式碼完全在同一個執行緒上執行。

這種假設的變化使得依賴於 ThreadLocal 的事務管理實現失效。

由於整合和最佳化(例如運算子融合),執行緒切換可能在任意時間發生。這種變化打破了所有依賴於 ThreadLocal 的程式碼。結果是我們需要一種不同的安排來反映事務狀態,而無需始終傳遞 TransactionStatus 物件。

在響應式領域,關聯帶外資料並不是一個新要求。我們在其他領域也面臨過這個要求,例如在 Spring Security 中使用 SecurityContext 實現響應式方法安全(僅舉一例)。Project Reactor 是 Spring 構建其響應式支援的基礎響應式庫,自 3.1 版本以來就提供了對訂閱者上下文的支援。

Reactor Context 對於響應式程式設計的作用,就像 ThreadLocal 對於指令式程式設計的作用一樣:Contexts 允許將上下文資料繫結到特定的執行。對於響應式程式設計,這指的是 Subscription。Reactor 的 Context 讓 Spring 可以將事務狀態以及所有資源和同步繫結到特定的 Subscription。所有使用 Project Reactor 的響應式程式碼現在都可以參與響應式事務。返回標量值並希望訪問事務細節的程式碼必須重寫以使用響應式型別才能參與事務。否則,Context 不可用。

響應式事務管理

從 Spring Framework 5.2 M2 開始,Spring 透過 ReactiveTransactionManager SPI 支援響應式事務管理。

ReactiveTransactionManager 是一種用於使用事務性資源的響應式和非阻塞整合的事務管理抽象。它是返回 Publisher 型別的響應式 @Transactional 方法以及使用 TransactionalOperator 進行程式設計式事務管理的基礎。

首批兩個響應式事務管理器實現是

  • 透過 Spring Data R2DBC 1.0 M2 實現的 R2DBC
  • 透過 Spring Data MongoDB 2.2 M4 實現的 MongoDB

讓我們看看響應式事務是什麼樣的

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql("INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

在註解驅動的安排中,響應式事務看起來與命令式事務非常相似。但主要區別在於我們使用的是 DatabaseClient,這是一個響應式資源抽象。所有事務管理都在幕後發生,利用了 Spring 的事務攔截器和 ReactiveTransactionManager

Spring (根據方法返回型別) 區分應應用哪種事務管理型別

  • 方法返回 Publisher 型別:響應式事務管理
  • 所有其他返回型別:命令式事務管理

這種區分很重要,因為您仍然可以使用命令式元件,例如 JPA 或 JDBC 查詢。將這些結果包裝到 Publisher 型別中會向 Spring 發出訊號,指示它應用響應式而非命令式事務管理。話雖如此,響應式事務安排並不會開啟一個 ThreadLocal 繫結的事務,而這對於 JPA 或 JDBC 是必需的。

TransactionalOperator

接下來,讓我們看看如何使用 TransactionalOperator 進行程式設計式事務管理

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql("INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的程式碼包含了一些值得注意的元件

  • R2dbcTransactionManager: 這是用於 R2DBC ConnectionFactory 的響應式事務管理器。
  • DatabaseClient: 該客戶端使用 R2DBC 驅動程式提供對 SQL 資料庫的訪問。
  • TransactionalOperator: 此運算子將所有上游 R2DBC 釋出者與事務上下文關聯起來。您可以使用運算子風格 as(…::transactional) 或回撥風格 execute(txStatus -> …) 來使用它。

響應式事務在訂閱時惰性啟動。該運算子啟動一個事務,設定適當的隔離級別,並將資料庫連線與其訂閱者上下文關聯。所有參與(上游)的 Publisher 例項都使用一個繫結到 Context 的事務連線。

響應式函式式運算子鏈可以是線性的(透過使用單個 Publisher)或非線性的(透過合併多個流)。使用運算子風格時,響應式事務會影響所有上游的 Publisher。要將事務範圍限制在特定的 Publisher 集合,請如下使用回撥風格

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql("INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事務管理僅限於在 execute(…) 內部訂閱的 Publisher 例項。換句話說,事務是作用域化的。execute(…) 內部的 Publisher 例項參與事務,而名為 outsideTransactionPublisher 則在事務外部執行其工作。

R2DBC 是 Spring 與響應式事務整合的其中之一。另一個整合是透過 Spring Data MongoDB 實現的 MongoDB,您可以使用它透過響應式程式設計參與多文件事務。

Spring Data MongoDB 附帶 ReactiveMongoTransactionManager 作為 ReactiveTransactionManager 實現。它建立一個會話並管理事務,以便在受管事務中執行的程式碼參與多文件事務。

以下示例展示了使用 MongoDB 進行程式設計式事務管理

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的程式碼設定了一個 ReactiveTransactionManager 並使用 TransactionalOperator 在單個事務中執行多個寫操作。ReactiveMongoTemplate 被配置為參與響應式事務。

下一步

響應式事務管理隨 Spring Framework 5.2 M2、Spring Data MongoDB 2.2 M4 和 Spring Data R2DBC 1.0 M2 里程碑版本一同釋出。您可以獲取這些版本,並開始在您的程式碼中整合響應式事務管理。我們期待社群的反饋,以便在六月初發布候選版本之前磨平所有尖角。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

搶先一步

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 透過簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群中所有即將舉行的活動。

檢視全部