Apache Kafka 在 Spring Cloud Stream Kafka 應用中的精確一次語義

工程 | Soby Chacko | 2023年10月16日 | ...

本系列部落格的其他部分

第一部分:Spring Cloud Stream Kafka 應用程式中的事務簡介

第二部分:Spring Cloud Stream Kafka 應用程式中的生產者啟動事務

第三部分:Spring Cloud Stream 中與外部事務管理器同步

第四部分:Spring Cloud Stream 和 Apache Kafka 的事務回滾策略

在上一系列討論中,我們已經對 Spring Cloud Stream Kafka 應用中的事務工作原理進行了基本分析。現在,我們終於迎來了重點:精確一次語義,這是流式應用中備受關注且必不可少的功能。在本篇部落格系列文章中,我們將探討如何透過 Apache Kafka 事務在 Spring Cloud Stream 應用中實現精確一次語義。由於我們已經掌握了前幾部分關於事務工作原理的知識,理解 Spring Cloud Stream Kafka 應用如何實現精確一次語義將相對容易。

需要注意的是,為了實現精確一次語義,我們無需編寫任何新的程式碼,除了我們在本系列部落格前幾篇文章中已經看到的程式碼之外。這篇部落格將闡明在 Spring Cloud Stream Kafka 應用中充分支援精確一次語義所必需的一些期望。

在分散式計算中實現精確一次語義是一項艱鉅的任務。我們在此不深入探討所有技術細節,以解釋為何這是一項如此困難的任務。有興趣的讀者如果想了解精確一次語義的所有底層原理及其在分散式系統中為何如此難以實現,可以參考相關文獻。Confluent 的這篇部落格是一個很好的起點,可以幫助理解這些技術挑戰以及 Apache Kafka 為實現這些目標所採用的解決方案。

雖然我們不會深入細節,但瞭解 Apache Kafka 提供的不同投遞保證是值得的。主要有三種此類投遞保證:

  • 至少一次語義
  • 至多一次語義
  • 精確一次語義

至少一次的投遞語義中,應用程式可能會接收資料一次或多次,但保證至少接收一次。在至多一次語義的投遞保證中,應用程式可能接收資料零次或一次,這意味著存在資料丟失的可能。另一方面,精確一次語義正如其名稱所示,僅保證一次投遞。根據應用程式的用例,您可能可以接受使用其中任何一種保證。預設情況下,Apache Kafka 提供至少一次投遞保證,這意味著一條記錄可能會被多次投遞。如果您的應用程式可以處理重複記錄或無記錄的後果,那麼使用非精確一次保證可能沒問題。相反,如果您處理的是金融系統或醫療資料等關鍵任務資料,則必須保證精確一次的投遞和處理,以避免災難性後果。由於 Apache Kafka 等系統的分散式特性,由於許多移動部分的存在,實現精確一次語義通常非常困難。

Spring Cloud Stream Kafka 和精確一次語義

我們在本系列部落格的前幾篇文章中看到了許多不同的場景。Apache Kafka 中的精確一次語義適用於讀-處理-寫(或消費-轉換-生產)應用。有時會讓人困惑的是,我們究竟在“一次”做什麼?是初始消費、資料處理還是最後的生產部分?Apache Kafka 保證的是整個讀->處理-寫序列的精確一次語義。在這個序列中,讀取和處理部分始終是至少一次的——例如,如果處理或寫入部分的任何原因失敗。當您依賴精確一次投遞時,事務至關重要,這樣資料的最終釋出才能成功或回滾。一個潛在的副作用是,初始消費和處理可能會發生多次。例如,如果事務被回滾,消費者偏移量不會被更新,下一次輪詢(無論是 Spring Cloud Stream 內部的重試還是應用程式重啟)都會重新投遞並再次處理同一條記錄。因此,消費和處理(轉換)部分的保證是至少一次,這是一個關鍵的理解點。任何以 read_committed 隔離級別執行的下游消費者將只會精確地接收一次上游處理器傳送的訊息。因此,必須理解,在精確一次投遞的世界裡,處理器和下游消費者都必須協調才能受益於精確一次語義。使用 read_uncommitted 隔離級別的已釋出主題的任何消費者可能會看到重複的資料。

另一個需要牢記的點是,由於記錄的消費及其處理可能發生多次,應用程式程式碼需要遵循冪等模式,這主要是在您的程式碼與資料庫等外部系統互動時需要考慮的。在這種情況下,應用程式需要確保使用者程式碼沒有副作用。

讓我們回顧一下我們之前看到的簡單消費-處理-生產迴圈的程式碼。

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

