領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本系列部落格的其他部分
第一部分:Spring Cloud Stream Kafka 應用程式中的事務簡介
第二部分:Spring Cloud Stream Kafka 應用程式中的生產者啟動事務
第三部分:Spring Cloud Stream 中與外部事務管理器同步
在本部落格系列的最後三個部分,我們分析了 Spring Cloud Stream Kafka 應用程式中的事務是如何工作的。我們探討了事務在不同場景下的作用,包括生產者和消費者應用程式,以及應用程式如何正確使用它們。現在這些基礎知識我們已經掌握,讓我們來探討事務的另一個方面:發生錯誤時回滾事務。當發生錯誤且事務管理器無法提交事務時,事務管理器會回滾事務,並且不會將任何內容持久化供下游消費者檢視。如果應用程式能夠控制這種回滾機制的工作方式,那就太有用了。Spring Cloud Stream 透過對 Apache Kafka 的 Spring 提供底層支援,簡化了這種回滾定製。我們需要注意關於生產者和消費者(consume-process-produce)事務性應用程式的一些事項。我們將在下面逐一介紹。
這是我們在上一篇文章中已經看過的程式碼片段。
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "my data: " + i);
}
}
如果事務方法丟擲異常,我們該怎麼辦?答案是,從 Spring Cloud Stream 的角度來看,我們無需做任何事情。事務攔截器會發起回滾,最終 Kafka 中的事務協調器會中止該事務。最終,異常會傳播給呼叫者,然後呼叫者可以決定是否重新觸發事務方法(如果錯誤是暫時的)。框架不會重試,因為這是生產者發起的事務。這種情況很簡單,因為在事務回滾期間,我們不需要在應用程式或框架層面做任何事情。如果發生錯誤,它保證會被回滾。然而,請記住,即使事務被回滾了,Kafka 日誌中可能仍有未提交的記錄。隔離級別為 read_uncommitted(預設值)的消費者仍然會收到這些記錄。因此,消費者應用程式必須確保它們使用 read_committed 的隔離級別,這樣它們就不會收到上游事務回滾的任何記錄。
我們在本部落格系列的最後一部分看到了這種情況。與第一種情況一樣,如果方法丟擲異常併發生回滾,即使 Kafka 事務與資料庫事務同步,應用程式也無需做任何事情來處理錯誤。事務會從資料庫和 Kafka 釋出中回滾。
如果生產者發起的事務回滾如此簡單,你可能會想,有什麼大不了的,為什麼我們必須專門用一篇文章來討論這個話題?什麼時候有必要讓應用程式提供特定的回滾策略?當你有一個正在進行的消費者發起的事務時,這是有意義的,因為我們需要特別注意如何處理已消費記錄的狀態及其偏移量。讓我們重新審視我們在上一篇部落格中使用的消費者發起的事務方法程式碼。
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
你可能還記得,這是一個端到端的事務性 consume-process-produce 模式。如果事務方法丟擲異常怎麼辦?在這裡,我們需要理解框架在回滾事務時如何處理已消費的記錄。Spring for Apache Kafka 的底層訊息監聽器容器允許設定一個 回滾處理器。
訊息監聽器容器使用失敗記錄作為列表的開頭,呼叫 AfterRollbackProcessor API 處理來自上次消費者輪詢的剩餘記錄。實現會使用 topic/partition 資訊來確保在下次輪詢時重新獲取失敗的記錄。當應用程式在 Spring Cloud Stream 中啟用事務時,我們使用一個名為 DefaultAfterRollbackProcessor 的預設實現,它實現了 AfterRollbackProcessor API。因此,當事務回滾時,這個實現會預設啟動。讓我們來看看當這個 AfterRollbackProcessor 在起作用時會發生什麼。
Spring Cloud Stream 允許你透過消費者繫結設定方法呼叫重試的最大次數。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts。max attempts 值包括初始嘗試。此值的預設值為三。如果你想停用重試,可以將此值設定為一。在這種情況下,框架只嘗試一次記錄。此值包含記錄的第一次嘗試。因此,在預設的三次嘗試的情況下,繫結器會在初始嘗試後重試兩次。
當用戶方法丟擲異常時,容器最初啟動的事務將被回滾。由於我們在事務上下文中,容器隨後會在新事務中使用 transaction template 呼叫 AfterRollbackProcessor 的 process 方法,啟動一個新的 Kafka 事務。在執行 AfterRollbackProcessor 的 process 方法時,它會根據 max attempts 配置檢查是否還有剩餘的重試次數。如果發現還有重試次數,它會提交當前事務,這是一個 no-op 操作,因為在檢查過程中沒有發生任何事情。消費者會進行一次 seek 操作,定位到失敗的記錄,以便下次輪詢返回該失敗記錄。然後消費者會輪詢更多記錄,這會導致失敗的記錄被重新傳遞。整個流程會重新開始並繼續。如果再次失敗,它會重複此過程,直到耗盡所有可用的重試次數。一旦所有重試次數都耗盡,AfterRollbackProcessor 會呼叫註冊的 recoverer。Spring Cloud Stream 會註冊一個 recoverer,將錯誤中的記錄傳送到錯誤通道。之後,輸入(已恢復)記錄的偏移量會被髮送到新事務中。之後,當前事務被提交,該提交原子地將偏移量傳送到事務並提交記錄的偏移量。此時流程完成。已恢復的記錄不包含在消費者 seek 中,下次輪詢會返回新記錄。
如果恢復因任何原因失敗,容器的行為將如同未用盡重試次數一樣,並陷入無限重試。如上所述,當恢復成功時,失敗的記錄不包含在 seek 中,因此下次輪詢不會返回該記錄。
假設應用程式將最大嘗試次數設定為兩次,並且記錄在兩次嘗試中都失敗了,以下是使用事務時的事件順序。
TransactionTemplate 的 execute 方法中被呼叫,這會觸發 KafkaTransactionManager 啟動一個新的事務。@Transactional 註解的使用者方法。StreamBridge 呼叫 send 方法,該方法釋出到 Kafka topic。這裡不會啟動新的 Kafka 事務,因為已經有一個 Kafka 事務在進行中。KafkaTemplate 使用相同的事務資源,即生產者,來發布。TransactionTemplate 的 execute 方法呼叫了使用者方法。然後它會回滾 Kafka 事務。AfterRollbackProcessor,因為我們在事務上下文中。它會透過 TransactionTemplate 啟動另一個 execute 操作,由 KafkaTransactionManager 建立一個新的 Kafka 事務。TransactionTemplate 的 execute 方法呼叫 AfterRollbackProcessor API 的 process 方法,並立即返回,因為還有一個重試次數(因為我們最多有兩次嘗試)。TransactionTemplate 的 execute 方法呼叫 AfterRollbackProcessor 的 process 方法,並發現沒有剩餘的重試次數了。AfterRollbackProcessor 中的 process 方法返回後,容器會呼叫事務的 commit 操作,該操作將偏移量原子地傳送到事務中,並執行消費者偏移量的提交。為什麼在上面的步驟 8 和每次後續呼叫 AfterRollbackProcessor 失敗後,我們需要一個新的事務?為什麼不能在提交原始 Kafka 事務之前呼叫 AfterRollbackProcessor?雖然在每次失敗的嘗試後建立一個新的 Kafka 事務來執行後回滾任務可能聽起來是不必要的開銷,但這是必要的。當原始事務發生回滾時,它不會將偏移量傳送到事務。如果有重試,容器會在新事務中再次呼叫監聽器,然後迴圈繼續,直到重試次數用盡並恢復記錄。潛在地,容器建立並回滾的事務數量可能與 max attempts 的數量相同,而不會將偏移量傳送到事務。每次原始事務回滾時,容器都會為 AfterRollbackProcessor 呼叫啟動一個相應的(no-op 提交的)新事務,直到最後一次恢復後的提交。在恢復記錄後,最後一次呼叫會將偏移量傳送到事務中,以原子地提交偏移量並在 Kafka 端執行必要的事務清理。因此,正如我們所見,要將偏移量傳送到事務,我們需要在新事務中呼叫 AfterRollbackProcessor。
如果應用程式想要自定義後回滾任務,而不是使用 Spring Cloud Stream 使用的預設 DefaultAfterRollbackProcessor,那麼它可以使用 ListenerContainerCustomizer 來提供自定義的 AfterRollbackProcessor。下面的列表展示瞭如何做到這一點。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 1)));
}
在提供上述自定義時,recoverer 會記錄錯誤並繼續。DefaultAfterRollbackProcessor 的建構函式還帶有一個具有無重試的退避。因此,在這個例子中,一旦方法中發生異常,記錄就會被記錄下來。
Spring Cloud Stream 允許你在耗盡所有重試次數後,將失敗的記錄作為恢復過程的一部分發送到一個唯一的DLQ(死信佇列)topic。我們提到 Spring Cloud Stream Kafka 繫結器使用的 DefaultAfterRollbackProcessor 會將記錄傳送到錯誤通道。當應用程式啟用DLQ時,繫結器會將失敗的記錄傳送到一個特殊的DLT topic。這如何發生的細節超出了我們事務討論的範圍。然而,問題在於DLT釋出是否是事務性的。在設定DLQ基礎設施時,如果應用程式使用事務(即,它提供了 transaction-id-prefix),繫結器將使用與 KafkaTransactionManager 中相同的原始事務生產者工廠。因此,框架保證事務性地釋出到DLT。
透過本文的討論,我們涵蓋了在 Spring Cloud Stream Kafka 應用程式中使用事務時的所有主要構建塊。在本部落格系列的下一部分,我們將探討 Kafka 中事務的一個實際應用,即流行的 exactly-once-semantics,以及我們如何在 Spring Cloud Stream Kafka 應用程式中啟用它們。