雲事件與 Spring - 第二部分

工程 | Oleg Zhurakousky | 2020 年 12 月 23 日 | ...

引言

我們首先快速總結 上一篇文章

  • 訊息 是一個合適的結構和抽象,用於在 Spring 的上下文中消費表示雲事件的資料。我們希望這一點已經清楚。
  • 在 Spring 中,我們致力於將功能性關注點與非功能性關注點隔離,這使我們能夠在框架級別處理非功能性方面(例如傳送、接收、重試、連線、轉換等),讓您(大部分)專注於實際的業務邏輯,並讓您的程式碼保持簡單並可插入各種執行上下文(稍後將詳細介紹)。

業務問題

正如承諾的那樣,這篇博文更具技術性,因為它涵蓋了可供您嘗試的具體示例。因此,事不宜遲,我們首先描述我們將涵蓋的三個用例。實際上用例是相同的,但執行上下文有所不同。

“接收代表待聘人員的資料,生成員工記錄。”

這三種不同的變體在於執行上下文(典型的非功能性關注點的一個例子)

  • HTTP 請求/響應
  • 從 AMQP 到 Apache Kafka
  • 從 RSocket 到 Apache Kafka。

用例和執行上下文都不是真正新的或獨一無二的。在 Spring 中,我們已經處理它們幾十年了,有數千個應用程式在生產中執行。那麼,透過新增 Cloud Event 上下文會有什麼變化嗎?換句話說,如果傳入和傳出資料代表一個 Cloud Event,會有什麼變化嗎?這些是我們在本文中試圖回答的問題。

這些示例的使用者程式碼是

@SpringBootApplication
public static class SampleApplication
  public static void main(String[] args) throws Exception {
    SpringApplication.run(SampleApplication.class, args);
  }

  @Bean
  public Function<Person, Employee> hire() {
    return person -> {
	Employee employee = new Employee(person);
	return employee;
    };
  }
}

是的,這有點無聊,因為它沒有顯示任何非功能性方面,因為它們是由特定於執行上下文的框架處理的。我們還將函式的實現細節保持得相當簡單,因為它們與主題無關。框架並不真正關心你做什麼。它只關心你期望什麼——輸入——以及你產生什麼——輸出——這些資訊可以從簽名中獲得。

用例 1(透過 HTTP)

此示例的完整原始碼可在 Spring Cloud Function 示例中找到。在此示例中,我們傳送一個 Cloud Event 作為 HTTP 請求,並期望接收一個 Cloud Event 作為 HTTP 響應。這意味著,我們的 hire() 函式需要以某種方式成為一個 HTTP 端點。我們可以透過使用 Spring Cloud Function 框架來實現這一點。透過新增其 spring-cloud-function-web 依賴項,我們添加了將函式轉換為 HTTP 端點所需的 Spring Boot 自動配置和元件。配置選項和預設值超出了本文的範圍,但您可以從 Spring Cloud Function 文件的相關部分獲取它們。重要的是,基於這些預設值,函式名稱成為在 localhost8080 上執行的 URL 路徑的一部分,從而生成 https://:8080/hire 端點。

現在您可以啟動應用程式並對其進行釋出。應用程式執行後,您可以使用以下命令對其進行 curl

curl -w'\n' localhost:8080/hire \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

您應該收到以下響應

. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}

嗯。。。。這真的和 Cloud Events 沒什麼關係!對吧……?

沒錯,但框架將函式暴露為 REST 端點、處理型別轉換、呼叫和其他非功能性方面的能力是明確的,並且與 Cloud Events 直接相關。請繼續閱讀……

這種功能的實現核心是 Message——一種結構和型別,它允許傳入的 HTTP(或任何其他)請求採用規範形式,以便其他框架能夠以統一的方式處理其內容,無論其來源或目的地如何。

但是等等,Cloud Events 呢?

讓我們透過新增代表所需 Cloud Event 屬性的 HTTP 頭,將此 HTTP 請求轉換為 Cloud Event。請注意,這些頭帶有 ce- 字首,這是 Cloud Event 規範的 HTTP 協議繫結部分所要求的。

curl -w'\n' localhost:8080/hire \
 -H "ce-id: 0001" \
 -H "ce-specversion: 1.0" \
 -H "ce-type: hire" \
 -H "ce-source: spring.io/spring-event" \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

執行後,您不會看到任何差異。您的函式以相同的方式執行,並且您收到相同的響應。

當然,除非您檢視並分析響應頭,其中現在包含所需的 Cloud Event 屬性(儘管與請求中的屬性不同)

ce-source: https://springframework.tw/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}

但是怎麼回事?

這是我們再次提醒您我們承諾將非功能性方面外包給框架的部分,因為這也是其中之一。因此,預設情況下(由框架建立),我們假設如果請求是一個 Cloud Event,則響應也應是一個 Cloud Event。您還可以看到,四個必需的 Cloud Event 屬性的值也是透過遵循框架建立的某些預設規則生成的。specversion 預設為 1.0type 預設為返回物件的型別名稱,id 預設為生成的 UUID(以提供合理安全的唯一性期望),source 預設為 https://springframework.tw/

