Spring Cloud Stream - 事件路由

工程 | Oleg Zhurakousky | 2019 年 10 月 31 日 | ...

歡迎閱讀關於 Spring Cloud Stream (SCSt) 新特性系列的另一篇文章。在之前的文章中(可在此處獲取:此處此處此處),我們試圖解釋我們在 Spring Cloud Stream (SCSt) 中轉向函數語言程式設計模型的原因。它程式碼量更少、配置更少,而且您的程式碼完全獨立於 SCSt 的內部實現。

今天,我們將討論使用函式進行路由。在 SCSt 的上下文中,路由是指能夠 a) 將事件路由到特定的事件訂閱者b) 將事件訂閱者產生的事件路由到特定的目標。為了更好地理解上下文,讓我們快速回顧一下基於註解的程式設計模型的工作方式。在本文中,我們將這種方式稱為路由 'TO' 和路由 'FROM'。

對於路由 TO 事件訂閱者,我們使用了 StreamListener 註解的 condition 屬性,如下所示

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

此處提供了有關此方法的更多詳細資訊。

而對於路由 FROM 事件訂閱者,我們使用了動態繫結目標——這種方法允許框架根據每個事件中提供的某些指令繫結到目標。

使用函式進行事件路由

透過函式式方法,我們可以以更簡潔的方式實現上述所有功能,並附帶一些額外特性。

路由 TO

路由 'TO' 函式可以透過依賴 Spring Cloud Function (SCF) 中可用的路由函式特性來實現。您可以透過設定 spring.cloud.stream.function.routing.enabled 屬性來顯式啟用路由,或者透過設定 spring.cloud.function.routing-expression 屬性並使用 Spring Expression Language (SpEL) 提供路由指令來隱式啟用。路由指令應生成要路由 'TO' 的函式的定義。出於繫結的目的,路由目標的名稱是 functionRouter-in-0(請參閱 RoutingFunction.FUNCTION_NAME此處描述的繫結命名約定)。

當訊息被髮送到此目標時,路由函式會嘗試確定哪個實際函式需要處理此事件。它首先嚐試訪問 spring.cloud.function.routing-expression 訊息頭,如果提供,則確定要呼叫的實際函式的名稱。這是最具動態性的方法。次動態的方法是提供一個 spring.cloud.function.definition 頭,該頭應包含要路由 'TO' 的函式的定義。這兩種方法都需要透過設定 spring.cloud.stream.function.routing.enabled 屬性來顯式啟用路由函式。

儘管微不足道,但 spring.cloud.function.routing-expression 也可以用作應用程式屬性,這是一種額外的特性,在以前的版本中不可用。例如,考慮表示式與傳入事件無關的情況,就像本文前面顯示的基於註解的示例中一樣(例如,spring.cloud.function.routing-expression=headers['type']=='order')。對於這種方法,您無需顯式啟用路由函式,因為將 spring.cloud.function.routing-expression 作為應用程式屬性具有相同的效果。

雖然微不足道,但以下是上述方法之一的完整示例

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

透過向由繫結器(即 rabbit 或 kafka)公開的 functionRouter-in-0 目標傳送訊息,該訊息將根據訊息處理時 nanoTime() 的值被路由到適當的(“偶數”或“奇數”)Consumer Bean。

路由 FROM

和之前一樣,路由 'FROM' 依賴於 SCSt 的動態繫結目標特性。然而,與路由 'TO' 一樣,還有許多額外的特性。

以下示例展示了基礎知識

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

您只需要一個對 BinderAwareChannelResolver 的引用(在下面的示例中自動裝配)。然後,您可以使用一些邏輯來確定目標名稱(在我們的示例中,我們使用 'type' 頭部的值)。一旦確定了目標名稱,您就可以使用 BinderAwareChannelResolver.resolveDestination(..) 操作獲取對其的引用並向其傳送訊息。這就是全部所需。

上述方法的缺點是一些特定於框架的抽象會滲透到您的程式碼中。例如,您需要了解 BinderAwareChannelResolverMessageChannel 等等。事實上,上面示例中的大部分程式碼都是樣板程式碼。

一種更動態且更少滲透的方法是依賴 spring.cloud.stream.sendto.destination 屬性,它有效地在幕後完成了上述所有操作。以下示例展示瞭如何使用這種方法

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
		.withPayload("Hello")
		.setHeader("spring.cloud.stream.sendto.destination", "even")
		.build();
       return outgoingMessage;
     };
  }
}

我們不再需要注入 BinderAwareChannelResolver、執行 MessageChannel 的解析等等。我們只需建立一個新的 Message,其中指定了框架用於動態解析目標的頭部。

路由源

最後但同樣重要的一點,讓我們看看另一個路由 'FROM' 的常見用例,即資料來源來自 SCSt 上下文之外,但需要路由到適當的目標

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然後我們可以透過執行以下 curl 命令看到結果

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://:8080

在這裡,我們使用了函式式方法和響應式範例的一點點潤色,這得益於 Supplier<Flux<?>> bean。我們有一個簡單的 MVC 控制器,我們想根據內容中 'id' 屬性的值將請求向下遊路由。雖然 EmitterProcessor 的詳細資訊及其在此處的用法是另一篇文章的主題,但重要的是它展示了一個完整的函式式應用程式,其中 HTTP 請求動態路由到由目標繫結器管理的目標。

注意:截至本文撰寫時,參考文件正在積極更新以支援即將釋出的 SCSt 3.0.0.RELEASE 版本,但您始終可以使用參考文件的原始碼來獲取最新資訊。

請訪問 GitHub 上的 Spring Cloud Stream

此外,本系列先前的部落格文章

- Spring Cloud Stream - 函式式和響應式

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 透過簡單的訂閱,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群所有即將到來的活動。

檢視全部