Spring Cloud Stream 2.0 - 輪詢消費者

工程 | Gary Russell | 2018 年 2 月 27 日 | ...

這是 Spring Cloud Stream 2.0.0.RELEASE 準備工作中的一系列預釋出部落格中的第二篇。

前言

Spring Cloud Stream 2.0 引入了輪詢消費者,應用程式可以控制訊息處理速率。

介紹

Spring Cloud Stream 具有生產者和消費者的概念;當使用訊息傳遞範例時,MessageChannel 被繫結到目標(例如 Kafka 主題、Rabbit 交換器/佇列)。 迄今為止,在消費者端,只要有空閒消費者可用,就會傳遞訊息。 實際上,broker 控制了傳遞速率;通常,當前訊息被處理後,會立即傳遞下一條訊息。

2.0 引入了輪詢消費者,應用程式可以控制訊息消耗速率。 Kafka 和 RabbitMQ binder 支援輪詢消費者。

詳情

使用輪詢消費者,我們繫結 PollableMessageSource 而不是將 MessageChannel 繫結到目標;例如,可以這樣配置 PolledProcessor 繫結

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

訊息源有一個方法

boolean poll(MessageHandler handler);

在處理程式的 handleRequest 方法退出之前,訊息不會被確認。

MessageHandler 是來自 spring-messaging 的介面;您可以提供標準的 Spring Integration 訊息處理程式之一,或者您自己的實現(通常是一個 lambda)。 因為 handleMessage 方法採用 Message<?> 引數,所以沒有型別資訊,並且不會轉換 payload。

但是,訊息轉換正如本系列的第一篇部落格中所討論的也可以應用於輪詢消費者。 為了將型別資訊傳遞給轉換服務,我們在過載的 poll() 方法中提供了一個引數化型別引用

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

並且訊息 payload 將被轉換為該型別,它可以很簡單,例如,內容型別為 text/plain

  • new ParameterizedTypeReference<String>() {}

或者更復雜,例如 JSON 內容型別

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

整合到一起

以下簡單的 Spring Boot 應用程式提供了一個完整的示例;它接收 String payload,將其轉換為大寫並將結果轉發到另一個目標。

@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {

  private final Logger logger =
  	  LoggerFactory.getLogger(Blog2Application.class);

  public static void main(String[] args) {
    SpringApplication.run(Blog2Application.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

結論

應用程式現在可以控制訊息的消耗速率。

有關更多資訊,請參閱參考手冊中的使用輪詢消費者

我們鼓勵您使用以下工具之一提供反饋

盡情享用!

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

更進一步

VMware 提供培訓和認證,以加速您的進步。

瞭解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群中所有即將舉行的活動。

檢視全部