搶佔先機
VMware 提供培訓和認證,助您加速進步。
瞭解更多RabbitMQ 是一個強大的訊息代理,基於高階訊息佇列協議 (AMQP)。由於 AMQP 規範的中立性,可以輕鬆地從包括 Python 在內的許多平臺連線到它。在這篇部落格文章中,我們將
順便說一下
本部落格文章中編寫的程式碼僅用於演示目的。請勿依賴這些演算法進行金融建議。說完這些,讓我們寫一些程式碼吧!
import pickle
import random
import time
class Ticker(object):
def __init__(self, publisher, qname):
self.publisher = publisher
# This quickly creates four random stock symbols
chars = range(ord("A"), ord("Z")+1)
def random_letter(): return chr(random.choice(chars))
self.stock_symbols = [random_letter()+random_letter()+random_letter() for i in range(4)]
self.last_quote = {}
self.counter = 0
self.time_format = "%a, %d %b %Y %H:%M:%S +0000"
self.qname = qname
def get_quote(self):
symbol = random.choice(self.stock_symbols)
if symbol in self.last_quote:
previous_quote = self.last_quote[symbol]
new_quote = random.uniform(0.9*previous_quote, 1.1*previous_quote)
if abs(new_quote) - 0 < 1.0:
new_quote = 1.0
self.last_quote[symbol] = new_quote
else:
new_quote = random.uniform(10.0, 250.0)
self.last_quote[symbol] = new_quote
self.counter += 1
return (symbol, self.last_quote[symbol], time.gmtime(), self.counter)
def monitor(self):
while True:
quote = self.get_quote()
print("New quote is %s" % str(quote))
self.publisher.publish(pickle.dumps((quote[0], quote[1], time.strftime(self.time_format, quote[2]), quote[3])), routing_key="")
secs = random.uniform(0.1, 0.5)
#print("Sleeping %s seconds..." % secs)
time.sleep(secs)
此應用程式隨機建立四個股票程式碼,然後開始建立報價。它最初選擇 10.0 到 250.0 之間的隨機值,然後繼續將價格隨機調整到前一個價格的 90% 到 110% 之間。然後它隨機等待 0.1 到 0.5 秒,然後再產生下一個報價。此程式碼設計的一個重要部分是將釋出到 AMQP 代理與股票行情解耦。相反,它期望在構造時注入一個釋出者服務。
值得注意的是,我們使用 pickle 來序列化我們的股票報價資料元組。在 AMQP 中,訊息的主體只是一系列位元組。儲存什麼以及如何序列化不屬於規範的一部分,而是必須在傳送方和接收方之間達成一致。在我們的情況下,釋出者和訂閱者都同意它包含一個 pickled 元組。
from amqplib import client_0_8 as amqp
class PyAmqpLibPublisher(object):
def __init__(self, exchange_name):
self.exchange_name = exchange_name
self.queue_exists = False
def publish(self, message, routing_key):
conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest", virtual_host="/", insist=False)
ch = conn.channel()
ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)
msg = amqp.Message(message)
msg.properties["content_type"] = "text/plain"
msg.properties["delivery_mode"] = 2
ch.basic_publish(exchange=self.exchange_name,
routing_key=routing_key,
msg=msg)
ch.close()
conn.close()
這裡需要注意的一點是宣告的交換器型別為“fanout”。這意味著繫結到它的每個佇列都將收到訊息的副本,而不會在代理端進行昂貴的處理。
您可能想知道為什麼主體內容型別是“text/plain”,考慮到它是一個序列化訊息。這是因為 Python 的 pickle 庫以 ASCII 鎧甲格式編碼資料,可以使用任何工具檢視而不會導致奇怪的行為。
import pickle
import random
import uuid
class Buyer(object):
def __init__(self, client, qname, trend=5):
self.holdings = {}
self.cash = 100000.0
self.history = {}
self.qname = qname
self.client = client
self.trend = trend
self.qname = uuid.uuid4().hex
def decide_whether_to_buy_or_sell(self, quote):
symbol, price, date, counter = quote
#print "Thinking about whether to buy or sell %s at %s" % (symbol, price)
if symbol not in self.history:
self.history[symbol] = [price]
else:
self.history[symbol].append(price)
if len(self.history[symbol]) >= self.trend:
price_low = min(self.history[symbol][-self.trend:])
price_max = max(self.history[symbol][-self.trend:])
price_avg = sum(self.history[symbol][-self.trend:])/self.trend
#print "Recent history of %s is %s" % (symbol, self.history[symbol][-self.trend:])
else:
price_low, price_max, price_avg = (-1, -1, -1)
print "%s quotes until we start deciding whether to buy or sell %s" % (self.trend - len(self.history[symbol]), symbol)
#print "Recent history of %s is %s" % (symbol, self.history[symbol])
if price_low == -1: return
#print "Trending minimum/avg/max of %s is %s-%s-%s" % (symbol, price_low, price_avg, price_max)
#for symbol in self.holdings.keys():
# print "self.history[symbol][-1] = %s" % self.history[symbol][-1]
# print "self.holdings[symbol][0] = %s" % self.holdings[symbol][0]
# print "Value of %s is %s" % (symbol, float(self.holdings[symbol][0])*self.history[symbol][-1])
value = sum([self.holdings[symbol][0]*self.history[symbol][-1] for symbol in self.holdings.keys()])
print "Net worth is %s + %s = %s" % (self.cash, value, self.cash + value)
if symbol not in self.holdings:
if price < 1.01*price_low:
shares_to_buy = random.choice([10, 15, 20, 25, 30])
print "I don't own any %s yet, and the price is below the trending minimum of %s so I'm buying %s shares." % (symbol, price_low, shares_to_buy)
cost = shares_to_buy * price
print "Cost is %s, cash is %s" % (cost, self.cash)
if cost < self.cash:
self.holdings[symbol] = (shares_to_buy, price, cost)
self.cash -= cost
print "Cash is now %s" % self.cash
else:
print "Unfortunately, I don't have enough cash at this time."
else:
if price > self.holdings[symbol][1] and price > 0.99*price_max:
print "+++++++ Price of %s is higher than my holdings, so I'm going to sell!" % symbol
sale_value = self.holdings[symbol][0] * price
print "Sale value is %s" % sale_value
print "Holdings value is %s" % self.holdings[symbol][2]
print "Total net is %s" % (sale_value - self.holdings[symbol][2])
self.cash += sale_value
print "Cash is now %s" % self.cash
del self.holdings[symbol]
def handle_pyamqplib_delivery(self, msg):
self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)
def handle(self, ch, delivery_tag, body):
quote = pickle.loads(body)
#print "New price for %s => %s at %s" % quote
ch.basic_ack(delivery_tag = delivery_tag)
print "Received message %s" % quote[3]
self.decide_whether_to_buy_or_sell(quote)
def monitor(self):
self.client.monitor(self.qname, self.handle_pyamqplib_delivery)
此客戶端的買賣股票策略與接收 RabbitMQ 訊息的機制很好地隔離開來。
def monitor(self, qname, callback):
conn = amqp.Connection(host="127.0.0.1", userid="guest", password="guest")
ch = conn.channel()
if not self.queue_exists:
ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
ch.queue_bind(queue=qname, exchange=self.exchange_name)
print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
#ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
self.queue_exists = True
ch.basic_consume(callback=callback, queue=qname)
while True:
ch.wait()
print 'Close reason:', conn.connection_close
這展示了連線到我們的 RabbitMQ 代理、宣告佇列、將其繫結到 fanout 交換器,然後註冊回撥的基本模式。
但我們不要糾結於如何使這個演算法更好地挑選贏家和輸家。相反,讓我們認識到,這使得任何金融公司都可以透過建立一個唯一的佇列,繫結到股票系統的 fanout 交換器,然後編寫自己的演算法來做出金融決策,從而非常容易地訂閱股票報價。
重點是,從 py-amqplib 遷移到 pika 其實非常容易。基於 AMQP 的方法是相同的,並且基本概念也相同。讓我們看看使用 pika 編寫另一個 AMQP 服務。
import pika
class PikaPublisher(object):
def __init__(self, exchange_name):
self.exchange_name = exchange_name
self.queue_exists = False
def publish(self, message, routing_key):
conn = pika.AsyncoreConnection(pika.ConnectionParameters(
'127.0.0.1',
credentials=pika.PlainCredentials('guest', 'guest')))
ch = conn.channel()
ch.exchange_declare(exchange=self.exchange_name, type="fanout", durable=False, auto_delete=False)
ch.basic_publish(exchange=self.exchange_name,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
content_type = "text/plain",
delivery_mode = 2, # persistent
),
block_on_flow_control = True)
ch.close()
conn.close()
def monitor(self, qname, callback):
conn = pika.AsyncoreConnection(pika.ConnectionParameters(
'127.0.0.1',
credentials=pika.PlainCredentials('guest', 'guest')))
ch = conn.channel()
if not self.queue_exists:
ch.queue_declare(queue=qname, durable=False, exclusive=False, auto_delete=False)
ch.queue_bind(queue=qname, exchange=self.exchange_name)
print "Binding queue %s to exchange %s" % (qname, self.exchange_name)
#ch.queue_bind(queue=qname, exchange=self.exchange_name, routing_key=qname)
self.queue_exists = True
ch.basic_consume(callback, queue=qname)
pika.asyncore_loop()
print 'Close reason:', conn.connection_close
這與前面展示的另一個服務非常相似。建立連線略有不同,但包含相同的資料,例如 broker 的主機以及 username 和 password。basic_publish 略有不同,訊息及其屬性一起放在方法呼叫內部。py-amqplib 在稍微不同的結構中聲明瞭整個訊息及其屬性,然後將其作為單個引數傳遞給 basic_publish。規範的好處在於知道所有重要的部分都存在於這兩個庫中。
與 py-amqplib 相比,pika 支援不同的等待機制。py-amqplib 有阻塞式等待,而 pika 既提供阻塞機制,也提供使用Python 的 asyncore 工具進行非同步操作的機制。我們可以在未來的關於 RabbitMQ 和 Python 的部落格文章中探討這一點。
這兩個庫之間的回撥方法簽名略有不同。我們需要更新我們的券商客戶端以適當地處理它。
def handle_pyamqplib_delivery(self, msg):
self.handle(msg.delivery_info["channel"], msg.delivery_info["delivery_tag"], msg.body)
將其與 pika 的回撥方法簽名進行比較。
def handle_pika_delivery(self, ch, method, header, body):
self.handle(ch, delivery_tag, body)
它們非常接近。重要的部分都在那裡。區別在於 pika 將訊息的部分拆分開,而 py-amqplib 將所有部分組合在一個單一類中。這就是為什麼回撥方法和實際提取訊息主體的方**法**之間存在解耦的原因。透過提取必要的部分,可以在這兩個庫之間切換,而無需重寫我們的買/賣演算法。
########################################
# To run this demo using py-amqplib,
# uncomment this block, and comment out
# the next block.
########################################
#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")
########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################
from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")
########################################
# This part doesn't have to change
########################################
from ticker_system import *
ticker = Ticker(publisher, "")
ticker.monitor()
這個執行程式可以在執行 py-amqplib 版本或 pika 版本的股票行情繫統之間切換。現在我們只需要一個券商服務的執行程式。
########################################
# To run this demo using py-amqplib,
# uncomment this block, and comment out
# the next block.
########################################
#from amqplib_client import *
#publisher = PyAmqpLibPublisher(exchange_name="my_exchange")
########################################
# To run this demo using pika,
# uncomment this block, and comment out
# the previous block
########################################
from pika_client import *
publisher = PikaPublisher(exchange_name="my_exchange")
########################################
# This part doesn't have to change
########################################
from buy_low_sell_high import *
buyer = Buyer(publisher, "", trend=25)
print "Buyer = %s" % id(buyer)
buyer.monitor()
在未來的部落格文章中,我們可以考慮使用 Pythonic DI 容器執行相同的程式碼。