From a7ff22a37baac189c1f433fe7785bd3a637953b1 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 27 Jan 2010 14:41:55 +0000 Subject: QPID-2261: add wait in tests for agent setup to complete git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903660 13f79535-47bb-0310-9956-ffa450edef68 --- python/qmf2/tests/basic_method.py | 59 +++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 27 deletions(-) (limited to 'python/qmf2/tests/basic_method.py') diff --git a/python/qmf2/tests/basic_method.py b/python/qmf2/tests/basic_method.py index a054a769d0..3db3af1d96 100644 --- a/python/qmf2/tests/basic_method.py +++ b/python/qmf2/tests/basic_method.py @@ -47,9 +47,10 @@ class _testNotifier(Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() + self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) @@ -108,33 +109,34 @@ class _agentApp(Thread): _obj2.set_value("field4", ["a", "list", "value"]) self.agent.add_object(_obj2) + self.running = False + self.ready = Event() + + def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False + # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + # Agent application main processing loop while self.running: self.notifier.wait_for_work(None) @@ -197,6 +199,10 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + class BaseTest(unittest.TestCase): @@ -206,20 +212,19 @@ class BaseTest(unittest.TestCase): self.defines = self.config.defines def setUp(self): - # one second agent indication interval - self.agent1 = _agentApp("agent1", 1) - self.agent1.connect_agent(self.broker) - self.agent2 = _agentApp("agent2", 1) - self.agent2.connect_agent(self.broker) + # one second agent heartbeat interval + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() def tearDown(self): if self.agent1: - self.agent1.shutdown_agent(10) - self.agent1.stop() + self.agent1.stop_app() self.agent1 = None if self.agent2: - self.agent2.shutdown_agent(10) - self.agent2.stop() + self.agent2.stop_app() self.agent2 = None def test_described_obj(self): -- cgit v1.2.1