高效解析響應式緩衝區流

工程 | Arjen Poutsma | 2021年9月14日 | ...

Spring Framework 5.3 釋出已有一段時間了。該版本的一個特性是對我們的響應式 Multipart 支援進行了重大改進。在這篇博文中,我們分享了在開發此功能時學到的一些知識。具體來說,我們重點討論了在位元組緩衝區流中查詢標記的方法。

Multipart Form Data

每當您上傳檔案時,您的瀏覽器會將其以及表單中的其他欄位作為 multipart/form-data 訊息傳送到伺服器。這些訊息的確切格式在 RFC 7578 中有所描述。如果您提交一個包含一個名為 foo 的簡單文字欄位和一個名為 file 的檔案選擇器的簡單表單,那麼 multipart/form-data 訊息看起來像這樣

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary" (1)

--boundary (2)
Content-Disposition: form-data; name="foo" (3)

bar
--boundary (4)
Content-Disposition: form-data; name="file"; filename="lorum.txt" (5)
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary-- (6)
  1. 訊息的 Content-Type 頭包含 boundary 引數。

  2. 邊界用於開始第一部分。它前面是 --

  3. 第一部分包含文字欄位 foo 的值,如部分頭中所示。欄位的值是 bar

  4. 邊界用於分隔第一部分和第二部分。同樣,它前面是 --

  5. 第二部分包含提交檔案 lorum.txt 的內容。

  6. 訊息的結尾由邊界指示。它前面和後面都是 --

查詢邊界

multipart/form-data 訊息中的邊界非常重要。它被指定為 Content-Type 頭的一個引數。當前面有兩個連字元 (--) 時,邊界表示新部分的開始。當後面也跟著 -- 時,邊界表示訊息的結束。

在傳入位元組緩衝區流中查詢邊界是解析多部分訊息的關鍵。這樣做看起來足夠簡單

private int indexOf(DataBuffer source, byte[] target) {
  int max = source.readableByteCount() - target.length + 1;
  for (int i = 0; i < max; i++) {
    boolean found = true;
    for (int j = 0; j < target.length; j++) {
      if (source.getByte(i + j) != target[j]) {
        found = false;
        break;
      }
    }
    if (found) {
      return i;
    }
  }
  return -1;
}

然而,有一個複雜之處:邊界可以跨越兩個緩衝區,這在 Reactive 環境中可能不會同時到達。例如,給定前面顯示的多部分訊息示例,第一個緩衝區可能包含以下內容

POST / HTTP/1.1
Host: example.com
Content-Type: multipart/form-data;boundary="boundary"

--boundary
Content-Disposition: form-data; name="foo"

bar
--bou

而下一個緩衝區包含剩餘部分

ndary
Content-Disposition: form-data; name="file"; filename="lorum.txt"
Content-Type: text/plain

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Integer iaculis metus id vestibulum nullam.

--boundary--

如果我們一次檢查一個緩衝區,我們就無法找到這樣的分割邊界。相反,我們需要跨多個緩衝區查詢邊界。

解決這個問題的一種方法是等到所有緩衝區都已接收,將它們合併,然後定位邊界。以下示例使用示例流和前面定義的 indexOf 方法來完成此操作

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

Mono<Integer> result = DataBufferUtils.join(stream)
  .map(joined -> indexOf(joined, boundary));

StepVerifier.create(result)
  .expectNext(6)
  .verifyComplete();

使用 Reactor 的 StepVerifier,我們看到邊界從索引 6 開始。

這種方法有一個主要缺點:將多個緩衝區合併為一個,實際上會將整個多部分訊息儲存在記憶體中。多部分訊息主要用於上傳(大)檔案,因此這不是一個可行的選擇。相反,我們需要一種更智慧的方法來定位邊界。

Knuth 演算法來幫忙!

幸運的是,這種方法以 Knuth-Morris-Pratt 演算法的形式存在。該演算法的主要思想是,如果我們已經匹配了邊界的幾個位元組,但下一個位元組不匹配,我們就不需要從頭開始。為此,該演算法維護狀態,其形式是預計算表中包含不匹配後可以跳過的位元組數的某個位置。

在 Spring Framework 中,我們已經在 Matcher 介面中實現了 Knuth-Morris-Pratt 演算法,您可以透過 DataBufferUtils::matcher 獲取其例項。您還可以檢視原始碼

這裡,我們使用 Matcher 來獲取 streamboundary 的結束索引,使用與前面相同的示例輸入

Flux<DataBuffer> stream = Flux.just("foo", "bar", "--boun", "dary", "baz")
  .map(s -> factory.wrap(s.getBytes(UTF_8)));
byte[] boundary = "--boundary".getBytes(UTF_8);

DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(boundary);
Flux<Integer> result = stream.map(matcher::match);

StepVerifier.create(result)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(-1)
  .expectNext(3)
  .expectNext(-1)
  .verifyComplete();

請注意,Knuth-Morris-Pratt 演算法給出邊界的**結束**索引,這解釋了測試結果:邊界直到倒數第二個緩衝區中的索引 3 才結束。

正如所料,Spring Framework 的 MultipartParser 大量使用了 Matcher,用於

如果您需要在位元組緩衝區流中查詢一系列位元組,請嘗試使用 Matcher

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有