diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-06-11 14:34:18 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-06-11 14:34:18 +0000 |
| commit | 38bf5de394cd3f0f9309924cbdabc96940f98a91 (patch) | |
| tree | a4c1732f74b2b2dda2ab6a55884897a3a03d3a23 /extras | |
| parent | f950644d0a9a3c6bd30bd304c9d3c6e344baa46e (diff) | |
| download | qpid-python-38bf5de394cd3f0f9309924cbdabc96940f98a91.tar.gz | |
QPID-2663: scale and performance optimizations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@953703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
| -rw-r--r-- | extras/qmf/src/py/qmf/console.py | 352 |
1 files changed, 233 insertions, 119 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 02c5ff1c6a..617a4f0234 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -34,8 +34,9 @@ from qpid.connection import Connection, ConnectionFailed, Timeout from qpid.datatypes import Message, RangedSet, UUID from qpid.util import connect, ssl, URL from qpid.codec010 import StringCodec as Codec -from threading import Lock, Condition, Thread -from time import time, strftime, gmtime +from threading import Lock, Condition, Thread, Semaphore +from Queue import Queue, Empty +from time import time, strftime, gmtime, sleep from cStringIO import StringIO #import qpid.log @@ -382,7 +383,7 @@ class Object(object): mp.user_id = self._broker.authUser mp.correlation_id = str(seq) mp.app_id = "qmf2" - mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name) + mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_method_request'} sendCodec.write_map(call) smsg = Message(dp, mp, sendCodec.encoded) @@ -671,7 +672,8 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) if broker.brokerSupportsV2: - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key) + # data indications should arrive on the lo priority queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key) def bindClass(self, pname, cname): @@ -686,7 +688,8 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key) if broker.brokerSupportsV2: - broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key) + # data indications should arrive on the lo priority queue + broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key) def bindClassKey(self, classKey): @@ -1200,7 +1203,7 @@ class Session: if Session.ENCODINGS.has_key(klass): return self.ENCODINGS[klass] for base in klass.__bases__: - result = self._encoding(base, obj) + result = self._encoding(base) if result != None: return result @@ -1333,7 +1336,7 @@ class Session: mp.user_id = broker.authUser mp.correlation_id = str(seq) mp.app_id = "qmf2" - mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name) + mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_method_request'} sendCodec.write_map(call) msg = Message(dp, mp, sendCodec.encoded) @@ -1880,82 +1883,32 @@ class MethodResult(object): #=================================================================================================== -# ManagedConnection +# Broker #=================================================================================================== -class ManagedConnection(Thread): - """ Thread class for managing a connection. """ +class Broker(Thread): + """ This object represents a connection (or potential connection) to a QMF broker. """ + SYNC_TIME = 60 + nextSeq = 1 + + # for connection recovery DELAY_MIN = 1 DELAY_MAX = 128 DELAY_FACTOR = 2 - def __init__(self, broker): - Thread.__init__(self) - self.daemon = True - self.name = "Connection for broker: %s:%d" % (broker.host, broker.port) - self.broker = broker - self.cv = Condition() - self.canceled = False - - def stop(self): - """ Tell this thread to stop running and return. """ - try: - self.cv.acquire() - self.canceled = True - self.cv.notify() - finally: - self.cv.release() - - def disconnected(self): - """ Notify the thread that the connection was lost. """ - try: - self.cv.acquire() - self.cv.notify() - finally: - self.cv.release() - - def run(self): - """ Main body of the running thread. """ - delay = self.DELAY_MIN - while True: - try: - self.broker._tryToConnect() - try: - self.cv.acquire() - while (not self.canceled) and self.broker.connected: - self.cv.wait() - if self.canceled: - return - delay = self.DELAY_MIN - finally: - self.cv.release() - except socket.error: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - except SessionDetached: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - except Closed: - if delay < self.DELAY_MAX: - delay *= self.DELAY_FACTOR - - try: - self.cv.acquire() - self.cv.wait(delay) - if self.canceled: - return - finally: - self.cv.release() + class _q_item: + """ Broker-private class to encapsulate data sent to the broker thread + queue. + """ + type_wakeup = 0 + type_v1msg = 1 + type_v2msg = 2 - -#=================================================================================================== -# Broker -#=================================================================================================== -class Broker: - """ This object represents a connection (or potential connection) to a QMF broker. """ - SYNC_TIME = 60 - nextSeq = 1 + def __init__(self, typecode, data): + self.typecode = typecode + self.data = data def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None): + Thread.__init__(self) self.session = session self.host = host self.port = port @@ -1967,18 +1920,28 @@ class Broker: self.cv = Condition() self.seqToAgentMap = {} self.error = None + self.conn_exc = None # exception hit by _tryToConnect() self.brokerId = None self.connected = False self.brokerAgent = None self.brokerSupportsV2 = None + self.rcv_queue = Queue() # for msg received on session self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) Broker.nextSeq += 1 - if self.session.manageConnections: - self.thread = ManagedConnection(self) - self.thread.start() - else: - self.thread = None - self._tryToConnect() + + # thread control + self.setDaemon(True) + self.setName("Thread for broker: %s:%d" % (host, port)) + self.canceled = False + self.ready = Semaphore(0) + self.start() + if not self.session.manageConnections: + # wait for connection setup to complete in subthread. + # On failure, propagate exeception to caller + self.ready.acquire() + if self.conn_exc: + raise self.conn_exc + def isConnected(self): """ Return True if there is an active connection to the broker. """ @@ -2064,6 +2027,8 @@ class Broker: self.cv.release() def _tryToConnect(self): + self.error = None + self.conn_exc = None try: try: self.cv.acquire() @@ -2106,9 +2071,9 @@ class Broker: accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) - self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) + self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200) self.topicName = "topic-%s" % self.amqpSessionId self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) @@ -2116,9 +2081,9 @@ class Broker: accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("tdest").listen(self._v1Cb) - self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) - self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) + self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200) ## ## Check to see if the broker has QMFv2 exchanges configured @@ -2139,21 +2104,44 @@ class Broker: ## Set up connectivity for QMFv2 ## if self.brokerSupportsV2: - self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId - self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True) + # set up 3 queues: + # 1 direct queue - for responses destined to this console. + # 2 topic queues - one for heartbeats (hi priority), one for all other indications. + self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True) + self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True) + self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="qmf.default.direct", - queue=self.v2_queue_name, binding_key=self.v2_queue_name) + queue=self.v2_direct_queue, binding_key=self.v2_direct_queue) ## Other bindings here... - self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest", + + self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) - self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1) - self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL) - self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL) + self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50) - self.connected = True - self.session._handleBrokerConnect(self) + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25) + + + self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window) + self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100) codec = Codec() self._setHeader(codec, 'B') @@ -2162,15 +2150,20 @@ class Broker: if self.brokerSupportsV2: self._v2SendAgentLocate() + return True # connection complete + except socket.error, e: self.error = "Socket Error %s - %s" % (e.__class__.__name__, e) - raise except Closed, e: self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e) - raise except ConnectionFailed, e: self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e) - raise + except: + e = Exception("Unknown connection exception") + self.error = str(e) + + self.conn_exc = e + return False # connection failed def _updateAgent(self, obj): bankKey = str(obj.agentBank) @@ -2233,7 +2226,7 @@ class Broker: mp.content_type = "amqp/map" mp.user_id = self.authUser mp.app_id = "qmf2" - mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name) + mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_agent_locate_request'} sendCodec = Codec() sendCodec.write_map(predicate) @@ -2280,14 +2273,20 @@ class Broker: def _send(self, msg, dest="qpid.management"): self.amqpSession.message_transfer(destination=dest, message=msg) - def _shutdown(self): - if self.thread: - self.thread.stop() - self.thread.join() + def _shutdown(self, _timeout=10): + if self.isAlive(): + # kick the thread + self.canceled = True + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) + self.join(_timeout) if self.connected: self.amqpSession.incoming("rdest").stop() if self.session.console != None: self.amqpSession.incoming("tdest").stop() + if self.brokerSupportsV2: + self.amqpSession.incoming("v2dest").stop() + self.amqpSession.incoming("v2TopicLo").stop() + self.amqpSession.incoming("v2TopicHi").stop() self.amqpSession.close() self.conn.close() self.connected = False @@ -2325,9 +2324,20 @@ class Broker: self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key=key) if self.brokerSupportsV2: + # do not drop heartbeat indications when under load from data + # or event indications. Put heartbeats on their own dedicated + # queue. + # for key in self.session.v2BindingKeyList: - self.amqpSession.exchange_bind(exchange="qmf.default.topic", - queue=self.v2_queue_name, binding_key=key) + if key.startswith("agent.ind.heartbeat"): + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_topic_queue_hi, + binding_key=key) + else: + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_topic_queue_lo, + binding_key=key) + if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() @@ -2335,14 +2345,19 @@ class Broker: self.cv.release() def _v1Cb(self, msg): + """ Callback from session receive thread for V1 messages + """ + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v1msg, msg)) + + def _v1Dispatch(self, msg): try: - self._v1CbProtected(msg) + self._v1DispatchProtected(msg) except Exception, e: print "EXCEPTION in Broker._v1Cb:", e import traceback traceback.print_exc() - def _v1CbProtected(self, msg): + def _v1DispatchProtected(self, msg): """ This is the general message handler for messages received via the QMFv1 exchanges. """ @@ -2382,7 +2397,7 @@ class Broker: finally: self.cv.release() - if opcode == None: return + if opcode == None: break if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) @@ -2400,6 +2415,11 @@ class Broker: pass def _v2Cb(self, msg): + """ Callback from session receive thread for V2 messages + """ + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg)) + + def _v2Dispatch(self, msg): """ This is the general message handler for messages received via QMFv2 exchanges. """ @@ -2455,18 +2475,112 @@ class Broker: pass def _exceptionCb(self, data): - self.connected = False - self.error = data + """ Exception notification callback from session receive thread. + """ + self.cv.acquire() try: - self.cv.acquire() - if self.syncInFlight: - self.cv.notify() + self.connected = False + self.error = data finally: self.cv.release() - self.session._handleError(self.error) - self.session._handleBrokerDisconnect(self) - if self.thread: - self.thread.disconnected() + self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None)) + + def run(self): + """ Main body of the running thread. """ + + # First, attempt a connection. In the unmanaged case, + # failure to connect needs to cause the Broker() + # constructor to raise an exception. + delay = self.DELAY_MIN + while not self.canceled: + if self._tryToConnect(): # connection up + break + # unmanaged connection - fail & wake up constructor + if not self.session.manageConnections: + self.ready.release() + return + # managed connection - try again + count = 0 + while not self.canceled and count < delay: + sleep(1) + count += 1 + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + + if self.canceled: + self.ready.release() + return + + # connection successful! + self.cv.acquire() + try: + self.connected = True + finally: + self.cv.release() + + self.session._handleBrokerConnect(self) + self.ready.release() + + while not self.canceled: + + item = self.rcv_queue.get() + + if self.canceled: + return + + if not self.connected: + # connection failure + while item: + # drain the queue + try: + item = self.rcv_queue.get(block=False) + except Empty: + break + + # notify any waiters, and callback + self.cv.acquire() + try: + edata = self.error; + if self.syncInFlight: + self.cv.notify() + finally: + self.cv.release() + self.session._handleError(edata) + self.session._handleBrokerDisconnect(self) + + if not self.session.manageConnections: + return # do not attempt recovery + + # retry connection setup + delay = self.DELAY_MIN + while not self.canceled: + if self._tryToConnect(): + break + # managed connection - try again + count = 0 + while not self.canceled and count < delay: + sleep(1) + count += 1 + if delay < self.DELAY_MAX: + delay *= self.DELAY_FACTOR + + if self.canceled: + return + + # connection successful! + self.cv.acquire() + try: + self.connected = True + finally: + self.cv.release() + + self.session._handleBrokerConnect(self) + + elif item.typecode == Broker._q_item.type_v1msg: + self._v1Dispatch(item.data) + elif item.typecode == Broker._q_item.type_v2msg: + self._v2Dispatch(item.data) + #=================================================================================================== @@ -2932,7 +3046,7 @@ class Agent: mp.user_id = self.broker.authUser mp.correlation_id = str(sequence) mp.app_id = "qmf2" - mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name) + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_query_request'} sendCodec = Codec() sendCodec.write_map(query) |
