RabbitMQ:在 Cloud Foundry 上啟用 Grails 全文搜尋

工程 | Peter Ledbrook | 2011 年 8 月 29 日 | ...

在我的 關於 Grails 和 Cloud Foundry 的第二篇部落格 中,我介紹了 Grails Twitter 示例 的一個變種,可以託管在 CloudFoundry.com 上。當時我提到,使用 Searchable 外掛進行全文搜尋會將你限制在單個應用程式例項上,因為搜尋索引對於每個例項來說是唯一的。換句話說,根據瀏覽器路由到的應用程式例項不同,你很可能會得到不同的搜尋結果。

我還說過,解決這個問題的一個選項是在例項之間同步搜尋索引。但這聽起來並不是特別容易,對吧?巧合的是,Cloud Foundry 引入 RabbitMQ 服務意味著所需的程式碼更改比你預期的要小得多。因此,讓我們看看我是如何為 Grails Twitter 狀態訊息新增全文搜尋功能的。

使狀態訊息可搜尋

Searchable 外掛強烈假設您希望索引標準的 GORM 領域類。這意味著使用 Hibernate/SQL。但 Grails Twitter 的狀態訊息儲存在 MongoDB 中,而不是 MySQL 中。我們能否使其可搜尋?是的,我們可以,但會犧牲一些功能。

與普通領域類一樣,搜尋Status例項的第一步是新增一個searchable屬性

package org.grails.twitter

import org.grails.twitter.auth.Person

class Status {
    static mapWith = "mongo"
    static transients = ["author"]

    static searchable = {
        only = ["message", "dateCreated"]
        authorId index: "no", store: "yes"
    }
	
    String message
    Long authorId
    List<String> tags = []
    Date dateCreated
	
    Person getAuthor() {
        return Person.get(authorId)
    }

    static constraints = {
        message maxSize: 160
    }
}

在本例中,我希望能夠根據訊息的建立日期和內容進行搜尋,而不是其他。我還希望從搜尋結果連結到訊息的作者。但是,如果 authorId 沒有被索引,那麼搜尋結果將不包含釋出者的 ID。因此,我將authorId儲存authorId在索引中,但不使其可搜尋 (index: "no")。簡單吧?當顯示搜尋結果時,現在可以包含每條訊息作者的姓名了。

索引非 Hibernate 領域類的一個顯著限制是映象(mirroring)不起作用。這意味著新訊息儲存時不會自動被索引。幸運的是,我們這裡實際上並不需要這種行為,因此我在Config.groovy:

searchable {
    ...
    mirrorChanges = false
    bulkIndexOnStartup = false
}

中停用了映象和“啟動時批次索引”(bulk indexing on startup)。當然,我們確實希望在啟動時索引狀態訊息,因為 Cloud Foundry 上的檔案系統是短暫的,因此搜尋索引需要在每次啟動時重建。但自動索引對非 Hibernate 領域類也無效,所以我只好在BootStrap.groovy:

...
class BootStrap {

    def searchableService
    def springSecurityService

    def init = { servletContext ->
        ...
        // Index all Hibernate mapped domain classes.
        searchableService.reindex()

        // Index all status messages.
        def statusMessages = Status.list()
        log.info "Indexing ${statusMessages.size()} status messages"
        Status.reindex(statusMessages)
        log.info "Finished indexing"
    }
    ...
}

的末尾進行手動索引。這並不是很多程式碼,但足以讓狀態訊息可搜尋。剩下的就是確保新訊息被索引,並且搜尋索引在應用程式例項之間同步。

使用 RabbitMQ 同步

保持搜尋索引同步的基本模型非常簡單直觀

每當儲存一條狀態訊息時,都會向 RabbitMQ 代理傳送一條訊息,然後由代理將其轉發給所有應用程式例項。然後,每個例項索引由該訊息標識的Status例項。

在我們實現這個功能之前,需要安裝 RabbitMQ 外掛

    grails install-plugin rabbitmq

接下來的工作是使用適當的交換機和佇列來配置代理。我之前寫過關於 AMQP 協議RabbitMQ 外掛 的部落格,所以這裡不再詳細介紹交換機和佇列。只需要知道我們需要一個 fanout 交換機(所有訊息都路由到所有監聽器)以及一個訂閱該交換機的 Grails 服務即可。所以在Config.groovyConfig.groovy

