Spring Cloud Stream Kafka 應用中的事務介紹

工程 | Soby Chacko | 2023 年 9 月 27 日 | ...

我們將開始一個新的部落格系列,重點介紹如何在 Spring Cloud Stream Kafka 應用中使用事務。本系列部落格將涵蓋使用 Spring Cloud Stream 和 Apache Kafka 編寫事務性應用的許多底層細節。透過本系列部落格,我們希望為您提供足夠的資訊,幫助您為各種業務場景編寫事務性的 Spring Cloud Stream Kafka 應用。

基本組成部分

Spring Cloud Stream Kafka 應用中的事務基礎支援主要來自 Apache Kafka 本身以及 Spring for Apache Kafka 庫。然而,本系列部落格是關於如何特別結合 Spring Cloud Stream 使用這種支援的。如果您熟悉 Apache Kafka 中事務的工作原理,以及 Spring for Apache Kafka 如何以 Spring 友好的方式使其成為可能,那麼本系列會讓您感到得心應手。

雖然 Apache Kafka 提供了基礎的事務支援,但 Spring for Apache Kafka (也稱為 Spring Kafka) 庫在 Spring 端擴充套件了這種支援,使 Spring 開發者可以透過依賴 Spring Framework 中傳統的事務支援更自然地使用它。Spring Cloud Stream 中的 Kafka binder 在 Spring for Apache Kafka 的支援之上進一步構建,使得在 Spring Cloud Stream Kafka 應用中使用相同的支援成為可能。在本系列部落格的第一部分,我們將簡要介紹 Kafka 事務,分析一些依賴事務會有幫助的用例,以及 Apache Kafka 和 Spring 生態系統中的事務組成部分。

在許多用例中,在 Apache Kafka 中以事務方式釋出、消費和處理記錄是必要的。當在生產者發起或實現消費-處理-生產模式的應用中以事務方式生產記錄時,它們會被原子地寫入 Kafka。如果出現問題,整個過程會被回滾,並且事務不會被提交。需要記住的一點是,與支援事務的關係型資料庫不同,在關係型資料庫中事務回滾時不會持久化任何記錄,而 Apache Kafka 仍然會將記錄釋出到主題分割槽。這種行為是由於 Apache Kafka 基本的只追加的不可變日誌架構所致,該架構不允許對記錄進行任何修改,例如在將記錄新增到記錄日誌後將其刪除。有人可能想知道使用事務的好處是什麼,因為在事務被中止時記錄可能會被髮布到主題分割槽,從而可能導致消費者看到它們。然而,具有正確隔離級別的消費者永遠不會看到回滾的記錄,即使來自回滾事務的記錄位於主題分割槽中。因此,從端到端的角度來看,整個過程保證是完全事務性的。

事務性用例

事務通常會給 Kafka 應用帶來顯著的開銷。在 Apache Kafka 中使用事務時,每條記錄都必須向記錄新增特殊的事務日誌,向特殊的事務狀態主題傳送事務標記等等。所有這些步驟都需要時間和空間,從而增加了總體延遲。因此,每個應用都必須透過分析用例來仔細權衡對事務支援的需求。

事務主要提供了一種保護資料的方式,以提供 ACID 能力。它透過提供原子性 (atomicity)、一致性 (consistency)、資料隔離 (data isolation) 和永續性 (durability) 來確保資料完整性。

在當今的企業中,有幾個關鍵任務用例非常需要使用事務並依賴它們帶來的 ACID 語義。關於何時使用事務並證明其開銷是合理的,並沒有一個簡單直接的答案。您必須審視應用並評估所涉及的風險。事務的常見典型示例是任何需要處理財務資料的場景。Bob 向 Alice 匯款,這個操作會從 Bob 的賬戶中扣款,然後 Alice 的賬戶會增加金額。如果此過程中出現任何問題,整個過程會像什麼都沒發生一樣回滾,因為我們不希望流程處於雜亂的狀態。如果從 Bob 的賬戶中扣款了,但 Alice 的賬戶沒有增加金額(或反之),那就會出現問題。從 Apache Kafka 的角度來看,這裡有一些事情發生。首先,一條訊息傳送到 Kafka 處理器,以從 Bob 的賬戶中扣款幷包含接收方的 TTP 資訊。處理器處理該資訊,然後向另一個主題傳送一條訊息,指示已從 Bob 的賬戶中扣款。之後,另一條訊息指示 Alice 的賬戶現已增加金額。此過程中的各種操作需要複雜的協調以確保一切按預期進行。每當我們有多個像這樣相關的事件時,事務可能有助於確保資料完整性並提供 ACID 語義。在此示例中,單個事件本身沒有太大意義,但它們組合在一起形成整個流程,並且需要事務性來確保資料完整性。

