在 Spring Cloud Stream Kafka 應用中與外部事務管理器同步

工程 | Soby Chacko | 2023 年 10 月 04 日 | ...

本系列部落格的其他部分

第一部分:Spring Cloud Stream Kafka 應用程式中的事務簡介

第二部分:Spring Cloud Stream Kafka 應用程式中的生產者啟動事務

本系列部落格的前一部分 中,我們瞭解了事務管理的基礎知識,主要是在使用生產者發起的 Spring Cloud Stream Kafka 應用程式時。在該討論中,我們還簡要地看到了 Spring Cloud Stream Kafka 消費者應用程式如何以適當的隔離級別消費以事務方式生成的記錄。當您與外部事務管理器(例如關係資料庫的事務管理器)同步時,我們提到您必須使用事務來確保資料完整性。在這一部分中,我們將瞭解在使用外部事務管理器時,如何在 Spring Cloud Stream 中實現事務保證。

在我們開始探索之前,請記住,在實踐中實現分散式事務非常困難。您必須依賴兩階段提交 (2PC) 策略和一個適當的分散式事務管理器,例如相容 JTA 的事務管理器,才能正確完成此操作。儘管如此,大多數企業用例可能不需要這種複雜程度,而且我們在此部落格中描述的大多數我們考慮並在實踐中看到人們使用的用例,最好堅持使用非分散式事務方法。這篇由 Spring 工程團隊的 Dr. Dave Syer 於 2009 年發表的 文章,即使在 14 年後仍然與理解分散式事務的挑戰以及 Spring 中推薦的替代方法相關。

讓我們回到我們的討論:在使用生產者發起的應用程式和消費-處理-生產(讀取-處理-寫入)應用程式中的外部事務管理器時,在 Spring Cloud Stream Kafka 應用程式中實現事務性。

現在,我們可以透過草擬一些程式碼來為我們將在討論中使用的示例領域設定場景。我們使用了一些領域物件來驅動演示,併為它們建立了虛擬碼。

假設訊息傳遞系統處理“事件”域型別——我們使用 PersonEvent

class PersonEvent {

   String name;
   String type;

   //Rest omitted for brevity
}

我們還需要一個 Person 物件的域實體

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // Rest omitted for brevity
}

最後,我們需要一個 CrudRepository 來處理 Person 域物件

public interface PersonRepository extends CrudRepository<Person, String> {}

在生產者發起的場景中,假設當一個方法被呼叫時(例如,透過 REST),會建立一個 Person 域物件,將其持久化到資料庫,然後作為 PersonEvent 透過 StreamBridge 傳送到出站 Kafka 主題。

消費-處理-生產 場景中,假設輸入主題接收一個 PersonEvent,處理器從中生成一個 Person 域物件來持久化到資料庫。最後,它將另一個 PersonEvent 釋出到出站 Kafka 主題。

我們這裡也使用 JPA 進行討論。Spring Cloud Stream 應用程式是 Boot 應用程式,您可以在應用程式中包含 spring-boot-starter-jpa 依賴項,幷包含適當的 spring.jpa.* 屬性來驅動必要的自動配置。假設 Spring Boot 將為我們自動配置一個 JPATransactionManager

讓我們將用例分解為各種場景。

場景 1:生產者發起的事務

在生產者發起的場景中,我們必須以事務方式執行兩項操作:資料庫操作,然後是 Kafka 釋出操作。這是基本思路。請記住,此程式碼僅顯示了涉及內容的核心。在實際環境中,程式碼幾乎肯定會比這複雜得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述生產者發起的程式碼是完全事務性的。在本部落格的前一部分中,我們看到,如果您只有 Kafka 事務,那麼新增 Transactional 註釋是不夠的。如前所述,Transactional 註釋沒有事務管理器,我們需要一個自定義事務管理器來使用相同的底層事務資源來實現事務性。然而,在這裡情況有所不同。我們有 Spring Boot 自動配置的 JpaTransactionManager,事務攔截器使用它來啟動事務。由於我們配置了 transaction-id-prefixStreamBridge 傳送操作可以以事務方式完成。但是,KafkaTemplate 透過 TransactionSynchronizationManager 將 Kafka 事務與已存在的 JPA 事務同步。方法退出時,主事務首先提交,然後是同步事務,在本例中是 Kafka 事務。

