Cloud Events 和 Spring - 第 2 部分

工程 | Oleg Zhurakousky | 2020年12月23日 | ...

引言

我們首先快速回顧一下上一篇文章

  • 在 Spring 的上下文中,Message 是一個合適的結構和抽象,用於消費代表 Cloud Event 的資料。我們希望這一點很清楚。
  • 在 Spring 中,我們致力於隔離功能性關注點與非功能性關注點,這使我們能夠在框架層面處理非功能性方面(例如傳送、接收、重試、連線、轉換等),讓您(大部分時間)專注於實際的業務邏輯,並使您的程式碼保持簡單且可插入到各種*執行上下文*(稍後會詳細介紹)。

業務問題

正如承諾的,這篇文章將更具技術性,因為它涵蓋了可供您嘗試的具體示例。因此,事不宜遲,我們首先描述將要介紹的三個用例。實際上用例是相同的,但執行上下文不同。

“接收代表待招聘人員的資料,生成員工記錄。”

三種不同的變體在於執行上下文(典型非功能性關注點的一個例子)

  • HTTP 請求/響應
  • 從 AMQP 到 Apache Kafka
  • 從 RSocket 到 Apache Kafka。

無論是用例還是執行上下文,它們都不是真正新的或獨一無二的。在 Spring 中,我們處理它們已經有幾十年了,有成千上萬的應用在生產環境中執行。那麼,新增 Cloud Event 上下文會改變什麼嗎?換句話說,如果傳入和傳出的資料代表一個 Cloud Event,會有什麼變化嗎?這些正是我們試圖在本文中回答的問題。

這些示例的使用者程式碼是

@SpringBootApplication
public static class SampleApplication
  public static void main(String[] args) throws Exception {
    SpringApplication.run(SampleApplication.class, args);
  }

  @Bean
  public Function<Person, Employee> hire() {
    return person -> {
	Employee employee = new Employee(person);
	return employee;
    };
  }
}

是的,這有點無聊,因為它沒有展示任何非功能性方面,這些方面由特定於執行上下文的框架處理。我們還將函式的實現細節保持得相當簡單,因為它們與主題無關。框架並不關心您做什麼。它只關心您期望什麼——*輸入*——以及您產生什麼——*輸出*——這些資訊可以從函式簽名中獲取。

用例 1(基於 HTTP)

此示例的完整原始碼可在Spring Cloud Function 示例中找到。在此示例中,我們將 Cloud Event 作為 HTTP 請求傳送,並期望接收一個 Cloud Event 作為 HTTP 響應。這意味著,我們的 hire() 函式需要以某種方式成為 HTTP 端點。我們可以透過使用Spring Cloud Function框架來實現這一點。透過新增其 spring-cloud-function-web 依賴,我們可以新增將函式轉換為 HTTP 端點所需的 Spring Boot 自動配置和元件。配置選項和預設值超出了本文的範圍,但您可以從Spring Cloud Function 文件的相關部分獲取。重要的是,基於這些預設值,函式名稱成為執行在 localhost8080 上的 URL 路徑的一部分,形成 https://:8080/hire 端點。

現在您可以啟動應用程式並向其傳送請求。應用程式執行後,您可以使用以下命令 curl

curl -w'\n' localhost:8080/hire \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

您應該收到以下響應

. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}

嗯... 這跟 Cloud Events 一點關係也沒有!對吧...?

正確,但框架將函式暴露為 REST 端點、處理型別轉換、呼叫以及其他非功能性方面的能力是顯而易見的,並且與 Cloud Events 直接相關。請繼續閱讀...

這種能力的實現核心是Message——一種結構和型別,它使傳入的 HTTP(或任何其他)請求能夠採用規範形式,以便其他框架能夠以統一的方式處理其內容,而不管其來源或目的地。

但是等等,Cloud Events 呢?

讓我們透過新增表示所需 Cloud Event 屬性的 HTTP 頭,將此 HTTP 請求轉換為 Cloud Event。請注意,這些頭帶有 Cloud Event 規範的HTTP 協議繫結部分要求的 ce- 字首。

curl -w'\n' localhost:8080/hire \
 -H "ce-id: 0001" \
 -H "ce-specversion: 1.0" \
 -H "ce-type: hire" \
 -H "ce-source: spring.io/spring-event" \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

執行後,您不會看到任何區別。您的函式行為相同,您收到相同的響應。

當然,除非您檢視並分析響應頭,它們現在包含所需的 Cloud Event 屬性(儘管與請求中的不同)

ce-source: https://springframework.tw/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}

但是怎麼做到的呢?

這部分我們再次提醒您,我們致力於將非功能性方面外包給框架,因為這就是其中之一。因此,預設情況下(由框架建立),我們假設如果請求是一個 Cloud Event,響應也應該是一個 Cloud Event。您還可以看到,四個必需的 Cloud Event 屬性的值也是根據框架建立的某些預設規則生成的。specversion 預設為 1.0type 預設為返回物件的型別名稱,id 預設為生成的 UUID(以提供合理安全的唯一性預期),而 source 預設為 https://springframework.tw/

