使用 Python 構建 RabbitMQ 應用

工程 | Greg L. Turnquist | 2010年8月19日 | ...

RabbitMQ 是一個基於 高階訊息佇列協議 (AMQP) 的強大訊息中介軟體。由於 AMQP 規範的通用性,可以輕鬆地從包括 Python 在內的許多平臺連線到它。在本篇博文中,我們將

  • 建立一個簡單的股票行情 Python 應用
  • 建立一個決定何時買賣的經紀商 Python 應用。
  • 比較由 RabbitMQ 團隊建立的 AMQP 庫 pikapy-amqplib
您可以在 http://github.com/gregturn/amqp-demo 找到此博文的所有原始碼。這需要您已經根據 您平臺的說明 安裝並啟動了 RabbitMQ。我個人是在我的 Mac OS X (Snow Leopard) 機器上執行的。

順便說一句

本篇博文中編寫的程式碼僅用於演示目的。請勿依賴這些演算法作為財務建議。
閒話不多說,讓我們開始寫程式碼吧!

構建股票行情

訊息解決方案的一個很好的例子是股票行情繫統。股票交易所向訊息中介軟體釋出訊息,指示股票名稱、價格和時間。
import pickle
import random
import time

class Ticker(object):
    def __init__(self, publisher, qname):
        self.publisher = publisher

        # This quickly creates four random stock symbols
        chars = range(ord("A"), ord("Z")+1)
        def random_letter(): return chr(random.choice(chars))
        self.stock_symbols = [random_letter()+random_letter()+random_letter() for i in range(4)]

        self.last_quote = {}
        self.counter = 0
        self.time_format = "%a, %d %b %Y %H:%M:%S +0000"
        self.qname = qname

    def get_quote(self):
        symbol = random.choice(self.stock_symbols)
        if symbol in self.last_quote:
            previous_quote = self.last_quote[symbol]
            new_quote = random.uniform(0.9*previous_quote, 1.1*previous_quote)
            if abs(new_quote) - 0 < 1.0:
                new_quote = 1.0
            self.last_quote[symbol] = new_quote
        else:
            new_quote = random.uniform(10.0, 250.0)
            self.last_quote[symbol] = new_quote
        self.counter += 1
        return (symbol, self.last_quote[symbol], time.gmtime(), self.counter)

    def monitor(self):
        while True:
            quote = self.get_quote()
            print("New quote is %s" % str(quote))
            self.publisher.publish(pickle.dumps((quote[0], quote[1], time.strftime(self.time_format, quote[2]), quote[3])), routing_key="")
            secs = random.uniform(0.1, 0.5)
            #print("Sleeping %s seconds..." % secs)
            time.sleep(secs)

此應用程式隨機建立四個股票程式碼,然後開始建立行情。它最初在 10.0 和 250.0 之間選擇一個隨機值,然後將價格在先前價格的 90% 到 110% 之間隨機調整。然後,它在進入下一個行情之前隨機等待 0.1 到 0.5 秒。此程式碼設計的一個重要部分是釋出到 AMQP 代理與股票行情程式解耦。相反,它期望在構造時注入一個釋出者服務。

重要的是要注意我們正在使用 pickle 來序列化股票行情資料元組。在 AMQP 中,訊息的正文只是一系列位元組。儲存什麼以及如何序列化不屬於規範的一部分,而必須在傳送方和接收方之間達成一致。在我們的情況下,釋出者和訂閱者都同意它包含一個 pickled 元組。

建立 AMQP 服務

下一步是建立我們的 AMQP 客戶端服務。它的目的是讓我們能夠輕鬆地正確隔離與 AMQP 伺服器的通訊,無論是透過釋出還是透過消費事件。
from amqplib import client_0_8 as amqp

class PyAmqpLibPublisher(object):
    def __init__(self, exchange_name):
        self.exchange_name = exchange_name
        self.queue_exists = False

    def publish(self, message, routing_key):
        conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest", virtual_host="/", insist=False)

        ch = conn.channel()

        ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

        msg = amqp.Message(message)
        msg.properties["content_type"] = "text/plain"
        msg.properties["delivery_mode"] = 2
        ch.basic_publish(exchange=self.exchange_name,
                         routing_key=routing_key,
                         msg=msg)
        ch.close()
        conn.close()

這裡要特別注意的一點是,宣告的交換機型別為“fanout”。這意味著繫結到它的每個佇列都將收到訊息的副本,而無需在代理端進行昂貴的處理。

您可能會想,為什麼正文的 content_type 是“text/plain”,考慮到它是一個序列化的訊息。這是因為 Python 的 pickle 庫以 ASCII 編碼格式編碼資料,該格式可以用任何工具檢視而不會導致奇怪的行為。

低買高賣

一些簡單而明智的建議是,價格低時買入,價格高時賣出。在這裡,我們將檢視一個簡單的客戶端,它訂閱股票行情,收集價格的歷史趨勢以確定下一個價格是處於低端還是高階,然後決定買入或賣出。
import pickle
import random
import uuid

