summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py45
1 files changed, 36 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index ba65936df7..2566bc527d 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -21,6 +21,7 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
+import proton
from qpid import connection, util
from qpid.compat import format_exc
from unittest import TestCase
@@ -48,13 +49,18 @@ from qpidtoollibs import BrokerAgent
import qpid.messaging
qm = qpid.messaging
qpid_messaging = None
+
+def env_has_log_config():
+ """True if there are qpid log configuratoin settings in the environment."""
+ return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ
+
if not os.environ.get("QPID_PY_NO_SWIG"):
try:
import qpid_messaging
from qpid.datatypes import uuid4
qm = qpid_messaging
# Silence warnings from swigged messaging library unless enabled in environment.
- if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ if not env_has_log_config():
qm.Logger.configure(["--log-enable=error"])
except ImportError:
print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
@@ -135,7 +141,7 @@ _popen_id = AtomicCounter() # Popen identifier for use in output file names.
# Constants for file descriptor arguments to Popen
FILE = "FILE" # Write to file named after process
-PIPE = subprocess.PIPE
+from subprocess import PIPE, STDOUT
class Popen(subprocess.Popen):
"""
@@ -201,7 +207,7 @@ class Popen(subprocess.Popen):
def communicate(self, input=None):
ret = subprocess.Popen.communicate(self, input)
- self.cleanup()
+ self._cleanup()
return ret
def is_running(self): return self.poll() is None
@@ -253,6 +259,7 @@ class Popen(subprocess.Popen):
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
+
def checkenv(name):
value = os.getenv(name)
if not value: raise Exception("Environment variable %s is not set" % name)
@@ -307,7 +314,7 @@ class Broker(Popen):
cmd += ["--log-to-stderr=no"]
# Add default --log-enable arguments unless args already has --log arguments.
- if not [l for l in args if l.startswith("--log")]:
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+"]
if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
@@ -443,10 +450,11 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content):
finally:
r.close()
-def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
+ if msg is None: msg = "browse '%s' failed" % queue
actual_contents = browse(session, queue, timeout, transform=transform)
if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
assert expect_contents == actual_contents, msg
@@ -485,6 +493,18 @@ class BrokerTest(TestCase):
test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
+ PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR)
+ PN_TX_VERSION = (0, 9)
+
+ amqp_tx_supported = PN_VERSION >= PN_TX_VERSION
+
+ @classmethod
+ def amqp_tx_warning(cls):
+ if not cls.amqp_tx_supported:
+ print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION)
+ return False
+ return True
+
def configure(self, config): self.config=config
def setUp(self):
@@ -497,8 +517,8 @@ class BrokerTest(TestCase):
if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0"
else: default_protocol="amqp0-10"
self.protocol = defs.get("PROTOCOL") or default_protocol
- self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
-
+ self.tx_protocol = self.protocol
+ if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10"
def tearDown(self):
err = []
@@ -529,15 +549,22 @@ class BrokerTest(TestCase):
self.teardown_add(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
+ def check_output(self, args, stdin=None):
+ p = self.popen(args, stdout=PIPE, stderr=STDOUT)
+ out = p.communicate(stdin)
+ if p.returncode != 0:
+ raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0]))
+ return out[0]
+
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)