在 Spring Cloud Stream Kafka 應用中與外部事務管理器同步

工程 | Soby Chacko | 2023 年 10 月 04 日 | ...

本部落格系列的其餘部分

第一部分:Spring Cloud Stream Kafka 應用中的事務介紹

第二部分:Spring Cloud Stream Kafka 應用中的生產者啟動事務

在本部落格系列的上一部分中,我們瞭解了事務管理的基礎知識,主要是在使用生產者啟動的 Spring Cloud Stream Kafka 應用時。在那次討論中,我們還簡要介紹了 Spring Cloud Stream Kafka 消費者應用如何以適當的隔離級別消費以事務方式產生的記錄。當您與外部事務管理器(例如關係資料庫的事務管理器)同步時,我們提到必須使用事務來確保資料完整性。在本部分中,我們將瞭解在使用外部事務管理器時,如何在 Spring Cloud Stream 中實現事務保證。

在開始探討之前,務必記住,在實踐中實現分散式事務是極其困難的。必須依賴兩階段提交 (2PC) 策略和適當的分散式事務管理器(例如相容 JTA 的事務管理器)才能正確執行此操作。然而,大多數企業用例可能不需要這種複雜程度,並且我們考慮和在實踐中看到的大多數用例可能最好堅持使用非分散式事務方法,正如我們在本部落格中描述的那樣。Spring 工程團隊的Dave Syer 博士於 2009 年發表的這篇文章,對於理解分散式事務的挑戰以及 Spring 中推薦的替代方法仍然具有現實意義(即使在 14 年後)。

讓我們回到我們的討論:在使用外部事務管理器時,如何在生產者啟動和消費-處理-生產(讀-處理-寫)應用中實現 Spring Cloud Stream Kafka 應用的事務性。

現在,我們可以透過勾勒出一些程式碼示例來為我們的討論奠定基礎,這些程式碼示例將貫穿討論過程。我們將使用一些領域物件來驅動演示,併為其建立了虛擬碼。

假設訊息系統處理“事件”領域型別 - 讓我們使用一個 PersonEvent

class PersonEvent {

   String name;
   String type;

   //Rest omitted for brevity
}

我們還需要一個用於 Person 物件的領域實體

@Entity
@Table(name = "person")
public class Person {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;

   private String name;

   // Rest omitted for brevity
}

最後,我們需要一個用於 Person 領域物件的 CrudRepository

public interface PersonRepository extends CrudRepository<Person, String> {}

在生產者啟動的場景中,假設當呼叫一個方法時(例如透過 REST),會建立一個 Person 領域物件,將其持久化到資料庫,並透過 StreamBridge 作為 PersonEvent 傳送到一個出站 Kafka 主題。

消費-處理-生產的場景中,假設輸入主題接收到一個 PersonEvent,處理器由此生成一個 Person 領域物件並將其持久化到資料庫。最後,它會生成另一個 PersonEvent 傳送到一個出站 Kafka 主題。

在這裡,我們也將使用 JPA 進行討論。Spring Cloud Stream 應用是 Boot 應用,您可以在應用中包含 spring-boot-starter-jpa 依賴,幷包含適當的 spring.jpa.* 屬性以驅動必要的自動配置。假設 Spring Boot 會為我們自動配置一個 JPATransactionManager

讓我們將用例分解為各種場景。

場景 1:生產者啟動事務

在生產者啟動的場景中,我們需要進行兩個事務性操作:一個數據庫操作,緊接著一個 Kafka 釋出操作。這是基本思路。請記住,這段程式碼只展示了關鍵部分。在實際應用中,程式碼幾乎肯定會比這複雜得多。

@Autowired
Sender sender;

@PostMapping("/send-data")
public void sendData() {
   sender.send(streamBridge, repository);
}

@Component
static class Sender {

