領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多歡迎閱讀本系列文章的又一篇,本系列文章旨在展示 Spring Cloud Stream (SCSt) 的新特性。在之前的文章(可在此處、此處和此處檢視)中,我們試圖為 Spring Cloud Stream (SCSt) 轉向函數語言程式設計模型提供理由。它程式碼量更少,配置更少,並且您的程式碼與 SCSt 的內部實現完全解耦。
今天,我們將討論函式路由。在 SCSt 的上下文中,路由是指:a) 將事件路由到特定的事件訂閱者,或 b) 將事件訂閱者產生的事件路由到特定的目標。為了更好地理解上下文,讓我們快速回顧一下基於註解的程式設計模型的工作方式。在這篇文章中,我們將把它們稱為“路由到”和“路由自”。
對於路由到事件訂閱者,我們使用 StreamListener 註解的 condition 屬性,如下所示:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}
此處提供了有關此方法的更多詳細資訊。
對於從事件訂閱者路由,我們使用了動態繫結目的地 - 這種方法允許框架根據單個事件中提供的某些指令繫結到目的地。
使用函式式方法,我們可以透過一些附加功能,以更簡潔明瞭的方式完成上述所有操作。
“路由到”函式可以透過依賴 Spring Cloud Function (SCF) 中提供的路由函式功能來實現。您可以透過設定 spring.cloud.stream.function.routing.enabled 屬性顯式啟用路由,或者透過設定 spring.cloud.function.routing-expression 屬性並提供 Spring Expression Language (SpEL) 路由指令來隱式啟用路由。路由指令應導致路由到的函式定義。出於繫結目的,路由目標的名稱是 functionRouter-in-0(請參閱 RoutingFunction.FUNCTION_NAME 和此處描述的繫結命名約定)。
當訊息被髮送到此目標時,路由函式會嘗試確定哪個實際函式需要處理此類事件。它首先嚐試訪問 `spring.cloud.function.routing-expression` 訊息頭,如果提供,則確定要呼叫的實際函式的名稱。這是最動態的方法。第二動態的方法是提供 `spring.cloud.function.definition` 頭,它應該包含要“路由到”的函式的定義。兩種方法都需要透過設定 `spring.cloud.stream.function.routing.enabled` 屬性來顯式啟用路由函式。
至於以前版本中未提供的附加功能,`spring.cloud.function.routing-expression` 也可以用作應用程式屬性。例如,考慮當表示式與傳入事件無關時的情況,如本文前面所示的基於註解的示例(例如,`spring.cloud.function.routing-expression=headers['type']=='order'`)。對於這種方法,您不需要顯式啟用路由函式,因為 `spring.cloud.function.routing-expression` 作為應用程式屬性具有相同的效果。
儘管微不足道,以下是上述方法之一的完整示例
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
透過將訊息傳送到繫結器(即 rabbit 或 kafka)公開的 `functionRouter-in-0` 目標,該訊息將根據訊息處理時 `nanoTime()` 的值路由到適當的(“偶數”或“奇數”)`Consumer` bean。
與之前一樣,"路由自"依賴於 SCSt 的動態繫結目的地功能。然而,與"路由到"一樣,還有一些附加功能。
以下示例展示了基本用法
@Autowired
private BinderAwareChannelResolver resolver;
public Consumer<String> send(Message message) {
MessageChannel destination = resolver
.resolveDestination(message.getHeaders().get("type"))
Message outgoingMessage = . . . // your code
destination.send(outgoingMessage);
}
您只需要引用 `BinderAwareChannelResolver`(在前面的示例中自動裝配)。然後,您可以使用一些邏輯來確定目標名稱(在我們的示例中,我們使用“type”頭的值)。一旦確定了目標名稱,您可以透過 `BinderAwareChannelResolver.resolveDestination(..)` 操作獲取對其的引用,並向其傳送訊息。這真的是全部。
上述方法的缺點是某些框架特定的抽象會滲透到您的程式碼中。例如,您需要了解 `BinderAwareChannelResolver` 和 `MessageChannel` 等等。實際上,前面示例中的大部分程式碼都是樣板程式碼。
一種更動態且侵入性更小的方法是依賴 `spring.cloud.stream.sendto.destination` 屬性,它有效地在幕後完成了上述所有操作。以下示例展示瞭如何使用這種方法
@SpringBootApplication
public class RoutingStreamApplication {
@Bean
public Function<Message<String>, Message<String>> process() {
return message -> {
// some logic to process incoming message
Message<String> outgoingMessage = MessageBuilder
.withPayload("Hello")
.setHeader("spring.cloud.stream.sendto.destination", "even")
.build();
return outgoingMessage;
};
}
}
我們不再需要注入 `BinderAwareChannelResolver`、執行 `MessageChannel` 的解析等等。我們只需建立一個新的 `Message`,它指定一個由框架用於動態解析目標的訊息頭。
最後但並非最不重要的是,讓我們看看“路由自”的另一個流行用例,即資料來源來自 SCSt 上下文之外,但需要路由到適當的目標。
@Controller
public class SourceWithDynamicDestination {
@Autowired
private ObjectMapper jsonMapper;
private final EmitterProcessor<?> processor = EmitterProcessor.create();
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType)
throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destination = payload.get("id");
Message<?> message =
MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<?>> source() {
return () -> processor;
}
}
然後我們可以透過執行以下 `curl` 命令檢視結果
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://:8080
在這裡,我們透過 `Supplier
注意:在撰寫本文時,參考文件正在積極更新以支援即將釋出的 SCSt 3.0.0.RELEASE 版本,但您始終可以使用參考文件的來源來獲取最新資訊。
請在 GitHub 上檢視 Spring Cloud Stream。
此外,本系列之前的部落格文章有