summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
committerTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
commitd5913038788795bd964c534bd28983e5732c2fce (patch)
treeb86ab7ab84ec2f25421c161e76ef2924043ca30a /python/qpid
parente173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff)
downloadqpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/management.py69
-rw-r--r--python/qpid/qmfconsole.py145
2 files changed, 129 insertions, 85 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index e6ff5852f2..485b64b99f 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -162,8 +162,12 @@ class managementChannel:
ssn.exchange_bind (exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
- ssn.message_subscribe (queue=self.topicName, destination="tdest")
- ssn.message_subscribe (queue=self.replyName, destination="rdest")
+ ssn.message_subscribe (queue=self.topicName, destination="tdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
+ ssn.message_subscribe (queue=self.replyName, destination="rdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
ssn.incoming ("rdest").listen (self.replyCb)
@@ -202,9 +206,6 @@ class managementChannel:
if self.enabled:
self.qpidChannel.message_transfer (destination=exchange, message=msg)
- def accept (self, msg):
- self.qpidChannel.message_accept(RangedSet(msg.id))
-
def message (self, body, routing_key="broker"):
dp = self.qpidChannel.delivery_properties()
dp.routing_key = routing_key
@@ -349,28 +350,27 @@ class managementClient:
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
codec = Codec (self.spec, msg.body)
- hdr = self.checkHeader (codec)
- if hdr == None:
- raise ValueError ("outer header invalid");
+ while True:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ return
- if hdr[0] == 'p':
- self.handlePackageInd (ch, codec)
- elif hdr[0] == 'q':
- self.handleClassInd (ch, codec)
- elif hdr[0] == 'h':
- self.handleHeartbeat (ch, codec)
- elif hdr[0] == 'e':
- self.handleEvent (ch, codec)
- else:
- self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
+ if hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ elif hdr[0] == 'h':
+ self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
- ch.accept(msg)
return
if hdr[0] == 'm':
@@ -385,7 +385,6 @@ class managementClient:
self.handleClassInd (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
def exceptCb (self, ch, data):
if self.closeCb != None:
@@ -403,20 +402,22 @@ class managementClient:
codec.write_uint32 (seq)
def checkHeader (self, codec):
- """ Check the header of a management message and extract the opcode and
- class. """
- octet = chr (codec.read_uint8 ())
- if octet != 'A':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != 'M':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != '2':
+ """ Check the header of a management message and extract the opcode and class. """
+ try:
+ octet = chr (codec.read_uint8 ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != '2':
+ return None
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
+ return (opcode, seq)
+ except:
return None
- opcode = chr (codec.read_uint8 ())
- seq = codec.read_uint32 ()
- return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 5e1df10e5b..ed4565dac4 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -62,11 +62,11 @@ class Console:
""" Invoked when a QMF agent disconects. """
pass
- def objectProps(self, broker, id, record):
+ def objectProps(self, broker, record):
""" Invoked when an object is updated. """
pass
- def objectStats(self, broker, id, record):
+ def objectStats(self, broker, record):
""" Invoked when an object is updated. """
pass
@@ -111,10 +111,10 @@ class Session:
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 10
+ GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
- manageConnections=False):
+ manageConnections=False, userBindings=False):
"""
Initialize a session. If the console argument is provided, the
more advanced asynchronous features are available. If console is
@@ -131,6 +131,12 @@ class Session:
If manageConnections is set to False, the user is responsible for handing failures. In
this case, an unreachable broker will cause addBroker to raise an exception.
+
+ If userBindings is set to False (the default) and rcvObjects is True, the console will
+ receive data for all object classes. If userBindings is set to True, the user must select
+ which classes the console shall receive by invoking the bindPackage or bindClass methods.
+ This allows the console to be configured to receive only information that is relavant to
+ a particular application. If rcvObjects id False, userBindings has no meaning.
"""
self.console = console
self.brokers = []
@@ -141,14 +147,21 @@ class Session:
self.getResult = []
self.getSelect = []
self.error = None
+ self.rcvObjects = rcvObjects
+ self.rcvEvents = rcvEvents
+ self.rcvHeartbeats = rcvHeartbeats
+ self.userBindings = userBindings
if self.console == None:
- rcvObjects = False
- rcvEvents = False
- rcvHeartbeats = False
- self.bindingKeyList = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats)
+ self.rcvObjects = False
+ self.rcvEvents = False
+ self.rcvHeartbeats = False
+ self.bindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
- if (manageConnections):
+ if self.userBindings and not self.rcvObjects:
+ raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
+ if manageConnections:
raise Exception("manageConnections - not yet implemented")
def __repr__(self):
@@ -200,6 +213,23 @@ class Session:
if (cname, hash) in self.packages[pname]:
return self.packages[pname][(cname, hash)]
+ def bindPackage(self, packageName):
+ """ """
+ if not self.userBindings or not self.rcvObjects:
+ raise Exception("userBindings option not set for Session")
+ for broker in self.brokers:
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key="console.obj.%s" % packageName)
+
+ def bindClass(self, classKey):
+ """ """
+ if not self.userBindings or not self.rcvObjects:
+ raise Exception("userBindings option not set for Session")
+ pname, cname, hash = classKey
+ for broker in self.brokers:
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+ binding_key="console.obj.%s.%s" % (pname, cname))
+
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
@@ -322,20 +352,19 @@ class Session:
""" """
pass
- def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats):
+ def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
- if rcvObjects and rcvEvents and rcvHeartbeats:
+ if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
keyList.append("console.#")
else:
- if rcvObjects:
- keyList.append("console.prop.#")
- keyList.append("console.stat.#")
+ if self.rcvObjects and not self.userBindings:
+ keyList.append("console.obj.#")
else:
- keyList.append("console.prop.org.apache.qpid.broker.agent")
- if rcvEvents:
+ keyList.append("console.obj.org.apache.qpid.broker.agent")
+ if self.rcvEvents:
keyList.append("console.event.#")
- if rcvHeartbeats:
+ if self.rcvHeartbeats:
keyList.append("console.heartbeat")
return keyList
@@ -488,9 +517,9 @@ class Session:
if self.console != None:
if prop:
- self.console.objectProps(broker, object.getObjectId(), object)
+ self.console.objectProps(broker, object)
if stat:
- self.console.objectStats(broker, object.getObjectId(), object)
+ self.console.objectStats(broker, object)
def _handleError(self, error):
self.error = error
@@ -858,6 +887,15 @@ class Object(object):
def getStatistics(self):
return self._statistics
+ def mergeUpdate(self, newer):
+ """ Replace properties and/or statistics with a newly received update """
+ if self._objectId != newer._objectId:
+ raise Exception("Objects with different object-ids")
+ if len(newer.getProperties()) > 0:
+ self.properties = newer.getProperties()
+ if len(newer.getStatistics()) > 0:
+ self.statistics = newer.getStatistics()
+
def __repr__(self):
return self.getIndex()
@@ -960,7 +998,7 @@ class MethodResult(object):
class Broker:
""" """
- SYNC_TIME = 10
+ SYNC_TIME = 60
def __init__(self, session, host, port, authMech, authUser, authPass):
self.session = session
@@ -1024,7 +1062,9 @@ class Broker:
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
- self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest")
+ self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF)
@@ -1032,7 +1072,9 @@ class Broker:
self.topicName = "topic-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
- self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest")
+ self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._replyCb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
@@ -1076,18 +1118,21 @@ class Broker:
def _checkHeader(self, codec):
""" Check the header of a management message and extract the opcode and class. """
- octet = chr(codec.read_uint8())
- if octet != 'A':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != 'M':
- return None, None
- octet = chr(codec.read_uint8())
- if octet != '2':
+ try:
+ octet = chr(codec.read_uint8())
+ if octet != 'A':
+ return None, None
+ octet = chr(codec.read_uint8())
+ if octet != 'M':
+ return None, None
+ octet = chr(codec.read_uint8())
+ if octet != '2':
+ return None, None
+ opcode = chr(codec.read_uint8())
+ seq = codec.read_uint32()
+ return opcode, seq
+ except:
return None, None
- opcode = chr(codec.read_uint8())
- seq = codec.read_uint32()
- return opcode, seq
def _message (self, body, routing_key="broker"):
dp = self.amqpSession.delivery_properties()
@@ -1143,23 +1188,21 @@ class Broker:
self.cv.release()
def _replyCb(self, msg):
- self.amqpSession.message_accept(RangedSet(msg.id))
codec = Codec(self.conn.spec, msg.body)
- opcode, seq = self._checkHeader(codec)
- if opcode == None:
- return
-
- if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
- elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
- elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
+ while True:
+ opcode, seq = self._checkHeader(codec)
+ if opcode == None: return
+ if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
+ elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+ elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
+ elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
+ elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq)
+ elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
+ elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
+ elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
+ elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
self.isConnected = False
@@ -1286,10 +1329,10 @@ class DebugConsole(Console):
def delAgent(self, agent):
print "delAgent:", agent
- def objectProps(self, broker, id, record):
+ def objectProps(self, broker, record):
print "objectProps:", record.getClassKey()
- def objectStats(self, broker, id, record):
+ def objectStats(self, broker, record):
print "objectStats:", record.getClassKey()
def event(self, broker, event):