更進一步
VMware 提供培訓和認證,以加速您的進步。
瞭解更多這是 Spring Cloud Stream 2.0.0.RELEASE 準備工作中的一系列預釋出部落格中的第二篇。
Spring Cloud Stream 2.0 引入了輪詢消費者,應用程式可以控制訊息處理速率。
Spring Cloud Stream 具有生產者和消費者的概念;當使用訊息傳遞範例時,MessageChannel
被繫結到目標(例如 Kafka 主題、Rabbit 交換器/佇列)。 迄今為止,在消費者端,只要有空閒消費者可用,就會傳遞訊息。 實際上,broker 控制了傳遞速率;通常,當前訊息被處理後,會立即傳遞下一條訊息。
2.0 引入了輪詢消費者,應用程式可以控制訊息消耗速率。 Kafka 和 RabbitMQ binder 支援輪詢消費者。
使用輪詢消費者,我們繫結 PollableMessageSource
而不是將 MessageChannel
繫結到目標;例如,可以這樣配置 PolledProcessor
繫結
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
訊息源有一個方法
boolean poll(MessageHandler handler);
在處理程式的 handleRequest
方法退出之前,訊息不會被確認。
MessageHandler
是來自 spring-messaging 的介面;您可以提供標準的 Spring Integration 訊息處理程式之一,或者您自己的實現(通常是一個 lambda)。 因為 handleMessage
方法採用 Message<?>
引數,所以沒有型別資訊,並且不會轉換 payload。
但是,訊息轉換正如本系列的第一篇部落格中所討論的也可以應用於輪詢消費者。 為了將型別資訊傳遞給轉換服務,我們在過載的 poll()
方法中提供了一個引數化型別引用
boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)
並且訊息 payload 將被轉換為該型別,它可以很簡單,例如,內容型別為 text/plain
new ParameterizedTypeReference<String>() {}
或者更復雜,例如 JSON 內容型別
new ParameterizedTypeReference<Map<String, Foo>>() {}
以下簡單的 Spring Boot 應用程式提供了一個完整的示例;它接收 String payload,將其轉換為大寫並將結果轉發到另一個目標。
@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {
private final Logger logger =
LoggerFactory.getLogger(Blog2Application.class);
public static void main(String[] args) {
SpringApplication.run(Blog2Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
應用程式現在可以控制訊息的消耗速率。
有關更多資訊,請參閱參考手冊中的使用輪詢消費者。
我們鼓勵您使用以下工具之一提供反饋
盡情享用!