但我不喜歡預設值。我想要自己的,並且我想要新增額外的屬性?

正如我們在上一篇文章中提到的:“我們還公開了實用程式、庫和配置選項,讓您可以影響某些非功能性關注點,因為出於各種原因,這仍然可能是必需的。”在這裡,您有兩種選擇。第一個選項:您可以更改函式簽名並返回一個Message<Employee>,您可以在其中新增額外的元資料(即 Cloud Event 屬性)。一旦框架看到您返回了一個Message,它就不會嘗試對使用者新增的元資料做任何額外的事情。這是適用於大多數(如果不是所有)依賴 Spring Messaging 的框架的規則。雖然這個選項很簡單,但它確實會將非功能性方面洩露到您的業務邏輯中。畢竟,您需要建立一個Message例項,您需要新增代表 Cloud Event 屬性的頭(最好帶有正確的——規範強制的——屬性字首),等等。但這個選項最大的缺陷是它會要求您更改函式的簽名並將功能和非功能性方面混合在一起,這明顯違反了關注點分離規則。然而,為了論證,這裡是您如何做到這一點的方法

@Bean
public Function<Message<Person>, Message<Employee>> hire() {
  return message -> {
    Person person = message.getPayload();
    Employee employee = new Employee(person);
      return CloudEventMessageBuilder.withData(employee).setId("123456")
	.setSource(URI.create("https://spring.cloudevenets.sample")).build();
  };
}

示例原始碼包含其註釋版本。

第二個選項:您可以提供一個名為 CloudEventHeaderEnricher 的策略實現,它提供了一個單獨的地方,您可以在其中實現生成適當屬性和輸出頭的邏輯。此策略由框架在生成輸出 Message 時呼叫。以下示例顯示了此策略的一種可能實現(在示例中也已註釋掉,因此取消註釋,重新啟動應用程式,然後檢視差異)。

@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
  return messageBuilder -> messageBuilder.setSource("https://springframework.tw/cloudevent")
	.setType("sample").setId("987654");
}

在這裡,您還可以看到一個可以幫助您構建 Cloud Event 訊息的實用類:CloudEventMessageBuilder。它仿照標準的 Spring MessageBuilder,但具有 Cloud Event 特定的設定器。然而,這種方法的主要優點是關注點分離。您的業務邏輯(您的功能程式碼)保持乾淨。此外,您仍然需要編寫的非功能程式碼是寫在單獨的位置。

還有一件事。。。示例程式碼假定您只對 Cloud Event 的 data 部分感興趣,並且您希望它以 POJO 的形式出現。但如果情況並非如此呢?如果您想要 Cloud Event 的完整檢視呢?或者如果您還想要原始形式(即 byte[])的 Cloud Event 資料呢?如前所述,框架從函式的簽名中獲取指令。因此,透過將輸入和輸出型別宣告為 Message,您實際上是在指示框架為您提供整個 Cloud Event(而不僅僅是它的 data)。此外,透過指定 Message 的泛型型別,您指示框架將 Cloud Event 的 data 部分作為該 Java 型別提供,實質上是要求它在必要時執行型別轉換。所以請嘗試以下簽名:public Function<Message<byte[]>, Message<Employee>> hire() {...}public Function<byte[], Employee> hire() {...} 或其他。

目前就這些了。README 檔案和原始碼中的註釋也提供了必要的額外說明。

用例 2(從 AMQP 到 Kafka)

該示例的完整原始碼可在 Spring Cloud Function 示例中找到。它假設您對 AMQP 和 Apache Kafka 有一定程度的熟悉。在此示例中,我們使用 RabbitMQ(作為 AMQP 訊息代理)和 Apache Kafka。

雖然這個用例可能看起來比前一個更復雜,但本節和後續部分(第三個用例)卻出奇地短。那是因為上一節中解釋的一切也適用於這裡。實際上,我們在這裡改變的唯一是執行上下文。我們透過相同的機制來做到這一點:新增相關的基於 Spring Boot 的自動配置。因此,在這種情況下,我們添加了兩個自動配置:一個用於 RabbitMQ(AMQP 訊息代理)繫結器,一個用於 Spring Cloud Stream 框架中可用的 Apache Kafka 繫結器。還有一些額外的應用程式配置(您可以在 application.properties 檔案中看到),用於指示框架如何將 hire 函式的輸入端繫結到 RabbitMQ(透過 RabbitMQ 繫結器)以及輸出端繫結到 Apache Kafka(透過 Apache Kafka 繫結器)。

假設您已執行 RabbitMQ 和 Kafka,請啟動應用程式並向 RabbitMQ 傳送一條訊息。您可以使用 RabbitMQ 儀表板(如果已安裝)向 hire-in-0 交換機發送訊息。
為了符合 Cloud Event 規範,您應該提供帶有 AMQP 適當字首(即 cloudEvents:)的屬性。請考慮以下示例

cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001

然後考慮以下資料:{"firstName":"John", "lastName":"Doe"}

為了簡化這個演示部分,我們包含了一個測試用例,透過向 RabbitMQ 傳送一個 Cloud Event 並從 Apache Kafka 接收一個 Cloud Event 來有效地自動化這個演示。