   @Transactional
   public void send(StreamBridge streamBridge, PersonRepository repository) {
       Person person = new Person();
       person.setName("Some Person");

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

上述生產者啟動的程式碼是完全事務性的。在本部落格的上一部分中,我們看到如果只有 Kafka 事務,僅新增 Transactional 註解是不夠的。正如討論過的,Transactional 註解沒有關聯事務管理器,我們需要一個使用相同底層事務資源的自定義事務管理器來實現事務性。然而,在這裡情況不同。我們有 Spring Boot 自動配置的 JpaTransactionManager,並且事務攔截器使用它來啟動一個事務。由於我們配置了 transaction-id-prefixStreamBridge 的傳送操作可以事務性地完成。此外,KafkaTemplate 透過 TransactionSynchronizationManager 將 Kafka 事務與已有的 JPA 事務同步。方法退出時,主事務首先提交,然後是同步的事務,在本例中是 Kafka 事務。

此流程中的順序如下。
  1. JPA 事務管理器啟動一個新的 JPA 事務。
  2. 資料庫操作開始,但由於我們仍在方法執行中,此處不會發生提交。
  3. StreamBridge 傳送操作觸發一個新的 Kafka 事務,透過事務同步管理器與 JPA 事務同步。
  4. 方法退出時,JPA 事務首先提交,然後是 Kafka 事務。

關於 Spring 中事務同步的一般說明:這聽起來好像在後臺進行復雜的事務同步。然而,正如我們在本文開頭所暗示的,這裡並沒有進行分散式事務同步,更不用說在各種事務之間進行智慧同步的方式了。事務本身對同步一無所知。Spring 的 TransactionSynchronizatonManager 只是協調多個事務的提交和回滾。在此上下文中同步事務的功能類似於巢狀兩個或更多 @Transactional 方法或 TransactionTempate 物件。配置量更少,因為 Spring 為您完成了巢狀。

場景 2:反轉提交順序

假設由於流程中的一些新需求,我們需要反轉提交順序,讓 Kafka 事務先於 JPA 事務提交。我們如何做到這一點?一個直觀的想法是顯式地為 @Transactional 註解提供一個 Kafka 事務管理器,並讓 JPA 事務與作為主事務的 Kafka 事務同步。程式碼如下所示:

@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
    Person person = new Person();
    person.setName("Some Person");

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    streamBridge.send("process-out-0", event);
}

我們需要提供一個自定義的 Kafka 事務管理器

@Bean
KafkaTransactionManager customKafkaTransactionManager() {
   KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
   ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
   KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
   return kafkaTransactionManager;
}

由於 Spring Boot 在檢測到已存在事務管理器時不會自動配置事務管理器,我們必須自己配置 JPA 事務管理器

@Bean
public PlatformTransactionManager transactionManager(
       ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
   JpaTransactionManager transactionManager = new JpaTransactionManager();
   transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
   return transactionManager;
}

我們的直覺在這裡奏效了嗎?我們成功改變了事務應用的順序了嗎?很遺憾,沒有。它不起作用,因為 JPA 事務管理器不允許其事務與其他事務同步,例如本例中主事務管理器(自定義 Kafka 事務管理器)的事務。在我們的情況下,儘管我們將自定義 Kafka 事務管理器設定為主事務管理器,但 JPA 事務在執行 repository save 方法時會自行啟動和提交,而不會與主事務同步。

此流程中的事件順序如下:
  1. Kafka 事務管理器啟動一個攔截器使用的新事務。
  2. 執行 repository save 方法時,JpaTransactionManager 會建立一個 JPA 事務,而不會與主事務同步。
  3. JPA 事務在方法執行過程中提交。
  4. 攔截器將在方法退出時提交 Kafka 事務。

那麼,我們如何反轉事務呢?有兩種方法可以做到這一點。

首先,我們可以嘗試鏈式事務管理器。ChainedTransactionManagerSpring Data 專案中的一個事務管理器實現。您可以向 ChainedTransactionManager 指定事務管理器列表,它會按照列表中的順序啟動事務。方法退出時,事務將按照事務管理器列表的相反順序提交。

雖然這聽起來是一個合理的策略,但需要記住的一個重要注意事項是,ChainedTransactionManager 目前已被棄用,不建議使用。棄用的原因可以在 Javadoc 中找到。核心思想是,人們常常期望 ChainedTransactionManager 是一款神奇的靈丹妙藥,能夠解決所有事務問題,包括帶有兩階段提交的分散式事務以及其他問題,而這與事實相去甚遠。ChainedTransactionManager 僅確保事務按照特定順序啟動和提交。它不保證任何事務同步,更不用說任何分散式事務協調了。如果您對 ChainedTransactionManager 的侷限性感到滿意,並且像我們的用例一樣需要特定的順序,那麼使用這個事務管理器是合理的,只要您記住正在使用框架中已棄用的類。

讓我們在我們的場景中嘗試使用 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一個名為 ChainedKafkaTransactionManager 的子類,該子類也已被棄用,因為其父類已被棄用。

我們使用之前在鏈式事務中看到的相同的自定義 KafkaTransactionManager bean。

我們還需要像之前一樣建立 JpaTransactionManager bean,因為 Spring Boot 不會自動配置它,因為它已經檢測到自定義的 KafkaTransactionManager bean。

新增這兩個 bean 後,讓我們建立 ChainedKafkaTransactionManager bean

@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
   return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}