class Buyer(object):
    def __init__(self, client, qname, trend=5):
        self.holdings = {}
        self.cash = 100000.0
        self.history = {}
        self.qname = qname
        self.client = client
        self.trend = trend
        self.qname = uuid.uuid4().hex

    def decide_whether_to_buy_or_sell(self, quote):
        symbol, price, date, counter = quote
        #print "Thinking about whether to buy or sell %s at %s" % (symbol, price)

        if symbol not in self.history:
            self.history[symbol] = [price]
        else:
            self.history[symbol].append(price)

        if len(self.history[symbol]) >= self.trend:
            price_low = min(self.history[symbol][-self.trend:])
            price_max = max(self.history[symbol][-self.trend:])
            price_avg = sum(self.history[symbol][-self.trend:])/self.trend
            #print "Recent history of %s is %s" % (symbol, self.history[symbol][-self.trend:])
        else:
            price_low, price_max, price_avg = (-1, -1, -1)
            print "%s quotes until we start deciding whether to buy or sell %s" % (self.trend - len(self.history[symbol]), symbol)
            #print "Recent history of %s is %s" % (symbol, self.history[symbol])

        if price_low == -1: return

        #print "Trending minimum/avg/max of %s is %s-%s-%s" % (symbol, price_low, price_avg, price_max)
        #for symbol in self.holdings.keys():
        #    print "self.history[symbol][-1] = %s" % self.history[symbol][-1]
        #    print "self.holdings[symbol][0] = %s" % self.holdings[symbol][0]
        #    print "Value of %s is %s" % (symbol, float(self.holdings[symbol][0])*self.history[symbol][-1])
        value = sum([self.holdings[symbol][0]*self.history[symbol][-1] for symbol in self.holdings.keys()])
        print "Net worth is %s + %s = %s" % (self.cash, value, self.cash + value)

        if symbol not in self.holdings:
            if price < 1.01*price_low:
                shares_to_buy = random.choice([10, 15, 20, 25, 30])
                print "I don't own any %s yet, and the price is below the trending minimum of %s so I'm buying %s shares." % (symbol, price_low, shares_to_buy)
                cost = shares_to_buy * price
                print "Cost is %s, cash is %s" % (cost, self.cash)
                if cost < self.cash:
                    self.holdings[symbol] = (shares_to_buy, price, cost)
                    self.cash -= cost
                    print "Cash is now %s" % self.cash
                else:
                    print "Unfortunately, I don't have enough cash at this time."
        else:
            if price > self.holdings[symbol][1] and price > 0.99*price_max:
                print "+++++++ Price of %s is higher than my holdings, so I'm going to sell!" % symbol
                sale_value = self.holdings[symbol][0] * price
                print "Sale value is %s" % sale_value
                print "Holdings value is %s" % self.holdings[symbol][2]
                print "Total net is %s" % (sale_value - self.holdings[symbol][2])
                self.cash += sale_value
                print "Cash is now %s" % self.cash
                del self.holdings[symbol]

    def handle_pyamqplib_delivery(self, msg):
        self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

    def handle(self, ch, delivery_tag, body):
        quote = pickle.loads(body)
        #print "New price for %s => %s at %s" % quote
        ch.basic_ack(delivery_tag = delivery_tag)
        print "Received message %s" % quote[3]
        self.decide_whether_to_buy_or_sell(quote)

    def monitor(self):
        self.client.monitor(self.qname, self.handle_pyamqplib_delivery)

此客戶端將買賣股票的策略很好地隔離了從 RabbitMQ 接收訊息的機制。

  1. monitor 是啟動監聽新股票行情的關鍵入口。它將 handle_pyamqplib_delivery 註冊為每次收到新行情時呼叫的回撥方法。
  2. handle_pyamqplib_delivery 提取訊息的重要部分,然後將其交給 handle。插入此額外方法呼叫的原因是支援用 pika 替換 py-amqplib,我們稍後將對此進行介紹。
  3. handle 對訊息的不透明正文進行反序列化,在通道上向代理確認訊息的接收,然後啟動其關於決定買入還是賣出的演算法。
  4. decide_whether_to_buy_or_sell 分割股票行情元組,然後將其價格新增到其股票程式碼歷史記錄中。它旨在收集最少數量的行情後再做決定。您不想嗎?然後它計算趨勢的最小值和最大值,如果價格相對接近最小值,它就買入。但是,如果它已經持有股票,那麼它會等到價格上漲到超過其最初的買入價。當這種情況發生時,它就會賣出。
這裡缺少的部分是 self.client.monitor 函式。 self.client 是我們之前編寫的 AMQP 服務的鉤子,我們需要一種方法將我們的佇列繫結到交換機以接收訊息。以下函式需要新增到 PyAmqpLibPublisher
    def monitor(self, qname, callback):
        conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest")

        ch = conn.channel()

        if not self.queue_exists:
            ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
            ch.queue_bind(queue=qname, exchange=self.exchange_name)
            print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
            #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
            self.queue_exists = True

        ch.basic_consume(callback=callback, queue=qname)

        while True:
            ch.wait()
        print 'Close reason:', conn.connection_close

這展示了連線到我們的 RabbitMQ 代理,宣告佇列,將其繫結到 fanout 交換機,然後註冊回撥的基本模式。