Message<byte[]> messageToAMQP = CloudEventMessageBuilder
	.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
	.setSource("https://cloudevent.demo")
	.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
	.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);

rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .

請注意,我們如何在這裡使用 CloudEventMessageBuilder 僅將 source 設定為 Cloud Event 屬性,同時依賴其餘必需的 Cloud Event 屬性的預設值。我們還使用 build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) 來確保屬性以 cloudEvents: 字首開頭(參見 Cloud Events AMQP 協議繫結)。此外,請注意,在接收端,Cloud Events 屬性現在以 ce_ 字首開頭(參見 Cloud Events Kafka 協議繫結),因為框架已確定目標目的地是 Apache Kafka。最後一點值得稍微詳細說明一下。我們已經確定設定 Cloud Event 屬性是一個非功能性方面,因此,我們已經公開了一種機制,讓您可以在業務邏輯之外處理它。但是屬性字首呢?請注意,我們正在不同的執行上下文中執行相同的程式碼。這意味著屬性字首實際上取決於執行上下文。因此,通過了解執行上下文,框架可以確保 Cloud Event 屬性字首的正確性。

在這裡,我們依賴於 Spring Cloud Stream 框架及其預設值,例如目的地自動配置(Kafka 和 Rabbit)、繫結名稱、連線等。這些預設值和配置選項的詳細資訊超出了本文的範圍,因為它們都與 Cloud Events 無關。有關框架本身及其配置選項的更多詳細資訊,請參閱 Spring Cloud Stream 文件

此外,與前面的示例一樣,這個示例也包含註釋掉的變體,歡迎您進行試驗。

用例 3(從 RSocket 到 Kafka)

該示例的完整原始碼可在 Spring Cloud Function 示例中找到。它假定您對 RSocket 和 Apache Kafka 有一定程度的熟悉。本節應該比上一節更短,因為它非常相似。然而,這裡有一些有趣的變體值得討論。顯而易見的一個是 RSocket。我們引入了一種不同的交付機制。但真正使其更有趣的是,RSocket 沒有定義協議繫結。我們可以選擇遵守 Kafka、HTTP 或 AMQP 規範之一,或者我們可以以結構化模式通訊 Cloud Event,其中整個事件被編碼為某種結構(例如 JSON)。

在此示例中,一些實現細節也與其他用例不同。然而,這些細節與 Cloud Event 沒有任何關係。相反,它們是您可以使用的其他機制的演示。例如,我們使用 Consumer 而不是 Function,並使用 Spring Cloud Stream 框架提供的 StreamBridge 元件手動傳送輸出訊息。

因此,事不宜遲,這是我們的應用程式程式碼

@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
  return person -> {
    Employee employee = new Employee(person);
    streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
	.setSource("https://springframework.tw/rsocket")
	.setId("1234567890")
	.build());
  };
}

請注意我們如何使用 CloudEventMessageBuilder 將輸出 Message 生成為 Cloud Event。

我們透過 RSocket 將結構化的 Cloud Event 表示(編碼為 JSON)傳送到 hire() 函式

String payload = "{\n" +
	"    \"specversion\" : \"1.0\",\n" +
	"    \"type\" : \"org.springframework\",\n" +
	"    \"source\" : \"https://springframework.tw/\",\n" +
	"    \"id\" : \"A234-1234-1234\",\n" +
	"    \"datacontenttype\" : \"application/json\",\n" +
	"    \"data\" : {\n" +
	"        \"firstName\" : \"John\",\n" +
	"        \"lastName\" : \"Doe\"\n" +
	"    }\n" +
	"}";

rsocketRequesterBuilder.tcp("localhost", 55555)
	.route("hire")        // target function
	.data(payload).       // data we're sending
	.send()

預期的輸出應與之前的用例相似,因為目標目的地相同。

結論

正如你所看到的,在 Spring 的上下文中處理 Cloud Events 時,您有多種選擇

  • 您可以選擇只關心 Cloud Event 的內容,同時完全控制出站 Cloud Event 的外觀。
  • 您可以選擇透過 Message 處理 Cloud Event 本身,並依賴提供的實用程式來簡化對 Cloud Event 特定資料的訪問。
  • 您可以選擇一個執行上下文,而不會影響您的業務邏輯(使用者程式碼),同時將某些 Cloud Event 特性(例如屬性字首)的正確性委託給框架來確保。

這些只是與本文相關的少數幾個,但還有更多。

成熟且經過驗證的模式、實現這些模式的框架以及分層和有主見的 Spring Boot 自動配置使其成為可能。層次非常重要,因為它們允許您將問題分解為可以在存在相同問題的其他專案和整合中重複使用的解決方案。這有效地使當前的 Cloud Event 整合成為一項簡單的努力,因為大多數與 Cloud Event 無關的非功能性方面(即連線、傳送、接收、轉換、重試等)都已經由 Spring Cloud Function 和 Spring Cloud Stream 背後的各個框架解決。

最後但並非最不重要的一點是,還有另一種處理 Cloud Events 和 Spring 的方法,那就是透過 Cloud Events Java SDK,您可以在其中找到一個 示例

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有