完成這些配置後,讓我們修改 Transactional 註解

@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}

上述配置實現了我們想要的結果。執行此應用時,事務會如預期般反轉 - 即 Kafka 先提交,然後是 JPA。

以下是流程中的步驟:
  1. TransactionInterceptor 使用自定義的 ChainedKafkaTransactionManager 來啟動事務。它使用 JpaTransactionManager 啟動 JPA 事務,並對 KafkaTransactionManager 執行相同的操作。
  2. 當方法呼叫資料庫操作時,由於它已經在 JPA 事務中執行,因此不會啟動另一個事務。此處不會發生提交或回滾,因為這不是一個新事務。
  3. 接下來,方法透過 StreamBridge 執行 Kafka 釋出。我們在這裡看到的情況與上面 JPA 的情況相同。由於存在一個已有的 Kafka 事務,它不會啟動新的 Kafka 事務。StreamBridge 的傳送操作會使用與初始 Kafka 事務相同的事務性生產者工廠進行。此處不會發生提交或回滾。
  4. 方法退出時,鏈式事務管理器會以相反的順序進行,先是 Kafka 事務提交(或回滾),然後是 JPA 事務。

如果您對鏈式事務管理器的侷限性感到滿意,這種方法是可行的。請記住,這裡沒有事務同步。事務管理器在事務開始時按給定順序應用,並在提交或回滾時以相反順序應用。如果您選擇此路線,由於您使用的是框架中已棄用的類,最好將它們複製到您的專案中而不是依賴框架。因為它們已被棄用,不保證新的功能和錯誤修復。未來的版本可能會完全刪除它們。也可能永遠不會刪除它們,並且棄用狀態的存在是為了阻止其使用(因為人們認為它具有比實際更多的功能)。

如果您不想依賴框架中已棄用的類,也不想在您的專案中複製和維護它們,您還有另一種選擇可以嘗試。您可以建立兩個事務性方法並巢狀呼叫。這是一個實現該想法的藍圖:

@Component
static class Sender {

       @Transactional("jpaTransactionManager")
       public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
           Person person = new Person();
           person.setName("Some Person");

           Person savedPerson = repository.save(person);

           PersonEvent event = new PersonEvent();
           event.setName(savedPerson.getName());
           event.setType("PersonSaved");
           kafkaSender.send(streamBridge, event);
       }
}

@Component
static class KafkaSender {
       @Transactional("customKafkaTransactionManager")
       public void send(StreamBridge streamBridge, PersonEvent event) {
           streamBridge.send("process-out-0", event);
       }
}

確保巢狀呼叫位於不同的類中,原因我們在本部落格系列的第二部分中已經詳細闡述過,這是由於 Spring 中 AOP 代理的工作方式所致。

在這種情況下,這兩個方法都是事務性的,並且是巢狀的。當事務攔截器攔截到第一個方法呼叫時,它會啟動 JPA 事務。在執行過程中,巢狀呼叫(其方法也帶有 @Transactional 註解)進入。由於此 bean 方法具有 @Transactional 註解,Spring AOP 會將該 bean 包裝在一個 AOP advice 中。因為我們從另一個不同類中的 bean 呼叫這個被代理的 bean,所以代理機制會正確地呼叫被代理的 bean。另一個事務攔截器會使用不同的事務管理器(即 KafkaTransactionManager)啟動一個新的事務。當進行 Kafka 釋出時,事務不會立即提交或回滾,因為事務是作為方法的一部分啟動的,提交或回滾發生在方法退出時。此時,控制權返回到第一個方法並繼續執行。一旦原始方法退出,JPA 事務就會透過攔截器提交。如果釋出到 Kafka 的方法丟擲異常,它會回滾該事務。在這種情況下,回滾後,異常會傳播回第一個事務性方法(JPA 方法),該方法也會因為異常而回滾其事務。

