搶佔先機
VMware 提供培訓和認證,助您加速發展。
瞭解更多本部落格系列的其它部分
第 1 部分:Spring Cloud Stream Kafka 應用中的事務介紹
第 2 部分:Spring Cloud Stream Kafka 應用中的生產者發起事務
第 3 部分:在 Spring Cloud Stream Kafka 應用中與外部事務管理器同步
第 4 部分:Spring Cloud Stream 和 Apache Kafka 的事務回滾策略
在本系列之前的討論中,我們已經對事務在 Spring Cloud Stream Kafka 應用中的工作方式進行了基礎分析,現在終於到了房間裡的大象(指顯而易見但被忽視的問題):exactly-once 語義,這是流處理應用中一個備受討論且必需的功能。在本部落格系列的這部分中,我們將探討如何在 Spring Cloud Stream 應用中透過 Apache Kafka 事務實現 exactly-once 語義。瞭解前幾節中事務的工作原理,將有助於相對容易地理解 Spring Cloud Stream Kafka 應用如何實現 exactly-once 語義。
這裡需要注意一件重要的事情是,為了實現 exactly-once 語義,我們不需要編寫超出本部落格系列之前文章中已展示程式碼之外的新程式碼。本部落格將澄清充分支援 Spring Cloud Stream Kafka 應用中 exactly-once 語義所需的某些預期。
exactly-once 語義在分散式計算中很難實現。本文不打算回顧所有技術細節來探討為何這是一項如此困難的任務。對 exactly-once 語義的底層原理以及其在分散式系統中為何如此難以實現的細節感興趣的讀者,可以參考該主題的更廣泛文獻。Confluent 的這篇部落格是一個很好的起點,可以幫助理解這些技術挑戰以及 Apache Kafka 為實現它們所實施的解決方案。
儘管我們不會深入細節,但瞭解 Apache Kafka 提供的不同交付保證是值得的。主要有三種交付保證:
在至少一次(at-least-once)的交付語義中,應用程式可能會收到資料一次或多次,但保證至少收到一次。在至多一次(at-most-once)的交付保證中,應用程式可能會收到資料零次或一次,這意味著存在資料丟失的可能性。另一方面,恰好一次(exactly-once)語義則保證,正如其名稱所示,只交付一次。根據應用程式的使用場景,您可能可以選擇其中任何一種保證。預設情況下,Apache Kafka 提供至少一次的交付保證,這意味著一條記錄可能會被多次交付。如果您的應用程式可以處理重複記錄或沒有記錄的後果,那麼選擇非恰好一次的保證可能是可以接受的。相反,如果您處理關鍵任務資料,例如金融系統或醫療資料,您必須保證恰好一次的交付和處理,以避免嚴重後果。由於像 Apache Kafka 這樣的系統具有分散式特性,由於涉及許多活動部件,通常很難實現恰好一次語義。
在本部落格系列之前的文章中,我們看到了許多不同的場景。Apache Kafka 中的 exactly-once 語義針對的是讀取-處理-寫入(或消費-轉換-生產)應用程式。有時會困惑於我們究竟是“一次性”做了什麼?是初始消費、資料處理還是最終的生產部分?Apache Kafka 為整個讀取->處理-寫入序列保證 exactly-once 語義。在此序列中,讀取和處理部分始終是至少一次(at-least-once)——例如,如果處理或寫入的某個部分因任何原因失敗。當您依賴 exactly-once 交付時,事務至關重要,這樣資料的最終釋出才能成功完成或回滾。一個潛在的副作用是初始消費和處理可能會發生多次。例如,如果事務回滾,消費者偏移量就不會更新,下次輪詢(如果是在 Spring Cloud Stream 內部重試或應用程式重啟時)將重新發送相同的記錄並再次處理。因此,在消費和處理(轉換)部分,保證是至少一次,這是理解的關鍵點。任何使用 read_committed
隔離級別執行的下游消費者將只從上游處理器獲得一次準確的訊息。因此,必須理解在 exactly-once 交付的世界中,處理器和下游消費者都必須協調,才能從 exactly-once 語義中受益。任何使用 read_uncommitted
隔離級別執行的生產主題消費者可能會看到重複資料。
另一個需要記住的點是,由於記錄的消費和處理可能會發生多次,應用程式程式碼需要遵循冪等模式。這主要是當您的程式碼與外部系統(例如資料庫)互動時需要考慮的問題。在這種情況下,由應用程式有責任確保使用者程式碼沒有副作用。
讓我們回顧一下之前看到的用於簡單消費-處理-生產迴圈的程式碼。
@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
如前所述,為了使此應用程式具有事務性,我們必須提供具有適當值的 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
配置屬性。提供此屬性是 Spring Cloud Stream 中啟用上述程式碼段完全具備 exactly-once 交付能力所需的全部操作。整個端到端過程在事務邊界內執行(儘管在上述示例中我們有兩個事務)。我們有一個外部的 Kafka 事務,它在容器呼叫監聽器時啟動,還有一個由事務攔截器啟動的 JPA 事務。當 StreamBridge
傳送發生時,使用來自初始 Kafka 事務的相同事務資源,但在控制權返回容器之前不會提交。當方法退出時,JPA 事務會被提交。假設這裡出了問題,資料庫操作丟擲異常。在這種情況下,JPA 不會提交,它將回滾,異常會傳播回監聽器容器,此時 Kafka 事務也會回滾。另一方面,如果 JPA 操作成功,但 Kafka 釋出失敗並丟擲異常,JPA 不會提交但會回滾,並且異常會傳播到監聽器。
在上面的程式碼中,如果我們不與外部事務管理器同步,而只是釋出到 Kafka,那麼我們不需要使用 @Transactional
註解,甚至可以將 txCode
方法中的程式碼內聯到消費者 lambda 中。
@Bean
public Consumer<PersonEvent> process() {
return pe -> {
Person person = new Person();
person.setName(pe.getName());
PersonEvent event = new PersonEvent();
event.setName(person.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
在這種情況下,我們只有在容器呼叫監聽器時由容器發起的 Kafka 事務。當代碼透過 StreamBridge
的 send 方法釋出記錄時,KafkaTemplate
使用來自初始事務的相同事務性生產者工廠。
在這兩種場景下,情況是:我們是完全事務性的,並且最終的釋出對於該事務而言只執行一次。使用 read_committed
隔離級別的下游消費者應該恰好消費一次。
到目前為止,在本系列中我們還沒有討論 Kafka Streams。有點諷刺的是,最初 Kafka Streams 應用是 Apache Kafka 新增事務支援和 exactly-once 語義的原因,但我們至今尚未提及。原因是,在 Kafka Streams 應用中實現 exactly-once 語義非常簡單,幾乎是微不足道的。正如他們所說,它只是一個配置開關。要了解更多關於 Kafka Streams 中 exactly-once 語義的資訊,請參閱Confluent 的這篇部落格。
與基於常規 Kafka 客戶端的應用一樣,在 Kafka Streams 的情況下,當您在消費-處理-生產模式中產生最終輸出時,exactly-once 保證就會生效,這意味著只要使用 read_committed
隔離級別,下游消費者就會恰好消費一次產生的資料。
Kafka Streams 配置屬性 processing.guarantee
屬性可以在 Kafka Streams 應用中啟用 exactly-once 語義。您可以在 Spring Cloud Stream 中透過設定 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee
屬性來配置它。您需要將值設定為 exactly_once
。預設情況下,Kafka Streams 使用的值是 at_least_once
。
有狀態的 Kafka Streams 應用中通常會發生三個主要活動:
模式是:記錄被接收並處理。在此過程中,任何狀態資訊都會具體化到狀態儲存中,本質上是更新特定的變更日誌主題。最後,出站記錄被髮布到另一個 Kafka 主題。如果您注意到這個模式,它看起來類似於我們已經見過的許多場景,除了狀態儲存部分。當將 processing.guarantee
設定為 exactly_once
時,Kafka Streams 保證如果在這些活動期間發生異常或應用程式崩潰,整個單元將原子地回滾,就像什麼都沒發生一樣。應用程式重啟後,處理器會再次消費該記錄,處理它,並最終釋出資料。由於這種釋出在幕後是事務性的,所以使用 read_committed
隔離級別的下游消費者在記錄完全釋出之前不會消費它,並且會處理所有實現事務性所需的事務(例如提交已消費記錄的偏移量等等),從而保證 exactly-once 交付。
Kafka Streams 的 exactly-once 交付保證是從 Kafka 相關活動的角度來看,針對記錄的端到端消費、處理和釋出。當存在外部系統時,它不提供此保證。例如,假設您的程式碼與外部系統(例如資料庫插入或更新操作)有互動。在這種情況下,由應用程式決定如何參與事務。Spring 的事務支援在這種情況下再次派上用場。我們不想在這裡重複程式碼。但是,正如我們在本系列中多次看到的那樣,您可以將與資料庫互動的程式碼封裝在一個單獨的方法中,使用 @Transactional
註解進行標註,並提供適當的事務管理器,例如我們已經見過的 JPA 事務管理器。當這樣的方法丟擲異常時,JPA 事務會回滾,並且異常會傳播到 Kafka Streams 處理器程式碼,最終再傳播回 Kafka Streams 框架本身,然後框架回滾原始的 Kafka 事務。這裡值得再次強調的是,重要的是要理解這些從流拓撲中的處理器呼叫的操作必須編寫成處理冪等性,因為“exactly once”僅適用於整個過程,而不是序列中的單個讀取和處理。
正如我們在本文開頭已經提到的,exactly-once 交付語義是分散式計算中的一個複雜主題。然而,藉助 Kafka 原生提供的實現 exactly-once 語義的解決方案以及 Spring 在 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的支援,在 Spring Cloud Stream Kafka 應用中實現 exactly-once 交付語義變得相對容易。