Spring Cloud Stream Kafka 應用中的生產者啟動事務

工程 | Soby Chacko | 2023 年 9 月 28 日 | ...

本部落格系列的其他部分

第一部分:Spring Cloud Stream Kafka 應用中的事務簡介

本文是部落格系列的第二部分,我們將詳細探討 Spring Cloud Stream 和 Apache Kafka 中的事務。在前一部分中,我們對事務進行了總體介紹,觸及了基本思想。在本部分部落格系列中,我們將深入瞭解一些實現細節及其實際應用。

在本文中,我們主要從生產者的角度來理解事務如何在 Spring Cloud Stream 和 Apache Kafka 中工作。

Spring Cloud Stream 中的生產者

在我們深入探討生產者啟動的事務之前,讓我們先透過一個簡單的生產者瞭解一些基礎知識。在 Spring Cloud Stream 中,有幾種方式可以編寫生產者(在訊息領域也稱為釋出者)。如果您需要在預定時間生成資料,可以編寫一個 java.util.function.Supplier 方法,如下所示。

@Bean
public Supplier<Pojo> mySupplier() {
  return () -> {
        new Pojo();
  };
}

當將上述 Supplier 作為 Spring bean 提供時(如程式碼所示),Spring Cloud Stream 將其視為釋出者,並且由於我們在這裡是在 Apache Kafka 的上下文中,它會將 POJO 記錄傳送到 Kafka topic。

預設情況下,Spring Cloud Stream 每秒呼叫一次 supplier,但您可以透過配置更改該計劃。有關更多詳細資訊,請參閱參考文件

如果您不想輪詢 supplier,而是想控制它的釋出頻率怎麼辦?Spring Cloud Stream 透過 StreamOperations API 及其開箱即用的實現 StreamBridge 提供了一種便捷的方式。示例如下。

@Autowired
StreamBridge streamBridge;

@PostMapping("/send-data")
public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
}

在這種情況下,應用程式使用 REST 端點透過 StreamBridge 觸發資料釋出。由於框架不是按計劃呼叫該函式,因此任何外部方都可以透過呼叫 REST 端點來啟動資料釋出。

在這些基本生產者中使用事務是否合適?

既然我們已經瞭解了 Spring Cloud Stream 提供的兩種記錄釋出策略,讓我們回到主要討論主題:事務性發布。假設一個場景,在使用一個或多個這些生產者時,我們希望確保資料完整性並獲得事務保證。在這種情況下,問題是首先是否需要使用事務來實現這些目標。在上面的這兩個示例中,如何確保記錄是事務性發布的?簡而言之,您應該避免將事務用於這些型別的釋出用例。這些示例中的記錄釋出是單次傳送場景。使用同步生產者,我們可以獲得相同的語義事務保證。預設情況下,生產者是非同步的,當使其在同步模式下執行時,生產者會確保在向客戶端傳送響應之前將記錄寫入 leader 和所有副本。您可以透過將 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync 屬性設定為 true 來啟用同步釋出。

總而言之,在設計純生產者應用程式時,請謹慎使用事務。如果您使用 Supplier 或透過 StreamBridge 一次傳送一條記錄,我們不建議使用事務,因為將生產者轉換為同步模式執行可以在沒有事務開銷的情況下達到相同的效果。這次討論引出了一個有趣的問題。對於純生產者應用程式,何時有必要使用事務並獲得收益?正如本部落格系列的前一部分所討論的,這完全取決於應用程式的用例。在生產者的上下文中,這意味著我們只有在進行多個相關的釋出,或者除了釋出之外還需要與外部事務管理器同步時,才需要使用事務。本文的下一部分將介紹前一種場景,本部落格系列的下一篇文章將介紹後一種場景。

在 Spring Cloud Stream Kafka Binder 中啟用事務

在 Spring Cloud Stream 的 Kafka binder 中啟用事務的主要驅動因素是單個屬性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。當此屬性具有有效的字串字首時,Spring Cloud Stream 中的 Kafka binder 會確保底層 KafkaTemplate 使用事務釋出資料。順便提一下,此屬性會指示 Spring Cloud Stream 在使用處理器模式(consume-process-produceread-process-write 模式)時使消費者具有事務感知能力。

事務實操

儘管這有點反直覺,但讓我們回到之前的單例 SupplierStreamBridge 示例,並引入事務來理解事務元件的主要用法。如前所述,在那些情況下我們不需要使用事務,因為這會增加額外的開銷。然而,這樣做有助於我們理解事物。

程式碼如下所示