如果我們想概括這種模式,我們可以說,任何時候,如果存在一個關鍵任務的消費-處理-釋出模式,並且其中一個元件失敗時,整個處理器需要表現得好像什麼都沒發生一樣,那麼使用事務是一種可以考慮的潛在解決方案。

其他領域的一些高階示例

  • 想象一個航空預訂系統需要釋出一個包含多段行程的預訂資訊。如果由於任何原因,系統無法釋出整個預訂,則需要中止該過程並重新開始。
  • 一個經紀公司傳送包含多個買入訂單的請求到結算所。假設該流程無法將這些單獨的訂單作為一個原子單元釋出到供結算所消費的訊息系統中。在這種情況下,經紀公司必須重新發送訂單。
  • 一個醫療賬單系統向保險公司傳送患者測試資料時,必須將來自患者的各種相關測試作為一個整體釋出到訊息系統。
  • 一個線上遊戲系統需要跟蹤玩家在遊戲中的位置,並以事務方式將其傳送到中央伺服器,以確保所有玩家看到的是正確的座標,而不是部分更新的位置。
  • 零售商的庫存補貨系統需要將各種相關產品狀態資訊作為一個原子單元傳送。
  • 一個線上電子商務訂購系統,在單個原子聚合操作中釋出訂單詳細資訊(如訂單項、賬戶持有人資訊、配送資訊等)。

與外部資料庫同步

另一種需要事務的用例是您必須與其他事務系統同步時。除了釋出到 Kafka 之外,假設您必須將記錄或一些派生資訊持久化到關係型資料庫中,所有這些操作都必須在一個原子操作內完成。如果其中一個系統傳送資料失敗,我們必須回滾。正如本系列部落格下一部分將介紹的那樣,如果您每次只向 Kafka 釋出一條記錄,並且沒有其他相關操作,則無需使用事務。但是,即使您只向 Kafka 主題釋出一次,但作為同一過程的一部分使用了關係型資料庫操作,為了確保資料完整性,使用事務就變得必要了。

釋出到多個 Kafka 主題

生產者獨有的應用中事務的另一個用例是釋出到多個 Kafka 主題。假設您有一些業務關鍵型資料,例如一個重要的通知(如訂單詳情),您希望將其釋出到多個 Kafka 主題,訂單詳情的一部分發布到一個訂單主題,另一部分發布到一個發貨主題。在這種情況下,我們可以使用事務來確保端到端的資料完整性。

泛化上述事務性用例

上述用例集合並非需要事務的窮舉列表。在當今企業的各種領域中,還有許多其他用例與我們所 рассмотре 的總體方向類似,需要訊息系統中的事務處理。

以下列表總結了 Apache Kafka 中事務可能有用的通用用例:

  • 消費-處理-釋出系統,其中需要將記錄作為一個原子單元釋出,並提供 exactly-once-semantics(恰好一次)的交付保證。
  • 多個相關的釋出事件,單獨來看沒有意義。
  • 將資料作為一個原子單元釋出到多個主題。
  • 與外部事務管理器同步。

這是所有這些不同情況的圖示表示。它涵蓋了我們上面考慮的場景,例如消費-處理-生產、多個生產者、與外部事務同步等。一個處理器從一個入站主題消費資料,執行業務邏輯,將一些資訊持久化到資料庫系統,併發布到多個 Kafka 主題。

scst-kafka-txn-overview

Apache Kafka 中的事務

有大量文獻可供學習 Apache Kafka 中事務如何工作的底層細節,這裡有一篇文章可以作為這些細節的介紹。然而,從一個非常高的層面簡要了解 Kafka 客戶端 API 如何實現事務性仍然是值得的。需要注意的一點是,對於普通消費者來說,Kafka 中沒有所謂的事務性消費者,但有事務感知型消費者。消費者透過設定隔離級別來實現這種事務感知。預設情況下,Kafka 中的消費者會看到所有記錄,包括上游生產者未提交的記錄,因為 Kafka 消費者預設的隔離級別是 read_uncommitted。Kafka 消費者必須使用 read_committed 的隔離級別才能提供端到端的事務語義。在本系列部落格的後續章節中,我們將看到如何在 Spring Cloud Stream 中實現這一點。

