領先一步
VMware 提供培訓和認證,助力您快速成長。
瞭解更多本部落格系列的其他部分
第一部分:Spring Cloud Stream Kafka 應用中的事務簡介
本文是部落格系列的第二部分,我們將詳細探討 Spring Cloud Stream 和 Apache Kafka 中的事務。在前一部分中,我們對事務進行了總體介紹,觸及了基本思想。在本部分部落格系列中,我們將深入瞭解一些實現細節及其實際應用。
在本文中,我們主要從生產者的角度來理解事務如何在 Spring Cloud Stream 和 Apache Kafka 中工作。
在我們深入探討生產者啟動的事務之前,讓我們先透過一個簡單的生產者瞭解一些基礎知識。在 Spring Cloud Stream 中,有幾種方式可以編寫生產者(在訊息領域也稱為釋出者)。如果您需要在預定時間生成資料,可以編寫一個 java.util.function.Supplier
方法,如下所示。
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
當將上述 Supplier 作為 Spring bean 提供時(如程式碼所示),Spring Cloud Stream 將其視為釋出者,並且由於我們在這裡是在 Apache Kafka 的上下文中,它會將 POJO 記錄傳送到 Kafka topic。
預設情況下,Spring Cloud Stream 每秒呼叫一次 supplier,但您可以透過配置更改該計劃。有關更多詳細資訊,請參閱參考文件。
如果您不想輪詢 supplier,而是想控制它的釋出頻率怎麼辦?Spring Cloud Stream 透過 StreamOperations
API 及其開箱即用的實現 StreamBridge
提供了一種便捷的方式。示例如下。
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
在這種情況下,應用程式使用 REST 端點透過 StreamBridge
觸發資料釋出。由於框架不是按計劃呼叫該函式,因此任何外部方都可以透過呼叫 REST 端點來啟動資料釋出。
在這些基本生產者中使用事務是否合適?
既然我們已經瞭解了 Spring Cloud Stream 提供的兩種記錄釋出策略,讓我們回到主要討論主題:事務性發布。假設一個場景,在使用一個或多個這些生產者時,我們希望確保資料完整性並獲得事務保證。在這種情況下,問題是首先是否需要使用事務來實現這些目標。在上面的這兩個示例中,如何確保記錄是事務性發布的?簡而言之,您應該避免將事務用於這些型別的釋出用例。這些示例中的記錄釋出是單次傳送場景。使用同步生產者,我們可以獲得相同的語義事務保證。預設情況下,生產者是非同步的,當使其在同步模式下執行時,生產者會確保在向客戶端傳送響應之前將記錄寫入 leader 和所有副本。您可以透過將 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync
屬性設定為 true
來啟用同步釋出。
總而言之,在設計純生產者應用程式時,請謹慎使用事務。如果您使用 Supplier
或透過 StreamBridge
一次傳送一條記錄,我們不建議使用事務,因為將生產者轉換為同步模式執行可以在沒有事務開銷的情況下達到相同的效果。這次討論引出了一個有趣的問題。對於純生產者應用程式,何時有必要使用事務並獲得收益?正如本部落格系列的前一部分所討論的,這完全取決於應用程式的用例。在生產者的上下文中,這意味著我們只有在進行多個相關的釋出,或者除了釋出之外還需要與外部事務管理器同步時,才需要使用事務。本文的下一部分將介紹前一種場景,本部落格系列的下一篇文章將介紹後一種場景。
在 Spring Cloud Stream 的 Kafka binder 中啟用事務的主要驅動因素是單個屬性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
。當此屬性具有有效的字串字首時,Spring Cloud Stream 中的 Kafka binder 會確保底層 KafkaTemplate
使用事務釋出資料。順便提一下,此屬性會指示 Spring Cloud Stream 在使用處理器模式(consume-process-produce 或 read-process-write 模式)時使消費者具有事務感知能力。
儘管這有點反直覺,但讓我們回到之前的單例 Supplier
或 StreamBridge
示例,並引入事務來理解事務元件的主要用法。如前所述,在那些情況下我們不需要使用事務,因為這會增加額外的開銷。然而,這樣做有助於我們理解事物。
程式碼如下所示
@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
}
現在讓我們提供所需的屬性。
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-
由於我們在應用程式的配置中提供了該屬性,因此在此示例中,每次 supplier 被呼叫(透過框架)或有人呼叫 StreamBridge#send
方法背後的 REST 端點時,底層向 Kafka topic 的釋出都將完全具備事務性。
當 supplier 觸發時,Kafka binder 使用 KafkaTemplate
釋出資料。當 binder 檢測到應用程式提供了 transaction-id-prefix
屬性時,每次 KafkaTemplate#send
呼叫都會透過 KafkaTemplate#executeInTransaction
方法完成。因此,請放心,框架會事務性地完成底層向 Kafka topic 的釋出。從應用程式開發者的角度來看,為了事務目的唯一需要提供的是 transaction-id-prefix
屬性。
在開發或除錯事務性應用程式時,將日誌級別設定為 TRACE
通常很有價值,這樣相關的底層事務類可以為我們提供正在發生的事情的詳細資訊。
例如,如果您將以下包的日誌級別設定為 TRACE,您將在日誌中看到大量活動。
logging:
level:
org.springframework.transaction: TRACE
org.springframework.kafka.transaction: TRACE
org.springframework.kafka.producer: TRACE
org.springframework.kafka.core: TRACE
每次框架呼叫 supplier 方法時,我們可以在日誌中觀察到以下內容
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()
從跟蹤日誌中可以看出,每次事務性地釋出記錄時,都會形成一個序列:beginTransaction、Sending、Sent 和 commitTransaction。如果您執行應用程式,您會觀察到每秒都會看到這些序列,因為這是 Spring Cloud Stream 呼叫 Supplier
方法的預設計劃。
同樣的事務流程也適用於 StreamBridge#send
的情況。當 Spring Cloud Stream 呼叫 send 方法時,輸出繫結使用的底層 KafkaTemplate
會確保記錄在事務內釋出,因為我們提供了 transaction-id-prefix
。
瞭解了基礎知識後,我們來看看何時使用事務是合理的。正如我們之前討論的,將多個記錄作為單個原子單元釋出的需求是一個有效的場景,在這種情況下使用事務是必要的。
讓我們看下面的程式碼示例
public void publish(StreamBridge streamBridge {
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
如您所見,這是一個特意設計的示例,以演示問題的關鍵。我們不是釋出一次,而是釋出多條記錄。釋出到多個 topic 在這裡也是同樣有效的方法。我們可能會認為,透過設定 transaction-id-prefix
屬性,我們可以快速將多個記錄的釋出包裝在單個事務中。然而,僅憑這一點是不夠的。我們仍然需要提供字首屬性。但是,僅僅提供字首屬性,每次傳送仍然發生在各自的事務中。為了確保所有五條記錄的整個端到端釋出原子性地發生,我們需要在方法上應用核心 Spring Framework 中的 @Transactional
註解。此外,我們必須提供一個使用 Spring Cloud Stream Kafka binder 建立的相同生產者工廠的事務管理器 bean——KafkaTransactionManager
。下面是我們的程式碼和應用程式配置現在看起來的樣子:
@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {
@Autowired
StreamBridge streamBridge;
@Autowired Sender sender;
@Autowired
DefaultBinderFactory binderFactory;
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProducer.class, args);
}
@PostMapping("/send-data")
public void publishData() throws InterruptedException {
sender.send(streamBridge);
}
@Component
static class Sender {
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
}
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
return kafkaTransactionManager;
}
}
以及相應的配置
spring:
cloud:
stream:
bindings:
mySupplier-out-0:
destination: my-topic
kafka:
binder:
Transaction:
transaction-id-prefix: mySupplier-
producer:
configuration:
retries: 1
acks: all
請注意,前面程式碼中的事務性方法(帶有 @Transactional
註解的方法)必須與呼叫該方法的類不同。如果呼叫發生在同一類的方法之間或不是 Spring 管理的 bean 的不同類之間,則不會建立代理,並且事務攔截器不會生效。JVM 在執行時不知道代理和攔截器機制。當在方法上新增 @Transactional
註解時,Spring 會在幕後為該方法建立一個事務代理。當 Spring Cloud Stream 呼叫事務性方法時,代理會攔截該呼叫,然後透過代理物件發生實際呼叫。
我們提供的自定義 KafkaTransactionManager
bean 有兩個作用。首先,它使 Spring Boot 應用 @EnableTransactionManagerment
。它還提供 binder 內部使用的相同生產者工廠,以便 Transactional 註解在應用事務時使用正確的資源。
當 Spring Boot 檢測到可用的事務管理器 bean 時,它會自動為我們應用 @EnableTransactionManagement
註解,該註解負責檢測 @Transactional
註解,然後透過 Spring AOP 代理和通知機制新增攔截器。換句話說,Spring AOP 會為 @Transactional
方法建立一個代理幷包含 AOP 通知。如果沒有應用 @EnableTransactionManagement
註解,Spring 不會觸發任何這些代理和攔截機制。由於 EnableTransactionManagement
註解對於這些各種原因至關重要,我們必須提供一個事務管理器 bean。否則,方法上的 Transactional
註解將不起作用。
請注意,我們從 binder 獲取事務性生產者工廠,並在 KafkaTransactionManager
的建構函式中使用它。當應用程式中存在此 bean 時,所有記錄的整個釋出現在都在單個事務的範圍內進行。我們在跟蹤日誌中只看到一個 beginTransaction…commitTransaction 序列,這意味著只有一個真正的事務執行了所有釋出操作。
在幕後,事件序列如下
Transactional
註解的方法被呼叫後,事務攔截器透過 AOP 代理機制啟動,並使用自定義的 KafkaTransactionManager
開啟一個新事務。StreamBridge#send
方法時,底層的 KafkaTemplate
將使用自定義 KafkaTransactionManager
建立的相同事務性資源。由於事務已經進行中,它不會開啟另一個事務,而是在同一個事務性生產者上進行釋出。send
方法時,它不會開啟新的事務。相反,它會透過原始事務中使用的相同生產者資源進行釋出。KafkaResourceHolder
的 commit 或 rollback 方法,從而呼叫 Kafka 生產者來 commit 或 rollback 事務。由於我們的示例中只有一個自定義 KafkaTransactionManager
bean,我們可以直接使用 Transactional
註解。另一方面,如果我們有多個自定義 KafkaTransactionManager
bean,我們就必須使用正確的 bean 名稱來限定 @Transactional
註解。
如果我們移除自定義的 KafkaTransactionManager
並執行此應用程式,您會看到它建立了五個獨立的事務,而不是單個事務。如果您啟用 TRACE
日誌記錄,您可以在日誌中看到五個 beginTransaction…commitTransaction 序列。
您可以透過編寫一個事務性消費者 Spring Cloud Stream 應用程式,並將其隔離級別設定為 read_committed
來驗證此行為。您可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level
屬性並將其值設定為 read_committed
來實現。為了測試目的,在 for 迴圈中的每個 StreamBridge#send
之後新增一個 Thread.sleep
或其他等待機制來模擬行為。您會看到,無論等待多久,只要每個 send
方法呼叫返回,消費者就會收到資料,這證明了並非單個事務執行了整個操作,而是每個 send
都發生在自己的事務中。
我們看到每次傳送都有獨立的事務,因為 Transactional
註解沒有達到我們預期的效果。Transactional
註解只有在存在可用的事務管理器 bean 並且其生產者工廠與 binder 使用的生產者工廠相同時才會起作用。
如果 Spring Boot 透過 spring.kafka.producer.transaction-id-prefix
在配置中檢測到 transaction-id-prefix
屬性,它會自動配置一個 KafkaTransactionManager
。然而,由於我們處於 Spring Cloud Stream 上下文中,我們必須使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
,因為這是我們指示框架為 binder 建立內部事務管理器和相關事務性生產者工廠的方式。如果我們提供正確的 spring.kafka
字首,讓 Spring Boot 自動配置一個 KakaTransactionManager
會怎樣?雖然這很誘人,但它不起作用,因為自動配置的事務管理器使用的生產者工廠與 binder 使用的不同。因此,我們必須提供一個使用 binder 使用的相同生產者工廠的自定義 KafkaTransactionManager
。這正是我們在上面所做的。
在本部落格系列的下一部分中,我們將學習如何與外部事務管理器同步,以實現生產者和消費者啟動的事務。