使用此技術時的一個重要注意事項 對巢狀方法的呼叫應該是第一個方法所做的最後一步,因為如果第一個方法在成功執行 Kafka 呼叫之後執行某些程式碼時失敗,Kafka 事務已經提交。第一個方法中的失敗不會自動回滾 Kafka 事務。

場景 3:消費-處理-生產

憑藉我們在本系列中迄今為止對事務獲得的核心理解,讓我們來看一下事件驅動和流處理應用中的一個重要模式,稱為消費-處理-生產模式。在 Spring Cloud Stream 中,這種模式的一個實現如下所示:

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我們有一個 Spring Cloud Stream 函式,它從輸入主題消費 PersonEvent,然後在函式 lambda 表示式的主體中呼叫一個函式進行處理。此函式返回另一個 PersonEvent,我們將其釋出到出站 Kafka 主題。如果我們不在事務上下文中,可以將上面的 run 方法內聯為函式 lambda 表示式的一部分。但是,為了實現事務語義,@Transactional 註解必須位於另一個類的方法上。

為了使 binder 具有事務性,請確保為 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 提供一個有效值。

上述程式碼是否完全事務性?然而,實際情況是,它只是端到端部分事務性。讓我們看看事件序列。

Binder 是事務性的,因為我們提供了 transaction-id-prefix。當消費者在訊息監聽器容器中輪詢記錄時,它會在其 TrasactionTemplate#execute 方法內部呼叫內部監聽器方法。因此,執行監聽器方法(該方法會呼叫使用者方法)的整個端到端過程都在由 KafkaTransactionManager 啟動的事務中執行。事務啟動時,TransactionSynchronizationManager 會將資源(生產者)繫結到該事務。當用戶方法(帶有 @Transactional 註解的方法)被呼叫時,事務攔截器會攔截該呼叫,讓包裝的 AOP advice 處理實際的呼叫。因為我們有一個 JpaTransactionManager,事務攔截器會使用該管理器並啟動一個新的事務。每個事務管理器實現會自行決定是否要與現有事務同步。對於 JpaTransactionManager(以及許多其他類似的資料庫事務管理器實現),它不允許與現有事務同步,正如我們上面已經討論過的。因此,JPA 事務是獨立執行的,如上面各節所示。當 run 方法退出時,事務攔截器會使用 JPA 事務管理器執行提交或回滾操作。至此,JPA 事務管理器完成了它的工作。此時,方法呼叫的響應會返回給呼叫者,即 Spring Cloud Stream 基礎設施。Spring Cloud Stream 中的這個機制會獲取此響應並將其傳送到 Kafka 的出站主題。它使用了最初事務開始時繫結的同一個事務性生產者。傳送記錄後,控制權返回給訊息監聽器容器,然後訊息監聽器容器會提交或回滾事務。

以下是此序列中的步驟:
  1. Kafka 消費者接收到記錄。
  2. Spring Kafka 中的容器使用 TransactionTemplateexecute 方法呼叫監聽器。

1. KafkaTransactionManager 啟動一個新的事務。2. Kafka 資源被繫結(生產者)。3. 當到達使用者程式碼時,事務攔截器最終會攔截該呼叫並啟動一個新的 JPA 事務。4. AOP 代理然後呼叫實際方法。方法退出時,JpaTransactionManager 會提交或回滾。5. 方法的輸出返回給 Spring Cloud Stream 中的呼叫者。6. 然後使用步驟 4 中的同一個事務性資源將響應傳送到 Kafka 出站。7. 控制權返回給訊息監聽器容器,並且 KafkaTransactionManager 會提交或回滾。

那麼,這裡的問題是什麼?它看起來是事務性的,但實際上只是部分如此。最初的主要問題是整個端到端過程超出了單個原子事務的範圍,這是一個重要問題。這裡有兩個事務 - Kafka 和 JPA - 並且 JPA 和 Kafka 事務之間沒有同步。如果資料庫事務提交成功而 Kafka 傳送失敗,則無法回滾 JPA 事務。