以下是此流程中的順序。
  1. JPA 事務管理器啟動新的 JPA 事務。
  2. 資料庫操作開始,但此處不發生提交,因為我們仍在執行方法。
  3. StreamBridge 傳送操作觸發新的 Kafka 事務,透過事務同步管理器與 JPA 事務同步。
  4. 方法退出時,JPA 事務首先提交,然後是 Kafka 事務。

關於在 Spring 中同步事務的一般說明: 聽起來它在後臺進行復雜的事務同步。然而,正如我們在本文開頭所暗示的,這裡沒有進行分散式事務同步,更不用說在各種事務之間進行任何智慧同步的方法了。事務本身對同步一無所知。Spring TransactionSynchronizatonManager 僅協調多個事務的提交和回滾。在此上下文中同步事務在功能上類似於巢狀兩個或多個 @Transactional 方法或 TransactionTempate 物件。配置項更少,因為 Spring 會為您處理巢狀。

場景 2:顛倒提交順序

假設由於流程中的一些新要求,我們需要顛倒提交順序,讓 Kafka 事務先於 JPA 提交。我們該怎麼做?一個可能直觀出現的解決方案是明確地將 Kafka 事務管理器提供給 @Transactional 註釋,讓 JPA 事務與作為主事務的 Kafka 事務同步。程式碼如下所示:

@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我們需要提供一個自定義 Kafka 事務管理器

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

由於 Spring Boot 在檢測到事務管理器已存在時不會配置事務管理器,因此我們必須自己配置 JPA 事務管理器

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我們的直覺在這裡奏效了嗎?我們成功地改變了事務應用的順序了嗎?不幸的是,沒有。這樣做無效,因為 JPA 事務管理器不允許其事務與其他事務同步,例如在本例中(自定義 Kafka 事務管理器)的主事務。在我們的例子中,儘管我們建立了一個自定義 Kafka 事務管理器作為主管理器,但 JPA 事務在執行儲存庫儲存方法時會自行啟動和提交,而不會與主事務同步。

此流程中的事件順序如下
  1. Kafka 事務管理器啟動攔截器使用的事務。
  2. 當儲存庫儲存方法執行時,JpaTransactionManager 建立一個 JPA 事務,而不同步與主事務。
  3. JPA 事務在方法執行過程中提交。
  4. 攔截器將在退出方法時提交 Kafka 事務。

那麼,我們如何顛倒事務呢?有兩種方法可以做到這一點。

首先,我們可以嘗試連結事務管理器。ChainedTransactionManagerSpring Data 專案中的一個事務管理器實現。您可以將事務管理器的列表指定給 ChainedTransactionManager,它會按照其列表中的事務管理器順序啟動事務。在退出時(即方法退出時),事務將按照事務管理器列表的相反順序提交。

雖然這聽起來是一個合理的策略,但要記住的一個大缺點是 ChainedTransactionManager 目前已被棄用,並且不是推薦的選項。棄用的原因在 Javadoc 中。要點是,人們通常期望 ChainedTransactionManager 是一個神奇的萬能藥,可以解決所有事務問題,包括兩階段提交的分散式事務和其他問題,而事實恰恰相反。ChainedTransactionManager 僅確保事務以特定順序啟動和提交。它不保證任何事務同步,更不用說任何分散式事務協調。如果您對 ChainedTransactionManager 的限制感到滿意,並且需要特定順序,就像我們的用例一樣。在這種情況下,只要您記住您正在使用框架中已棄用的類,就可以合理地使用此事務管理器。

