diff options
Diffstat (limited to 'python/qmf2/tests/agent_discovery.py')
| -rw-r--r-- | python/qmf2/tests/agent_discovery.py | 255 |
1 files changed, 207 insertions, 48 deletions
diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py index 3c530cc060..e820c2c839 100644 --- a/python/qmf2/tests/agent_discovery.py +++ b/python/qmf2/tests/agent_discovery.py @@ -17,6 +17,7 @@ # import unittest import logging +import time from threading import Thread, Event import qpid.messaging @@ -45,40 +46,39 @@ class _testNotifier(qmf2.common.Notifier): class _agentApp(Thread): - def __init__(self, name, heartbeat): + def __init__(self, name, broker_url, heartbeat): Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url self.notifier = _testNotifier() self.agent = qmf2.agent.Agent(name, _notifier=self.notifier, _heartbeat_interval=heartbeat) # No database needed for this test + self.running = False + + def start_app(self): self.running = True self.start() - def connect_agent(self, broker_url): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(broker_url.host, - broker_url.port, - broker_url.user, - broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - - def disconnect_agent(self, timeout): - if self.conn: - self.agent.remove_connection(timeout) - - def shutdown_agent(self, timeout): - self.agent.destroy(timeout) - - def stop(self): + def stop_app(self): self.running = False + # wake main thread self.notifier.indication() # hmmm... collide with daemon??? self.join(10) if self.isAlive(): - logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") def run(self): + # Connect the agent to the broker, + # broker_url = "user/passwd@hostname:port" + conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + conn.connect() + self.agent.set_connection(conn) + while self.running: self.notifier.wait_for_work(None) wi = self.agent.get_next_workitem(timeout=0) @@ -87,6 +87,9 @@ class _agentApp(Thread): self.agent.release_workitem(wi) wi = self.agent.get_next_workitem(timeout=0) + # done, cleanup agent + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) class BaseTest(unittest.TestCase): @@ -97,26 +100,27 @@ class BaseTest(unittest.TestCase): def setUp(self): # one second agent indication interval - self.agent1 = _agentApp("agent1", 1) - self.agent1.connect_agent(self.broker) - self.agent2 = _agentApp("agent2", 1) - self.agent2.connect_agent(self.broker) + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() def tearDown(self): if self.agent1: - self.agent1.shutdown_agent(10) - self.agent1.stop() + self.agent1.stop_app() self.agent1 = None if self.agent2: - self.agent2.shutdown_agent(10) - self.agent2.stop() + self.agent2.stop_app() self.agent2 = None def test_discover_all(self): - # create console - # enable agent discovery - # wait - # expect agent add for agent1 and agent2 + """ + create console + enable agent discovery + wait + expect agent add for agent1 and agent2 + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -154,10 +158,12 @@ class BaseTest(unittest.TestCase): def test_discover_one(self): - # create console - # enable agent discovery, filter for agent1 only - # wait until timeout - # expect agent add for agent1 only + """ + create console + enable agent discovery, filter for agent1 only + wait until timeout + expect agent add for agent1 only + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -198,10 +204,12 @@ class BaseTest(unittest.TestCase): def test_heartbeat(self): - # create console with 2 sec agent timeout - # enable agent discovery, find all agents - # stop agent1, expect timeout notification - # stop agent2, expect timeout notification + """ + create console with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification + stop agent2, expect timeout notification + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=2) @@ -239,8 +247,7 @@ class BaseTest(unittest.TestCase): agent1 = self.agent1 self.agent1 = None - agent1.shutdown_agent(10) - agent1.stop() + agent1.stop_app() wi = self.console.get_next_workitem(timeout=4) while wi is not None: @@ -265,8 +272,7 @@ class BaseTest(unittest.TestCase): agent2 = self.agent2 self.agent2 = None - agent2.shutdown_agent(10) - agent2.stop() + agent2.stop_app() wi = self.console.get_next_workitem(timeout=4) while wi is not None: @@ -291,11 +297,13 @@ class BaseTest(unittest.TestCase): def test_find_agent(self): - # create console - # do not enable agent discovery - # find agent1, expect success - # find agent-none, expect failure - # find agent2, expect success + """ + create console + do not enable agent discovery + find agent1, expect success + find agent-none, expect failure + find agent2, expect success + """ self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier) self.conn = qpid.messaging.Connection(self.broker.host, @@ -318,3 +326,154 @@ class BaseTest(unittest.TestCase): self.console.destroy(10) + def test_heartbeat_x2(self): + """ + create 2 consoles with 2 sec agent timeout + enable agent discovery, find all agents + stop agent1, expect timeout notification on both consoles + stop agent2, expect timeout notification on both consoles + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + conn.connect() + console.addConnection(conn) + console.enable_agent_discovery() + self.consoles.append(console) + + # now wait for all consoles to discover all agents, + # agents send a heartbeat once a second + for console in self.consoles: + agent1_found = agent2_found = False + wi = console.get_next_workitem(timeout=2) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + wi = console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.stop_app() + + for console in self.consoles: + agent1_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.stop_app() + + for console in self.consoles: + agent2_found = True + wi = console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + break + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + + wi = console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + + for console in self.consoles: + console.destroy(10) + + + def test_find_agent_x2(self): + """ + create 2 consoles, do not enable agent discovery + console-1: find agent1, expect success + console-2: find agent2, expect success + Verify console-1 does -not- know agent2 + Verify console-2 does -not- know agent1 + """ + console_count = 2 + self.consoles = [] + for i in range(console_count): + console = qmf2.console.Console("test-console-" + str(i), + notifier=_testNotifier(), + agent_timeout=2) + conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + conn.connect() + console.addConnection(conn) + self.consoles.append(console) + + agent1 = self.consoles[0].find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + agent2 = self.consoles[1].find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # wait long enough for agent heartbeats to be sent... + + time.sleep(self.agent_heartbeat * 2) + + agents = self.consoles[0].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1") + agent1 = self.consoles[0].get_agent("agent1") + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + + agents = self.consoles[1].get_agents() + self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2") + agent2 = self.consoles[1].get_agent("agent2") + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + # verify no new agents were learned + + for console in self.consoles: + console.destroy(10) + |
