使用 Spring Data 實現響應式程式設計

工程 | Mark Paluch | November 28, 2016 | ...

上週釋出的 Spring Data Kay M1 是首個提供響應式資料訪問支援的版本。它初步支援的儲存——MongoDB、Apache Cassandra 和 Redis——都已提供了響應式驅動,這使得它們成為此類原型的理想候選。讓我們更詳細地瞭解構成該支援的新程式設計模型和 API。

響應式倉庫

倉庫程式設計模型是 Spring Data 使用者通常使用的最高階抽象。它通常由 Spring Data 提供的介面中定義的一組 CRUD 方法和領域特定查詢方法組成。以下是一個響應式 Spring Data 倉庫定義的示例

public interface ReactivePersonRepository
  extends ReactiveCrudRepository<Person, String> {

  Flux<Person> findByLastname(Mono<String> lastname);

  @Query("{ 'firstname': ?0, 'lastname': ?1}")
  Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}

正如您所見,它與您習慣的傳統倉庫介面差異不大。然而,與傳統倉庫介面相比,響應式倉庫使用響應式型別作為返回型別,也可以用於引數型別。新引入的 `ReactiveCrudRepository` 中的 CRUD 方法自然也使用了這些型別。

預設情況下,響應式倉庫使用 Project Reactor 型別,但也可以使用其他響應式庫。我們為這些庫提供了自定義倉庫基礎介面(例如 `RxJava2CrudRepository`),並根據查詢方法需要自動調整型別,例如 RxJava 的 `Observable` 和 `Single`。其他方面基本保持不變。但請注意,當前里程碑版本尚不支援分頁,並且您需要在類路徑中包含必要的響應式庫才能啟用對特定庫的支援。

啟用響應式 Spring Data

與阻塞世界中的情況類似,響應式 Spring Data 的支援透過 `@Enable…` 註解以及一些基礎設施設定來啟用

@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {

  @Bean
  public MongoClient mongoClient() {
    return MongoClients.create();
  }

  @Override
  protected String getDatabaseName() {
    return "reactive";
  }
}

請注意我們如何為基礎設施配置使用不同的基類,因為我們需要利用 MongoDB 非同步驅動。

使用響應式倉庫

現在可以像使用阻塞式倉庫一樣使用該倉庫,不同之處在於結果的處理可以以響應式方式進行

@RestController
class PersonController {

  private final PersonRepository people;

  public PersonController(PersonRepository people) {
    this.people = people;
  }

  @GetMapping("/people")
  Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {

    Flux<Person> result = repository.findByLastname(lastname);
    return result.map(it -> it.getFullName());
  }
}

請看我們如何轉發 Spring Web Reactive 提供的響應式引數,將它們傳輸到倉庫中,然後獲取一個 `Flux`,並以響應式方式處理執行結果。通常,響應式查詢方法遵循與現有倉庫相同的查詢建立思路。傳遞給查詢方法的引數可以是普通型別(例如 `String`)、包裝型別(`Optional`、`Stream`)或響應式包裝引數(`Mono`、`Flux`)。如果您使用響應式包裝型別作為引數型別,則實際的查詢建立和執行會延遲到實際訂閱發生時。

響應式模板

正如傳統倉庫基於傳統模板實現一樣,響應式倉庫構建在響應式模板之上。阻塞模板 API 中可用的大多數操作在響應式模板中都有對應的部分。我們將把阻塞世界中的更多功能移植到響應式模板 API 中,但有些操作目前(或尚未)無法透過響應式驅動程式實現,或者在響應式世界中沒有意義。

以下是 Spring Data MongoDB 中 `ReactiveMongoOperations` 的一個片段。它由 `ReactiveMongoTemplate` 實現,並使用 Project Reactor 的響應式型別(如 `Mono` 和 `Flux`)來包裝響應。有些方法也接受響應式型別,以便將資料流式傳輸到您的資料儲存中。

public interface ReactiveMongoOperations {

  // …

  /**
   * Map the results of an ad-hoc query on the specified collection to a
   * single instance of an object of the specified type.
   */
  <T> Mono<T> findOne(Query query, Class<T> entityClass);

  /**
   * Map the results of an ad-hoc query on the collection for the entity
   * class to a List of the specified type.
   */
  <T> Flux<T> find(Query query, Class<T> entityClass);

  /**
   * Insert the object into the specified collection.
   */
  <T> Mono<T> insert(T objectToSave, String collectionName);

  /**
   * Insert the object into the collection for the entity type of the object
   * to save.
   */
  <T> Mono<T> insert(Mono<? extends T> objectToSave);

  // …
}

請注意,所有方法都遵循響應式執行模型,即在呼叫時不會執行任何包含 I/O 的操作,只有在訂閱返回值時才會執行。

讓我們透過模板插入一些資料

Flux<Person> flux = Flux.just(new Person("Walter", "White"),
  new Person("Skyler", "White"),
  new Person("Saul", "Goodman"),
  new Person("Jesse", "Pinkman"));

template.insertAll(flux).subscribe();

有些方法(例如 `insertAll(…)`)接受響應式型別,以便將傳入資料非同步流式傳輸到您的 MongoDB 資料庫中,例如來自您在 Spring Web Reactive 控制器中接收到的 `Flux`,該 Flux 將透過 Jackson 非同步對映一個 JSON 陣列

@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {

  return template.insertAll(people);
}

正如您所見,倉庫和模板 API 都允許您以響應式、非阻塞的方式描述請求處理。接下來,讓我們更深入地瞭解 Redis 對響應式資料訪問的支援。

Spring Data Redis 的響應式連線

Spring Data Redis 在連線層提供了初步的響應式支援,目前僅支援 Lettuce,因為它是唯一支援響應式資料訪問的 Redis 驅動。由於 Redis 通常在較低的抽象級別使用,Kay M1 版本從該較低級別的響應式抽象開始。`LettuceConnectionFactory` 允許訪問 `ReactiveRedisConnection`,後者又提供了 Redis 命令的響應式版本

使用運算子進行函式式鏈式呼叫建立訪問 Redis 資料的響應式鏈。同樣,所有 I/O 都是非同步的。

ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
  .flatMap(keyCommands::type)
  .flatMap(System.out::println)
  .subscribe();

這段程式碼獲取一個隨機鍵並列印其資料型別。一個不存在的隨機鍵將以空的 `Mono` 完成。

響應式 Redis 命令有兩種風格:接受普通引數和接受命令釋出者。命令釋出者發出特定的 Redis 命令,將資料直接流式傳輸到 Redis。每當命令執行後,每個發出的命令都會發出一個命令響應。

public interface ReactiveStringCommands {

  // …

  Mono<Boolean> set(ByteBuffer key, ByteBuffer value);

  Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);

  // …
}

傳統的 Spring Data Redis 在其阻塞 API 上使用 `byte[]` 來交換資料。如果資料已存在於緩衝區中(如 `ByteBuffer` 或 Netty 的 `ByteBuf`),`byte[]` 會強制進行資料複製。響應式支援很大程度上是為了提高資源使用效率,因此我們決定公開接受和返回 `ByteBuffer` 的方法。

總結

希望這篇博文為您介紹了 Kay 版本在不同抽象級別提供的響應式特性。您可以在我們的示例倉庫中找到所有這些的可執行示例。

我們期待在 2017 年 1 月釋出另一個里程碑版本,然後邁向釋出候選版本。

獲取 Spring 資訊

訂閱 Spring 資訊以保持連線

訂閱

提升自我

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 透過一次簡單訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群的所有即將舉行的活動。

檢視全部