summaryrefslogtreecommitdiff
path: root/qpid/python/qmf2/agent.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-02-04 19:38:55 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-02-04 19:38:55 +0000
commitf98a59dd32d571096e1d10299d9d8434189d035b (patch)
tree2778aae7310a0c1ebefe8efd835a96e795536b3e /qpid/python/qmf2/agent.py
parent3a7f44a61ce4cada2bfce431e405652744368ab3 (diff)
downloadqpid-python-f98a59dd32d571096e1d10299d9d8434189d035b.tar.gz
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/agent.py')
-rw-r--r--qpid/python/qmf2/agent.py76
1 files changed, 53 insertions, 23 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py
index 790cec283c..a6b3c39ad1 100644
--- a/qpid/python/qmf2/agent.py
+++ b/qpid/python/qmf2/agent.py
@@ -95,6 +95,8 @@ class Agent(Thread):
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
self._heartbeat_interval = _heartbeat_interval
+ # @todo: currently, max # of objects in a single reply message, would
+ # be better if it were max bytesize of per-msg content...
self._max_msg_size = _max_msg_size
self._capacity = _capacity
@@ -456,6 +458,38 @@ class Agent(Thread):
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
+ def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ """
+ Send a response to a query, breaking the result into multiple
+ messages based on the agent's _max_msg_size config parameter
+ """
+
+ total = len(objects)
+ if self._max_msg_size:
+ max_count = self._max_msg_size
+ else:
+ max_count = total
+
+ start = 0
+ end = min(total, max_count)
+ while end <= total:
+ m = Message(properties={"qmf.subject":subject,
+ "method":"response"},
+ correlation_id = cid,
+ content={msgkey:objects[start:end]})
+ self._send_reply(m, reply_to)
+ if end == total:
+ break;
+ start = end
+ end = min(total, end + max_count)
+
+ # response terminator - last message has empty object array
+ if total:
+ m = Message(properties={"qmf.subject":subject,
+ "method":"response"},
+ correlation_id = cid,
+ content={msgkey: []} )
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -615,12 +649,11 @@ class Agent(Thread):
finally:
self._lock.release()
- m = Message(properties={"qmf.subject":make_subject(OpCode.data_ind),
- "method":"response"},
- content={MsgKey.package_info: pnames} )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
- self._send_reply(m, msg.reply_to)
+ self._send_query_response(make_subject(OpCode.data_ind),
+ MsgKey.package_info,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
def _querySchema( self, msg, query, _idOnly=False ):
"""
@@ -652,17 +685,15 @@ class Agent(Thread):
self._lock.release()
if _idOnly:
- content = {MsgKey.schema_id: schemas}
+ msgkey = MsgKey.schema_id
else:
- content = {MsgKey.schema:schemas}
-
- m = Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.data_ind)},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ msgkey = MsgKey.schema
- self._send_reply(m, msg.reply_to)
+ self._send_query_response(make_subject(OpCode.data_ind),
+ msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
def _queryData( self, msg, query, _idOnly=False ):
@@ -736,17 +767,16 @@ class Agent(Thread):
self._lock.release()
if _idOnly:
- content = {MsgKey.object_id:data_objs}
+ msgkey = MsgKey.object_id
else:
- content = {MsgKey.data_obj:data_objs}
+ msgkey = MsgKey.data_obj
- m = Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.data_ind)},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ self._send_query_response(make_subject(OpCode.data_ind),
+ msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ data_objs)
- self._send_reply(m, msg.reply_to)
##==============================================================================