領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本部落格系列的其他部分
第 1 部分:Spring Cloud Stream Kafka 應用中的事務簡介
第 2 部分:Spring Cloud Stream Kafka 應用中的生產者發起事務
第 3 部分:在 Spring Cloud Stream Kafka 應用中與外部事務管理器同步
在本部落格系列的最後三部分中,我們分析了事務在 Spring Cloud Stream Kafka 應用中的工作方式。我們遇到了事務在不同場景下(包括生產者和消費者應用)的幫助作用以及應用如何正確使用它們。既然這些基本要素已經掌握,讓我們繼續探討事務的另一個方面:錯誤發生時回滾事務。當發生錯誤且事務系統無法提交事務時,事務管理器會回滾事務,並且不會持久化任何資料供下游消費者檢視。如果應用能夠指定此回滾機制的工作方式,那將很有幫助。Spring Cloud Stream 透過 Spring 對 Apache Kafka 的基礎支援,使得這種回滾定製成為可能。我們必須瞭解一些關於生產者和消費者(消費-處理-生產)事務性應用的事項。我們將對此進行講解。
下面是我們在上一篇文章中看到的程式碼片段。
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "my data: " + i);
}
}
如果事務方法丟擲異常,我們該怎麼辦?答案是,從 Spring Cloud Stream 的角度來看,我們什麼都不需要做。事務攔截器會發起回滾,最終 Kafka 的事務協調器會中止事務。最終,異常會傳播給呼叫方,然後呼叫方可以決定是否在錯誤是瞬時的情況下重新觸發事務方法。由於這是生產者發起事務,框架不會進行重試。這種情況很簡單,因為在事務回滾期間,我們不需要從應用或框架的角度做任何事情。如果發生錯誤,事務保證會回滾。然而,請記住,即使事務已回滾,Kafka 日誌中可能仍存在未提交的記錄。隔離級別為 read_uncommitted
(預設)的消費者仍然會收到這些記錄。因此,消費者應用必須確保使用 read_committed
隔離級別,以便它們不會收到上游事務回滾的任何記錄。
我們在本部落格系列的最後一部分看到了這種情況。與第一種情況一樣,如果方法丟擲異常併發生回滾,即使 Kafka 事務與資料庫事務同步,應用也不需要做任何事情來處理錯誤。資料庫和 Kafka 釋出操作的事務都會回滾。
如果生產者發起事務回滾如此簡單,您可能會想這有什麼大不了的,以及為什麼我們必須專門用一整篇文章來討論這個話題。何時需要應用提供特定的回滾策略?當您有正在進行的消費者發起事務時,這才有意義,因為我們需要特別注意如何處理已消費記錄的狀態及其偏移量。讓我們重新審視本系列上一篇部落格中執行的消費者發起事務方法程式碼。
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
正如您所記得的,這是一個端到端事務性的消費-處理-生產模式。如果事務方法丟擲異常怎麼辦?在這裡,我們需要了解框架在回滾事務時如何處理已消費的記錄。Spring for Apache Kafka 中的底層訊息監聽器容器允許設定一個回滾處理器(rollback processor)
訊息監聽器容器會呼叫 AfterRollbackProcessor
API,傳入上次消費者拉取剩餘的記錄,失敗的記錄位於列表開頭。實現會使用主題/分割槽資訊,確保在下次拉取時再次獲取失敗的記錄。當應用在 Spring Cloud Stream 中啟用事務時,我們預設使用名為 DefaultAfterRollbackProcessor
的實現,它實現了 AfterRollbackProcessor
API。因此,當事務回滾時,此實現會預設生效。讓我們看看這個 AfterRollbackProcessor
工作時會發生什麼。
Spring Cloud Stream 允許您透過消費者繫結設定方法呼叫的最大重試次數。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
。最大嘗試次數值包含初始嘗試。此值的預設值為三。如果您想停用重試,可以將此值設定為一。在這種情況下,框架只會嘗試一次記錄。此值包含記錄的首次嘗試。因此,在預設值三的情況下,繫結器在初始嘗試後會重試兩次。
當用戶方法丟擲異常時,容器最初啟動的事務會回滾。由於我們處於事務上下文中,容器隨後會使用事務模板在一個新事務中呼叫 AfterRollbackProcessor
的 process 方法,這會啟動一個新的 Kafka 事務。在執行 AfterRollbackProcessor
的 process 方法時,它會根據最大嘗試次數配置檢查是否還有待處理的重試。如果發現還有更多重試,它會提交當前事務,這實際上是一個空操作,因為在檢查期間沒有發生任何事情。消費者會對失敗的記錄進行 seek 操作,以便下次拉取時返回此失敗的記錄。然後消費者會拉取更多記錄,這會重新投遞失敗的記錄。整個流程再次開始並持續。如果再次失敗,它會重複,直到所有可用的重試次數耗盡。一旦所有重試次數耗盡,AfterRollbackProcessor
會呼叫註冊的恢復器。Spring Cloud Stream 註冊了一個恢復器,將出錯的記錄傳送到錯誤通道。之後,輸入(已恢復)記錄的偏移量會發送到新事務。在此之後,當前事務會提交,這會原子性地將偏移量傳送到事務並提交記錄的偏移量。現在處理完成。已恢復的記錄不會包含在消費者 seek 中,下次拉取會返回新的記錄。
如果恢復由於任何原因失敗,容器的行為就像重試沒有耗盡一樣,並進入無限重試。如上所述,當恢復成功時,失敗的記錄不會包含在 seek 中,因此下次拉取不會返回該記錄。
假設應用設定的最大嘗試次數為兩次,並且記錄兩次都失敗,以下是使用事務時事件發生的順序。
TransactionTemplate
的 execute 方法內被呼叫,這會觸發 KafkaTransactionManager
啟動一個新的事務。@Transactional
註解進行標記。StreamBridge
呼叫 send 方法,該方法會發布到 Kafka 主題。這裡不會啟動新的 Kafka 事務,因為已經有一個 Kafka 事務正在進行中。KafkaTemplate
使用相同的事務資源(生產者)來發布訊息。TransactionTemplate
的 execute 方法呼叫了使用者方法。然後,它會回滾 Kafka 事務。AfterRollbackProcessor
。它會在其 TransactionTemplate
上開始另一個 execute 操作,由 KafkaTransactionManager
建立一個新的 Kafka 事務。TransactionTemplate
的 execute 方法呼叫 AfterRollbackProcessor
API 中的 process 方法並立即返回,因為還剩一次重試機會(因為我們最多嘗試兩次)。TransactionTemplate
的 execute 方法呼叫 AfterRollbackProcessor
的 process 方法,並發現沒有更多的重試機會了。AfterRollbackProcessor
中的 process 方法返回,容器就會呼叫事務上的 commit 操作,該操作會原子性地將偏移量傳送到事務並執行消費者偏移量提交。為什麼在上面的步驟 8 中需要一個新的事務,以及在失敗嘗試後每次呼叫 AfterRollbackProcessor
時都需要一個新的事務?為什麼不能在提交原始 Kafka 事務之前呼叫 AfterRollbackProcessor
?雖然每次失敗嘗試後建立一個新的 Kafka 事務來執行回滾後任務可能聽起來像是不必要的開銷,但這卻是必需的。當原始事務發生回滾時,它不會將偏移量傳送到事務。如果需要重試,容器會在一個新的事務中再次呼叫監聽器,這個迴圈會一直持續,直到重試次數耗盡並且記錄被恢復。容器建立並回滾的事務數量可能與最大嘗試次數一樣多,而沒有將偏移量傳送到事務。每次原始事務回滾時,容器都會為 AfterRollbackProcessor
呼叫啟動一個相應的新事務,其提交都是空操作(no-op)(恢復後的最後一次提交除外)。在恢復記錄後,這最後一次呼叫會將偏移量傳送到事務,以便原子地提交偏移量並在 Kafka 端進行必要的事務清理。因此,正如我們所見,為了將偏移量傳送到事務,我們需要在一個新的事務中呼叫 AfterRollbackProcessor
。
如果應用想要定製回滾後任務,而不是使用 Spring Cloud Stream 使用的預設 DefaultAfterRollbackProcessor
,那麼可以使用 ListenerContainerCustomizer
提供一個自定義的 AfterRollbackProcessor
。以下列表顯示瞭如何做到這一點
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 1)));
}
提供上述定製後,恢復器會記錄錯誤並繼續。DefaultAfterRollbackProcessor
的建構函式也接受一個沒有重試的 backoff。因此,在本例中,一旦方法中首次發生異常,記錄就會透過日誌記錄的方式得到恢復。
Spring Cloud Stream 允許您在重試次數耗盡後,將失敗的記錄作為恢復過程的一部分發送到一個唯一的死信佇列 (DLQ) 主題。我們提到過,Spring Cloud Stream Kafka 繫結器使用的 DefaultAfterRollbackProcessor
會將記錄傳送到一個錯誤通道。當應用啟用 DLQ 時,繫結器會將失敗的記錄傳送到一個特殊的 DLT 主題。這方面的具體細節超出了我們事務討論的範圍。然而,問題在於 DLT 釋出是否具有事務性。在設定 DLQ 基礎設施時,如果應用使用了事務(即提供了 transaction-id-prefix
),繫結器會使用與 KafkaTransactionManager
中使用的相同的原始事務性生產者工廠。因此,框架保證以事務方式釋出到 DLT。
透過本文的討論,我們涵蓋了在 Spring Cloud Stream Kafka 應用中使用事務時的所有主要構建塊。在本部落格系列的下一部分中,我們將探討 Kafka 中事務的一個實際應用,即流行的精確一次語義(exactly-once-semantics),以及如何在 Spring Cloud Stream Kafka 應用中啟用它們。