Spring Integration 在 dm Server 上

工程 | Iwein Fuld | 2009 年 2 月 27 日 | ...

引言

在這篇部落格文章中,我將向您展示如何使用 Spring Integration 和 dm Server 建立松耦合、可擴充套件的應用程式。使用 OSGi 的額外好處是可以在執行時改變應用程式的行為,當然我們也會藉此找點樂子。首先,我將快速強調設計併發應用程式的原因,然後我將描述將 OSGi bundle 與訊息傳遞整合的不同策略。在此過程中,您將能瞥見我們的工具和一些 dm Server 功能。如果您已經下載並安裝了最新的 SpringSource Tool Suite 和 dm Server,您應該能夠自己完成這些。您不需要示例程式碼來跟隨故事,但如果您感興趣,可以在 Spring Integration sandbox 中找到它。

同步的弊端

應用程式中的同步呼叫表現良好,但它們無法擴充套件。這一點之前已有不少文獻記載,因此我將快速為您回顧一下,然後繼續。進行同步呼叫時,呼叫執行緒必須阻塞,直到呼叫完成。如果您在該方法中執行 I/O 操作,阻塞執行緒就會浪費 CPU 週期。如果您有一個大規模並行 Web 應用程式,這不是什麼大問題,因為您可以簡單地增加使用的執行緒池的最大大小。然而,如果您需要從一次呼叫中充分利用伺服器的 CPU 能力,那就沒轍了。由於一個執行緒執行在一個核心上,因此您擁有的核心越多,單執行緒程式就越低效。

訊息傳遞來解救

親自深入併發程式設計樂趣無窮,但這並非易事,而且您今天可能想早點完成工作。這就是 Spring Integration 可以幫助您的地方。它為您提供了所需的所有併發處理控制,而無需您承擔併發程式設計底層細節的責任。

OSGi 如何提供幫助?

沒有 OSGi,您也可以完全避免同步的弊端。但它讓生活變得如此美好,您不應該錯過它。每次進行微小更改時都進行 JVM 的完全重新部署和重啟會非常無趣。還有一件事讓 OSGi 和訊息傳遞的結合變得非常有趣。如果企業應用程式變得龐大,您遲早需要模組化。在同一個傳統伺服器中執行多個模組是一場災難(也稱為 jar hell)。因此,在傳統架構中,人們在許多情況下過早地進行了橫向擴充套件。這導致了非常糟糕的設定。不同的節點必須透過網路相互通訊,並且無法共享 CPU 和記憶體等資源。這使得架構效率低下,並嚴重影響效能。您不僅浪費了一臺伺服器中某些核心的週期,現在還跨多個節點重複同樣的低效率。更糟糕的是,網路延遲增加了 I/O 等待時間,而在本地方法呼叫就足夠的情況下。

有了 OSGi,您就不必承擔這樣的損失,您可以在同一個 JVM 中執行不同的 bundle,因此它們之間的呼叫就像普通方法呼叫一樣快速,而不會出現因在同一個 classpath 上部署不同團隊的 jar 包而導致的可怕的 jar hell 問題。擁有一個可以在執行時改變行為、可以由多個團隊協作開發並且可以幫助您將所需硬體減少到一臺強大伺服器的應用程式。聽起來很不錯,那麼我們來做吧。

準備工作

如果您喜歡使用實際的程式碼示例來跟隨部落格,或者自己並行編寫示例,您需要準備好適當的工具(和程式碼)。如果您只是想理解部落格中的想法,則無需這樣做。我將在此處包含必要的程式碼示例。本文並非詳細的教程,因此如果您想跟隨實踐,您將需要自己編寫程式碼,或者直接使用示例。為此,請遵循以下步驟。

  1. 下載並安裝 SpringSource Tool Suitedm Server(這就像解壓檔案一樣簡單)。如果您願意,也可以使用帶有 dm Server 工具和 Spring IDE 的 Eclipse。
  2. 以預設名稱配置 dm Server 並啟動它
    1. 開啟 Server 檢視,在其中右鍵單擊並選擇新建
    2. 在搜尋視窗中輸入 Spring 並選擇 SpringSource dm Server 1.0
    3. 指向已解壓的 dm Server 目錄
  3. 更新您的 bundle 倉庫並安裝 Spring Integration Core 1.0.1 和 Commons Logging 1.1.1
    1. 雙擊新建立的伺服器,伺服器管理控制檯將開啟
    2. 選擇 repository 選項卡,然後點選“Update local bundle and library repository index”(更新本地 bundle 和庫倉庫索引)
    3. 搜尋 Spring Integration 並下載核心 bundle
    4. 啟動伺服器
  4. Spring Integration sandbox 匯入專案
    1. 將專案簽出到本地工作目錄
    2. 從 STS 中選擇 File->Import->Existing projects into workspace(檔案->匯入->將現有專案匯入工作區)並指向工作目錄