@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {

  @Bean
  public Supplier<Pojo> mySupplier() {
    return () -> {
      new Pojo();
    };
  }

  @Autowired
  StreamBridge streamBridge;

  @PostMapping("/send-data")
  public void publishData() {
   streamBridge.send("mySupplier-out-0", new Pojo());
  }
}

現在讓我們提供所需的屬性。

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-

由於我們在應用程式的配置中提供了該屬性,因此在此示例中,每次 supplier 被呼叫(透過框架)或有人呼叫 StreamBridge#send 方法背後的 REST 端點時,底層向 Kafka topic 的釋出都將完全具備事務性。

當 supplier 觸發時,Kafka binder 使用 KafkaTemplate 釋出資料。當 binder 檢測到應用程式提供了 transaction-id-prefix 屬性時,每次 KafkaTemplate#send 呼叫都會透過 KafkaTemplate#executeInTransaction 方法完成。因此,請放心,框架會事務性地完成底層向 Kafka topic 的釋出。從應用程式開發者的角度來看,為了事務目的唯一需要提供的是 transaction-id-prefix 屬性。

在開發或除錯事務性應用程式時,將日誌級別設定為 TRACE 通常很有價值,這樣相關的底層事務類可以為我們提供正在發生的事情的詳細資訊。

例如,如果您將以下包的日誌級別設定為 TRACE,您將在日誌中看到大量活動。

logging:
 level:
   org.springframework.transaction: TRACE
   org.springframework.kafka.transaction: TRACE
   org.springframework.kafka.producer: TRACE
   org.springframework.kafka.core: TRACE

每次框架呼叫 supplier 方法時,我們可以在日誌中觀察到以下內容

o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()

從跟蹤日誌中可以看出,每次事務性地釋出記錄時,都會形成一個序列:beginTransactionSendingSentcommitTransaction。如果您執行應用程式,您會觀察到每秒都會看到這些序列,因為這是 Spring Cloud Stream 呼叫 Supplier 方法的預設計劃。

同樣的事務流程也適用於 StreamBridge#send 的情況。當 Spring Cloud Stream 呼叫 send 方法時,輸出繫結使用的底層 KafkaTemplate 會確保記錄在事務內釋出,因為我們提供了 transaction-id-prefix

多記錄釋出的事務

瞭解了基礎知識後,我們來看看何時使用事務是合理的。正如我們之前討論的,將多個記錄作為單個原子單元釋出的需求是一個有效的場景,在這種情況下使用事務是必要的。

讓我們看下面的程式碼示例