我們可能會認為 ChainedTransactionManager 會在這裡有所幫助。雖然這種直覺有一些道理,但它不適用於上面的程式碼。由於在呼叫監聽器方法時容器中建立了 Kafka 事務,ChainedTransactionManager 不會從提供給它的任何 Kafka 事務管理器中建立任何新的 Kafka 事務。當退出使用者方法時,我們仍然只有一個 JPA 事務要提交或回滾。Kafka 事務必須等到呼叫返回容器才能提交或回滾。

問題在於我們在 Spring Cloud Stream 中使用了一個函式,該函式使框架能夠釋出到 Kafka。在我們的例子中,任何使用者指定的事務(例如 JPA 事務)都發生在 Spring Cloud Stream 進行 Kafka 釋出之前。我們需要確保是使用者程式碼來發布到 Kafka,這樣我們才能將整個事務程式碼視為一個單元。為了實現這一點,我們應該切換到 Consumer 而不是 Function,然後使用 StreamBridge API 釋出到 Kafka。看看這段修改後的程式碼:

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

然後我們使用上面相同的 TxCode

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

注意,run 方法不返回任何內容,但我們透過 StreamBridge API 顯式傳送到出站 Kafka 主題。

讓我們看看這些更改後的事件序列:
  1. Kafka 消費者接收到記錄。
  2. Spring Kafka 中的容器使用 TransactionTemplate 的 execute 方法呼叫監聽器。
  3. KafkaTransactionManager 啟動一個新的事務。
  4. Kafka 資源被繫結(生產者)。
  5. 當到達使用者程式碼時,攔截器會攔截該呼叫並使用 JpaTransactionManager 啟動一個新的事務。
  6. 實際的使用者方法被呼叫。
  7. Kafka 傳送操作透過 StreamBridge 作為方法執行的一部分進行。底層的 KafkaTemplate 使用步驟 4 中繫結的同一個事務性生產者工廠。
  8. 方法退出時,JpaTransactionManager 會提交或回滾。
  9. 最後,當 Kafka 事務提交(或回滾)時,控制權返回到 TransactionTemplate#execute 方法。

請特別注意上面的步驟 7。當 KafkaTemplate 檢測到已有 Kafka 事務正在進行中(在步驟 3 中開始)時,它不會與 JPA 事務同步,儘管 KafkaTemplate 具有此能力。現有的 Kafka 事務優先,並加入該事務。

即使我們仍然有兩個獨立的事務,從端到端的事務角度來看,事情是原子性的。如果透過 StreamBridge 進行的 Kafka 釋出操作失敗,JPA 和 Kafka 事務都不會執行提交操作。兩者都會回滾。同樣,如果資料庫操作失敗,兩個事務仍然會回滾。然而,始終存在一個事務提交而另一個回滾的可能性,因此應用程式碼必須處理記錄的去重以實現容錯。

在討論消費-處理-生產模式時,另一個關鍵組成部分是生產者需要將消費記錄的偏移量(除了提交偏移量的消費者之外)傳送給事務。正如我們在本部落格系列的第一部分中看到的那樣,Kafka Producer API 有一個名為 sendOffsetToTransaction 的方法,生產者透過 OffsetMetadataConsumerGroupMetadata 為每個分割槽傳送一個偏移量(當前訊息的偏移量 + 1)。在使用 Spring Cloud StreamSpring for Apache Kafka 時,應用不需要呼叫這個底層操作。Spring for Apache Kafka 中的 Kafka 訊息監聽器容器會代表應用自動處理。儘管框架在事務提交之前會在生產者上呼叫 sendOffsetToTransaction,但在事務協調器提交事務時,將偏移量傳送到事務和實際的消費者偏移量提交是原子性發生的。

透過這次討論,我們探討了編寫事務性 Spring Cloud Stream 應用的各種選項,這些應用必須與外部事務系統(如資料庫)互動,同時消費和生產到 Apache Kafka。

在本系列的下一部分中,我們將探討事務回滾(在編寫事務系統時的另一個關鍵方面)以及在編寫 Spring Cloud Stream Kafka 應用時如何訪問各種 Spring 元件。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群的所有即將到來的活動。

檢視全部