領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這是 2.0.0.RELEASE 版本釋出前系列部落格中的第二篇。
Spring Cloud Stream 2.0 引入了輪詢式消費者,應用程式可以控制訊息處理速率。
Spring Cloud Stream 包含生產者和消費者的概念;在使用訊息傳遞正規化時,MessageChannel 會繫結到目標(例如 Kafka 主題、Rabbit Exchanges/Queues)。到目前為止,在消費者端,只要有空閒的消費者可用,就會傳遞訊息。實際上,訊息代理控制著傳遞速率;通常,下一條訊息會在當前訊息處理完畢後立即傳遞。
2.0 版本引入了輪詢式消費者,應用程式可以控制訊息的消費速率。Kafka 和 RabbitMQ 繫結器都支援輪詢式消費者。
使用輪詢式消費者時,我們不繫結 MessageChannel 到目標,而是繫結一個 PollableMessageSource;例如,PolledProcessor 繫結可能配置如下:
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
訊息源有一個方法
boolean poll(MessageHandler handler);
在處理程式 `handleRequest` 方法退出之前,訊息不會被確認。
MessageHandler 是 spring-messaging 中的介面;您可以提供標準的 Spring Integration 訊息處理程式之一,或者您自己的實現(通常是 lambda)。由於 `handleMessage` 方法接受一個 `Message<?>` 引數,因此沒有型別資訊,並且訊息體不會被轉換。
然而,如本系列第一篇部落格 所討論的,訊息轉換同樣可以應用於輪詢式消費者。為了將型別資訊傳遞給轉換服務,我們在過載的 `poll()` 方法中提供了一個引數化型別引用:
boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)
訊息體將被轉換為指定型別,這可以是簡單的,例如 `text/plain` 內容型別:
new ParameterizedTypeReference<String>() {}或者更復雜的,例如 JSON 內容型別:
new ParameterizedTypeReference<Map<String, Foo>>() {}下面的簡單 Spring Boot 應用程式提供了一個完整的示例;它接收 String 訊息體,將其轉換為大寫,並將結果轉發到另一個目標。
@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {
private final Logger logger =
LoggerFactory.getLogger(Blog2Application.class);
public static void main(String[] args) {
SpringApplication.run(Blog2Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
現在應用程式可以控制訊息的消費速率。
有關更多資訊,請參閱參考手冊中 使用輪詢式消費者。
我們鼓勵您使用以下任一方式提供反饋:
祝您使用愉快!