在生產者端,應用依賴於 Kafka 客戶端提供的一些 API 方法。我們來看一些重要的方法。

為了使應用具有事務性,Kafka 客戶端需要一個事務 ID。應用透過 Kafka 生產者屬性 transactional.id 來提供此 ID,事務協調器使用此 ID 來註冊並啟動事務。事務協調器使用此 ID 來跟蹤事務的所有方面,例如初始化、進行中的進度、提交等。

以下列表總結了與事務相關的關鍵生產者 API 方法。

Producer#initTransactions() - 每個生產者呼叫一次以初始化事務支援。初始化 Kafka 事務。

Producer#beginTransaction() - 在傳送記錄之前開始事務。

Producer#sendOffsetsToTransaction() - 將消費記錄的偏移量傳送到事務。

Producer#commitTransaction() - 提交事務。

Producer#abortTransaction() - 中止事務。

在傳送記錄之前,我們需要初始化並開始事務。然後,繼續進行資料處理。如果為了執行此釋出而消費了記錄,我們必須使用生產者將消費記錄的偏移量傳送到事務。在此之後,可以繼續執行事務提交或中止操作 (commitTransaction 或 abortTransaction)。當我們呼叫 commitTransaction 方法時,正是 Kafka 客戶端將偏移量原子地傳送到 consumer_offsets 主題的時候。

Spring for Apache Kafka 中的事務支援

使用像 Spring for Apache Kafka 或依賴於它的 Spring Cloud Stream Kafka binder 這樣的框架時,它們帶來的好處是允許應用主要關注業務邏輯,因為框架處理了我們上面看到的底層事務性模板程式碼序列。使用 Spring for Apache Kafka 或其他框架(例如使用它的 Spring Cloud Stream)是有益的,因為它使我們不必擔心編寫(上面描述的)底層模板程式碼序列來確保所有事務步驟都成功。正如您可以想象的那樣,這裡有很多活動部件,如果您遺漏一個步驟或未按預期執行一個步驟,可能會導致應用容易出錯。在使用 Spring 的情況下,我們提到的這些框架代表應用開發者處理了這些問題。讓我們簡要看看它是如何做到的。

Spring for Apache Kafka 框架透過提供 Spring 開發者熟悉的、一致的事務性程式設計模型來隱藏所有這些底層細節。結果是,使用 Spring for Apache Kafka 或 Spring Cloud Stream 等其他框架的應用,可以簡單地專注於應用的業務邏輯,而不必處理複雜的底層事務相關事務。

KafkaTransactionManager

Spring for Apache Kafka 如何提供這種一致的事務性程式設計模型?簡而言之,Spring 開發者傳統上使用 @Transactional 註解或程式設計式方法(例如直接在應用中使用 TransactionTemplate)來建立本地事務。這些機制需要一個事務管理器實現來驅動事務方面。Spring for Apache Kafka 提供了一個事務管理器實現。KafkaTransactionManager 是 Spring Framework 中 PlatformTransactionManager 的一個實現。您可以將此事務管理器與 @Transactional 註解一起使用,或透過使用 TransactionTemplate 在本地事務中使用它。KafkaTransactionManager 使用生產者工廠建立 Kafka 生產者,並提供 API 來開始、提交和回滾事務。

KafkaResourceHolder

Spring for Apache Kafka 還提供了一個 KafkaResourceHolder,用於持有 Kafka 生產者資源。Spring for Apache Kafka 中的 KafkaTemplate 為給定的生產者工廠在當前執行緒上觸發 KafkaResourceHolder 的繫結。在消費者發起的事務中,訊息監聽容器執行此繫結,並且生產者工廠與 KafkaTransactionManager 使用的生產者工廠相同。這樣,事務為所有釋出需求都使用同一個事務性生產者。

除了上述元件外,Spring for Apache Kafka 還提供了其他用於處理事務相關問題的實用工具。在後續系列章節中,我們會在必要時看到其中的一些工具。

在本系列部落格的第二部分,我們將深入探討在 Spring Cloud Stream 應用中使用事務的更實際的實現細節。

訂閱 Spring 時事通訊

保持與 Spring 時事通訊的聯絡

訂閱

搶先一步

VMware 提供培訓和認證,助您加速前進。

瞭解更多

獲得支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部