diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-25 21:16:47 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-25 21:16:47 +0000 |
| commit | 8139ebac20c94cb1526aa5a9c4bcb8fb2e2c410d (patch) | |
| tree | 093226798f61501055f99aeb6dff6e5c14f46b7d | |
| parent | 588219ac990912cecf77c90eaa77fba3dcdd5668 (diff) | |
| download | qpid-python-8139ebac20c94cb1526aa5a9c4bcb8fb2e2c410d.tar.gz | |
QPID-2261: add console async subscription api, and tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916462 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/console.py | 309 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py | 224 |
2 files changed, 408 insertions, 125 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index 7c55184bfe..afd20c3655 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -151,6 +151,24 @@ class _AsyncMailbox(_Mailbox): console._wake_thread() + def reset_timeout(self, _timeout=None): + """ Reset the expiration date for this mailbox. + """ + if _timeout is None: + _timeout = self.console._reply_timeout + self.console._lock.acquire() + try: + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + self.console._next_mbox_expire = None + finally: + self.console._lock.release() + + # wake the console mgmt thread so it will learn about the mbox + # expiration date (and adjust its idle sleep period correctly) + + self.console._wake_thread() + def deliver(self, msg): """ """ @@ -353,32 +371,81 @@ class _MethodMailbox(_AsyncMailbox): class _SubscriptionMailbox(_AsyncMailbox): """ - A Mailbox for a single subscription. + A Mailbox for a single subscription. Allows only sychronous "subscribe" + and "refresh" requests. """ - def __init__(self, console, lifetime, context, agent): + def __init__(self, console, context, agent, duration, interval): """ Invoked by application thread. """ - super(_SubscriptionMailbox, self).__init__(console, lifetime) + super(_SubscriptionMailbox, self).__init__(console, duration) self.cv = Condition() self.data = [] self.result = [] self.context = context + self.duration = duration + self.interval = interval self.agent_name = agent.get_name() self.agent_subscription_id = None # from agent + def subscribe(self, query): + agent = self.console.get_agent(self.agent_name) + if not agent: + logging.warning("subscribed failed - unknown agent '%s'" % + self.agent_name) + return False + try: + logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name) + agent._send_subscribe_req(query, self.get_address(), self.interval, + self.duration) + except SendError, e: + logging.error(str(e)) + return False + return True + + def resubscribe(self, duration): + agent = self.console.get_agent(self.agent_name) + if not agent: + logging.warning("resubscribed failed - unknown agent '%s'" % + self.agent_name) + return False + try: + logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name) + agent._send_resubscribe_req(self.get_address(), + self.agent_subscription_id, duration) + except SendError, e: + logging.error(str(e)) + return False + return True + def deliver(self, msg): """ """ opcode = msg.properties.get("qmf.opcode") if (opcode == OpCode.subscribe_rsp or opcode == OpCode.subscribe_refresh_rsp): - # - # sync only - just deliver the msg - # + + error = msg.content.get("_error") + if error: + try: + e_map = QmfData.from_map(error) + except TypeError: + logging.warning("Invalid QmfData map received: '%s'" + % str(error)) + e_map = QmfData.create({"error":"Unknown error"}) + sp = SubscribeParams(None, None, None, e_map) + else: + self.agent_subscription_id = msg.content.get("_subscription_id") + self.duration = msg.content.get("_duration", self.duration) + self.interval = msg.content.get("_interval", self.interval) + self.reset_timeout(self.duration) + sp = SubscribeParams(self.get_address(), + self.interval, + self.duration, + None) self.cv.acquire() try: - self.data.append(msg) + self.data.append(sp) # if was empty, notify waiters if len(self.data) == 1: self.cv.notify() @@ -386,43 +453,7 @@ class _SubscriptionMailbox(_AsyncMailbox): self.cv.release() return - # sid = msg.content.get("_subscription_id") - # lifetime = msg.content.get("_duration") - # error = msg.content.get("_error") - # sp = SubscribeParams(sid, - # msg.content.get("_interval"), - # lifetime, error) - # if sid and self.subscription_id is None: - # self.subscription_id = sid - # if lifetime: - # self.console._lock.acquire() - # try: - # self.expiration_date = (datetime.datetime.utcnow() + - # datetime.timedelta(seconds=lifetime)) - # finally: - # self.console._lock.release() - - # if self.waiting: - # self.cv.acquire() - # try: - # self.data.append(sp) - # # if was empty, notify waiters - # if len(self._data) == 1: - # self._cv.notify() - # finally: - # self._cv.release() - # else: - # if opcode == OpCode.subscribe_rsp: - # wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, - # self.context, sp) - # else: - # wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, - # self.context, sp) - # self.console._work_q.put(wi) - # self.console._work_q_put = True - # if error: - # self.destroy() - + # else: data indication agent_name = msg.properties.get("qmf.agent") if not agent_name: logging.warning("Ignoring data_ind - no agent name given: %s" % @@ -472,6 +503,72 @@ class _SubscriptionMailbox(_AsyncMailbox): +class _AsyncSubscriptionMailbox(_SubscriptionMailbox): + """ + A Mailbox for a single subscription. Allows only asychronous "subscribe" + and "refresh" requests. + """ + def __init__(self, console, context, agent, duration, interval): + """ + Invoked by application thread. + """ + super(_AsyncSubscriptionMailbox, self).__init__(console, context, + agent, duration, + interval) + self.subscribe_pending = False + self.resubscribe_pending = False + + + def subscribe(self, query, reply_timeout): + if super(_AsyncSubscriptionMailbox, self).subscribe(query): + self.subscribe_pending = True + self.reset_timeout(reply_timeout) + return True + return False + + def resubscribe(self, duration, reply_timeout): + if super(_AsyncSubscriptionMailbox, self).resubscribe(duration): + self.resubscribe_pending = True + self.reset_timeout(reply_timeout) + return True + return False + + def deliver(self, msg): + """ + """ + super(_AsyncSubscriptionMailbox, self).deliver(msg) + sp = self.fetch(0) + if sp: + # if the message was a reply to a subscribe or + # re-subscribe, then we get here. + if self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, + self.context, sp) + else: + wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, + self.context, sp) + + self.subscribe_pending = False + self.resubscribe_pending = False + + self.console._work_q.put(wi) + self.console._work_q_put = True + + if not sp.succeeded(): + self.destroy() + + + def expire(self): + """ Either the subscription expired, or a request timedout. + """ + if self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, + self.context, None) + elif self.resubscribe_pending: + wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, + self.context, None) + self.destroy() + ##============================================================================== ## DATA MODEL @@ -1264,111 +1361,73 @@ class Console(Thread): def create_subscription(self, agent, query, console_handle, _interval=None, _duration=None, - _reply_handle=None, _timeout=None): + _blocking=True, _timeout=None): if not _duration: _duration = self._subscribe_timeout - if _reply_handle is not None: - assert(False) # async TBD - else: - mbox = _SubscriptionMailbox(self, _duration, console_handle, agent) - - cid = mbox.get_address() - - try: - logging.debug("Sending Subscribe to Agent (%s)" % time.time()) - agent._send_subscribe_req(query, cid, _interval, _duration) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return None + if _timeout is None: + _timeout = self._reply_timeout - if _reply_handle is not None: + if not _blocking: + mbox = _AsyncSubscriptionMailbox(self, console_handle, agent, + _duration, _interval) + if not mbox.subscribe(query, _timeout): + mbox.destroy() + return False return True + else: + mbox = _SubscriptionMailbox(self, console_handle, agent, _duration, + _interval) - # wait for reply - if _timeout is None: - _timeout = self._reply_timeout + if not mbox.subscribe(query): + mbox.destroy() + return None - logging.debug("Waiting for response to subscription (%s)" % _timeout) - # @todo: what if mbox expires here? - replyMsg = mbox.fetch(_timeout) + logging.debug("Waiting for response to subscription (%s)" % _timeout) + # @todo: what if mbox expires here? + sp = mbox.fetch(_timeout) - if not replyMsg: - logging.debug("Subscription request wait timed-out.") - mbox.destroy() - return None + if not sp: + logging.debug("Subscription request wait timed-out.") + mbox.destroy() + return None - error = replyMsg.content.get("_error") - if error: - mbox.destroy() - try: - e_map = QmfData.from_map(error) - except TypeError: - e_map = QmfData.create({"error":"Unknown error"}) - return SubscribeParams(None, None, None, e_map) + if not sp.succeeded(): + mbox.destroy() - mbox.agent_subscription_id = replyMsg.content.get("_subscription_id") - return SubscribeParams(mbox.get_address(), - replyMsg.content.get("_interval"), - replyMsg.content.get("_duration"), - None) + return sp def refresh_subscription(self, subscription_id, _duration=None, - _reply_handle=None, _timeout=None): - if _reply_handle is not None: - assert(False) # async TBD + _timeout=None): + if _timeout is None: + _timeout = self._reply_timeout mbox = self._get_mailbox(subscription_id) if not mbox: logging.warning("Subscription %s not found." % subscription_id) return None - agent = self.get_agent(mbox.agent_name) - if not agent: - logging.warning("Subscription %s agent %s not found." % - (mbox.agent_name, subscription_id)) - return None - - try: - logging.debug("Sending Subscribe to Agent (%s)" % time.time()) - agent._send_resubscribe_req(subscription_id, - mbox.agent_subscription_id, - _duration) - except SendError, e: - logging.error(str(e)) - # @todo ???? mbox.destroy() - return None + if isinstance(mbox, _AsyncSubscriptionMailbox): + return mbox.resubscribe(_duration, _timeout) + else: + # synchronous - wait for reply + if not mbox.resubscribe(_duration): + # @todo ???? mbox.destroy() + return None - if _reply_handle is not None: - return True + # wait for reply - # wait for reply - if _timeout is None: - _timeout = self._reply_timeout + logging.debug("Waiting for response to subscription (%s)" % _timeout) + sp = mbox.fetch(_timeout) - logging.debug("Waiting for response to subscription (%s)" % _timeout) - replyMsg = mbox.fetch(_timeout) + if not sp: + logging.debug("re-subscribe request wait timed-out.") + # @todo???? mbox.destroy() + return None - if not replyMsg: - logging.debug("Subscription request wait timed-out.") - # @todo???? mbox.destroy() - return None + return sp - error = replyMsg.content.get("_error") - if error: - # @todo mbox.destroy() - try: - e_map = QmfData.from_map(error) - except TypeError: - e_map = QmfData.create({"error":"Unknown error"}) - return SubscribeParams(None, None, None, e_map) - - return SubscribeParams(mbox.get_address(), - replyMsg.content.get("_interval"), - replyMsg.content.get("_duration"), - None) def cancel_subscription(self, subscription_id): """ diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py index 84726a775c..750952df46 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py @@ -582,3 +582,227 @@ class BaseTest(unittest.TestCase): self.console.destroy(10) + def test_async_by_obj_id_schema(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + r_count = 0 + sp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 6) + + self.console.destroy(10) + + def test_async_refresh(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + # refresh after third data indication + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + sp = None + rp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + rp = wi.get_params() + self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams)) + self.assertTrue(rp.succeeded()) + self.assertTrue(rp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + if r_count == 4: # + 1 for subscribe reply + rp = self.console.refresh_subscription(sp.get_subscription_id()) + self.assertTrue(rp) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription, + 2 replys + self.assertTrue(r_count > 7) + + self.console.destroy(10) + + + def test_async_cancel(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + # cancel after first data indication + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + sp = None + rp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.cancel_subscription(sp.get_subscription_id()) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 1 subscribe reply and 1 data_indication + self.assertTrue(r_count == 2) + + self.console.destroy(10) |