讓我們嘗試在我們的場景中使用 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一個名為 ChainedKafkaTransactionManager 的子類,由於父類已被棄用,因此它也被棄用了。

我們使用前面在連結事務中看到的相同的自定義 KafkaTransactionManager bean。

我們還需要建立 JpaTransactionManager bean,如前所述,因為 Spring Boot 不會自動配置它,因為它已經檢測到了自定義 KafkaTransactionManager bean。

新增這兩個 bean 後,讓我們建立 ChainedKafkaTransactionManager bean

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

有了這些設定,讓我們修改我們的 Transactional 註釋

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上面的配置實現了我們想要的結果。當您執行此應用程式時,我們將顛倒事務,如預期的那樣——即,Kafka 先提交,然後是 JPA。

以下是流程中的步驟
  1. TransactionInterceptor 使用自定義 ChainedKafkaTransactionManager 來啟動事務。它使用 JpaTransactionManager 啟動 Jpa 事務,併為 KafkaTransactionManager 執行相同的操作。
  2. 當方法呼叫資料庫操作時,由於它已經在 JPA 事務中執行,因此它不會啟動另一個事務。這裡不發生提交或回滾,因為這不是一個新事務。
  3. 接下來,方法透過 StreamBridge 執行 Kafka 釋出。我們在這裡看到了與上面 JPA 相同的情況。由於已經存在 Kafka 事務,因此它不會啟動新的 Kafka 事務。StreamBridge 傳送操作是透過最初的 Kafka 事務所使用的相同的事務性生產者工廠完成的。此處不發生提交或回滾。
  4. 當方法退出時,連結的事務管理器按相反順序進行,首先是 Kafka 事務提交(或回滾),然後是 JPA 事務。

如果您對連結事務管理器的限制感到滿意,則此方法有效。請記住,這裡沒有事務同步。事務管理器在事務開始時按給定順序應用,在提交或回滾時按相反順序應用。如果您選擇此路線,由於您使用的是框架中已棄用的類,最好將它們複製並在您的專案中保留,而不是依賴於框架。由於它們已被棄用,不保證會有新功能和錯誤修復。未來的版本可能會完全刪除它們。也有可能這些類永遠不會被刪除,並且棄用狀態是為了阻止使用它們(因為人們認為它們的功能比實際功能更強大)。

如果您不想依賴框架中已棄用的類,或者不想複製它們並在您的端維護它們,那麼您還有另一個選擇。您可以建立兩個事務方法並巢狀呼叫。這是該想法的藍圖:

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

請確保巢狀呼叫在不同的類中,原因與我們在本部落格系列的 第二部分 中討論過的 AOP 代理工作方式有關。

在這種情況下,兩個方法都是事務性的,並且它們是巢狀的。當事務攔截器攔截第一個方法呼叫時,它會啟動 JPA 事務。在執行過程中,巢狀呼叫(其方法也帶有 @Transactional 註釋)會進來。由於此 bean 方法帶有 @Transactional 註釋,Spring AOP 會將 bean 包裝在 AOP 通知中。由於我們從不同類中的另一個 bean 呼叫此通知的 bean,因此代理機制會正確呼叫通知的 bean。另一個事務攔截器透過使用不同的事務管理器(即 KafkaTransactionManager)啟動新事務。當發生 Kafka 釋出時,事務不會立即提交或回滾,因為它是作為方法一部分啟動的事務,並且在方法退出時發生提交或回滾。此時,控制權返回到第一個方法並繼續。一旦退出原始方法,JPA 事務就會透過攔截器提交。如果釋出到 Kafka 的方法丟擲異常,它將回滾該事務。在這種情況下,回滾後,異常會傳播回第一個事務方法(JPA 方法),後者由於異常也會回滾其事務。

使用此技術時的一個重要注意事項 巢狀方法的呼叫應該是第一個方法做的最後一件事,因為如果第一個方法在 Kafka 呼叫成功後未能執行某些程式碼,Kafka 事務已經提交。第一個方法中的失敗不會自動回滾 Kafka 事務。