正如我們之前所見,為了使此應用程式具有事務性,我們必須為 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 配置屬性提供一個合適的值。在 Spring Cloud Stream 中,提供此屬性是使上述程式碼段完全具備精確一次投遞能力所需的所有操作。整個端到端過程在事務邊界內執行(儘管在上面的示例中有兩個事務)。我們有一個外部 Kafka 事務,它在容器呼叫監聽器時啟動,另一個由事務攔截器啟動的 JPA 事務。當發生 StreamBridge 傳送時,會使用與初始 Kafka 事務相同的事務資源,但直到控制權返回容器後才提交。當方法退出時,JPA 事務就會提交。假設在這裡出現問題,並且資料庫操作丟擲異常。在這種情況下,JPA 不會提交,它會回滾,異常會傳播回監聽器容器,此時 Kafka 事務也會被回滾。另一方面,如果 JPA 操作成功,但 Kafka 釋出失敗並丟擲異常,JPA 不會提交而是回滾,異常會傳播回監聽器。

在上述程式碼中,如果我們不與外部事務管理器同步而僅僅釋出到 Kafka,那麼就不需要使用 @Transactional 註釋,我們甚至可以將程式碼內聯到 txCode 方法中作為消費者 lambda 的一部分。

@Bean
public Consumer<PersonEvent> process() {
   return pe -> {
	  Person person = new Person();
       person.setName(pe.getName());
       PersonEvent event = new PersonEvent();
       event.setName(person.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);

   }
}

在這種情況下,我們只有一個在容器呼叫監聽器時啟動的 Kafka 事務。當代碼透過 StreamBridge 傳送方法釋出記錄時,KafkaTemplate 會使用與初始事務相同的事務生產者工廠。

這兩種場景下的故事都是,我們是完全事務性的,並且最終的釋出僅在事務中進行一次。具有 read_committed 隔離級別的下游消費者應該精確地消費它們一次。

Kafka Streams 和精確一次語義

在本系列中,直到目前為止,我們還沒有討論 Kafka Streams。這有些諷刺,因為最初,Kafka Streams 應用是 Apache Kafka 新增事務支援和精確一次語義的原因,但我們還沒有討論它。原因是,在 Kafka Streams 應用中實現精確一次語義非常簡單,幾乎是預設的。他們稱之為,它是一個單一的配置開關。要了解更多關於 Kafka Streams 中精確一次語義的資訊,請參閱 Confluent 的這篇部落格

與常規的基於 Kafka 客戶端的應用一樣,對於 Kafka Streams,當您在消費-處理-生產模式中生產最終輸出時,精確一次保證就會起作用,這意味著已釋出的任何下游消費者都將精確地消費一次,只要它們使用 read_committed 隔離級別。

Kafka Streams 的 processing.guarantee 配置屬性在 Kafka Streams 應用中啟用精確一次語義。您可以透過設定 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee 屬性來在 Spring Cloud Stream 中進行設定。您需要將其值設定為 exactly_once。預設情況下,Kafka Streams 使用 at_least_once 的值。

狀態ful Kafka Streams 應用通常會發生三種主要活動:

  1. 記錄的初始消費
  2. 透過變更日誌主題更新狀態儲存。
  3. 生產資料

模式是接收並處理記錄。在此過程中,任何狀態資訊都會具體化到狀態儲存中,本質上是更新特定的變更日誌主題。最後,出站記錄會被髮布到另一個 Kafka 主題。如果您注意到這個模式,它看起來與我們已經看到的許多場景相似,除了狀態儲存部分。當將 processing.guarantee 設定為 exactly_once 時,Kafka Streams 保證,如果在這些活動期間發生異常或應用程式崩潰,整個單元將原子地回滾,就好像什麼都沒發生一樣。應用程式重新啟動後,處理器會再次消費記錄,處理它,並最終釋出資料。由於這在後臺以事務方式進行釋出,因此任何使用 read_committed 隔離級別的下游消費者都不會在成功釋出之前消費該記錄,從而處理了實現事務性所需的所有操作(例如,提交已消費記錄的偏移量等),從而保證了精確一次投遞。

Kafka Streams 的精確一次投遞保證是指從 Kafka 相關活動的角度來看,端到端的消費、處理和記錄的釋出。它在存在外部系統時並不提供此保證。例如,如果您的程式碼與資料庫插入或更新操作等外部系統互動,那麼由應用程式決定如何參與事務。Spring 的事務支援在這種情況下再次派上用場。我們不想在此重複程式碼,但正如我們在此係列中多次看到的,您可以將與資料庫互動的程式碼封裝在單獨的方法中,用 @Transactional 註釋進行註解,並提供一個合適的事務管理器,例如我們之前看到的 JPA 管理器。當此類方法丟擲異常時,JPA 事務會回滾,異常會傳播到 Kafka Streams 處理器程式碼,後者最終會將其傳播回 Kafka Streams 框架本身,然後該框架會回滾原始 Kafka 事務。再次強調,理解這一點很重要,從流拓撲中的處理器呼叫的這些操作必須被編寫成能夠處理冪等性,因為“精確一次”僅適用於整個過程,而不是序列中單獨的讀取和處理。

結論

正如我們在本文開頭提到的,精確一次投遞語義在分散式計算中是一個複雜的主題。然而,藉助 Kafka 原生提供的實現精確一次語義的解決方案,以及 Spring 在 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的支援,在 Spring Cloud Stream Kafka 應用中實現精確一次投遞語義相對容易。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有