public void publish(StreamBridge streamBridge {
  for (int i = 0; i < 5; i++) {
    streamBridge.send("mySupplier-out-0", "data-" + i);
  }
}

如您所見,這是一個特意設計的示例,以演示問題的關鍵。我們不是釋出一次,而是釋出多條記錄。釋出到多個 topic 在這裡也是同樣有效的方法。我們可能會認為,透過設定 transaction-id-prefix 屬性,我們可以快速將多個記錄的釋出包裝在單個事務中。然而,僅憑這一點是不夠的。我們仍然需要提供字首屬性。但是,僅僅提供字首屬性,每次傳送仍然發生在各自的事務中。為了確保所有五條記錄的整個端到端釋出原子性地發生,我們需要在方法上應用核心 Spring Framework 中的 @Transactional 註解。此外,我們必須提供一個使用 Spring Cloud Stream Kafka binder 建立的相同生產者工廠的事務管理器 bean——KafkaTransactionManager。下面是我們的程式碼和應用程式配置現在看起來的樣子:

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String[] args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
     	   streamBridge.send("mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
     ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}

以及相應的配置

spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
		transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

請注意,前面程式碼中的事務性方法(帶有 @Transactional 註解的方法)必須與呼叫該方法的類不同。如果呼叫發生在同一類的方法之間或不是 Spring 管理的 bean 的不同類之間,則不會建立代理,並且事務攔截器不會生效。JVM 在執行時不知道代理和攔截器機制。當在方法上新增 @Transactional 註解時,Spring 會在幕後為該方法建立一個事務代理。當 Spring Cloud Stream 呼叫事務性方法時,代理會攔截該呼叫,然後透過代理物件發生實際呼叫。

我們提供的自定義 KafkaTransactionManager bean 有兩個作用。首先,它使 Spring Boot 應用 @EnableTransactionManagerment。它還提供 binder 內部使用的相同生產者工廠,以便 Transactional 註解在應用事務時使用正確的資源。

當 Spring Boot 檢測到可用的事務管理器 bean 時,它會自動為我們應用 @EnableTransactionManagement 註解,該註解負責檢測 @Transactional 註解,然後透過 Spring AOP 代理和通知機制新增攔截器。換句話說,Spring AOP 會為 @Transactional 方法建立一個代理幷包含 AOP 通知。如果沒有應用 @EnableTransactionManagement 註解,Spring 不會觸發任何這些代理和攔截機制。由於 EnableTransactionManagement 註解對於這些各種原因至關重要,我們必須提供一個事務管理器 bean。否則,方法上的 Transactional 註解將不起作用。

請注意,我們從 binder 獲取事務性生產者工廠,並在 KafkaTransactionManager 的建構函式中使用它。當應用程式中存在此 bean 時,所有記錄的整個釋出現在都在單個事務的範圍內進行。我們在跟蹤日誌中只看到一個 beginTransaction…commitTransaction 序列,這意味著只有一個真正的事務執行了所有釋出操作。

在幕後,事件序列如下

  1. 帶有 Transactional 註解的方法被呼叫後,事務攔截器透過 AOP 代理機制啟動,並使用自定義的 KafkaTransactionManager 開啟一個新事務。
  2. 當事務管理器開始事務時,事務管理器使用的資源——事務性資源持有者(即從生產者工廠獲得的生產者)——將繫結到該事務。
  3. 當方法呼叫 StreamBridge#send 方法時,底層的 KafkaTemplate 將使用自定義 KafkaTransactionManager 建立的相同事務性資源。由於事務已經進行中,它不會開啟另一個事務,而是在同一個事務性生產者上進行釋出。
  4. 當它呼叫更多 send 方法時,它不會開啟新的事務。相反,它會透過原始事務中使用的相同生產者資源進行釋出。
  5. 當方法退出時,如果沒有錯誤,攔截器會要求事務管理器提交事務。如果在方法中的任何傳送操作或其他任何地方丟擲異常,攔截器會要求事務管理器回滾事務。這些呼叫最終會到達 KafkaResourceHoldercommitrollback 方法,從而呼叫 Kafka 生產者來 commitrollback 事務。

由於我們的示例中只有一個自定義 KafkaTransactionManager bean,我們可以直接使用 Transactional 註解。另一方面,如果我們有多個自定義 KafkaTransactionManager bean,我們就必須使用正確的 bean 名稱來限定 @Transactional 註解。

如果我們執行應用程式時不使用自定義 KafkaTransactionManager 會怎樣?

如果我們移除自定義的 KafkaTransactionManager 並執行此應用程式,您會看到它建立了五個獨立的事務,而不是單個事務。如果您啟用 TRACE 日誌記錄,您可以在日誌中看到五個 beginTransaction…commitTransaction 序列。

您可以透過編寫一個事務性消費者 Spring Cloud Stream 應用程式,並將其隔離級別設定為 read_committed 來驗證此行為。您可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 屬性並將其值設定為 read_committed 來實現。為了測試目的,在 for 迴圈中的每個 StreamBridge#send 之後新增一個 Thread.sleep 或其他等待機制來模擬行為。您會看到,無論等待多久,只要每個 send 方法呼叫返回,消費者就會收到資料,這證明了並非單個事務執行了整個操作,而是每個 send 都發生在自己的事務中。

我們看到每次傳送都有獨立的事務,因為 Transactional 註解沒有達到我們預期的效果。Transactional 註解只有在存在可用的事務管理器 bean 並且其生產者工廠與 binder 使用的生產者工廠相同時才會起作用。

如果 Spring Boot 透過 spring.kafka.producer.transaction-id-prefix 在配置中檢測到 transaction-id-prefix 屬性,它會自動配置一個 KafkaTransactionManager。然而,由於我們處於 Spring Cloud Stream 上下文中,我們必須使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因為這是我們指示框架為 binder 建立內部事務管理器和相關事務性生產者工廠的方式。如果我們提供正確的 spring.kafka 字首,讓 Spring Boot 自動配置一個 KakaTransactionManager 會怎樣?雖然這很誘人,但它不起作用,因為自動配置的事務管理器使用的生產者工廠與 binder 使用的不同。因此,我們必須提供一個使用 binder 使用的相同生產者工廠的自定義 KafkaTransactionManager。這正是我們在上面所做的。

在本部落格系列的下一部分中,我們將學習如何與外部事務管理器同步,以實現生產者和消費者啟動的事務。

訂閱 Spring 郵件列表

透過 Spring 郵件列表保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助力您快速成長。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部