summaryrefslogtreecommitdiff
path: root/python/qmf2/tests
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-01-26 19:38:05 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-01-26 19:38:05 +0000
commitfaeceaf81e73b23260d32503bc09e87f6ed2e735 (patch)
tree9b89ee4964ad533a77dbf54119eb11e7a8768f9f /python/qmf2/tests
parent988872f0eb969a68d53ae303cc0a2aaddd87420f (diff)
downloadqpid-python-faeceaf81e73b23260d32503bc09e87f6ed2e735.tar.gz
QPID-2261: checkpoint qmfv2 python api
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/tests')
-rw-r--r--python/qmf2/tests/__init__.py6
-rw-r--r--python/qmf2/tests/agent_discovery.py255
-rw-r--r--python/qmf2/tests/events.py15
3 files changed, 225 insertions, 51 deletions
diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py
index 2e742b79be..e942c2fbc2 100644
--- a/python/qmf2/tests/__init__.py
+++ b/python/qmf2/tests/__init__.py
@@ -19,4 +19,8 @@
# under the License.
#
-import agent_discovery, basic_query, basic_method, obj_gets, events
+import agent_discovery
+import basic_query
+import basic_method
+import obj_gets
+import events
diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py
index 3c530cc060..e820c2c839 100644
--- a/python/qmf2/tests/agent_discovery.py
+++ b/python/qmf2/tests/agent_discovery.py
@@ -17,6 +17,7 @@
#
import unittest
import logging
+import time
from threading import Thread, Event
import qpid.messaging
@@ -45,40 +46,39 @@ class _testNotifier(qmf2.common.Notifier):
class _agentApp(Thread):
- def __init__(self, name, heartbeat):
+ def __init__(self, name, broker_url, heartbeat):
Thread.__init__(self)
+ self.timeout = 3
+ self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
# No database needed for this test
+ self.running = False
+
+ def start_app(self):
self.running = True
self.start()
- def connect_agent(self, broker_url):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(broker_url.host,
- broker_url.port,
- broker_url.user,
- broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
-
- def disconnect_agent(self, timeout):
- if self.conn:
- self.agent.remove_connection(timeout)
-
- def shutdown_agent(self, timeout):
- self.agent.destroy(timeout)
-
- def stop(self):
+ def stop_app(self):
self.running = False
+ # wake main thread
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
if self.isAlive():
- logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
+ # Connect the agent to the broker,
+ # broker_url = "user/passwd@hostname:port"
+ conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ conn.connect()
+ self.agent.set_connection(conn)
+
while self.running:
self.notifier.wait_for_work(None)
wi = self.agent.get_next_workitem(timeout=0)
@@ -87,6 +87,9 @@ class _agentApp(Thread):
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
+ # done, cleanup agent
+ self.agent.remove_connection(self.timeout)
+ self.agent.destroy(self.timeout)
class BaseTest(unittest.TestCase):
@@ -97,26 +100,27 @@ class BaseTest(unittest.TestCase):
def setUp(self):
# one second agent indication interval
- self.agent1 = _agentApp("agent1", 1)
- self.agent1.connect_agent(self.broker)
- self.agent2 = _agentApp("agent2", 1)
- self.agent2.connect_agent(self.broker)
+ self.agent_heartbeat = 1
+ self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+ self.agent2.start_app()
def tearDown(self):
if self.agent1:
- self.agent1.shutdown_agent(10)
- self.agent1.stop()
+ self.agent1.stop_app()
self.agent1 = None
if self.agent2:
- self.agent2.shutdown_agent(10)
- self.agent2.stop()
+ self.agent2.stop_app()
self.agent2 = None
def test_discover_all(self):
- # create console
- # enable agent discovery
- # wait
- # expect agent add for agent1 and agent2
+ """
+ create console
+ enable agent discovery
+ wait
+ expect agent add for agent1 and agent2
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -154,10 +158,12 @@ class BaseTest(unittest.TestCase):
def test_discover_one(self):
- # create console
- # enable agent discovery, filter for agent1 only
- # wait until timeout
- # expect agent add for agent1 only
+ """
+ create console
+ enable agent discovery, filter for agent1 only
+ wait until timeout
+ expect agent add for agent1 only
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -198,10 +204,12 @@ class BaseTest(unittest.TestCase):
def test_heartbeat(self):
- # create console with 2 sec agent timeout
- # enable agent discovery, find all agents
- # stop agent1, expect timeout notification
- # stop agent2, expect timeout notification
+ """
+ create console with 2 sec agent timeout
+ enable agent discovery, find all agents
+ stop agent1, expect timeout notification
+ stop agent2, expect timeout notification
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=2)
@@ -239,8 +247,7 @@ class BaseTest(unittest.TestCase):
agent1 = self.agent1
self.agent1 = None
- agent1.shutdown_agent(10)
- agent1.stop()
+ agent1.stop_app()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
@@ -265,8 +272,7 @@ class BaseTest(unittest.TestCase):
agent2 = self.agent2
self.agent2 = None
- agent2.shutdown_agent(10)
- agent2.stop()
+ agent2.stop_app()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
@@ -291,11 +297,13 @@ class BaseTest(unittest.TestCase):
def test_find_agent(self):
- # create console
- # do not enable agent discovery
- # find agent1, expect success
- # find agent-none, expect failure
- # find agent2, expect success
+ """
+ create console
+ do not enable agent discovery
+ find agent1, expect success
+ find agent-none, expect failure
+ find agent2, expect success
+ """
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier)
self.conn = qpid.messaging.Connection(self.broker.host,
@@ -318,3 +326,154 @@ class BaseTest(unittest.TestCase):
self.console.destroy(10)
+ def test_heartbeat_x2(self):
+ """
+ create 2 consoles with 2 sec agent timeout
+ enable agent discovery, find all agents
+ stop agent1, expect timeout notification on both consoles
+ stop agent2, expect timeout notification on both consoles
+ """
+ console_count = 2
+ self.consoles = []
+ for i in range(console_count):
+ console = qmf2.console.Console("test-console-" + str(i),
+ notifier=_testNotifier(),
+ agent_timeout=2)
+ conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ conn.connect()
+ console.addConnection(conn)
+ console.enable_agent_discovery()
+ self.consoles.append(console)
+
+ # now wait for all consoles to discover all agents,
+ # agents send a heartbeat once a second
+ for console in self.consoles:
+ agent1_found = agent2_found = False
+ wi = console.get_next_workitem(timeout=2)
+ while wi and not (agent1_found and agent2_found):
+ if wi.get_type() == wi.AGENT_ADDED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = True
+ elif agent.get_name() == "agent2":
+ agent2_found = True
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_found and agent2_found:
+ break;
+ wi = console.get_next_workitem(timeout=2)
+
+ self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+ # now kill agent1 and wait for expiration
+
+ agent1 = self.agent1
+ self.agent1 = None
+ agent1.stop_app()
+
+ for console in self.consoles:
+ agent1_found = True
+ wi = console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_found = False
+ break
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+
+ wi = console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent1_found, "agent1 did not delete!")
+
+ # now kill agent2 and wait for expiration
+
+ agent2 = self.agent2
+ self.agent2 = None
+ agent2.stop_app()
+
+ for console in self.consoles:
+ agent2_found = True
+ wi = console.get_next_workitem(timeout=4)
+ while wi is not None:
+ if wi.get_type() == wi.AGENT_DELETED:
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf2.console.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent2":
+ agent2_found = False
+ break
+ else:
+ self.fail("Unexpected agent_deleted received: %s" %
+ agent.get_name())
+
+ wi = console.get_next_workitem(timeout=4)
+
+ self.assertFalse(agent2_found, "agent2 did not delete!")
+
+
+ for console in self.consoles:
+ console.destroy(10)
+
+
+ def test_find_agent_x2(self):
+ """
+ create 2 consoles, do not enable agent discovery
+ console-1: find agent1, expect success
+ console-2: find agent2, expect success
+ Verify console-1 does -not- know agent2
+ Verify console-2 does -not- know agent1
+ """
+ console_count = 2
+ self.consoles = []
+ for i in range(console_count):
+ console = qmf2.console.Console("test-console-" + str(i),
+ notifier=_testNotifier(),
+ agent_timeout=2)
+ conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ conn.connect()
+ console.addConnection(conn)
+ self.consoles.append(console)
+
+ agent1 = self.consoles[0].find_agent("agent1", timeout=3)
+ self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+ agent2 = self.consoles[1].find_agent("agent2", timeout=3)
+ self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+ # wait long enough for agent heartbeats to be sent...
+
+ time.sleep(self.agent_heartbeat * 2)
+
+ agents = self.consoles[0].get_agents()
+ self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1")
+ agent1 = self.consoles[0].get_agent("agent1")
+ self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+
+ agents = self.consoles[1].get_agents()
+ self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2")
+ agent2 = self.consoles[1].get_agent("agent2")
+ self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+ # verify no new agents were learned
+
+ for console in self.consoles:
+ console.destroy(10)
+
diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py
index b2d934728d..171cb80aaa 100644
--- a/python/qmf2/tests/events.py
+++ b/python/qmf2/tests/events.py
@@ -22,6 +22,7 @@ import logging
from threading import Thread, Event
import qpid.messaging
+from qpid.harness import Skipped
from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
QmfData, QmfQueryPredicate, SchemaEventClass,
@@ -93,7 +94,11 @@ class _agentApp(Thread):
self.broker_url.port,
self.broker_url.user,
self.broker_url.password)
- conn.connect()
+ try:
+ conn.connect()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
self.agent.set_connection(conn)
counter = 1
@@ -150,12 +155,18 @@ class BaseTest(unittest.TestCase):
self.broker.port,
self.broker.user,
self.broker.password)
- self.conn.connect()
+ try:
+ self.conn.connect()
+ except qpid.messaging.ConnectError, e:
+ raise Skipped(e)
+
self.console.addConnection(self.conn)
# find the agents
for aname in ["agent1", "agent2"]:
+ print("!!! finding aname=%s (%s)" % (aname, time.time()))
agent = self.console.find_agent(aname, timeout=3)
+ print("!!! agent=%s aname=%s (%s)" % (agent, aname, time.time()))
self.assertTrue(agent and agent.get_name() == aname)
# now wait for events