summaryrefslogtreecommitdiff
path: root/qpid/cpp/bindings
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-04-07 21:22:55 +0000
committerAlan Conway <aconway@apache.org>2014-04-07 21:22:55 +0000
commita6f044f40d70b73c320cc909169e3518909365e2 (patch)
treea23df30ce4b52f1d23e55a0afa0e90c1fa20ffd6 /qpid/cpp/bindings
parent8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (diff)
downloadqpid-python-a6f044f40d70b73c320cc909169e3518909365e2.tar.gz
QPID-5560: HA tests do not use AMQP 1.0
The HA tests were using only AMQP 0-10. Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available) Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation. Summary of changes: - brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0 - default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise - qpidtoollibs/broker.py: enable use of swig client with BrokerAgent - Swig python client: - support for passing client_properties/properties. - expose AddressHelper pn_data read/write as PnData helper class - set sender/receiver capacity on creation - limited disposition support - rejected messages. - support for additional timeout parameters - expose messaging::Logger, allow log configuration to be set from python. - ha_tests.py: - bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF. - pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive) - TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.) - Broker fixes: - Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue. - broker::amqp::Session was always setting an exclusive owner in createQueue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/bindings')
-rw-r--r--qpid/cpp/bindings/qpid/python/qpid_messaging.i83
1 files changed, 73 insertions, 10 deletions
diff --git a/qpid/cpp/bindings/qpid/python/qpid_messaging.i b/qpid/cpp/bindings/qpid/python/qpid_messaging.i
index 8063db5967..e728c90334 100644
--- a/qpid/cpp/bindings/qpid/python/qpid_messaging.i
+++ b/qpid/cpp/bindings/qpid/python/qpid_messaging.i
@@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
* don't even know why there is a non-const version of the method. */
%rename(opened) qpid::messaging::Connection::isOpen();
%rename(_close) qpid::messaging::Connection::close();
-%rename(receiver) qpid::messaging::Session::createReceiver;
+%rename(_receiver) qpid::messaging::Session::createReceiver;
%rename(_sender) qpid::messaging::Session::createSender;
%rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool);
%rename(_acknowledge_msg) qpid::messaging::Session::acknowledge(
@@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
%rename(_getTtl) qpid::messaging::Message::getTtl;
%rename(_setTtl) qpid::messaging::Message::setTtl;
+%rename(_sync) qpid::messaging::Session::sync;
+
+// Capitalize constant names correctly for python
+%rename(TRACE) qpid::messaging::trace;
+%rename(DEBUG) qpid::messaging::debug;
+%rename(INFO) qpid::messaging::info;
+%rename(NOTICE) qpid::messaging::notice;
+%rename(WARNING) qpid::messaging::warning;
+%rename(ERROR) qpid::messaging::error;
+%rename(CRITICAL) qpid::messaging::critical;
%include "qpid/qpid.i"
@@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
%pythoncode %{
@staticmethod
- def establish(url=None, **options) :
+ def establish(url=None, timeout=None, **options) :
+ if timeout and "reconnect-timeout" not in options:
+ options["reconnect-timeout"] = timeout
conn = Connection(url, **options)
conn.open()
return conn
%}
}
+%pythoncode %{
+ # Disposition class from messaging/message.py
+ class Disposition:
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+ # Consntants from messaging/constants.py
+ __SELF__ = object()
+
+ class Constant:
+
+ def __init__(self, name, value=__SELF__):
+ self.name = name
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+ AMQP_PORT = 5672
+ AMQPS_PORT = 5671
+
+ UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+ REJECTED = Constant("REJECTED")
+ RELEASED = Constant("RELEASED")
+%}
+
%extend qpid::messaging::Session {
%pythoncode %{
def acknowledge(self, message=None, disposition=None, sync=True) :
- if disposition :
- raise Exception("SWIG does not support dispositions yet. Use "
- "Session.reject and Session.release instead")
if message :
- self._acknowledge_msg(message, sync)
+ if disposition is None: self._acknowledge_msg(message, sync)
+ # FIXME aconway 2014-02-11: the following does not repsect the sync flag.
+ elif disposition.type == REJECTED: self.reject(message)
+ elif disposition.type == RELEASED: self.release(message)
else :
+ if disposition : # FIXME aconway 2014-02-11: support this
+ raise Exception("SWIG does not support dispositions yet. Use "
+ "Session.reject and Session.release instead")
self._acknowledge_all(sync)
__swig_getmethods__["connection"] = getConnection
if _newclass: connection = property(getConnection)
- def sender(self, target, **options) :
- s = self._sender(target)
- s._setDurable(options.get("durable"))
- return s
+ def receiver(self, source, capacity=None):
+ r = self._receiver(source)
+ if capacity is not None: r.capacity = capacity
+ return r
+
+ def sender(self, target, durable=None, capacity=None) :
+ s = self._sender(target)
+ if capacity is not None: s.capacity = capacity
+ s._setDurable(durable)
+ return s
def next_receiver(self, timeout=None) :
if timeout is None :
@@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
# Python API uses timeouts in seconds,
# but C++ API uses milliseconds
return self._next_receiver(Duration(int(1000*timeout)))
+
+ def sync(self, timeout=None):
+ if timeout == 0: self._sync(False) # Non-blocking sync
+ else: self._sync(True) # Blocking sync, C++ has not timeout.
+
%}
}
@@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
message.durable = self.durable
return self._send(message, sync)
+ def sync(self, timeout=None): self.session.sync(timeout)
+
__swig_getmethods__["capacity"] = getCapacity
__swig_setmethods__["capacity"] = setCapacity
if _newclass: capacity = property(getCapacity, setCapacity)