Spring Cloud Stream Kafka 應用中的生產者發起的事務

工程 | Soby Chacko | 2023 年 9 月 28 日 | ...

本系列部落格的其他部分

第一部分:Spring Cloud Stream Kafka 應用程式中的事務簡介

本文是我們研究 Spring Cloud Stream 和 Apache Kafka 中事務的部落格系列中的第 2 部分。在上一部分中,我們對事務進行了概述,並觸及了基本概念。在本部落格系列的這一部分中,我們將深入探討一些實現細節及其實際應用。

在本文中,我們主要關注生產者端,以瞭解事務如何與 Spring Cloud Stream 和 Apache Kafka 一起工作。

Spring Cloud Stream 中的生產者

在我們深入探討生產者發起的事務之前,先回顧一些基礎知識,瞭解一個簡單的生產者。在 Spring Cloud Stream 中,有幾種方法可以編寫生產者(在訊息傳遞領域也稱為釋出者)。如果你有一個需要按計劃生成資料的用例,你可以編寫一個 java.util.function.Supplier 方法,如下所示。

@Bean
public Supplier<Pojo> mySupplier() {
  return () -> {
        new Pojo();
  };
}

當將上述 Supplier 作為 Spring bean 提供時,如程式碼所示,Spring Cloud Stream 會將其視為釋出者,並且,由於我們處於 Apache Kafka 的上下文中,它會將 POJO 記錄傳送到 Kafka 主題。

預設情況下,Spring Cloud Stream 每秒呼叫一次 Supplier,但你可以透過配置更改該計劃。有關更多詳細資訊,請參閱 參考文件

如果你不想輪詢 Supplier,而是想控制釋出頻率,該怎麼辦?Spring Cloud Stream 透過 StreamOperations API 提供了一種便捷的方式,其開箱即用的實現稱為 StreamBridge。這是一個示例。

@Autowired
StreamBridge streamBridge;

@PostMapping("/send-data")
public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
}

在這種情況下,應用程式使用 REST 端點來觸發透過 StreamBridge 釋出資料。由於框架不會按計劃呼叫該函式,因此任何外部方都可以透過呼叫 REST 端點來啟動資料釋出。

在這些基本生產者中使用事務是否合適?

現在我們已經瞭解了 Spring Cloud Stream 提供的兩種釋出記錄的策略,讓我們回到我們討論的主要主題:事務性發布。假設一個場景,我們希望確保資料完整性並在使用一個或多個這些生產者時獲得事務保證。在這種情況下,問題是我們是否需要首先使用事務來實現它們。在上面的兩個示例中,你如何確保記錄是事務性地釋出的?簡短的回答是,你應該避免在這些型別的釋出用例中使用事務。這些示例中的記錄釋出是單次傳送場景。使用同步生產者,我們可以實現相同的語義事務保證。預設情況下,生產者是非同步的,當使其執行在同步模式下時,生產者會確保在將響應傳送給客戶端之前,它將記錄寫入 leader 和所有副本。可以透過將 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync 屬性設定為 true 來啟用同步釋出。

總而言之,在設計僅生產者的應用程式時,請謹慎使用事務。如果不一次傳送一條記錄,而是使用 SupplierStreamBridge,我們不建議使用事務,因為將生產者轉換為同步模式執行可以達到相同的結果,而無需事務開銷。然後,這個討論引出了一個有趣的問題。對於僅生產者的應用程式,何時有必要使用事務並獲得好處?正如在本系列部落格的上一部分中所討論的,這完全取決於應用程式的用例。在生產者的上下文中,這意味著只有在我們進行多個相關的釋出,或者除了釋出之外,還需要與外部事務管理器同步時,才需要使用事務。本文的下一部分將介紹前一種情況,而本部落格系列的下一篇文章將介紹後一種情況。

在 Spring Cloud Stream Kafka 繫結器中啟用事務

為 Spring Cloud Stream 的 Kafka 繫結器啟用事務的主要驅動程式是一個屬性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。當此屬性具有有效的 prefix 字串時,Spring Cloud Stream 中的 Kafka 繫結器會確保底層 KafkaTemplate 使用事務釋出資料。順便說一句,此屬性還會提示 Spring Cloud Stream 在使用處理器模式(consume-process-produceread-process-write 模式)時使消費者具有事務感知能力。

事務實戰

儘管這可能有些違反直覺,但讓我們回到前面描述的單個 SupplierStreamBridge 示例,並引入事務來理解事務元件的主要用途。如前所述,在這些情況下我們不需要使用事務,因為這會增加更多開銷。但是,這樣做有助於我們理解。