但我 不喜歡預設值。我想要自己的值,並且想新增額外的屬性?

正如我們在上一篇文章中提到的:*“我們還提供了實用工具、庫和配置選項,讓您可以影響某些非功能性關注點,因為出於各種原因,這可能仍然是必需的。”* 在這裡,您有兩種選擇。***第一種選擇:*** 您可以更改函式簽名並返回一個 Message<Employee>,您可以在其中新增額外的元資料(即 Cloud Event 屬性)。一旦框架看到您返回了 Message,它就不會嘗試對使用者新增的元資料進行任何額外處理。這條規則實際上適用於大多數(如果不是全部)依賴 Spring Messaging 的框架。雖然這個選項很簡單,但它確實會將非功能性方面洩露到您的業務邏輯中。畢竟,您需要建立一個 Message 例項,您需要新增表示 Cloud Event 屬性的頭(最好帶有正確——規範要求的——屬性字首),等等。但是這個選項最大的缺點是它要求您更改函式簽名並將功能性和非功能性方面混在一起,這明顯違反了*關注點分離*原則。然而,為了論證起見,這裡展示瞭如何做到這一點

@Bean
public Function<Message<Person>, Message<Employee>> hire() {
  return message -> {
    Person person = message.getPayload();
    Employee employee = new Employee(person);
      return CloudEventMessageBuilder.withData(employee).setId("123456")
	.setSource(URI.create("https://spring.cloudevenets.sample")).build();
  };
}

示例原始碼中包含其註釋版本。

***第二種選擇:*** 您可以提供一個名為 CloudEventHeaderEnricher 的策略的實現,它提供了一個單獨的地方,您可以在其中實現為輸出生成適當屬性和頭的邏輯。此策略在框架生成輸出 Message 時被呼叫。以下示例展示了此策略的一種可能實現(在示例中也被註釋掉了,所以請取消註釋,重啟應用程式,然後檢視區別)。

@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
  return messageBuilder -> messageBuilder.setSource("https://springframework.tw/cloudevent")
	.setType("sample").setId("987654");
}

在這裡,您還可以看到一個可以幫助您構建 Cloud Event 訊息的實用類:CloudEventMessageBuilder。它是模仿標準的 Spring MessageBuilder 設計的,但具有 Cloud Event 特定的 setter。然而,這種方法的主要優點是關注點分離。您的業務邏輯(您的功能程式碼)保持清晰。此外,您仍然需要編寫的非功能程式碼被寫在一個單獨的地方。

還有一件事... 示例程式碼假設您只關心 Cloud Event 的 data 部分,並且希望它是 POJO 的形式。但如果不是這樣呢?如果您想要 Cloud Event 的完整檢視怎麼辦?或者如果您也想要 Cloud Event 資料的原始形式(即 byte[])怎麼辦?如前所述,框架從函式的簽名中獲取指令。因此,透過將輸入和輸出型別宣告為 Message,您實際上是指示框架為您提供整個 Cloud Event(不僅僅是其 data)。此外,透過指定 Message 的泛型型別,您指示框架將 Cloud Event 的 data 部分作為該 Java 型別提供,這實際上是要求它在必要時執行型別轉換。所以請嘗試以下簽名:public Function<Message<byte[]>, Message<Employee>> hire() {...}public Function<byte[], Employee> hire() {...} 或其他。

目前就這些了。README 檔案和原始碼中的註釋也在需要的地方提供了額外的說明。

用例 2(從 AMQP 到 Kafka)

此示例的完整原始碼可在Spring Cloud Function 示例中找到。它假設您對 AMQP 和 Apache Kafka 有一定程度的瞭解。在此示例中,我們使用 RabbitMQ(作為 AMQP 訊息代理)和 Apache Kafka。

雖然這個用例可能看起來比上一個更復雜,但本節和下一節(第三個用例)卻出奇地短。這是因為上一節解釋的所有內容也適用於這裡。實際上,我們在這裡改變的唯一一件事就是執行上下文。我們透過同樣的機制實現這一點:新增相關的基於 Spring Boot 的自動配置。因此,在這種情況下,我們添加了兩個自動配置:一個用於 RabbitMQ(AMQP 訊息代理)繫結器,另一個用於Spring Cloud Stream框架中提供的 Apache Kafka 繫結器。還有一些額外的應用程式配置(您可以在 application.properties 檔案中看到),用於指示框架如何將 hire 函式的輸入端繫結到 RabbitMQ(透過 RabbitMQ 繫結器),以及將輸出端繫結到 Apache Kafka(透過 Apache Kafka 繫結器)。

假設您已經運行了 RabbitMQ 和 Kafka,請啟動應用程式並向 RabbitMQ 傳送一條 Message。您可以使用RabbitMQ 控制檯(如果已安裝)向 hire-in-0 交換機發送訊息。
為了符合 Cloud Event 規範,您應該提供帶有 AMQP 適當字首的屬性(即 cloudEvents:)。請考慮以下示例

cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001

然後考慮以下資料:{"firstName":"John", "lastName":"Doe"}

為了簡化本演示部分,我們包含了一個測試用例,透過將 Cloud Event 傳送到 RabbitMQ 並從 Apache Kafka 接收來有效地自動化此演示。

