summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-05-12 17:19:19 +0000
committerTed Ross <tross@apache.org>2010-05-12 17:19:19 +0000
commit84cd6faf2c13f0a9a8b45a52e52fbaa4ad4775af (patch)
treeef862c1693390f2de26ce92480c0935b1d2c79e1
parent0fa302cbc61335e930bc78ed619973e1c0fcce83 (diff)
downloadqpid-python-84cd6faf2c13f0a9a8b45a52e52fbaa4ad4775af.tar.gz
Two fixes for qmf.console:
1) use proper binding-keys for filtering QMFv2 data indications 2) fix the problem where stat-updates were being lumped in with property updates for V1 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943589 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/qmf/src/py/qmf/console.py72
1 files changed, 45 insertions, 27 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index 6f2afe1413..15c71fd799 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -564,7 +564,7 @@ class Session:
self.rcvObjects = False
self.rcvEvents = False
self.rcvHeartbeats = False
- self.bindingKeyList = self._bindingKeys()
+ self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
if self.userBindings and not self.rcvObjects:
@@ -650,24 +650,30 @@ class Session:
""" Request object updates for all table classes within a package. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
- key = "console.obj.*.*.%s.#" % packageName
- self.bindingKeyList.append(key)
+ v1key = "console.obj.*.*.%s.#" % packageName
+ v2key = "agent.ind.data.%s.#" % packageName
+ self.v1BindingKeyList.append(v1key)
+ self.v2BindingKeyList.append(v2key)
for broker in self.brokers:
if broker.isConnected():
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key=key)
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
+ if broker.brokerSupportsV2:
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
def bindClass(self, pname, cname):
""" Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
- key = "console.obj.*.*.%s.%s.#" % (pname, cname)
- self.bindingKeyList.append(key)
+ v1key = "console.obj.*.*.%s.%s.#" % (pname, cname)
+ v2key = "agent.ind.data.%s.%s" % (pname, cname)
+ self.v1BindingKeyList.append(v1key)
+ self.v2BindingKeyList.append(v2key)
for broker in self.brokers:
if broker.isConnected():
- broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key=key)
+ broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
+ if broker.brokerSupportsV2:
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
def bindClassKey(self, classKey):
@@ -790,20 +796,25 @@ class Session:
def _bindingKeys(self):
- keyList = []
- keyList.append("schema.#")
+ v1KeyList = []
+ v2KeyList = []
+ v1KeyList.append("schema.#")
+ v2KeyList.append("agent.ind.heartbeat.#")
if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
- keyList.append("console.#")
+ v1KeyList.append("console.#")
+ v2KeyList.append("agent.#")
else:
if self.rcvObjects and not self.userBindings:
- keyList.append("console.obj.#")
+ v1KeyList.append("console.obj.#")
+ v2KeyList.append("agent.ind.data.#")
else:
- keyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
+ v1KeyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
if self.rcvEvents:
- keyList.append("console.event.#")
+ v1KeyList.append("console.event.#")
+ v2KeyList.append("agent.ind.event.#")
if self.rcvHeartbeats:
- keyList.append("console.heartbeat.#")
- return keyList
+ v1KeyList.append("console.heartbeat.#")
+ return (v1KeyList, v2KeyList)
def _handleBrokerConnect(self, broker):
@@ -2014,8 +2025,6 @@ class Broker:
self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="qmf.default.direct",
queue=self.v2_queue_name, binding_key=self.v2_queue_name)
- self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_queue_name, binding_key="agent.#")
## Other bindings here...
self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
accept_mode=self.amqpSession.accept_mode.none,
@@ -2194,9 +2203,13 @@ class Broker:
self.reqsOutstanding -= 1
if self.reqsOutstanding == 0 and not self.topicBound:
self.topicBound = True
- for key in self.session.bindingKeyList:
+ for key in self.session.v1BindingKeyList:
self.amqpSession.exchange_bind(exchange="qpid.management",
queue=self.topicName, binding_key=key)
+ if self.brokerSupportsV2:
+ for key in self.session.v2BindingKeyList:
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_queue_name, binding_key=key)
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
@@ -2600,7 +2613,7 @@ class Agent:
finally:
self.lock.release()
- context.addV1QueryResult(obj)
+ context.addV1QueryResult(obj, prop, stat)
def _v2HandleDataInd(self, mp, ah, content):
@@ -2844,19 +2857,24 @@ class RequestContext(object):
self.sequence = sequence
- def addV1QueryResult(self, data):
+ def addV1QueryResult(self, data, has_props, has_stats):
values = {}
- for prop, val in data.getProperties():
- values[prop.name] = val
- for stat, val in data.getStatistics():
- values[stat.name] = val
+ if has_props:
+ for prop, val in data.getProperties():
+ values[prop.name] = val
+ if has_stats:
+ for stat, val in data.getStatistics():
+ values[stat.name] = val
for key in values:
val = values[key]
if key in self.selectors and val != self.selectors[key]:
return
if self.notifiable:
- self.notifiable(qmf_object=data)
+ if has_props:
+ self.notifiable(qmf_object=data)
+ if has_stats:
+ self.notifiable(qmf_object_stats=data)
else:
self.queryResults.append(data)