diff options
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r-- | cpp/src/tests/brokertest.py | 312 |
1 files changed, 150 insertions, 162 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 98f58ebfdd..16d7fb0b78 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -29,6 +29,7 @@ from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger +import qmf.console log = getLogger("qpid.brokertest") @@ -61,24 +62,6 @@ def is_running(pid): class BadProcessStatus(Exception): pass -class ExceptionWrapper: - """Proxy object that adds a message to exceptions raised""" - def __init__(self, obj, msg): - self.obj = obj - self.msg = msg - - def __getattr__(self, name): - func = getattr(self.obj, name) - if type(func) != callable: - return func - return lambda *args, **kwargs: self._wrap(func, args, kwargs) - - def _wrap(self, func, args, kwargs): - try: - return func(*args, **kwargs) - except Exception, e: - raise Exception("%s: %s" %(self.msg, str(e))) - def error_line(filename, n=1): """Get the last n line(s) of filename for error messages""" result = [] @@ -88,7 +71,8 @@ def error_line(filename, n=1): for l in f: if len(result) == n: result.pop(0) result.append(" "+l) - finally: f.close() + finally: + f.close() except: return "" return ":\n" + "".join(result) @@ -96,111 +80,90 @@ def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" + deadline = time.time() + timeout while not function(): - if delay > timeout: delay = timeout + remaining = deadline - time.time() + if remaining <= 0: return False + delay = min(delay, remaining) time.sleep(delay) - timeout -= delay - if timeout <= 0: return False delay *= 2 return True +class AtomicCounter: + def __init__(self): + self.count = 0 + self.lock = Lock() + + def next(self): + self.lock.acquire(); + ret = self.count + self.count += 1 + self.lock.release(); + return ret + +_popen_id = AtomicCounter() # Popen identifier for use in output file names. + +# Constants for file descriptor arguments to Popen +FILE = "FILE" # Write to file named after process +PIPE = subprocess.PIPE + class Popen(subprocess.Popen): """ Can set and verify expectation of process status at end of test. Dumps command line, stdout, stderr to data dir for debugging. """ - class DrainThread(Thread): - """Thread to drain a file object and write the data to a file.""" - def __init__(self, infile, outname): - Thread.__init__(self) - self.infile, self.outname = infile, outname - self.outfile = None - - def run(self): - try: - for line in self.infile: - if self.outfile is None: - self.outfile = open(self.outname, "w") - self.outfile.write(line) - finally: - self.infile.close() - if self.outfile is not None: self.outfile.close() - - class OutStream(ExceptionWrapper): - """Wrapper for output streams, handles exceptions & draining output""" - def __init__(self, infile, outfile, msg): - ExceptionWrapper.__init__(self, infile, msg) - self.infile, self.outfile = infile, outfile - self.thread = None - - def drain(self): - if self.thread is None: - self.thread = Popen.DrainThread(self.infile, self.outfile) - self.thread.start() - - def outfile(self, ext): return "%s.%s" % (self.pname, ext) - - def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True): - """Run cmd (should be a list of arguments) + def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): + """Run cmd (should be a list of program and arguments) expect - if set verify expectation at end of test. - drain - if true (default) drain stdout/stderr to files. + stdout, stderr - can have the same values as for subprocess.Popen as well as + FILE (the default) which means write to a file named after the process. + stdin - like subprocess.Popen but defauts to PIPE """ self._clean = False self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] - self.returncode = None self.expect = expect + self.id = _popen_id.next() + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) + if stdout == FILE: stdout = open(self.outfile("out"), "w") + if stderr == FILE: stderr = open(self.outfile("err"), "w") try: - subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True) - except ValueError: # Windows can't do close_fds - subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE) - self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid) - msg = "Process %s" % self.pname - self.stdin = ExceptionWrapper(self.stdin, msg) - self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg) - self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg) + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr, + close_fds=True) + except ValueError: # Windows can't do close_fds + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr) + f = open(self.outfile("cmd"), "w") - try: f.write(self.cmd_str()) + try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) - if drain: self.drain() - def __str__(self): return "Popen<%s>"%(self.pname) + def __str__(self): return "Popen<%s>"%(self.pname) - def drain(self): - """Start threads to drain stdout/err""" - self.stdout.drain() - self.stderr.drain() - - def _cleanup(self): - """Close pipes to sub-process""" - self._clean_lock.acquire() - try: - if self._clean: return - self._clean = True - self.stdin.close() - self.drain() # Drain output pipes. - self.stdout.thread.join() # Drain thread closes pipe. - self.stderr.thread.join() - finally: self._clean_lock.release() + def outfile(self, ext): return "%s.%s" % (self.pname, ext) def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - + def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - try: - self.kill() - except: - self.unexpected("expected running, exit code %d" % self.wait()) + if self.poll() != None: + self.unexpected("expected running, exit code %d" % self.returncode) + else: + try: + self.kill() + except Exception,e: + self.unexpected("exception from kill: %s" % str(e)) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -212,40 +175,21 @@ class Popen(subprocess.Popen): self.unexpected("expected error") finally: self.wait() # Clean up the process. - + def communicate(self, input=None): - if input: - self.stdin.write(input) - self.stdin.close() - outerr = (self.stdout.read(), self.stderr.read()) - self.wait() - return outerr + ret = subprocess.Popen.communicate(self, input) + self.cleanup() + return ret - def is_running(self): - return self.poll() is None + def is_running(self): return self.poll() is None def assert_running(self): if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) - def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4 - if self.returncode is None: - # Pass _deadstate only if it has been set, there is no _deadstate - # parameter in Python 2.6 - if _deadstate is None: ret = subprocess.Popen.poll(self) - else: ret = subprocess.Popen.poll(self, _deadstate) - - if (ret != -1): - self.returncode = ret - self._cleanup() - return self.returncode - def wait(self): - if self.returncode is None: - self.drain() - try: self.returncode = subprocess.Popen.wait(self) - except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e)) - self._cleanup() - return self.returncode + ret = subprocess.Popen.wait(self) + self._cleanup() + return ret def terminate(self): try: subprocess.Popen.terminate(self) @@ -254,7 +198,8 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - + self._cleanup() + def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method @@ -262,6 +207,20 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') + self._cleanup() + + def _cleanup(self): + """Clean up after a dead process""" + self._clean_lock.acquire() + if not self._clean: + self._clean = True + try: self.stdin.close() + except: pass + try: self.stdout.close() + except: pass + try: self.stderr.close() + except: pass + self._clean_lock.release() def cmd_str(self): return " ".join([str(s) for s in self.cmd]) @@ -288,11 +247,11 @@ class Broker(Popen): while (os.path.exists(self.log)): self.log = "%s-%d.log" % (self.name, i) i += 1 - + def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -318,15 +277,20 @@ class Broker(Popen): cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] - Popen.__init__(self, cmd, expect, drain=False) + if show_cmd: print cmd + Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False + def startQmf(self, handler=None): + self.qmf_session = qmf.console.Session(handler) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + def host(self): return self._host def port(self): @@ -357,7 +321,7 @@ class Broker(Popen): s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() - + def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" @@ -401,13 +365,14 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" - if self._log_ready: return True - self._log_ready = find_in_file("notice Broker running", self.log) + if not self._log_ready: + self._log_ready = find_in_file("notice Broker running", self.log) + return self._log_ready def ready(self, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=30): + if not retry(self.log_ready, timeout=60): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will @@ -416,23 +381,27 @@ class Broker(Popen): c = self.connect(**kwargs) try: c.session() finally: c.close() - except: raise RethrownException( - "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5))) + except Exception,e: raise RethrownException( + "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): - uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() + f = open(os.path.join(self.datadir, "cluster", "store.status")) + try: uuids = f.readlines() + finally: f.close() null_uuid="00000000-0000-0000-0000-000000000000\n" if len(uuids) < 2: return "unknown" # we looked while the file was being updated. if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" - + class Cluster: """A cluster of brokers in a test.""" + # Client connection options for use in failover tests. + CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true" _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -443,16 +412,19 @@ class Cluster: self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait=wait) + self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): - for i in range(count): self.start(expect=expect, wait=wait, args=args) + def ready(self): + for b in self: b.ready() + + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): + for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -481,7 +453,7 @@ class BrokerTest(TestCase): rootdir = os.getcwd() def configure(self, config): self.config=config - + def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) @@ -502,41 +474,50 @@ class BrokerTest(TestCase): """Call thing.stop at end of test""" self.stopem.append(stopable) - def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True): + def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) - p = Popen(cmd, expect, drain) + p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) + b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait) + cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster + def browse(self, session, queue, timeout=0): + """Return a list with the contents of each message on queue.""" + r = session.receiver("%s;{mode:browse}"%(queue)) + r.capacity = 100 + try: + contents = [] + try: + while True: contents.append(r.fetch(timeout=timeout).content) + except messaging.Empty: pass + finally: r.close() + return contents + def assert_browse(self, session, queue, expect_contents, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - - r = session.receiver("%s;{mode:browse}"%(queue)) - actual_contents = [] - try: - for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) - while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. - except messaging.Empty: pass - r.close() + actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) +def join(thread, timeout=10): + thread.join(timeout) + if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) + class RethrownException(Exception): """Captures the stack trace of the current exception to be thrown later""" def __init__(self, msg=""): @@ -554,15 +535,16 @@ class StoppableThread(Thread): def stop(self): self.stopped = True - self.join() + join(self) if self.error: raise self.error - + class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ - def __init__(self, broker, max_depth=None, queue="test-queue"): + def __init__(self, broker, max_depth=None, queue="test-queue", + connection_options=Cluster.CONNECTION_OPTIONS): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -573,9 +555,11 @@ class NumberedSender(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{%s}"%(connection_options), "--content-stdin" ], - expect=EXPECT_RUNNING) + expect=EXPECT_RUNNING, + stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 @@ -590,6 +574,7 @@ class NumberedSender(Thread): try: self.sent = 0 while not self.stopped: + self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: @@ -612,16 +597,17 @@ class NumberedSender(Thread): self.stopped = True self.condition.notify() finally: self.condition.release() - self.join() + join(self) self.write_message(-1) # end-of-messages marker. if self.error: raise self.error - + class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. """ - def __init__(self, broker, sender = None, queue="test-queue"): + def __init__(self, broker, sender = None, queue="test-queue", + connection_options=Cluster.CONNECTION_OPTIONS): """ sender: enable flow control. Call sender.received(n) for each message received. """ @@ -632,22 +618,24 @@ class NumberedReceiver(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", + "--connection-options", "{%s}"%(connection_options), "--forever" ], expect=EXPECT_RUNNING, - drain=False) + stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender + self.received = 0 def read_message(self): return int(self.receiver.stdout.readline()) - + def run(self): try: - self.received = 0 m = self.read_message() while m != -1: + self.receiver.assert_running() assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 @@ -659,7 +647,7 @@ class NumberedReceiver(Thread): def stop(self): """Returns when termination message is received""" - self.join() + join(self) if self.error: raise self.error class ErrorGenerator(StoppableThread): @@ -674,7 +662,7 @@ class ErrorGenerator(StoppableThread): self.broker=broker broker.test.cleanup_stop(self) self.start() - + def run(self): c = self.broker.connect_old() try: |