再次看程式碼

@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {

  @Bean
  public Supplier<Pojo> mySupplier() {
    return () -> {
      new Pojo();
    };
  }

  @Autowired
  StreamBridge streamBridge;

  @PostMapping("/send-data")
  public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
  }
}

現在我們提供所需的屬性。

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-

由於我們在應用程式的配置中提供此屬性,因此每次(透過框架)呼叫此示例中的 Supplier 時,或者當有人呼叫 StreamBridge#send 方法後面的 REST 端點時,底層到 Kafka 主題的釋出都將是完全事務性的。

當呼叫 Supplier 時,Kafka 繫結器使用 KafkaTemplate 釋出資料。當繫結器檢測到應用程式提供了 transaction-id-prefix 屬性時,每個 KafkaTemplate#send 呼叫都透過 KafkaTemplate#executeInTransaction 方法進行。因此,請放心,框架會以事務方式執行底層到 Kafka 主題的釋出。從應用程式的角度來看,應用程式開發人員需要為事務提供的唯一內容是 transaction-id-prefix 屬性。

在開發或除錯事務性應用程式時,將日誌級別設定為 TRACE 通常是值得的,這樣相關的底層事務類就可以為我們提供有關正在發生的事情的詳細資訊。

例如,如果將以下軟體包的日誌級別設定為 TRACE,你將在日誌中看到大量活動。

logging:
 level:
   org.springframework.transaction: TRACE
   org.springframework.kafka.transaction: TRACE
   org.springframework.kafka.producer: TRACE
   org.springframework.kafka.core: TRACE

每次框架呼叫 Supplier 方法時,我們可以在日誌中觀察到以下內容

o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()

從跟蹤日誌可以看出,每次以事務方式釋出記錄時,它會形成一個序列:beginTransactionSendingSentcommitTransaction。如果你執行應用程式,你會發現這些序列每秒都會出現一次,因為這是 Spring Cloud Stream 呼叫 Supplier 方法的預設計劃。

相同的事務流程也適用於 StreamBridge#send 的情況。當 Spring Cloud Stream 呼叫 send 方法時,輸出繫結使用的底層 KafkaTemplate 會確保記錄在事務中釋出,因為我們提供了 transaction-id-prefix

多條記錄釋出的事務

有了這些鋪墊之後,我們來討論一些使用事務更有意義的情況。如前所述,將多條記錄作為單個原子單元釋出是一個有效場景,在這種情況下使用事務變得必要。

我們來看下面的程式碼示例