Message<byte[]> messageToAMQP = CloudEventMessageBuilder
	.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
	.setSource("https://cloudevent.demo")
	.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
	.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);

rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .

請注意我們在此如何使用 CloudEventMessageBuilder 僅將 source 設定為 Cloud Event 屬性,而其餘必需的 Cloud Event 屬性則依賴預設值。我們還使用 build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) 來確保屬性帶有 cloudEvents: 字首(參見Cloud Events AMQP 協議繫結)。另請注意,在接收端,Cloud Events 屬性現在帶有 ce_ 字首(參見Cloud Events Kafka 協議繫結),因為框架確定目標目的地是 Apache Kafka。最後一點值得稍微詳細說明。我們已經確定設定 Cloud Event 屬性是非功能性方面,因此我們暴露了一種機制,讓您可以在業務邏輯之外處理它。但是屬性字首呢?請注意,我們在不同的執行上下文中執行相同的程式碼。這意味著屬性字首實際上取決於執行上下文。因此,框架通過了解執行上下文來確保 Cloud Event 屬性字首的正確性。

在這裡,我們依賴於Spring Cloud Stream框架及其預設設定,例如目標自動配置(Kafka 和 Rabbit)、繫結名稱、連線性等。這些預設設定和配置選項的詳細資訊超出了本文的範圍,因為它們與 Cloud Events 無關。有關框架本身及其配置選項的更多詳細資訊,請參閱Spring Cloud Stream 文件

此外,與上一個示例一樣,此示例也包含註釋掉的變體,歡迎您進行實驗。

用例 3(從 RSocket 到 Kafka)

此示例的完整原始碼可在Spring Cloud Function 示例中找到。它假設您對 RSocket 和 Apache Kafka 有一定程度的瞭解。本節應該比上一節更短,因為它非常相似。但是,這裡有一些有趣的變體值得討論。嗯,最明顯的是RSocket。我們引入了一種不同的傳遞機制。但真正讓它更有趣的是,RSocket 沒有定義協議繫結。我們可以選擇遵循 Kafka、HTTP 或 AMQP 規範之一,或者我們可以以結構化模式傳遞 Cloud Event,其中整個事件被編碼為某種結構(例如 JSON)。

此示例中的一些實現細節也與其他用例不同。然而,這些細節與 Cloud Event 無關。相反,它們是您可以使用的其他機制的演示。例如,我們使用 Consumer 代替 Function,並透過使用 Spring Cloud Stream 框架提供的 StreamBridge 元件手動傳送輸出訊息。

所以,事不宜遲,這是我們的應用程式程式碼

@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
  return person -> {
    Employee employee = new Employee(person);
    streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
	.setSource("https://springframework.tw/rsocket")
	.setId("1234567890")
	.build());
  };
}

請注意我們如何使用 CloudEventMessageBuilder 將輸出 Message 生成為 Cloud Event。

我們透過 RSocket 向 hire() 函式傳送一個結構化的 Cloud Event 表示形式,編碼為 JSON。

String payload = "{\n" +
	"    \"specversion\" : \"1.0\",\n" +
	"    \"type\" : \"org.springframework\",\n" +
	"    \"source\" : \"https://springframework.tw/\",\n" +
	"    \"id\" : \"A234-1234-1234\",\n" +
	"    \"datacontenttype\" : \"application/json\",\n" +
	"    \"data\" : {\n" +
	"        \"firstName\" : \"John\",\n" +
	"        \"lastName\" : \"Doe\"\n" +
	"    }\n" +
	"}";

rsocketRequesterBuilder.tcp("localhost", 55555)
	.route("hire")        // target function
	.data(payload).       // data we're sending
	.send()

預期的輸出應該與之前的用例類似,因為目標目的地相同。

結論

正如您所見,在 Spring 的上下文中處理 Cloud Events 時,您有多種選擇

  • 您可以選擇只關心 Cloud Event 的內容,同時完全控制出站 Cloud Event 的外觀。
  • 您可以透過 Message 處理 Cloud Event 本身,並依賴提供的實用工具來簡化對 Cloud Event 特定資料的訪問。
  • 您可以選擇執行上下文而不影響您的業務邏輯(使用者程式碼),同時委託框架確保某些 Cloud Event 特性的正確性,例如屬性字首。

這些只是與本文上下文相關的幾個選項,但還有更多。

既定和經過驗證的模式、實現這些模式的框架以及分層和有主見的 Spring Boot 自動配置使其成為可能。分層很重要,因為它們使您可以將問題分解為可以在存在相同問題的其他專案和整合中重用的解決方案。這有效地使當前的 Cloud Event 整合變得相當簡單,因為大多數與 Cloud Event 無關的非功能性方面(即連線、傳送、接收、轉換、重試等)已經由 Spring Cloud Function 和 Spring Cloud Stream 後面的各個框架解決了。

最後但同樣重要的是,還有另一種處理 Cloud Events 和 Spring 的方法,即透過Cloud Events Java SDK,您也可以在其中找到一個示例

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,加速您的進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部