diff options
author | Alan Conway <aconway@apache.org> | 2015-02-27 16:37:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2015-02-27 16:37:06 +0000 |
commit | 3aaa53e9103b6019c9e31d15186b12a95a1993be (patch) | |
tree | f5950c063ff08f574c808023ece7745739ca7027 /qpid/cpp/src/tests/brokertest.py | |
parent | 9c9f0e2c935d11c0f8d1ebddf1bbb78c3c22c606 (diff) | |
download | qpid-python-3aaa53e9103b6019c9e31d15186b12a95a1993be.tar.gz |
QPID-4710: [AMQP 1.0] Support for transactions in qpid::messaging C++ client.
Implements the "transactional retire and settle immediately" option for
transactions as specified in AMQP 1.0 in the qpid::messaging C++ client.
NOTE: Transactions over AMQP 1.0 require proton 0.9 or greater. With older
versions, attempting a transactions over AMQP 1.0 will raise a link-detached
exception "Node not found: tx-transaction"
1. Added descriptor list to Variant with support in Encoder and PnData.
Required to support transactions, need to be able to create described lists.
Variant changes are source and binary compatible.
A Variant now has a Variant::List of descripors which can be numeric or string.
Nested descriptors are implemented by putting multiple descriptors in the list.
Other minor changes:
- Variant refactor: don't delete impl on every assignment.
- Add Variant constructors that take a string encoding.
(new constructors, not defaulted arguments, so the change is binary and source compatible.)
- Growable buffer support for Encoder.
- Printing described Variant prints descriptors in form @descriptor value
2. Added transaction support to AMQP 1.0 client code
Added messaging/amqp/Transaction.h,cpp: transaction logic
- communicate with coordinator, send declare/dischange messages.
- add tx state info to transfers and acknowledgements.
- Sync session after discharge.
- A transactional session automatically acks any message retrieved by fetch/get
to bring them into the transaction. This is consistent the 0-10 client.
Minor fixes to existing client code:
- Fix use of pn_drain API in C++ client to work with C++ and Java brokers.
- Make amqp::Exception derive from qpid::Exception
3. Fixes to existing broker code:
- Incoming.cpp fix: start async completion before processing message.
- Delay accept of dischage message till commit is complete.
- newSession - handle failover during session creation.
4. Added tests
interop_tests.py: transaction tests that can run against an external broker, see comments.
ha_tests.py: Enable transaction tests over AMQP 1.0.
Minor test fixes:
- brokertest.py don't set default logging if QPID_LOG env vars set.
- brokertest.py Pass kwargs to broker() create function.
- qpid-receive: capacity should never be larger than message count.
- Accept user:pass as well as user/pass in Url.
- brokertest.py: Always do a ready() check on all brokers.
If proton < 0.9 is used, transaction tests will be skipped or will downgrade to
the amqp0-10 protocol with a printed warning.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1662743 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 45 |
1 files changed, 36 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index ba65936df7..2566bc527d 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -21,6 +21,7 @@ import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal +import proton from qpid import connection, util from qpid.compat import format_exc from unittest import TestCase @@ -48,13 +49,18 @@ from qpidtoollibs import BrokerAgent import qpid.messaging qm = qpid.messaging qpid_messaging = None + +def env_has_log_config(): + """True if there are qpid log configuratoin settings in the environment.""" + return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ + if not os.environ.get("QPID_PY_NO_SWIG"): try: import qpid_messaging from qpid.datatypes import uuid4 qm = qpid_messaging # Silence warnings from swigged messaging library unless enabled in environment. - if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ: + if not env_has_log_config(): qm.Logger.configure(["--log-enable=error"]) except ImportError: print "Cannot load python SWIG bindings, falling back to native qpid.messaging." @@ -135,7 +141,7 @@ _popen_id = AtomicCounter() # Popen identifier for use in output file names. # Constants for file descriptor arguments to Popen FILE = "FILE" # Write to file named after process -PIPE = subprocess.PIPE +from subprocess import PIPE, STDOUT class Popen(subprocess.Popen): """ @@ -201,7 +207,7 @@ class Popen(subprocess.Popen): def communicate(self, input=None): ret = subprocess.Popen.communicate(self, input) - self.cleanup() + self._cleanup() return ret def is_running(self): return self.poll() is None @@ -253,6 +259,7 @@ class Popen(subprocess.Popen): def cmd_str(self): return " ".join([str(s) for s in self.cmd]) + def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) @@ -307,7 +314,7 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] # Add default --log-enable arguments unless args already has --log arguments. - if not [l for l in args if l.startswith("--log")]: + if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+"] if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, @@ -443,10 +450,11 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content): finally: r.close() -def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): +def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" + if msg is None: msg = "browse '%s' failed" % queue actual_contents = browse(session, queue, timeout, transform=transform) if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg @@ -485,6 +493,18 @@ class BrokerTest(TestCase): test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() + PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR) + PN_TX_VERSION = (0, 9) + + amqp_tx_supported = PN_VERSION >= PN_TX_VERSION + + @classmethod + def amqp_tx_warning(cls): + if not cls.amqp_tx_supported: + print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION) + return False + return True + def configure(self, config): self.config=config def setUp(self): @@ -497,8 +517,8 @@ class BrokerTest(TestCase): if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0" else: default_protocol="amqp0-10" self.protocol = defs.get("PROTOCOL") or default_protocol - self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0 - + self.tx_protocol = self.protocol + if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10" def tearDown(self): err = [] @@ -529,15 +549,22 @@ class BrokerTest(TestCase): self.teardown_add(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd) + b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b + def check_output(self, args, stdin=None): + p = self.popen(args, stdout=PIPE, stderr=STDOUT) + out = p.communicate(stdin) + if p.returncode != 0: + raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0])) + return out[0] + def browse(self, *args, **kwargs): browse(*args, **kwargs) def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) |