summaryrefslogtreecommitdiff
path: root/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r--cpp/src/tests/brokertest.py671
1 files changed, 0 insertions, 671 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
deleted file mode 100644
index a19dd305e5..0000000000
--- a/cpp/src/tests/brokertest.py
+++ /dev/null
@@ -1,671 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Support library for tests that start multiple brokers, e.g. cluster
-# or federation
-
-import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
-import qpid, traceback, signal
-from qpid import connection, messaging, util
-from qpid.compat import format_exc
-from qpid.harness import Skipped
-from unittest import TestCase
-from copy import copy
-from threading import Thread, Lock, Condition
-from logging import getLogger
-import qmf.console
-
-log = getLogger("qpid.brokertest")
-
-# Values for expected outcome of process at end of test
-EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
-EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
-EXPECT_RUNNING=3 # Expect to still be running at end of test
-EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
-
-def find_exe(program):
- """Find an executable in the system PATH"""
- def is_exe(fpath):
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
- mydir, name = os.path.split(program)
- if mydir:
- if is_exe(program): return program
- else:
- for path in os.environ["PATH"].split(os.pathsep):
- exe_file = os.path.join(path, program)
- if is_exe(exe_file): return exe_file
- return None
-
-def is_running(pid):
- try:
- os.kill(pid, 0)
- return True
- except:
- return False
-
-class BadProcessStatus(Exception):
- pass
-
-def error_line(filename, n=1):
- """Get the last n line(s) of filename for error messages"""
- result = []
- try:
- f = open(filename)
- try:
- for l in f:
- if len(result) == n: result.pop(0)
- result.append(" "+l)
- finally:
- f.close()
- except: return ""
- return ":\n" + "".join(result)
-
-def retry(function, timeout=10, delay=.01):
- """Call function until it returns True or timeout expires.
- Double the delay for each retry. Return True if function
- returns true, False if timeout expires."""
- deadline = time.time() + timeout
- while not function():
- remaining = deadline - time.time()
- if remaining <= 0: return False
- delay = min(delay, remaining)
- time.sleep(delay)
- delay *= 2
- return True
-
-class AtomicCounter:
- def __init__(self):
- self.count = 0
- self.lock = Lock()
-
- def next(self):
- self.lock.acquire();
- ret = self.count
- self.count += 1
- self.lock.release();
- return ret
-
-_popen_id = AtomicCounter() # Popen identifier for use in output file names.
-
-# Constants for file descriptor arguments to Popen
-FILE = "FILE" # Write to file named after process
-PIPE = subprocess.PIPE
-
-class Popen(subprocess.Popen):
- """
- Can set and verify expectation of process status at end of test.
- Dumps command line, stdout, stderr to data dir for debugging.
- """
-
- def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
- """Run cmd (should be a list of program and arguments)
- expect - if set verify expectation at end of test.
- stdout, stderr - can have the same values as for subprocess.Popen as well as
- FILE (the default) which means write to a file named after the process.
- stdin - like subprocess.Popen but defauts to PIPE
- """
- self._clean = False
- self._clean_lock = Lock()
- assert find_exe(cmd[0]), "executable not found: "+cmd[0]
- if type(cmd) is type(""): cmd = [cmd] # Make it a list.
- self.cmd = [ str(x) for x in cmd ]
- self.expect = expect
- self.id = _popen_id.next()
- self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
- if stdout == FILE: stdout = open(self.outfile("out"), "w")
- if stderr == FILE: stderr = open(self.outfile("err"), "w")
- try:
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr,
- close_fds=True)
- except ValueError: # Windows can't do close_fds
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr)
-
- f = open(self.outfile("cmd"), "w")
- try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
- finally: f.close()
- log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
-
- def __str__(self): return "Popen<%s>"%(self.pname)
-
- def outfile(self, ext): return "%s.%s" % (self.pname, ext)
-
- def unexpected(self,msg):
- err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
- raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-
- def stop(self): # Clean up at end of test.
- try:
- if self.expect == EXPECT_UNKNOWN:
- try: self.kill() # Just make sure its dead
- except: pass
- elif self.expect == EXPECT_RUNNING:
- try: self.kill()
- except: self.unexpected("expected running, exit code %d" % self.wait())
- else:
- retry(lambda: self.poll() is not None)
- if self.returncode is None: # Still haven't stopped
- self.kill()
- self.unexpected("still running")
- elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
- self.unexpected("exit code %d" % self.returncode)
- elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
- self.unexpected("expected error")
- finally:
- self.wait() # Clean up the process.
-
- def communicate(self, input=None):
- ret = subprocess.Popen.communicate(self, input)
- self.cleanup()
- return ret
-
- def is_running(self): return self.poll() is None
-
- def assert_running(self):
- if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
-
- def wait(self):
- ret = subprocess.Popen.wait(self)
- self._cleanup()
- return ret
-
- def terminate(self):
- try: subprocess.Popen.terminate(self)
- except AttributeError: # No terminate method
- try:
- os.kill( self.pid , signal.SIGTERM)
- except AttributeError: # no os.kill, using taskkill.. (Windows only)
- os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
-
- def kill(self):
- try: subprocess.Popen.kill(self)
- except AttributeError: # No terminate method
- try:
- os.kill( self.pid , signal.SIGKILL)
- except AttributeError: # no os.kill, using taskkill.. (Windows only)
- os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
-
- def _cleanup(self):
- """Clean up after a dead process"""
- self._clean_lock.acquire()
- if not self._clean:
- self._clean = True
- try: self.stdin.close()
- except: pass
- try: self.stdout.close()
- except: pass
- try: self.stderr.close()
- except: pass
- self._clean_lock.release()
-
- def cmd_str(self): return " ".join([str(s) for s in self.cmd])
-
-def checkenv(name):
- value = os.getenv(name)
- if not value: raise Exception("Environment variable %s is not set" % name)
- return value
-
-def find_in_file(str, filename):
- if not os.path.exists(filename): return False
- f = open(filename)
- try: return str in f.read()
- finally: f.close()
-
-class Broker(Popen):
- "A broker process. Takes care of start, stop and logging."
- _broker_count = 0
-
- def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
-
- def find_log(self):
- self.log = "%s.log" % self.name
- i = 1
- while (os.path.exists(self.log)):
- self.log = "%s-%d.log" % (self.name, i)
- i += 1
-
- def get_log(self):
- return os.path.abspath(self.log)
-
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
- """Start a broker daemon. name determines the data-dir and log
- file names."""
-
- self.test = test
- self._port=port
- if BrokerTest.store_lib:
- args = args + ['--load-module', BrokerTest.store_lib]
- if BrokerTest.sql_store_lib:
- args = args + ['--load-module', BrokerTest.sql_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
- if BrokerTest.sql_clfs_store_lib:
- args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
- cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args
- if not "--auth" in args: cmd.append("--auth=no")
- if wait != None:
- cmd += ["--wait", str(wait)]
- if name: self.name = name
- else:
- self.name = "broker%d" % Broker._broker_count
- Broker._broker_count += 1
- self.find_log()
- cmd += ["--log-to-file", self.log]
- cmd += ["--log-to-stderr=no"]
- if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
- self.datadir = self.name
- cmd += ["--data-dir", self.datadir]
- Popen.__init__(self, cmd, expect, stdout=PIPE)
- test.cleanup_stop(self)
- self._host = "127.0.0.1"
- log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
- self._log_ready = False
-
- def startQmf(self, handler=None):
- self.qmf_session = qmf.console.Session(handler)
- self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
-
- def host(self): return self._host
-
- def port(self):
- # Read port from broker process stdout if not already read.
- if (self._port == 0):
- try: self._port = int(self.stdout.readline())
- except ValueError:
- raise Exception("Can't get port for broker %s (%s)%s" %
- (self.name, self.pname, error_line(self.log,5)))
- return self._port
-
- def unexpected(self,msg):
- raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
-
- def connect(self, **kwargs):
- """New API connection to the broker."""
- return messaging.Connection.establish(self.host_port(), **kwargs)
-
- def connect_old(self):
- """Old API connection to the broker."""
- socket = qpid.util.connect(self.host(),self.port())
- connection = qpid.connection.Connection (sock=socket)
- connection.start()
- return connection;
-
- def declare_queue(self, queue):
- c = self.connect_old()
- s = c.session(str(qpid.datatypes.uuid4()))
- s.queue_declare(queue=queue)
- c.close()
-
- def _prep_sender(self, queue, durable, xprops):
- s = queue + "; {create:always, node:{durable:" + str(durable)
- if xprops != None: s += ", x-declare:{" + xprops + "}"
- return s + "}}"
-
- def send_message(self, queue, message, durable=True, xprops=None, session=None):
- if session == None:
- s = self.connect().session()
- else:
- s = session
- s.sender(self._prep_sender(queue, durable, xprops)).send(message)
- if session == None:
- s.connection.close()
-
- def send_messages(self, queue, messages, durable=True, xprops=None, session=None):
- if session == None:
- s = self.connect().session()
- else:
- s = session
- sender = s.sender(self._prep_sender(queue, durable, xprops))
- for m in messages: sender.send(m)
- if session == None:
- s.connection.close()
-
- def get_message(self, queue):
- s = self.connect().session()
- m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1)
- s.acknowledge()
- s.connection.close()
- return m
-
- def get_messages(self, queue, n):
- s = self.connect().session()
- receiver = s.receiver(queue+"; {create:always}", capacity=n)
- m = [receiver.fetch(timeout=1) for i in range(n)]
- s.acknowledge()
- s.connection.close()
- return m
-
- def host_port(self): return "%s:%s" % (self.host(), self.port())
-
- def log_ready(self):
- """Return true if the log file exists and contains a broker ready message"""
- if not self._log_ready:
- self._log_ready = find_in_file("notice Broker running", self.log)
- return self._log_ready
-
- def ready(self, **kwargs):
- """Wait till broker is ready to serve clients"""
- # First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=60):
- raise Exception(
- "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
- # Create a connection and a session. For a cluster broker this will
- # return after cluster init has finished.
- try:
- c = self.connect(**kwargs)
- try: c.session()
- finally: c.close()
- except Exception,e: raise RethrownException(
- "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
-
- def store_state(self):
- f = open(os.path.join(self.datadir, "cluster", "store.status"))
- try: uuids = f.readlines()
- finally: f.close()
- null_uuid="00000000-0000-0000-0000-000000000000\n"
- if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
- if uuids[0] == null_uuid: return "empty"
- if uuids[1] == null_uuid: return "dirty"
- return "clean"
-
-class Cluster:
- """A cluster of brokers in a test."""
-
- _cluster_count = 0
-
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
- self.test = test
- self._brokers=[]
- self.name = "cluster%d" % Cluster._cluster_count
- Cluster._cluster_count += 1
- # Use unique cluster name
- self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
- self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait)
-
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
- """Add a broker to the cluster. Returns the index of the new broker."""
- if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
- return self._brokers[-1]
-
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
- for i in range(count): self.start(expect=expect, wait=wait, args=args)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-class BrokerTest(TestCase):
- """
- Tracks processes started by test and kills at end of test.
- Provides a well-known working directory for each test.
- """
-
- # Environment settings.
- qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
- cluster_lib = os.getenv("CLUSTER_LIB")
- xml_lib = os.getenv("XML_LIB")
- qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
- qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
- receiver_exec = os.getenv("RECEIVER_EXEC")
- sender_exec = os.getenv("SENDER_EXEC")
- sql_store_lib = os.getenv("STORE_SQL_LIB")
- sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB")
- sql_catalog = os.getenv("STORE_CATALOG")
- store_lib = os.getenv("STORE_LIB")
- test_store_lib = os.getenv("TEST_STORE_LIB")
- rootdir = os.getcwd()
-
- def configure(self, config): self.config=config
-
- def setUp(self):
- outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
- self.dir = os.path.join(self.rootdir, outdir, self.id())
- os.makedirs(self.dir)
- os.chdir(self.dir)
- self.stopem = [] # things to stop at end of test
-
- def tearDown(self):
- err = []
- for p in self.stopem:
- try: p.stop()
- except Exception, e: err.append(str(e))
- self.stopem = [] # reset in case more processes start
- os.chdir(self.rootdir)
- if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
-
- def cleanup_stop(self, stopable):
- """Call thing.stop at end of test"""
- self.stopem.append(stopable)
-
- def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
- """Start a process that will be killed at end of test, in the test dir."""
- os.chdir(self.dir)
- p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
- self.cleanup_stop(p)
- return p
-
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
- """Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
- if (wait):
- try: b.ready()
- except Exception, e:
- raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
- return b
-
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
- """Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait)
- return cluster
-
- def browse(self, session, queue, timeout=0):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
- r = session.receiver("%s;{mode:browse}"%(queue))
- try:
- contents = []
- try:
- while True: contents.append(r.fetch(timeout=timeout).content)
- except messaging.Empty: pass
- finally: pass #FIXME aconway 2011-04-14: r.close()
- return contents
-
- def assert_browse(self, session, queue, expect_contents, timeout=0):
- """Assert that the contents of messages on queue (as retrieved
- using session and timeout) exactly match the strings in
- expect_contents"""
- actual_contents = self.browse(session, queue, timeout)
- self.assertEqual(expect_contents, actual_contents)
-
-def join(thread, timeout=10):
- thread.join(timeout)
- if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
-
-class RethrownException(Exception):
- """Captures the stack trace of the current exception to be thrown later"""
- def __init__(self, msg=""):
- Exception.__init__(self, msg+"\n"+format_exc())
-
-class StoppableThread(Thread):
- """
- Base class for threads that do something in a loop and periodically check
- to see if they have been stopped.
- """
- def __init__(self):
- self.stopped = False
- self.error = None
- Thread.__init__(self)
-
- def stop(self):
- self.stopped = True
- join(self)
- if self.error: raise self.error
-
-class NumberedSender(Thread):
- """
- Thread to run a sender client and send numbered messages until stopped.
- """
-
- def __init__(self, broker, max_depth=None, queue="test-queue"):
- """
- max_depth: enable flow control, ensure sent - received <= max_depth.
- Requires self.notify_received(n) to be called each time messages are received.
- """
- Thread.__init__(self)
- self.sender = broker.test.popen(
- ["qpid-send",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--content-stdin"
- ],
- expect=EXPECT_RUNNING,
- stdin=PIPE)
- self.condition = Condition()
- self.max = max_depth
- self.received = 0
- self.stopped = False
- self.error = None
-
- def write_message(self, n):
- self.sender.stdin.write(str(n)+"\n")
- self.sender.stdin.flush()
-
- def run(self):
- try:
- self.sent = 0
- while not self.stopped:
- if self.max:
- self.condition.acquire()
- while not self.stopped and self.sent - self.received > self.max:
- self.condition.wait()
- self.condition.release()
- self.write_message(self.sent)
- self.sent += 1
- except Exception: self.error = RethrownException(self.sender.pname)
-
- def notify_received(self, count):
- """Called by receiver to enable flow control. count = messages received so far."""
- self.condition.acquire()
- self.received = count
- self.condition.notify()
- self.condition.release()
-
- def stop(self):
- self.condition.acquire()
- try:
- self.stopped = True
- self.condition.notify()
- finally: self.condition.release()
- join(self)
- self.write_message(-1) # end-of-messages marker.
- if self.error: raise self.error
-
-class NumberedReceiver(Thread):
- """
- Thread to run a receiver client and verify it receives
- sequentially numbered messages.
- """
- def __init__(self, broker, sender = None, queue="test-queue"):
- """
- sender: enable flow control. Call sender.received(n) for each message received.
- """
- Thread.__init__(self)
- self.test = broker.test
- self.receiver = self.test.popen(
- ["qpid-receive",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--forever"
- ],
- expect=EXPECT_RUNNING,
- stdout=PIPE)
- self.lock = Lock()
- self.error = None
- self.sender = sender
-
- def read_message(self):
- return int(self.receiver.stdout.readline())
-
- def run(self):
- try:
- self.received = 0
- m = self.read_message()
- while m != -1:
- assert(m <= self.received) # Check for missing messages
- if (m == self.received): # Ignore duplicates
- self.received += 1
- if self.sender:
- self.sender.notify_received(self.received)
- m = self.read_message()
- except Exception:
- self.error = RethrownException(self.receiver.pname)
-
- def stop(self):
- """Returns when termination message is received"""
- join(self)
- if self.error: raise self.error
-
-class ErrorGenerator(StoppableThread):
- """
- Thread that continuously generates errors by trying to consume from
- a non-existent queue. For cluster regression tests, error handling
- caused issues in the past.
- """
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.broker=broker
- broker.test.cleanup_stop(self)
- self.start()
-
- def run(self):
- c = self.broker.connect_old()
- try:
- while not self.stopped:
- try:
- c.session(str(qpid.datatypes.uuid4())).message_subscribe(
- queue="non-existent-queue")
- assert(False)
- except qpid.session.SessionException: pass
- time.sleep(0.01)
- except: pass # Normal if broker is killed.
-
-def import_script(path):
- """
- Import executable script at path as a module.
- Requires some trickery as scripts are not in standard module format
- """
- f = open(path)
- try:
- name=os.path.split(path)[1].replace("-","_")
- return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
- finally: f.close()