diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/tests/brokertest.py | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r-- | cpp/src/tests/brokertest.py | 102 |
1 files changed, 13 insertions, 89 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index aea4460e5a..70c145a51b 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -17,8 +17,7 @@ # under the License. # -# Support library for tests that start multiple brokers, e.g. cluster -# or federation +# Support library for tests that start multiple brokers, e.g. HA or federation import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal @@ -203,8 +202,10 @@ class Popen(subprocess.Popen): self.wait() def kill(self): - try: - subprocess.Popen.kill(self) + # Set to EXPECT_UNKNOWN, EXPECT_EXIT_FAIL creates a race condition + # if the process exits normally concurrent with the call to kill. + self.expect = EXPECT_UNKNOWN + try: subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) @@ -380,8 +381,7 @@ class Broker(Popen): if not retry(self.log_ready, timeout=timeout): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) - # Create a connection and a session. For a cluster broker this will - # return after cluster init has finished. + # Create a connection and a session. try: c = self.connect(**kwargs) try: c.session() @@ -389,54 +389,6 @@ class Broker(Popen): except Exception,e: raise RethrownException( "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) - def store_state(self): - f = open(os.path.join(self.datadir, "cluster", "store.status")) - try: uuids = f.readlines() - finally: f.close() - null_uuid="00000000-0000-0000-0000-000000000000\n" - if len(uuids) < 2: return "unknown" # we looked while the file was being updated. - if uuids[0] == null_uuid: return "empty" - if uuids[1] == null_uuid: return "dirty" - return "clean" - -class Cluster: - """A cluster of brokers in a test.""" - # Client connection options for use in failover tests. - CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true" - - _cluster_count = 0 - - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): - self.test = test - self._brokers=[] - self.name = "cluster%d" % Cluster._cluster_count - Cluster._cluster_count += 1 - # Use unique cluster name - self.args = copy(args) - self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] - self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] - assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" - self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) - - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): - """Add a broker to the cluster. Returns the index of the new broker.""" - if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) - return self._brokers[-1] - - def ready(self): - for b in self: b.ready() - - def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): - for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - - def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) @@ -473,7 +425,6 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) - cluster_lib = os.getenv("CLUSTER_LIB") ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") @@ -525,11 +476,6 @@ class BrokerTest(TestCase): raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): - """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) - return cluster - def browse(self, *args, **kwargs): browse(*args, **kwargs) def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) @@ -558,14 +504,17 @@ class StoppableThread(Thread): join(self) if self.error: raise self.error +# Options for a client that wants to reconnect automatically. +RECONNECT_OPTIONS="reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true" + class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ def __init__(self, broker, max_depth=None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS, - failover_updates=True, url=None): + connection_options=RECONNECT_OPTIONS, + failover_updates=True, url=None, args=[]): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -576,7 +525,7 @@ class NumberedSender(Thread): "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--content-stdin" - ] + ] + args if failover_updates: cmd += ["--failover-updates"] self.sender = broker.test.popen( cmd, expect=EXPECT_RUNNING, stdin=PIPE) @@ -627,7 +576,7 @@ class NumberedReceiver(Thread): sequentially numbered messages. """ def __init__(self, broker, sender=None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS, + connection_options=RECONNECT_OPTIONS, failover_updates=True, url=None): """ sender: enable flow control. Call sender.received(n) for each message received. @@ -676,31 +625,6 @@ class NumberedReceiver(Thread): join(self) self.check() -class ErrorGenerator(StoppableThread): - """ - Thread that continuously generates errors by trying to consume from - a non-existent queue. For cluster regression tests, error handling - caused issues in the past. - """ - - def __init__(self, broker): - StoppableThread.__init__(self) - self.broker=broker - broker.test.cleanup_stop(self) - self.start() - - def run(self): - c = self.broker.connect_old() - try: - while not self.stopped: - try: - c.session(str(qpid.datatypes.uuid4())).message_subscribe( - queue="non-existent-queue") - assert(False) - except qpid.session.SessionException: pass - time.sleep(0.01) - except: pass # Normal if broker is killed. - def import_script(path): """ Import executable script at path as a module. |