非同步移交給訊息傳遞 bundle

在本節中,我們將使用一個依賴於 Spring Integration 的單一 bundle,並將我們的工作移交給它。這個 bundle 將隱藏所有訊息傳遞的細節,如果您系統中只有一小部分需要進行密集工作,這可能正是您所需要的。

我將展示的示例應用程式是為一箇中世紀小鎮開發的。在這些小鎮上,過去有一個報信員,他會走街串巷,大聲宣佈當地新聞和公告。這個小鎮(希望保持匿名)被報信員工會迫使其改善報信員的工作條件,並且由於預算限制,他們希望同時減少人數。

他們得出的結論是,實現這些目標的唯一方法是用自動化系統取代報信員的大部分體力勞動。報信員只需在他舒適的辦公室裡將資訊傳遞給這個系統,然後資訊就會分發給鎮上的所有公民。市政廳已經記錄了所有公民的資訊,因此最簡單的方法是將這個分發邏輯放在市政廳。

[caption id="attachment_1096" align="alignnone" width="653" caption="示例中各種 bundle 的概覽。"]System design of various bundles[/caption]

報信員 bundle 決定不重新培訓報信員以適應新系統,因此他仍然會呼叫鎮上的 cry() 方法,只是不再需要四處奔波並多次執行以覆蓋鎮上的每個地方。公告的分發將由鎮上的新系統透明地完成。報信員 bundle 只有一個類 TownCrier.java,它被注入了應該向其喊話的 Town。


@Component
public class TownCrier {
	private static final Log logger = LogFactory.getLog(TownCrier.class);

	@Autowired //Spring will take care of the wiring as usual
	private Town town;

	private ScheduledThreadPoolExecutor schedule;

	@PostConstruct
	public void start() {
		logger.info("Starting feeder");
		schedule = new ScheduledThreadPoolExecutor(1);

		TimerTask task = new TimerTask() {
			public void run() {
				logger.info("Crying the time");
				town.cry("Oyez! Oyez! Oyez! It is now " + new Date());
			}
		};
		schedule.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
	}
}

如果您瞭解 Spring Integration,您可能會對這段程式碼感到不適,並希望改用 inbound channel adapter 和 poller,但我們在這裡只是模擬遺留程式碼,所以請耐心聽我說。

為了幫助 Spring 引用將放在另一個 bundle 中的 town,我們將使用 Spring DM。如果您以前從未接觸過 OSGi 和 Spring DM,現在是閱讀 入門 的好時機。我們只需在 osgi-context.xml 中新增一個 OSGi reference 元素。


<osgi:reference id="collectorService"
	interface="com.springsource.samples.integration.osgi.town.input.Town" />

最後,在 MANIFEST.mf 中存在一個對匯出服務的 town bundle 的依賴。

Import-Package: com.springsource.samples.integration.osgi.town.input,
 org.apache.commons.logging;version="[1.1.1,1.1.1]",
 javax.annotation

同樣,完整程式碼位於 Spring Integration sandbox

Town bundle Town(bundle)已經完全接受了訊息傳遞。他們正在使用 Spring Integration 來處理訊息傳遞和併發。他們還定義了報信員和公民(稍後提及)所需的輸入和輸出介面。

Town 介面有一個 cry 方法,我們希望將它的實現暴露給報信員。我們可以讓 Spring Integration 為我們建立一個實現,該實現接受字串引數並將其封裝在 Message 中放在通道上。這透過 gateway 完成。


<gateway id="inputGateway" default-request-channel="announcements"
	service-interface="com.springsource.samples.integration.osgi.town.input.Town" />

<publish-subscribe-channel id="announcements" />

public interface Town {
	void cry(String string);
}

由於 Town 介面只有一個方法,我們不需要告訴 Spring Integration 呼叫哪個方法,因此這裡的 @Gateway 是可選的。

執行時監聽

鎮上的公民會在市政廳註冊,但他們不知道訊息傳遞。Citizen 可以使用其 onCry(String string) 方法監聽喊話,但它不依賴於 Spring Integration。在某些情況下,您可能希望執行一個不依賴 Spring Integration 的 bundle,但在使用 Spring Integration 的事件驅動架構中啟用它。

