From a7ff22a37baac189c1f433fe7785bd3a637953b1 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Wed, 27 Jan 2010 14:41:55 +0000 Subject: QPID-2261: add wait in tests for agent setup to complete git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903660 13f79535-47bb-0310-9956-ffa450edef68 --- python/qmf2/tests/agent_discovery.py | 5 +++ python/qmf2/tests/basic_method.py | 59 +++++++++++++++++++----------------- python/qmf2/tests/basic_query.py | 58 +++++++++++++++++++---------------- python/qmf2/tests/events.py | 10 ++++-- python/qmf2/tests/obj_gets.py | 47 ++++++++++++++-------------- 5 files changed, 102 insertions(+), 77 deletions(-) (limited to 'python') diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py index e820c2c839..277018f297 100644 --- a/python/qmf2/tests/agent_discovery.py +++ b/python/qmf2/tests/agent_discovery.py @@ -56,10 +56,14 @@ class _agentApp(Thread): _heartbeat_interval=heartbeat) # No database needed for this test self.running = False + self.ready = Event() def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") def stop_app(self): self.running = False @@ -78,6 +82,7 @@ class _agentApp(Thread): self.broker_url.password) conn.connect() self.agent.set_connection(conn) + self.ready.set() while self.running: self.notifier.wait_for_work(None) diff --git a/python/qmf2/tests/basic_method.py b/python/qmf2/tests/basic_method.py index a054a769d0..3db3af1d96 100644 --- a/python/qmf2/tests/basic_method.py +++ b/python/qmf2/tests/basic_method.py @@ -47,9 +47,10 @@ class _testNotifier(Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() + self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) @@ -108,33 +109,34 @@ class _agentApp(Thread): _obj2.set_value("field4", ["a", "list", "value"]) self.agent.add_object(_obj2) + self.running = False + self.ready = Event() + + def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False + # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + # Agent application main processing loop while self.running: self.notifier.wait_for_work(None) @@ -197,6 +199,10 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + class BaseTest(unittest.TestCase): @@ -206,20 +212,19 @@ class BaseTest(unittest.TestCase): self.defines = self.config.defines def setUp(self): - # one second agent indication interval - self.agent1 = _agentApp("agent1", 1) - self.agent1.connect_agent(self.broker) - self.agent2 = _agentApp("agent2", 1) - self.agent2.connect_agent(self.broker) + # one second agent heartbeat interval + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() def tearDown(self): if self.agent1: - self.agent1.shutdown_agent(10) - self.agent1.stop() + self.agent1.stop_app() self.agent1 = None if self.agent2: - self.agent2.shutdown_agent(10) - self.agent2.stop() + self.agent2.stop_app() self.agent2 = None def test_described_obj(self): diff --git a/python/qmf2/tests/basic_query.py b/python/qmf2/tests/basic_query.py index 2009f5c594..4c55878d2c 100644 --- a/python/qmf2/tests/basic_query.py +++ b/python/qmf2/tests/basic_query.py @@ -47,9 +47,10 @@ class _testNotifier(Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() + self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) @@ -108,33 +109,34 @@ class _agentApp(Thread): _obj2.set_value("field4", ["a", "list", "value"]) self.agent.add_object(_obj2) + self.running = False + self.ready = Event() + + def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False + # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) @@ -143,6 +145,11 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + class BaseTest(unittest.TestCase): @@ -153,19 +160,18 @@ class BaseTest(unittest.TestCase): def setUp(self): # one second agent indication interval - self.agent1 = _agentApp("agent1", 1) - self.agent1.connect_agent(self.broker) - self.agent2 = _agentApp("agent2", 1) - self.agent2.connect_agent(self.broker) + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() def tearDown(self): if self.agent1: - self.agent1.shutdown_agent(10) - self.agent1.stop() + self.agent1.stop_app() self.agent1 = None if self.agent2: - self.agent2.shutdown_agent(10) - self.agent2.stop() + self.agent2.stop_app() self.agent2 = None def test_all_oids(self): diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py index 171cb80aaa..25c749cffa 100644 --- a/python/qmf2/tests/events.py +++ b/python/qmf2/tests/events.py @@ -75,10 +75,16 @@ class _agentApp(Thread): self.agent.register_object_class(_schema) self.running = False + self.ready = Event() def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + # time.sleep(1) + print("!!! agent=%s setup complete (%s)" % (self.agent, time.time())) def stop_app(self): self.running = False @@ -100,6 +106,8 @@ class _agentApp(Thread): raise Skipped(e) self.agent.set_connection(conn) + print("!!! agent=%s connection done (%s)" % (self.agent, time.time())) + self.ready.set() counter = 1 while self.running: @@ -164,9 +172,7 @@ class BaseTest(unittest.TestCase): # find the agents for aname in ["agent1", "agent2"]: - print("!!! finding aname=%s (%s)" % (aname, time.time())) agent = self.console.find_agent(aname, timeout=3) - print("!!! agent=%s aname=%s (%s)" % (agent, aname, time.time())) self.assertTrue(agent and agent.get_name() == aname) # now wait for events diff --git a/python/qmf2/tests/obj_gets.py b/python/qmf2/tests/obj_gets.py index 9287fc7173..f9be25409f 100644 --- a/python/qmf2/tests/obj_gets.py +++ b/python/qmf2/tests/obj_gets.py @@ -47,9 +47,10 @@ class _testNotifier(Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) self.notifier = _testNotifier() + self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) @@ -138,26 +139,17 @@ class _agentApp(Thread): _obj.set_value("key-2", 2) self.agent.add_object(_obj) + self.running = False + self.ready = Event() + + def start_app(self): self.running = True self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False self.notifier.indication() # hmmm... collide with daemon??? self.join(10) @@ -165,6 +157,15 @@ class _agentApp(Thread): raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) @@ -173,6 +174,9 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) class BaseTest(unittest.TestCase): @@ -186,15 +190,14 @@ class BaseTest(unittest.TestCase): def setUp(self): self.agents = [] for i in range(self.agent_count): - agent = _agentApp("agent-" + str(i), 1) - agent.connect_agent(self.broker) + agent = _agentApp("agent-" + str(i), self.broker, 1) + agent.start_app() self.agents.append(agent) def tearDown(self): for agent in self.agents: if agent is not None: - agent.shutdown_agent(10) - agent.stop() + agent.stop_app() def test_all_agents(self): -- cgit v1.2.1