public void publish(StreamBridge streamBridge {
  for (int i = 0; i < 5; i++) {
    streamBridge.send("mySupplier-out-0", "data-" + i);
  }
}

如你所見,這是一個為了演示而刻意設計的示例。我們釋出了多條記錄,而不是隻釋出一條。釋出到多個主題同樣是這裡一個有效的方法。我們可能會認為,透過設定 transaction-id-prefix 屬性,我們可以快速將多條記錄的釋出包裝在單個事務中。然而,僅憑這些還不足以幫助我們。我們仍然需要提供 prefix 屬性。但是,僅僅這樣,每次傳送仍然發生在各自獨立的事務中。為了確保所有五條記錄的整個端到端釋出都原子地發生,我們需要在方法上應用核心 Spring 框架的 @Transactional 註解。此外,我們必須提供一個事務管理器 bean - KafkaTransactionManager - 該 bean 使用 Spring Cloud Stream Kafka 繫結器建立的同一個生產者工廠。以下是我們的程式碼現在的樣子以及應用程式的配置。

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String[] args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
     	   streamBridge.send("mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
     ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}

以及相應的配置

spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
		transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

請注意,前面程式碼中的事務性方法(用 @Transactional 註釋的方法)必須與呼叫該方法的類不同。如果呼叫發生在同一類的方法之間或未被 Spring 管理的 bean 之間的不同類之間,則沒有代理,事務攔截器也不會生效。JVM 在執行時不知道代理和攔截器機制。在方法上新增 @Transactional 註解時,Spring 會在後臺為該方法建立一個事務性代理。當 Spring Cloud Stream 呼叫事務性方法時,代理會攔截該呼叫,然後透過代理物件實際呼叫。

我們提供的自定義 KafkaTransactionManager bean 有兩個目的。首先,它使 Spring Boot 應用 @EnableTransactionManagerment。它還提供與繫結器內部使用的相同的生產者工廠,以便事務註解在使用事務時使用正確的資源。

當 Spring Boot 檢測到可用的事務管理器 bean 時,它會自動為我們應用 @EnableTransactionManagement 註解,該註解負責檢測 @Transactional 註解,然後透過 Spring AOP 代理和通知機制新增攔截器。換句話說,Spring AOP 為 @Transactional 方法建立一個代理,幷包含 AOP 通知。如果沒有應用 @EnableTransactionManagement 註解,Spring 將不會觸發任何這些代理和攔截機制。由於 EnableTransactionManagement 註解因各種原因至關重要,因此我們必須提供一個事務管理器 bean。否則,方法上的 Transactional 註解將不起作用。

請注意,我們從繫結器中獲取事務性生產者工廠,並在 KafkaTransactionManager 的建構函式中使用它。當此 bean 存在於應用程式中時,所有記錄的整個釋出現在都發生在一個事務的範圍內。我們在跟蹤日誌中只看到一個 beginTransaction…commitTransaction 序列,這意味著只有一個正確的事務執行了所有釋出操作。

在後臺,這是事件序列

  1. 一旦呼叫用 Transactional 註釋的方法,事務攔截器就會透過 AOP 代理機制生效,並使用自定義 KafkaTransactionManager 啟動新事務。
  2. 當事務管理器開始事務時,事務管理器使用的資源(事務性資源持有者,即從生產者工廠獲得的生產者)將被繫結到事務。
  3. 當方法呼叫 StreamBridge#send 方法時,底層 KafkaTemplate 將使用自定義 KafkaTransactionManager 建立的同一個事務性資源。由於事務已在進行中,它不會啟動另一個事務,而是透過同一個事務性生產者進行釋出。
  4. 當它呼叫更多 send 方法時,它不會啟動新事務。相反,它透過原始事務中使用的同一生產者資源進行釋出。
  5. 當方法退出時,如果不存在錯誤,攔截器會要求事務管理器提交事務。如果 send 操作或方法中的任何其他內容引發異常,攔截器會要求事務管理器回滾事務。這些呼叫最終會命中 KafkaResourceHoldercommitrollback 方法,這些方法呼叫 Kafka 生產者來 commitrollback 事務。

由於我們的示例中只有一個自定義 KafkaTransactionManager bean,我們可以簡單地使用 Transactional 註解。另一方面,如果我們有多個自定義 KafkaTransactionManager bean,我們就必須使用正確的 bean 名稱來限定 @Transactional 註解。

如果我們不使用自定義 KafkaTransactionManager 執行應用程式會怎樣?

如果我們移除自定義 KafkaTransactionManager 並執行此應用程式,你會發現它建立了五個單獨的事務,而不是一個事務。如果啟用 TRACE 日誌記錄,你將在日誌中看到五個 beginTransaction…commitTransaction 序列。

你可以透過編寫一個事務性消費者 Spring Cloud Stream 應用程式並將其隔離級別設定為 read_committed 來驗證此行為。你可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 屬性並將其值設定為 read_committed 來做到這一點。為方便測試,請新增 Thread.sleep 或其他等待機制來模擬 for 迴圈中每次 StreamBridge#send 後的行為。你會發現,每次 send 方法呼叫返回後,無論是否等待,消費者都會收到資料,從而證明整個操作不是由一個事務完成的,而是每次 send 都在自己的事務中進行。

我們看到每個 send 都有單獨的事務,因為 Transactional 註解不起作用。Transactional 註解僅在存在事務管理器 bean 且其生產者工廠與繫結器使用的生產者工廠相同時才有效。

Spring Boot 會在配置中檢測到 transaction-id-prefix 屬性(透過 spring.kafka.producer.transaction-id-prefix)時自動配置一個 KafkaTransactionManager。然而,由於我們在 Spring Cloud Stream 的上下文中,我們必須使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因為這是我們向框架發出訊號以建立繫結器及其關聯的事務性生產者工廠的內部事務管理器的方式。如果我們提供正確的 spring.kafka 字首,以便 Spring Boot 為我們自動配置一個 KakaTransactionManager,會怎麼樣?雖然這很誘人,但它不起作用,因為自動配置的事務管理器使用與繫結器使用的生產者工廠不同的生產者工廠。因此,我們必須提供一個自定義 KafkaTransactionManager,它使用繫結器使用的同一個生產者工廠。這正是我們上面所做的。

在本系列部落格的下一部分,我們將學習如何同步到生產者和消費者發起的事務的外部事務管理器。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有