diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
| commit | 5e0d449d9de02b4937747cc0b67cf50e62310b81 (patch) | |
| tree | 1a1133ebe7b70ff84aaede1d6c8cab84b4ceaf48 /qpid/python | |
| parent | 5f4ccf5da76ae6ebfdd3252ed6f865e0a72fd745 (diff) | |
| download | qpid-python-5e0d449d9de02b4937747cc0b67cf50e62310b81.tar.gz | |
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish
- Connection.connect() split into Connection.open(), Connection.attach()
- Connection.disconnect() -> Connection.detach()
- reconnect_hosts -> reconnect_urls
- transport now takes tcp, ssl, and tcp+tls
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@932352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rwxr-xr-x | qpid/python/examples/api/drain | 8 | ||||
| -rwxr-xr-x | qpid/python/examples/api/server | 8 | ||||
| -rwxr-xr-x | qpid/python/examples/api/spout | 8 | ||||
| -rw-r--r-- | qpid/python/qpid/brokertest.py | 3 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 25 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 149 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/exceptions.py | 4 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/transports.py | 9 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/util.py | 20 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/__init__.py | 6 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 74 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/message.py | 3 |
12 files changed, 183 insertions, 134 deletions
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain index 372426c801..a2e40ec8a0 100755 --- a/qpid/python/examples/api/drain +++ b/qpid/python/examples/api/drain @@ -51,7 +51,6 @@ if opts.verbose: else: enable("qpid", WARN) -url = URL(opts.broker) if args: addr = args.pop(0) else: @@ -72,15 +71,12 @@ class Formatter: def __getitem__(self, st): return eval(st, self.environ) -# XXX: should make URL default the port for us -conn = Connection(url.host, url.port, - username=url.user, - password=url.password, +conn = Connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) try: - conn.connect() + conn.open() ssn = conn.session() rcv = ssn.receiver(addr) diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server index 3103dd795a..0500e6f380 100755 --- a/qpid/python/examples/api/server +++ b/qpid/python/examples/api/server @@ -44,16 +44,12 @@ if opts.verbose: else: enable("qpid", WARN) -url = URL(opts.broker) if args: addr = args.pop(0) else: parser.error("address is required") -# XXX: should make URL default the port for us -conn = Connection(url.host, url.port, - username=url.user, - password=url.password, +conn = Connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) @@ -75,7 +71,7 @@ def dispatch(msg): return result try: - conn.connect() + conn.open() ssn = conn.session() rcv = ssn.receiver(addr) diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout index 9e5759b2dd..dacebb5d1a 100755 --- a/qpid/python/examples/api/spout +++ b/qpid/python/examples/api/spout @@ -65,7 +65,6 @@ if opts.verbose: else: enable("qpid", WARN) -url = URL(opts.broker) if opts.id is None: spout_id = str(uuid4()) else: @@ -92,15 +91,12 @@ if opts.entries: else: content = text -# XXX: should make URL default the port for us -conn = Connection(url.host, url.port, - username=url.user, - password=url.password, +conn = Connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) try: - conn.connect() + conn.open() ssn = conn.session() snd = ssn.sender(addr) diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py index 3608a959b1..ff9b3aaeb6 100644 --- a/qpid/python/qpid/brokertest.py +++ b/qpid/python/qpid/brokertest.py @@ -295,7 +295,8 @@ class Broker(Popen): def connect(self): """New API connection to the broker.""" - return messaging.Connection.open(self.host(), self.port()) + return messaging.Connection.establish(host=self.host(), + port=self.port()) def connect_old(self): """Old API connection to the broker.""" diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 7b165ec94b..cf8498794b 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -324,9 +324,10 @@ class Driver: self._selector = Selector.default() self._attempts = 0 self._delay = self.connection.reconnect_interval_min + urls = [URL(u) for u in self.connection.reconnect_urls] self._hosts = [(self.connection.host, self.connection.port)] + \ - self.connection.reconnect_hosts - self._reconnect_log = self.connection.options.get("reconnect_log", True) + [(u.host, u.port) for u in urls] + self._reconnect_log = self.connection.reconnect_log self._host = 0 self._retrying = False self._transport = None @@ -463,7 +464,7 @@ class Driver: self.engine = Engine(self.connection) self.engine.open() rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - trans = getattr(transports, self.connection.transport, None) + trans = transports.TRANSPORTS.get(self.connection.transport) if trans: self._transport = trans(host, port) else: @@ -507,9 +508,7 @@ class Engine: self._channels = 0 self._sessions = {} - options = self.connection.options - - self.address_cache = Cache(options.get("address_ttl", 60)) + self.address_cache = Cache(self.connection.address_ttl) self._status = CLOSED self._buf = "" @@ -528,11 +527,11 @@ class Engine: self._sasl.setAttr("password", self.connection.password) if self.connection.host: self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", options.get("service", "qpidd")) - if "min_ssf" in options: - self._sasl.setAttr("minssf", options["min_ssf"]) - if "max_ssf" in options: - self._sasl.setAttr("maxssf", options["max_ssf"]) + self._sasl.setAttr("service", self.connection.sasl_service) + if self.connection.sasl_min_ssf is not None: + self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) + if self.connection.sasl_max_ssf is not None: + self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) self._sasl.init() self._sasl_encode = False self._sasl_decode = False @@ -619,8 +618,8 @@ class Engine: (cli_major, cli_minor, major, minor)) def do_connection_start(self, start): - if self.connection.mechanisms: - permitted = self.connection.mechanisms.split() + if self.connection.sasl_mechanisms: + permitted = self.connection.sasl_mechanisms.split() mechs = [m for m in start.mechanisms if m in permitted] else: mechs = start.mechanisms diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 7ac3881bac..17807f20d2 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -36,7 +36,7 @@ from qpid.messaging.constants import * from qpid.messaging.exceptions import * from qpid.messaging.message import * from qpid.ops import PRIMITIVE -from qpid.util import default +from qpid.util import default, URL from threading import Thread, RLock log = getLogger("qpid.messaging") @@ -51,61 +51,110 @@ class Connection: """ @static - def open(host, port=None, username="guest", password="guest", **options): + def establish(url=None, **options): """ - Creates an AMQP connection and connects it to the given host and port. - - @type host: str - @param host: the name or ip address of the remote host - @type port: int - @param port: the port number of the remote host - @rtype: Connection - @return: a connected Connection + Constructs a L{Connection} with the supplied parameters and opens + it. """ - conn = Connection(host, port, username, password, **options) - conn.connect() + conn = Connection(url, **options) + conn.open() return conn - def __init__(self, host, port=None, username="guest", password="guest", **options): + def __init__(self, url=None, **options): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be used. + @type url: str + @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] @type host: str - @param host: the name or ip address of the remote host + @param host: the name or ip address of the remote host (overriden by url) @type port: int - @param port: the port number of the remote host + @param port: the port number of the remote host (overriden by url) + @type transport: str + @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) + @type heartbeat: int + @param heartbeat: heartbeat interval in seconds + + @type username: str + @param username: the username for authentication (overriden by url) + @type password: str + @param password: the password for authentication (overriden by url) + + @type sasl_mechanisms: str + @param sasl_mechanisms: space separated list of permitted sasl mechanisms + @type sasl_service: str + @param sasl_service: ??? + @type sasl_min_ssf: ??? + @param sasl_min_ssf: ??? + @type sasl_max_ssf: ??? + @param sasl_max_ssf: ??? + + @type reconnect: bool + @param reconnect: enable/disable automatic reconnect + @type reconnect_timeout: float + @param reconnect_timeout: total time to attempt reconnect + @type reconnect_internal_min: float + @param reconnect_internal_min: minimum interval between reconnect attempts + @type reconnect_internal_max: float + @param reconnect_internal_max: maximum interval between reconnect attempts + @type reconnect_internal: float + @param reconnect_interval: set both min and max reconnect intervals + @type reconnect_limit: int + @param reconnect_limit: limit the total number of reconnect attempts + @type reconnect_urls: list[str] + @param reconnect_urls: list of backup hosts specified as urls + + @type address_ttl: float + @param address_ttl: time until cached address resolution expires + @rtype: Connection @return: a disconnected Connection """ - self.host = host - self.username = username - self.password = password - self.mechanisms = options.get("mechanisms") + if url is None: + url = options.get("host") + if isinstance(url, basestring): + url = URL(url) + self.host = url.host + if url.scheme == url.AMQP: + self.transport = "tcp" + elif url.scheme == url.AMQPS: + self.transport = "ssl" + else: + self.transport = options.get("transport", "tcp") + if self.transport in ("ssl", "tcp+tls"): + self.port = default(url.port, options.get("port", AMQPS_PORT)) + else: + self.port = default(url.port, options.get("port", AMQP_PORT)) self.heartbeat = options.get("heartbeat") + self.username = default(url.user, options.get("username", "guest")) + self.password = default(url.password, options.get("password", "guest")) + + self.sasl_mechanisms = options.get("sasl_mechanisms") + self.sasl_service = options.get("sasl_service", "qpidd") + self.sasl_min_ssf = options.get("sasl_min_ssf") + self.sasl_max_ssf = options.get("sasl_max_ssf") + self.reconnect = options.get("reconnect", False) self.reconnect_timeout = options.get("reconnect_timeout") - if "reconnect_interval_min" in options: - self.reconnect_interval_min = options["reconnect_interval_min"] - else: - self.reconnect_interval_min = options.get("reconnect_interval", 1) - if "reconnect_interval_max" in options: - self.reconnect_interval_max = options["reconnect_interval_max"] - else: - self.reconnect_interval_max = options.get("reconnect_interval", 2*60) + reconnect_interval = options.get("reconnect_interval") + self.reconnect_interval_min = options.get("reconnect_interval_min", + default(reconnect_interval, 1)) + self.reconnect_interval_max = options.get("reconnect_interval_max", + default(reconnect_interval, 2*60)) self.reconnect_limit = options.get("reconnect_limit") - self.reconnect_hosts = options.get("reconnect_hosts", []) - self.transport = options.get("transport", "plain") + self.reconnect_urls = options.get("reconnect_urls", []) + self.reconnect_log = options.get("reconnect_log", True) + + self.address_ttl = options.get("address_ttl", 60) + self.options = options - if self.transport == "tls": - self.port = default(port, AMQPS_PORT) - else: - self.port = default(port, AMQP_PORT) self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self._open = False self._connected = False self._transport_connected = False self._lock = RLock() @@ -164,9 +213,26 @@ class Connection: del self.sessions[ssn.name] @synchronized - def connect(self): + def open(self): + """ + Opens a connection. + """ + if self._open: + raise ConnectionError("already open") + self._open = True + self.attach() + + @synchronized + def opened(self): + """ + Return true if the connection is open, false otherwise. + """ + return self._open + + @synchronized + def attach(self): """ - Connect to the remote endpoint. + Attach to the remote endpoint. """ self._connected = True self._driver.start() @@ -181,9 +247,9 @@ class Connection: if not (l.linked or l.error or l.closed)] @synchronized - def disconnect(self): + def detach(self): """ - Disconnect from the remote endpoint. + Detach from the remote endpoint. """ self._connected = False self._wakeup() @@ -192,9 +258,9 @@ class Connection: self._condition.gc() @synchronized - def connected(self): + def attached(self): """ - Return true if the connection is connected, false otherwise. + Return true if the connection is attached, false otherwise. """ return self._connected @@ -207,7 +273,8 @@ class Connection: for ssn in self.sessions.values(): ssn.close() finally: - self.disconnect() + self.detach() + self._open = False class Session: @@ -680,7 +747,7 @@ class Sender: """ if not self.session.connection._connected or self.session.closing: - raise Disconnected() + raise Detached() self._ewait(lambda: self.linked) diff --git a/qpid/python/qpid/messaging/exceptions.py b/qpid/python/qpid/messaging/exceptions.py index 5c8bdedc26..10ad529806 100644 --- a/qpid/python/qpid/messaging/exceptions.py +++ b/qpid/python/qpid/messaging/exceptions.py @@ -33,10 +33,10 @@ class ConnectError(ConnectionError): class SessionError(Exception): pass -class Disconnected(SessionError): +class Detached(SessionError): """ Exception raised when an operation is attempted that is illegal when - disconnected. + detached. """ pass diff --git a/qpid/python/qpid/messaging/transports.py b/qpid/python/qpid/messaging/transports.py index 1dea469fe5..8133a45604 100644 --- a/qpid/python/qpid/messaging/transports.py +++ b/qpid/python/qpid/messaging/transports.py @@ -19,7 +19,9 @@ from qpid.util import connect -class plain: +TRANSPORTS = {} + +class tcp: def __init__(self, host, port): self.socket = connect(host, port) @@ -42,6 +44,8 @@ class plain: def close(self): self.socket.close() +TRANSPORTS["tcp"] = tcp + try: from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ SSL_ERROR_WANT_WRITE @@ -105,3 +109,6 @@ else: self.socket.setblocking(1) # this closes the underlying socket self.tls.close() + + TRANSPORTS["ssl"] = tls + TRANSPORTS["tcp+tls"] = tls diff --git a/qpid/python/qpid/messaging/util.py b/qpid/python/qpid/messaging/util.py index 30b54f3ad4..42bc280454 100644 --- a/qpid/python/qpid/messaging/util.py +++ b/qpid/python/qpid/messaging/util.py @@ -26,31 +26,31 @@ from threading import Thread log = getLogger("qpid.messaging.util") -def auto_fetch_reconnect_hosts(conn): - ssn = conn.session("auto-fetch-reconnect-hosts") +def auto_fetch_reconnect_urls(conn): + ssn = conn.session("auto-fetch-reconnect-urls") rcv = ssn.receiver("amq.failover") rcv.capacity = 10 def main(): while True: msg = rcv.fetch() - set_reconnect_hosts(conn, msg) + set_reconnect_urls(conn, msg) ssn.acknowledge(msg, sync=False) - thread = Thread(name="auto-fetch-reconnect-hosts", target=main) + thread = Thread(name="auto-fetch-reconnect-urls", target=main) thread.setDaemon(True) thread.start() -def set_reconnect_hosts(conn, msg): - reconnect_hosts = [] +def set_reconnect_urls(conn, msg): + reconnect_urls = [] urls = msg.properties["amq.failover"] for u in urls: if u.startswith("amqp:tcp:"): parts = u.split(":") host, port = parts[2:4] - reconnect_hosts.append((host, port)) - conn.reconnect_hosts = reconnect_hosts - log.warn("set reconnect_hosts for conn %s: %s", conn, reconnect_hosts) + reconnect_urls.append("%s:%s" % (host, port)) + conn.reconnect_urls = reconnect_urls + log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls) -__all__ = ["auto_fetch_reconnect_hosts", "set_reconnect_hosts"] +__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"] diff --git a/qpid/python/qpid/tests/messaging/__init__.py b/qpid/python/qpid/tests/messaging/__init__.py index b2a600982a..b106377cc9 100644 --- a/qpid/python/qpid/tests/messaging/__init__.py +++ b/qpid/python/qpid/tests/messaging/__init__.py @@ -50,7 +50,7 @@ class Base(Test): self.rcv = self.setup_receiver() def teardown(self): - if self.conn is not None and self.conn.connected(): + if self.conn is not None and self.conn.attached(): self.conn.close() def content(self, base, count = None): @@ -146,9 +146,9 @@ class Base(Test): def transport(self): if self.broker.scheme == self.broker.AMQPS: - return "tls" + return "ssl" else: - return "plain" + return "tcp" def connection_options(self): return {"reconnect": self.reconnect(), diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 2eeba3b068..40388bc9a8 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -28,26 +28,21 @@ from qpid.tests.messaging import Base class SetupTests(Base): - def testOpen(self): - # XXX: need to flesh out URL support/syntax - self.conn = Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + def testEstablish(self): + self.conn = Connection.establish(self.broker, **self.connection_options()) self.ping(self.conn.session()) - def testConnect(self): - # XXX: need to flesh out URL support/syntax - self.conn = Connection(self.broker.host, self.broker.port, - **self.connection_options()) - self.conn.connect() + def testOpen(self): + self.conn = Connection(self.broker, **self.connection_options()) + self.conn.open() self.ping(self.conn.session()) def testConnectError(self): try: - self.conn = Connection.open("localhost", 0) + self.conn = Connection.establish("localhost:0") assert False, "connect succeeded" except ConnectError, e: - # XXX: should verify that e includes appropriate diagnostic info - pass + assert "Connection refused" in str(e) def use_fds(self): fds = [] @@ -66,8 +61,7 @@ class SetupTests(Base): for i in range(32): if fds: os.close(fds.pop()) for i in xrange(64): - conn = Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + conn = Connection.establish(self.broker, **self.connection_options()) conn.close() finally: while fds: @@ -76,8 +70,8 @@ class SetupTests(Base): def testReconnect(self): options = self.connection_options() import socket - from qpid.messaging import transports - real = transports.plain + from qpid.messaging.transports import TRANSPORTS + real = TRANSPORTS["tcp"] class flaky: @@ -112,7 +106,7 @@ class SetupTests(Base): def close(self): self.real.close() - transports.flaky = flaky + TRANSPORTS["flaky"] = flaky options["reconnect"] = True options["reconnect_interval"] = 0 @@ -120,7 +114,7 @@ class SetupTests(Base): options["reconnect_log"] = False options["transport"] = "flaky" - self.conn = Connection.open(self.broker.host, self.broker.port, **options) + self.conn = Connection.establish(self.broker, **options) ssn = self.conn.session() snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") rcv = ssn.receiver(snd.target) @@ -153,8 +147,7 @@ class SetupTests(Base): class ConnectionTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def testSessionAnon(self): ssn1 = self.conn.session() @@ -172,23 +165,23 @@ class ConnectionTests(Base): assert ssn1 is self.conn.session("one") assert ssn2 is self.conn.session("two") - def testDisconnect(self): + def testDetach(self): ssn = self.conn.session() self.ping(ssn) - self.conn.disconnect() + self.conn.detach() try: self.ping(ssn) assert False, "ping succeeded" - except Disconnected: - # this is the expected failure when pinging on a disconnected + except Detached: + # this is the expected failure when pinging on a detached # connection pass - self.conn.connect() + self.conn.attach() self.ping(ssn) def testClose(self): self.conn.close() - assert not self.conn.connected() + assert not self.conn.attached() ACK_QC = 'test-ack-queue; {create: always}' ACK_QD = 'test-ack-queue; {delete: always}' @@ -196,8 +189,7 @@ ACK_QD = 'test-ack-queue; {delete: always}' class SessionTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() @@ -230,11 +222,11 @@ class SessionTests(Base): self.ssn.acknowledge(msg) snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') - def testDisconnectedReceiver(self): - self.conn.disconnect() + def testDetachedReceiver(self): + self.conn.detach() rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") - m = self.content("testDisconnectedReceiver") - self.conn.connect() + m = self.content("testDetachedReceiver") + self.conn.attach() snd = self.ssn.sender("test-dis-rcv-queue") snd.send(m) self.drain(rcv, expected=[m]) @@ -475,7 +467,7 @@ class SessionTests(Base): try: self.ping(self.ssn) assert False, "ping succeeded" - except Disconnected: + except Detached: pass RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' @@ -483,8 +475,7 @@ RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' class ReceiverTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() @@ -653,8 +644,7 @@ class ReceiverTests(Base): class AddressTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() @@ -896,8 +886,8 @@ test-link-bindings-queue; { rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) for m in messages: snd.send(m) - self.conn.disconnect() - self.conn.connect() + self.conn.detach() + self.conn.attach() self.drain(rcv, expected=expected) def testReliabilityUnreliable(self): @@ -924,8 +914,7 @@ UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" class AddressErrorTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() @@ -991,8 +980,7 @@ SENDER_Q = 'test-sender-q; {create: always, delete: always}' class SenderTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() diff --git a/qpid/python/qpid/tests/messaging/message.py b/qpid/python/qpid/tests/messaging/message.py index 9272be7fa4..51d7879d55 100644 --- a/qpid/python/qpid/tests/messaging/message.py +++ b/qpid/python/qpid/tests/messaging/message.py @@ -53,8 +53,7 @@ ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' class MessageEchoTests(Base): def setup_connection(self): - return Connection.open(self.broker.host, self.broker.port, - **self.connection_options()) + return Connection.establish(self.broker, **self.connection_options()) def setup_session(self): return self.conn.session() |