場景 3:消費-處理-生產

透過我們在本系列中到目前為止獲得的對事務的核心理解,讓我們看一下事件驅動和流應用程式中的一個關鍵模式,稱為 消費-處理-生產 模式。在 Spring Cloud Stream 中,此類模式的實現如下所示:

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我們有一個 Spring Cloud Stream 函式,它從輸入主題消費 PersonEvent,然後在函式 lambda 表示式的主體內呼叫一個函式進行處理。此函式返回另一個 PersonEvent,我們將其釋出到出站 Kafka 主題。如果我們不在事務上下文中,我們可以在函式 lambda 表示式中內聯上面的 run 方法。但是,為了實現事務語義,@Transactional 註釋必須位於不同類中的方法上。

為了使繫結器具有事務性,請確保提供具有有效值的 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix

上面的程式碼是完全事務性的嗎?然而,現實是它在端到端方面僅是部分事務性的。讓我們看一下事件順序。

由於我們提供了 transaction-id-prefix,因此繫結器是事務性的。當消費者在訊息偵聽器容器中輪詢記錄時,它會在 TrasactionTemplate#execute 方法內呼叫內部偵聽器方法。因此,執行偵聽器方法(呼叫使用者方法)的整個端到端過程都在由 KafkaTransactionManager 啟動的事務中執行。當事務啟動時,TransactionSynchronizationManager 將資源(生產者)繫結到事務。當呼叫使用者方法(用 @Transactional 註釋的方法)時,事務攔截器會攔截該呼叫,讓包裝的 AOP 通知處理實際呼叫。由於我們有一個 JpaTransactionManager,事務攔截器會使用該管理器並啟動新事務。由每個事務管理器實現決定它是否希望與現有事務同步。如我們上面已經討論過的,在 JpaTransactionManager(以及許多其他類似的資料庫事務管理器實現)的情況下,它不允許與現有事務同步。因此,JPA 事務獨立執行,如上文各節所示。當 run 方法退出時,事務攔截器會使用 JPA 事務管理器執行提交或回滾操作。至此,JPA 事務管理器完成了其工作。此時,方法呼叫的響應返回給呼叫者,即 Spring Cloud Stream 基礎結構。此機制在 Spring Cloud Stream 中獲取此響應並將其傳送到 Kafka 中的出站主題。它使用與初始事務開始時繫結的相同的事務性生產者。傳送記錄後,控制權返回到訊息偵聽器容器,然後提交或回滾事務。

此序列中的步驟如下
  1. Kafka 消費者接收記錄。
  2. Spring Kafka 中的容器使用 TransactionTemplateexecute 方法呼叫偵聽器。

KafkaTransactionManager 啟動新事務。3. Kafka 資源被繫結(生產者)。4. 當到達使用者程式碼時,事務攔截器最終會攔截該呼叫並使用 JpaTransactionManager 啟動新事務。5. AOP 代理然後呼叫實際方法。當方法退出時,JpaTransactionManager 提交或回滾。6. 方法的輸出返回到 Spring Cloud Stream 中的呼叫者。7. 然後使用步驟 4 中繫結的同一事務資源將響應傳送到 Kafka 出站。8. 控制權返回到訊息偵聽器容器,KafkaTransactionManager 提交或回滾。

那麼,這裡有什麼問題?它看起來像事務性的,但實際上只是一部分事務性的。最根本的問題是,整個端到端過程不在單個原子事務的邊界內,這是一個重大問題。這裡有兩個事務——Kafka 和 JPA——並且 JPA 和 Kafka 事務之間沒有同步。如果資料庫事務已提交而 Kafka 傳送失敗,則無法回滾 JPA 事務。

