領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多上週釋出的 Spring Data Kay M1 是首個支援響應式資料訪問的版本。其初始支援的儲存 — MongoDB、Apache Cassandra 和 Redis — 都已具備響應式驅動,這使得它們成為這類原型的自然選擇。讓我們更詳細地瞭解一下支援響應式程式設計的新程式設計模型和 API。
Repositories 程式設計模型是 Spring Data 使用者通常打交道的最高階抽象。它們通常由一組在 Spring Data 提供的介面中定義的 CRUD 方法以及領域特定的查詢方法組成。下面是一個響應式 Spring Data repository 定義的樣子:
public interface ReactivePersonRepository
extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(Mono<String> lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}
正如你所見,與你習慣的相比,並沒有太大的區別。然而,與傳統的 repository 介面相比,響應式 repository 使用響應式型別作為返回值,並且也可以用於引數型別。新引入的 ReactiveCrudRepository 中的 CRUD 方法當然也使用了這些型別。
預設情況下,響應式 repository 使用 Project Reactor 型別,但也可以使用其他響應式庫。我們為這些庫提供了自定義的 repository 基礎介面(例如 RxJava2CrudRepository),並且還會根據需要自動適配查詢方法所需的型別,例如 RxJava 的 Observable 和 Single。其餘部分基本保持不變。但請注意,當前的里程碑版本還不支援分頁,並且你當然需要將必要的響應式庫新增到類路徑中才能啟用對特定庫的支援。
與阻塞式程式設計中的情況類似,響應式 Spring Data 的支援是透過 @Enable… 註解以及一些基礎設施設定來啟用的。
@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {
@Bean
public MongoClient mongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "reactive";
}
}
可以看到,我們為基礎設施配置使用了不同的基類,因為我們需要利用 MongoDB 的非同步驅動。
現在可以使用 repository 了,就像使用阻塞式 repository 一樣,只是結果的處理現在可以以響應式的方式進行。
@RestController
class PersonController {
private final PersonRepository people;
public PersonController(PersonRepository people) {
this.people = people;
}
@GetMapping("/people")
Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {
Flux<Person> result = repository.findByLastname(lastname);
return result.map(it -> it.getFullName());
}
}
請看我們如何轉發 Spring Web Reactive 提供的響應式引數,將它們傳入 repository,然後獲得一個 Flux,並以響應式的方式處理執行結果。總的來說,響應式查詢方法遵循與已知 repository 相同的查詢建立思路。傳遞給查詢方法的引數可以是普通的(例如 String)、包裝的(Optional<String>、Stream<String>)或響應式包裝的引數(Mono<String>、Flux<String>)。如果你使用響應式包裝器作為引數型別,實現會將實際的查詢建立和執行推遲到實際訂閱時。
正如傳統的 repository 基於傳統的 template 實現一樣,響應式的 repository 是構建在響應式 template 之上的。阻塞式 template API 中可用的大多數操作在響應式 template 中都有對應的功能。我們將把阻塞式程式設計中的更多功能移植到響應式 template API 中,但有些操作(目前)只是無法透過響應式驅動程式獲得,或者在響應式程式設計中沒有意義。
這是 Spring Data MongoDB 中 ReactiveMongoOperations 的一部分。它由 ReactiveMongoTemplate 實現,並使用 Project Reactor 的響應式型別,如 Mono 和 Flux 來包裝響應。一些方法還接受響應式型別,以便將資料流式傳輸到你的資料儲存中。
public interface ReactiveMongoOperations {
// …
/**
* Map the results of an ad-hoc query on the specified collection to a
* single instance of an object of the specified type.
*/
<T> Mono<T> findOne(Query query, Class<T> entityClass);
/**
* Map the results of an ad-hoc query on the collection for the entity
* class to a List of the specified type.
*/
<T> Flux<T> find(Query query, Class<T> entityClass);
/**
* Insert the object into the specified collection.
*/
<T> Mono<T> insert(T objectToSave, String collectionName);
/**
* Insert the object into the collection for the entity type of the object
* to save.
*/
<T> Mono<T> insert(Mono<? extends T> objectToSave);
// …
}
請注意,所有方法都遵循響應式執行模型,在呼叫時不會執行任何涉及 I/O 的操作,而只在訂閱返回的值時執行。
讓我們透過 template 插入一些資料:
Flux<Person> flux = Flux.just(new Person("Walter", "White"),
new Person("Skyler", "White"),
new Person("Saul", "Goodman"),
new Person("Jesse", "Pinkman"));
template.insertAll(flux).subscribe();
一些方法 — 例如 insertAll(…) — 接受響應式型別,以便將傳入的資料非同步流式傳輸到你的 MongoDB 資料庫,例如,這些資料可能來自你在 Spring Web Reactive 控制器中接收到的 Flux,該控制器將透過 Jackson 非同步對映一個 JSON 陣列。
@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {
return template.insertAll(people);
}
正如你所見,repository 和 template API 都允許你以響應式、非阻塞的方式描述請求處理。說了這些,讓我們更深入地研究一下 Redis 對響應式資料訪問的支援。
Spring Data Redis 在連線層面提供了初步的響應式支援,目前僅限於 Lettuce,因為它是唯一支援響應式資料訪問的 Redis 驅動。由於 Redis 通常在更低的抽象級別上使用,Kay M1 版本從更低級別的響應式抽象開始。LettuceConnectionFactory 允許訪問 ReactiveRedisConnection,進而提供對 Redis 命令響應式版本的訪問。
透過運算子進行函式式鏈式呼叫,建立鏈來以響應式的方式訪問 Redis 資料。同樣,所有 I/O 都是非同步的。
ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
.flatMap(keyCommands::type)
.flatMap(System.out::println)
.subscribe();
這段程式碼獲取一個隨機鍵並列印其資料型別。一個不存在的隨機鍵會被完成為一個空的 Mono。
響應式 Redis 命令有兩種形式:接受普通引數和接受命令釋出者。命令釋出者會發出特定的 Redis 命令,將資料流式傳輸到 Redis。每個發出的命令在執行後都會發出一個命令響應。
public interface ReactiveStringCommands {
// …
Mono<Boolean> set(ByteBuffer key, ByteBuffer value);
Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);
// …
}
傳統的 Spring Data Redis 在其阻塞式 API 中使用 byte[] 來交換資料。byte[] 會強制進行資料複製,如果資料已經在緩衝區中,例如 ByteBuffer 或 Netty 的 ByteBuf。響應式支援很大程度上是為了高效利用資源,因此我們決定暴露接受和返回 ByteBuffer 的方法。
我希望這篇博文能讓你瞭解 Kay 在各種抽象級別上提供的響應式功能。你可以在我們的示例倉庫中找到所有這些功能的可用示例。
我們期待在 2017 年 1 月份釋出另一個里程碑版本,然後走向釋出候選版本。