diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-04 19:38:55 +0000 |
| commit | f98a59dd32d571096e1d10299d9d8434189d035b (patch) | |
| tree | 2778aae7310a0c1ebefe8efd835a96e795536b3e /qpid/python/qmf2/agent.py | |
| parent | 3a7f44a61ce4cada2bfce431e405652744368ab3 (diff) | |
| download | qpid-python-f98a59dd32d571096e1d10299d9d8434189d035b.tar.gz | |
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/agent.py')
| -rw-r--r-- | qpid/python/qmf2/agent.py | 76 |
1 files changed, 53 insertions, 23 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py index 790cec283c..a6b3c39ad1 100644 --- a/qpid/python/qmf2/agent.py +++ b/qpid/python/qmf2/agent.py @@ -95,6 +95,8 @@ class Agent(Thread): self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier self._heartbeat_interval = _heartbeat_interval + # @todo: currently, max # of objects in a single reply message, would + # be better if it were max bytesize of per-msg content... self._max_msg_size = _max_msg_size self._capacity = _capacity @@ -456,6 +458,38 @@ class Agent(Thread): except SendError, e: logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) + def _send_query_response(self, subject, msgkey, cid, reply_to, objects): + """ + Send a response to a query, breaking the result into multiple + messages based on the agent's _max_msg_size config parameter + """ + + total = len(objects) + if self._max_msg_size: + max_count = self._max_msg_size + else: + max_count = total + + start = 0 + end = min(total, max_count) + while end <= total: + m = Message(properties={"qmf.subject":subject, + "method":"response"}, + correlation_id = cid, + content={msgkey:objects[start:end]}) + self._send_reply(m, reply_to) + if end == total: + break; + start = end + end = min(total, end + max_count) + + # response terminator - last message has empty object array + if total: + m = Message(properties={"qmf.subject":subject, + "method":"response"}, + correlation_id = cid, + content={msgkey: []} ) + self._send_reply(m, reply_to) def _dispatch(self, msg, _direct=False): """ @@ -615,12 +649,11 @@ class Agent(Thread): finally: self._lock.release() - m = Message(properties={"qmf.subject":make_subject(OpCode.data_ind), - "method":"response"}, - content={MsgKey.package_info: pnames} ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id - self._send_reply(m, msg.reply_to) + self._send_query_response(make_subject(OpCode.data_ind), + MsgKey.package_info, + msg.correlation_id, + msg.reply_to, + pnames) def _querySchema( self, msg, query, _idOnly=False ): """ @@ -652,17 +685,15 @@ class Agent(Thread): self._lock.release() if _idOnly: - content = {MsgKey.schema_id: schemas} + msgkey = MsgKey.schema_id else: - content = {MsgKey.schema:schemas} - - m = Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.data_ind)}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + msgkey = MsgKey.schema - self._send_reply(m, msg.reply_to) + self._send_query_response(make_subject(OpCode.data_ind), + msgkey, + msg.correlation_id, + msg.reply_to, + schemas) def _queryData( self, msg, query, _idOnly=False ): @@ -736,17 +767,16 @@ class Agent(Thread): self._lock.release() if _idOnly: - content = {MsgKey.object_id:data_objs} + msgkey = MsgKey.object_id else: - content = {MsgKey.data_obj:data_objs} + msgkey = MsgKey.data_obj - m = Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.data_ind)}, - content=content ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id + self._send_query_response(make_subject(OpCode.data_ind), + msgkey, + msg.correlation_id, + msg.reply_to, + data_objs) - self._send_reply(m, msg.reply_to) ##============================================================================== |
