diff options
| author | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:55 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:55 +0000 |
| commit | a6f044f40d70b73c320cc909169e3518909365e2 (patch) | |
| tree | a23df30ce4b52f1d23e55a0afa0e90c1fa20ffd6 /qpid/cpp/bindings | |
| parent | 8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (diff) | |
| download | qpid-python-a6f044f40d70b73c320cc909169e3518909365e2.tar.gz | |
QPID-5560: HA tests do not use AMQP 1.0
The HA tests were using only AMQP 0-10.
Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available)
Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation.
Summary of changes:
- brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0
- default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise
- qpidtoollibs/broker.py: enable use of swig client with BrokerAgent
- Swig python client:
- support for passing client_properties/properties.
- expose AddressHelper pn_data read/write as PnData helper class
- set sender/receiver capacity on creation
- limited disposition support - rejected messages.
- support for additional timeout parameters
- expose messaging::Logger, allow log configuration to be set from python.
- ha_tests.py:
- bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF.
- pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive)
- TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.)
- Broker fixes:
- Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue.
- broker::amqp::Session was always setting an exclusive owner in createQueue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/bindings')
| -rw-r--r-- | qpid/cpp/bindings/qpid/python/qpid_messaging.i | 83 |
1 files changed, 73 insertions, 10 deletions
diff --git a/qpid/cpp/bindings/qpid/python/qpid_messaging.i b/qpid/cpp/bindings/qpid/python/qpid_messaging.i index 8063db5967..e728c90334 100644 --- a/qpid/cpp/bindings/qpid/python/qpid_messaging.i +++ b/qpid/cpp/bindings/qpid/python/qpid_messaging.i @@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) * don't even know why there is a non-const version of the method. */ %rename(opened) qpid::messaging::Connection::isOpen(); %rename(_close) qpid::messaging::Connection::close(); -%rename(receiver) qpid::messaging::Session::createReceiver; +%rename(_receiver) qpid::messaging::Session::createReceiver; %rename(_sender) qpid::messaging::Session::createSender; %rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool); %rename(_acknowledge_msg) qpid::messaging::Session::acknowledge( @@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) %rename(_getTtl) qpid::messaging::Message::getTtl; %rename(_setTtl) qpid::messaging::Message::setTtl; +%rename(_sync) qpid::messaging::Session::sync; + +// Capitalize constant names correctly for python +%rename(TRACE) qpid::messaging::trace; +%rename(DEBUG) qpid::messaging::debug; +%rename(INFO) qpid::messaging::info; +%rename(NOTICE) qpid::messaging::notice; +%rename(WARNING) qpid::messaging::warning; +%rename(ERROR) qpid::messaging::error; +%rename(CRITICAL) qpid::messaging::critical; %include "qpid/qpid.i" @@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) %pythoncode %{ @staticmethod - def establish(url=None, **options) : + def establish(url=None, timeout=None, **options) : + if timeout and "reconnect-timeout" not in options: + options["reconnect-timeout"] = timeout conn = Connection(url, **options) conn.open() return conn %} } +%pythoncode %{ + # Disposition class from messaging/message.py + class Disposition: + def __init__(self, type, **options): + self.type = type + self.options = options + + def __repr__(self): + args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()] + return "Disposition(%s)" % ", ".join(args) + + # Consntants from messaging/constants.py + __SELF__ = object() + + class Constant: + + def __init__(self, name, value=__SELF__): + self.name = name + if value is __SELF__: + self.value = self + else: + self.value = value + + def __repr__(self): + return self.name + + AMQP_PORT = 5672 + AMQPS_PORT = 5671 + + UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + + REJECTED = Constant("REJECTED") + RELEASED = Constant("RELEASED") +%} + %extend qpid::messaging::Session { %pythoncode %{ def acknowledge(self, message=None, disposition=None, sync=True) : - if disposition : - raise Exception("SWIG does not support dispositions yet. Use " - "Session.reject and Session.release instead") if message : - self._acknowledge_msg(message, sync) + if disposition is None: self._acknowledge_msg(message, sync) + # FIXME aconway 2014-02-11: the following does not repsect the sync flag. + elif disposition.type == REJECTED: self.reject(message) + elif disposition.type == RELEASED: self.release(message) else : + if disposition : # FIXME aconway 2014-02-11: support this + raise Exception("SWIG does not support dispositions yet. Use " + "Session.reject and Session.release instead") self._acknowledge_all(sync) __swig_getmethods__["connection"] = getConnection if _newclass: connection = property(getConnection) - def sender(self, target, **options) : - s = self._sender(target) - s._setDurable(options.get("durable")) - return s + def receiver(self, source, capacity=None): + r = self._receiver(source) + if capacity is not None: r.capacity = capacity + return r + + def sender(self, target, durable=None, capacity=None) : + s = self._sender(target) + if capacity is not None: s.capacity = capacity + s._setDurable(durable) + return s def next_receiver(self, timeout=None) : if timeout is None : @@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) # Python API uses timeouts in seconds, # but C++ API uses milliseconds return self._next_receiver(Duration(int(1000*timeout))) + + def sync(self, timeout=None): + if timeout == 0: self._sync(False) # Non-blocking sync + else: self._sync(True) # Blocking sync, C++ has not timeout. + %} } @@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) message.durable = self.durable return self._send(message, sync) + def sync(self, timeout=None): self.session.sync(timeout) + __swig_getmethods__["capacity"] = getCapacity __swig_setmethods__["capacity"] = setCapacity if _newclass: capacity = property(getCapacity, setCapacity) |
