Spring Cloud Stream 2.0 - 輪詢式消費者

工程 | Gary Russell | 2018年2月27日 | ...

這是 2.0.0.RELEASE 版本釋出前系列部落格中的第二篇。

序言

Spring Cloud Stream 2.0 引入了輪詢式消費者,應用程式可以控制訊息處理速率。

引言

Spring Cloud Stream 包含生產者和消費者的概念;在使用訊息傳遞正規化時,MessageChannel 會繫結到目標(例如 Kafka 主題、Rabbit Exchanges/Queues)。到目前為止,在消費者端,只要有空閒的消費者可用,就會傳遞訊息。實際上,訊息代理控制著傳遞速率;通常,下一條訊息會在當前訊息處理完畢後立即傳遞。

2.0 版本引入了輪詢式消費者,應用程式可以控制訊息的消費速率。Kafka 和 RabbitMQ 繫結器都支援輪詢式消費者。

詳情

使用輪詢式消費者時,我們不繫結 MessageChannel 到目標,而是繫結一個 PollableMessageSource;例如,PolledProcessor 繫結可能配置如下:

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

訊息源有一個方法

boolean poll(MessageHandler handler);

在處理程式 `handleRequest` 方法退出之前,訊息不會被確認。

MessageHandler 是 spring-messaging 中的介面;您可以提供標準的 Spring Integration 訊息處理程式之一,或者您自己的實現(通常是 lambda)。由於 `handleMessage` 方法接受一個 `Message<?>` 引數,因此沒有型別資訊,並且訊息體不會被轉換。

然而,如本系列第一篇部落格 所討論的,訊息轉換同樣可以應用於輪詢式消費者。為了將型別資訊傳遞給轉換服務,我們在過載的 `poll()` 方法中提供了一個引數化型別引用:

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

訊息體將被轉換為指定型別,這可以是簡單的,例如 `text/plain` 內容型別:

  • new ParameterizedTypeReference<String>() {}

或者更復雜的,例如 JSON 內容型別:

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

綜合示例

下面的簡單 Spring Boot 應用程式提供了一個完整的示例;它接收 String 訊息體,將其轉換為大寫,並將結果轉發到另一個目標。

@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {

  private final Logger logger =
  	  LoggerFactory.getLogger(Blog2Application.class);

  public static void main(String[] args) {
    SpringApplication.run(Blog2Application.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

結論

現在應用程式可以控制訊息的消費速率。

有關更多資訊,請參閱參考手冊中 使用輪詢式消費者

我們鼓勵您使用以下任一方式提供反饋:

祝您使用愉快!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有