但是,讓我們不要過於糾結於如何讓這個演算法在挑選贏家和輸家方面做得更好。相反,讓我們認識到這使得任何金融公司都可以透過建立唯一的佇列,繫結到股票系統的 fanout 交換機,然後編寫自己的金融決策演算法來輕鬆訂閱股票行情。

用 pika 替換 py-amqplib

AMQP 是一個設計精良的規範。它包括一種 XML 格式,支援自動生成客戶端庫的能力。這意味著為規範而編寫的庫很容易被替換,並根據其實現的優點進行選擇。Python 社群中一個流行的庫是 py-amqplib。它的一個侷限性,正如在其專案站點上指出的那樣,是它會阻塞並且目前不提供併發。pika 同時提供這兩者。

關鍵點是,從 py-amqplib 遷移到 pika 其實非常容易。基於 AMQP 的方法是相同的,並且底層概念也是相同的。讓我們看看使用 pika 編寫一個替代的 AMQP 服務。

import pika

class PikaPublisher(object):
    def __init__(self, exchange_name):
        self.exchange_name = exchange_name
        self.queue_exists = False

    def publish(self, message, routing_key):
        conn = pika.AsyncoreConnection(pika.ConnectionParameters(
                '127.0.0.1',
                credentials=pika.PlainCredentials('guest', 'guest')))

        ch = conn.channel()

        ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)

        ch.basic_publish(exchange=self.exchange_name,
                         routing_key=routing_key,
                         body=message,
                         properties=pika.BasicProperties(
                                content_type = "text/plain",
                                delivery_mode = 2, # persistent
                                ),
                         block_on_flow_control = True)
        ch.close()
        conn.close()

    def monitor(self, qname, callback):
        conn = pika.AsyncoreConnection(pika.ConnectionParameters(
                '127.0.0.1',
                credentials=pika.PlainCredentials('guest', 'guest')))

        ch = conn.channel()

        if not self.queue_exists:
            ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
            ch.queue_bind(queue=qname, exchange=self.exchange_name)
            print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
            #ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
            self.queue_exists = True

        ch.basic_consume(callback, queue=qname)

        pika.asyncore_loop()
        print 'Close reason:', conn.connection_close

這與前面展示的另一個服務非常相似。建立連線略有不同,但包含相同的 प्रकारचे資料,如 broker 的主機,以及 usernamepasswordbasic_publish 略有不同,訊息及其屬性被放在方法呼叫內部。py-amqplib 以稍有不同的結構宣告整個訊息及其屬性,然後將其作為一個引數傳遞給 basic_publish。關於規範的好處是知道所有重要的部分都在這兩個庫中。

與 py-amqplib 相比,pika 支援不同的等待機制。py-amqplib 具有阻塞等待,而 pika 同時提供阻塞機制和使用 Python 的 asyncore 工具 進行非同步操作的機制。我們可以在關於 RabbitMQ 和 Python 的未來部落格文章中探討這一點。

這些庫的回撥方法簽名略有不同。我們需要更新我們的經紀客戶端以適當地處理它。

    def handle_pyamqplib_delivery(self, msg):
        self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)

將此與 pika 的回撥方法簽名進行比較。

    def handle_pika_delivery(self, ch, method, header, body):
        self.handle(ch, delivery_tag, body)

它們非常接近。重要的部分都在那裡。區別在於 pika 將訊息的各個部分分開,而 py-amqplib 將它們全部組合在一個類中。這就是為什麼回撥方法與提取我們訊息正文的實際方法之間存在解耦。透過提取必要的部分,可以輕鬆地在這些庫之間切換,而無需重寫我們的買賣演算法。

執行

有了所有這些程式碼,我們需要執行它們。很容易編寫一個執行指令碼並直接啟動。
########################################
# To run this demo using py-amqplib,
# uncomment this block, and  comment out
# the next block.
########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")

########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################

from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")

########################################
# This part doesn't have to change
########################################

from ticker_system import *
ticker = Ticker(publisher, "")
ticker.monitor()

這個執行器可以在執行 py-amqplib 或 pika 版本的股票行情繫統之間切換。現在我們只需要一個執行器來執行經紀服務。

########################################
# To run this demo using py-amqplib,
# uncomment this block, and  comment out
# the next block.
########################################

#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")

########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################

from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")

########################################
# This part doesn't have to change
########################################

from buy_low_sell_high import *
buyer = Buyer(publisher, "", trend=25)
print "Buyer = %s" % id(buyer)
buyer.monitor()

在未來的部落格文章中,我們可以考慮使用 Pythonic 的 DI 容器來執行相同的程式碼。

一個好的規範提供了很好的選擇

AMQP 規範使我們能夠基於技術優點以外的更多因素來選擇庫。透過將 AMQP 的機制與生成行情和解析行情的功能分離開來,很容易替換 py-amqplib 和 pika。核心方法名稱相同。幾個引數也相同。但更重要的是:架構概念是相同的。選擇哪個庫現在可以不僅包括技術優點,還包括客戶支援、規範遵從性、同步與非同步支援以及可用性等。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有