我們可能會認為 ChainedTransactionManager 可以提供幫助。雖然這種直覺有一些優點,但它與上述程式碼不相容。由於在呼叫偵聽器方法時在容器中建立的 Kafka 事務,ChainedTransactionManager 不會從提供給它的任何 Kafka 事務管理器建立新的 Kafka 事務。當退出使用者方法時,我們仍然有一個 JPA 事務需要提交或回滾。Kafka 事務必須等到呼叫返回到容器才能提交或回滾。

問題在於我們使用了 Spring Cloud Stream 中的一個函式,該函式允許框架釋出到 Kafka。在我們的例子中,任何使用者指定的事務,如 JPA 事務,都發生在 Spring Cloud Stream 進行 Kafka 釋出之前。我們需要確保使用者程式碼是釋出到 Kafka 的程式碼,以便我們可以將整個事務程式碼視為一個單元。為此,我們應該切換到 Consumer 而不是 Function,然後使用 StreamBridge API 釋出到 Kafka。請看這個修改後的程式碼:

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然後我們使用相同的 TxCode,如上

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

請注意,run 方法不返回任何內容,但我們透過 StreamBridge API 顯式傳送到出站 Kafka 主題。

讓我們看看這些更改後的事件序列
  1. Kafka 消費者接收記錄。
  2. Spring Kafka 中的容器使用 TransactionTemplate 的 execute 方法呼叫偵聽器。
  3. KafkaTransactionManager 啟動新事務。
  4. Kafka 資源被繫結(生產者)。
  5. 當它到達使用者程式碼時,攔截器會攔截該呼叫,並使用 JpaTransactionManager 啟動新事務。
  6. 呼叫實際使用者方法。
  7. Kafka 傳送操作透過 StreamBridge 作為方法執行的一部分進行。底層 KafkaTemplate 使用在步驟 4 中繫結的相同的事務性生產者工廠。
  8. 當方法退出時,JpaTransactionManager 提交或回滾。
  9. 最後,當 Kafka 事務提交(或回滾)時,控制權返回到 TransactionTemplate#execute 方法。

請特別注意上面第 7 步。當 KafkaTemplate 檢測到已有 Kafka 事務正在進行(在步驟 3 中開始)時,它不會與 JPA 事務同步,儘管 KafkaTemplate 能夠做到這一點。現有的 Kafka 事務將優先,並加入該事務。

儘管我們仍然有兩個獨立的事務,但從端到端事務的角度來看,事情是原子的。如果透過 StreamBridge 的 Kafka 釋出操作失敗,JPA 和 Kafka 事務都不會執行提交操作。兩者都會回滾。同樣,如果資料庫操作失敗,兩個事務仍然會回滾。但是,總有可能一個事務提交而另一個回滾,因此應用程式程式碼必須處理記錄的去重以實現容錯。

在討論 消費-處理-生產 模式時,另一個關鍵元件是生產者需要將消耗的記錄的偏移量(除了消費者提交偏移量)傳送到事務中。正如我們在本部落格系列的 第一部分 中所看到的,有一個 Kafka Producer API 方法稱為 sendOffsetToTransaction,其中生產者透過 OffsetMetadataConsumerGroupMetadata 為每個分割槽傳送一個偏移量(當前訊息的偏移量 + 1)。使用 Spring Cloud StreamSpring for Apache Kafka 時,應用程式不需要呼叫此低階操作。Spring for Apache Kafka 中的 Kafka 訊息偵聽器容器會代表應用程式自動處理它。儘管框架在事務提交前呼叫了生產者上的 sendOffsetToTransaction,但在事務協調器提交事務時,傳送偏移量到事務和實際的消費者偏移量提交是原子發生的。

透過這次討論,我們探討了編寫必須與外部事務系統(如資料庫)互動的事務性 Spring Cloud Stream 應用程式的各種選項,同時消費和生產到 Apache Kafka。

在系列的下一部分中,我們將研究事務回滾(編寫事務系統時的另一個關鍵方面)以及如何在編寫 Spring Cloud Stream Kafka 應用程式時訪問各種 Spring 元件。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有