示例在 CityHall 的 Town bundle 中使用了一個 service activator,它負責解包訊息並將它們推送給公民


@ServiceActivator
public void onCry(String announcement) {
	for (Citizen consumer : citizens) {
		consumer.onCry(announcement);
	}
}

為了允許公民自行註冊,CitizenRegistry 被公開為一個 OSGi 服務。


<osgi:service id="citizenRegistry" ref="cityHall"
	interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />

公民註冊服務也由 CityHall 實現,用於維護接收公告的公民集合。

在 scribe bundle 中實現了一個 Citizen 的示例。它透過一個 OSGi reference 引用了 CitizenRegistry。


<osgi:reference id="citizenRegistry"
	interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />

Scribe 在初始化期間向公民註冊中心註冊自己,隨後記錄所有接收到的公告。

這樣,只有 town bundle 的程式碼需要更改才能開始使用新系統,而 citizen bundle 可以保持不變。當然,可能有充分的理由擁有多個瞭解訊息傳遞並直接共享通道的 bundle。在下一節中,我們將探討這一點。

將通道公開為 OSGi 服務

假設這個小鎮也與中央政府打交道。政府以 government bundle 的形式派代表來到鎮上。這個代表直接瞭解 announcementsChannel,因此無需使用 gateway 或 service activator 將其遮蔽。小鎮將 announcementsChannel 公開為一個 OSGi 服務


<osgi:service id="announcementsChannel" ref="announcements"
	interface="org.springframework.integration.channel.SubscribableChannel" />

為了將政府法令傳送到鎮上的公告通道,我們在示例中的 government bundle 中有一個 gateway,但這可以是任何端點。重要的是在 government bundle 中引用 town bundle 中的通道


	<osgi:reference id="announcementsChannel"
		interface="org.springframework.integration.channel.SubscribableChannel" />

現在您可以在另一個 bundle 中將其作為普通通道使用


<si:service-activator input-channel="announcementsChannel"
	ref="nationalArchive" />

<si:gateway
	service-interface="com.springsource.samples.integration.osgi.government.GovernmentDecreeGateway"
	id="decreeer" default-request-channel="announcementsChannel" />

為了簡單起見,示例實現了非常簡單的本地端點,但一旦通道連線了 bundle,您就可以使用 Spring Integration 的介面卡來離開 JVM。

我如何在現實世界中應用這一點?

如果訊息有效載荷是中央銀行或美聯儲的利率變化,小鎮是內部銀行應用程式的中心訊息系統,公民是各個交易服務(例如:外匯、債券交易、期權和權證等)。每個都需要模組化,因為
  • 它們由不同的團隊頻繁更改
  • 定期新增更多模組
  • 每個模組需要根據新的利率採取不同的行動

假設 government bundle 成為銀行內部的貸款/信貸部門(其自身對其他機構的貸款利率可能因美聯儲利率而異 - 可能基於不同的架構構建)。我不是金融專家,也不是您所在領域的專家,因此我不會武斷地告訴您這個技術解決方案具體如何應用。將來我們將把示例重構到一個更真實的領域,並將其新增到示例中,為您提供一些思路。如果您有特殊請求,請務必在此處評論反饋。

結論

要設計一個可擴充套件的應用程式,您需要考慮同步移交的問題。訊息傳遞可以解決許多這些問題。為了演進大型應用程式,使其在單個 JVM 上獲得最佳效能,OSGi 是一個引人注目的替代方案。將 Spring Integration 和 dm Server 兩者結合使用,強大而簡單。在本文中,您已經瞭解瞭如何

  • 使用 gateway 處理不依賴 Spring Integration 的 bundle 的入站呼叫
  • 使用 service activator 處理對不依賴 Spring Integration 的 bundle 的出站呼叫
  • 公開 SubscribableChannel,以便使用 Spring Integration 的 bundle 可以輪詢訊息
  • 公開 MessageChannel,以便使用 Spring Integration 的 bundle 可以推送訊息
同樣的設定使用其他訊息傳遞框架和 OSGi 容器也是可能的,但這個示例的簡單性樹立了一個難以超越的良好榜樣。是的,這是一個挑戰,來吧,咬鉤吧,你知道你想要...

連結

訂閱 Spring 通訊

透過 Spring 通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您飛速發展。

瞭解更多

獲取支援

Tanzu Spring 在一項簡單訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群中所有即將舉行的活動。

檢視全部