rabbitmq {
    connectionfactory {
        username = 'guest'
        password = 'guest'
        hostname = 'localhost'
    }

    queues = {
        exchange name: 'search.sync', type: fanout, durable: false
    }
}

中,我添加了以下內容:重要的是交換機的宣告:當應用程式部署到 Cloud Foundry 時,連線工廠設定會被忽略,因為 RabbitMQ 服務在執行時繫結到應用程式。

傳送訊息只需一行程式碼

...
class StatusService {
    def springSecurityService
    def tagService
    
    void updateStatus(long userId, String message) {
        def status = new Status(message: message, authorId: userId).save(flush: true, failOnError: true)
        rabbitSend 'search.sync', '', "${status.id}:${status.class.name}"
        
        runAsync {
            tagService.extractTagsFromMessage(status)
        }
    }
    ...
}

而用於索引狀態訊息的服務也沒有複雜多少

package org.grails.twitter

class SyncService {
    static rabbitSubscribe = "search.sync"
    static transactional = false

    def grailsApplication
    def searchableService

    void handleMessage(String message) {
        def parts = message.split(/:/)
        if (parts.size() != 2) {
            log.error "Invalid message: $message"
            return
        }

        def domainClass = grailsApplication.getDomainClass(parts[1])
        log.debug "Reindexing instance ${parts[0]} of ${parts[1]}"
        try {
            searchableService.reindex(domainClass.clazz.get(parts[0]))
        }
        catch (Exception ex) {
            log.error "Failed to index instance ${parts[0]} of ${parts[1]}", ex
        }
    }
}

所以rabbitSend()方法用於傳送一個簡單的字串,其中包含Status例項 ID 和類名。在這種情況下,我們只處理Status例項,但讓服務對所有潛在的可搜尋領域類通用是很有用的。此外,使用 Groovy 意味著我們不必做任何糟糕的反射:我們只需獲取類,然後直接呼叫我們想要的方法!

SyncService的重要部分是rabbitSubscribe屬性和handleMessage()方法。前者宣告該服務應訂閱 "search.sync" 交換機,也就是我傳送訊息的交換機。handleMessage()方法。前者宣告該服務應訂閱 "search.sync" 交換機,也就是我傳送訊息的交換機。方法在每次從該交換機接收到訊息時被呼叫,訊息內容作為其引數。因此,該方法提取類名和例項 ID,並使用 Grails 的DomainClass.get()方法從資料儲存(對於我們的Status訊息來說是 MongoDB)中檢索相關例項。最後,searchableService.reindex()方法將狀態訊息新增到本地搜尋索引中。當然,這一切都在每個應用程式例項上發生。

現在應用程式已準備好部署到 Cloud Foundry 並擴充套件到允許的最大例項數!您可以在 CloudFoundry.com 上看到結果。請注意,在 GitHub 專案中,我做了一些 UI 工作來支援全文搜尋,但這些更改與當前主題並不十分相關。

總結

不得不說,我自己也很驚訝,要實現搜尋索引同步所需的程式碼竟然如此之少。不僅如此,我還能專注於如何解決問題,而不是如何編寫程式碼,因為編碼過程非常簡單直觀。最重要的是,使用 Cloud Foundry 意味著部署包括建立和繫結 RabbitMQ 服務,然後執行grails prod cf-update命令將更改推送到伺服器。真是簡單。

如您所見,RabbitMQ 可以為與雲相關的問題提供創新的解決方案,而 Grails 外掛透過其約定使它非常易於使用。您可以在同一應用程式的不同例項之間、不同的 Grails 應用程式之間,甚至使用不同語言和框架編寫的應用程式之間進行通訊。例如,我們可以部署一個簡單的 Node.js 或 Sinatra 應用程式來記錄和顯示“search.sync”訊息,以便您跟蹤它們。基本上,RabbitMQ 是您的雲工具箱中必不可少的一項。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

搶先一步

VMware 提供培訓和認證,助您快速前進。

瞭解更多

獲取支援

Tanzu Spring 透過一項簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部