summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-04-09 10:54:07 +0000
committerRafael H. Schloming <rhs@apache.org>2010-04-09 10:54:07 +0000
commitdf0f59227e5947aa4620e8c319823e96c5796234 (patch)
tree8455f285068e911137d753f7c390781432032373 /python/qpid/messaging
parent7a0d795940c5c68383b0224740f085e8e6b4e60b (diff)
downloadqpid-python-df0f59227e5947aa4620e8c319823e96c5796234.tar.gz
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish - Connection.connect() split into Connection.open(), Connection.attach() - Connection.disconnect() -> Connection.detach() - reconnect_hosts -> reconnect_urls - transport now takes tcp, ssl, and tcp+tls git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/driver.py25
-rw-r--r--python/qpid/messaging/endpoints.py149
-rw-r--r--python/qpid/messaging/exceptions.py4
-rw-r--r--python/qpid/messaging/transports.py9
-rw-r--r--python/qpid/messaging/util.py20
5 files changed, 140 insertions, 67 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 7b165ec94b..cf8498794b 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -324,9 +324,10 @@ class Driver:
self._selector = Selector.default()
self._attempts = 0
self._delay = self.connection.reconnect_interval_min
+ urls = [URL(u) for u in self.connection.reconnect_urls]
self._hosts = [(self.connection.host, self.connection.port)] + \
- self.connection.reconnect_hosts
- self._reconnect_log = self.connection.options.get("reconnect_log", True)
+ [(u.host, u.port) for u in urls]
+ self._reconnect_log = self.connection.reconnect_log
self._host = 0
self._retrying = False
self._transport = None
@@ -463,7 +464,7 @@ class Driver:
self.engine = Engine(self.connection)
self.engine.open()
rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
- trans = getattr(transports, self.connection.transport, None)
+ trans = transports.TRANSPORTS.get(self.connection.transport)
if trans:
self._transport = trans(host, port)
else:
@@ -507,9 +508,7 @@ class Engine:
self._channels = 0
self._sessions = {}
- options = self.connection.options
-
- self.address_cache = Cache(options.get("address_ttl", 60))
+ self.address_cache = Cache(self.connection.address_ttl)
self._status = CLOSED
self._buf = ""
@@ -528,11 +527,11 @@ class Engine:
self._sasl.setAttr("password", self.connection.password)
if self.connection.host:
self._sasl.setAttr("host", self.connection.host)
- self._sasl.setAttr("service", options.get("service", "qpidd"))
- if "min_ssf" in options:
- self._sasl.setAttr("minssf", options["min_ssf"])
- if "max_ssf" in options:
- self._sasl.setAttr("maxssf", options["max_ssf"])
+ self._sasl.setAttr("service", self.connection.sasl_service)
+ if self.connection.sasl_min_ssf is not None:
+ self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
+ if self.connection.sasl_max_ssf is not None:
+ self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
self._sasl.init()
self._sasl_encode = False
self._sasl_decode = False
@@ -619,8 +618,8 @@ class Engine:
(cli_major, cli_minor, major, minor))
def do_connection_start(self, start):
- if self.connection.mechanisms:
- permitted = self.connection.mechanisms.split()
+ if self.connection.sasl_mechanisms:
+ permitted = self.connection.sasl_mechanisms.split()
mechs = [m for m in start.mechanisms if m in permitted]
else:
mechs = start.mechanisms
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 7ac3881bac..17807f20d2 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -36,7 +36,7 @@ from qpid.messaging.constants import *
from qpid.messaging.exceptions import *
from qpid.messaging.message import *
from qpid.ops import PRIMITIVE
-from qpid.util import default
+from qpid.util import default, URL
from threading import Thread, RLock
log = getLogger("qpid.messaging")
@@ -51,61 +51,110 @@ class Connection:
"""
@static
- def open(host, port=None, username="guest", password="guest", **options):
+ def establish(url=None, **options):
"""
- Creates an AMQP connection and connects it to the given host and port.
-
- @type host: str
- @param host: the name or ip address of the remote host
- @type port: int
- @param port: the port number of the remote host
- @rtype: Connection
- @return: a connected Connection
+ Constructs a L{Connection} with the supplied parameters and opens
+ it.
"""
- conn = Connection(host, port, username, password, **options)
- conn.connect()
+ conn = Connection(url, **options)
+ conn.open()
return conn
- def __init__(self, host, port=None, username="guest", password="guest", **options):
+ def __init__(self, url=None, **options):
"""
Creates a connection. A newly created connection must be connected
with the Connection.connect() method before it can be used.
+ @type url: str
+ @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
@type host: str
- @param host: the name or ip address of the remote host
+ @param host: the name or ip address of the remote host (overriden by url)
@type port: int
- @param port: the port number of the remote host
+ @param port: the port number of the remote host (overriden by url)
+ @type transport: str
+ @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
+ @type heartbeat: int
+ @param heartbeat: heartbeat interval in seconds
+
+ @type username: str
+ @param username: the username for authentication (overriden by url)
+ @type password: str
+ @param password: the password for authentication (overriden by url)
+
+ @type sasl_mechanisms: str
+ @param sasl_mechanisms: space separated list of permitted sasl mechanisms
+ @type sasl_service: str
+ @param sasl_service: ???
+ @type sasl_min_ssf: ???
+ @param sasl_min_ssf: ???
+ @type sasl_max_ssf: ???
+ @param sasl_max_ssf: ???
+
+ @type reconnect: bool
+ @param reconnect: enable/disable automatic reconnect
+ @type reconnect_timeout: float
+ @param reconnect_timeout: total time to attempt reconnect
+ @type reconnect_internal_min: float
+ @param reconnect_internal_min: minimum interval between reconnect attempts
+ @type reconnect_internal_max: float
+ @param reconnect_internal_max: maximum interval between reconnect attempts
+ @type reconnect_internal: float
+ @param reconnect_interval: set both min and max reconnect intervals
+ @type reconnect_limit: int
+ @param reconnect_limit: limit the total number of reconnect attempts
+ @type reconnect_urls: list[str]
+ @param reconnect_urls: list of backup hosts specified as urls
+
+ @type address_ttl: float
+ @param address_ttl: time until cached address resolution expires
+
@rtype: Connection
@return: a disconnected Connection
"""
- self.host = host
- self.username = username
- self.password = password
- self.mechanisms = options.get("mechanisms")
+ if url is None:
+ url = options.get("host")
+ if isinstance(url, basestring):
+ url = URL(url)
+ self.host = url.host
+ if url.scheme == url.AMQP:
+ self.transport = "tcp"
+ elif url.scheme == url.AMQPS:
+ self.transport = "ssl"
+ else:
+ self.transport = options.get("transport", "tcp")
+ if self.transport in ("ssl", "tcp+tls"):
+ self.port = default(url.port, options.get("port", AMQPS_PORT))
+ else:
+ self.port = default(url.port, options.get("port", AMQP_PORT))
self.heartbeat = options.get("heartbeat")
+ self.username = default(url.user, options.get("username", "guest"))
+ self.password = default(url.password, options.get("password", "guest"))
+
+ self.sasl_mechanisms = options.get("sasl_mechanisms")
+ self.sasl_service = options.get("sasl_service", "qpidd")
+ self.sasl_min_ssf = options.get("sasl_min_ssf")
+ self.sasl_max_ssf = options.get("sasl_max_ssf")
+
self.reconnect = options.get("reconnect", False)
self.reconnect_timeout = options.get("reconnect_timeout")
- if "reconnect_interval_min" in options:
- self.reconnect_interval_min = options["reconnect_interval_min"]
- else:
- self.reconnect_interval_min = options.get("reconnect_interval", 1)
- if "reconnect_interval_max" in options:
- self.reconnect_interval_max = options["reconnect_interval_max"]
- else:
- self.reconnect_interval_max = options.get("reconnect_interval", 2*60)
+ reconnect_interval = options.get("reconnect_interval")
+ self.reconnect_interval_min = options.get("reconnect_interval_min",
+ default(reconnect_interval, 1))
+ self.reconnect_interval_max = options.get("reconnect_interval_max",
+ default(reconnect_interval, 2*60))
self.reconnect_limit = options.get("reconnect_limit")
- self.reconnect_hosts = options.get("reconnect_hosts", [])
- self.transport = options.get("transport", "plain")
+ self.reconnect_urls = options.get("reconnect_urls", [])
+ self.reconnect_log = options.get("reconnect_log", True)
+
+ self.address_ttl = options.get("address_ttl", 60)
+
self.options = options
- if self.transport == "tls":
- self.port = default(port, AMQPS_PORT)
- else:
- self.port = default(port, AMQP_PORT)
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
+ self._open = False
self._connected = False
self._transport_connected = False
self._lock = RLock()
@@ -164,9 +213,26 @@ class Connection:
del self.sessions[ssn.name]
@synchronized
- def connect(self):
+ def open(self):
+ """
+ Opens a connection.
+ """
+ if self._open:
+ raise ConnectionError("already open")
+ self._open = True
+ self.attach()
+
+ @synchronized
+ def opened(self):
+ """
+ Return true if the connection is open, false otherwise.
+ """
+ return self._open
+
+ @synchronized
+ def attach(self):
"""
- Connect to the remote endpoint.
+ Attach to the remote endpoint.
"""
self._connected = True
self._driver.start()
@@ -181,9 +247,9 @@ class Connection:
if not (l.linked or l.error or l.closed)]
@synchronized
- def disconnect(self):
+ def detach(self):
"""
- Disconnect from the remote endpoint.
+ Detach from the remote endpoint.
"""
self._connected = False
self._wakeup()
@@ -192,9 +258,9 @@ class Connection:
self._condition.gc()
@synchronized
- def connected(self):
+ def attached(self):
"""
- Return true if the connection is connected, false otherwise.
+ Return true if the connection is attached, false otherwise.
"""
return self._connected
@@ -207,7 +273,8 @@ class Connection:
for ssn in self.sessions.values():
ssn.close()
finally:
- self.disconnect()
+ self.detach()
+ self._open = False
class Session:
@@ -680,7 +747,7 @@ class Sender:
"""
if not self.session.connection._connected or self.session.closing:
- raise Disconnected()
+ raise Detached()
self._ewait(lambda: self.linked)
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index 5c8bdedc26..10ad529806 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -33,10 +33,10 @@ class ConnectError(ConnectionError):
class SessionError(Exception):
pass
-class Disconnected(SessionError):
+class Detached(SessionError):
"""
Exception raised when an operation is attempted that is illegal when
- disconnected.
+ detached.
"""
pass
diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py
index 1dea469fe5..8133a45604 100644
--- a/python/qpid/messaging/transports.py
+++ b/python/qpid/messaging/transports.py
@@ -19,7 +19,9 @@
from qpid.util import connect
-class plain:
+TRANSPORTS = {}
+
+class tcp:
def __init__(self, host, port):
self.socket = connect(host, port)
@@ -42,6 +44,8 @@ class plain:
def close(self):
self.socket.close()
+TRANSPORTS["tcp"] = tcp
+
try:
from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
SSL_ERROR_WANT_WRITE
@@ -105,3 +109,6 @@ else:
self.socket.setblocking(1)
# this closes the underlying socket
self.tls.close()
+
+ TRANSPORTS["ssl"] = tls
+ TRANSPORTS["tcp+tls"] = tls
diff --git a/python/qpid/messaging/util.py b/python/qpid/messaging/util.py
index 30b54f3ad4..42bc280454 100644
--- a/python/qpid/messaging/util.py
+++ b/python/qpid/messaging/util.py
@@ -26,31 +26,31 @@ from threading import Thread
log = getLogger("qpid.messaging.util")
-def auto_fetch_reconnect_hosts(conn):
- ssn = conn.session("auto-fetch-reconnect-hosts")
+def auto_fetch_reconnect_urls(conn):
+ ssn = conn.session("auto-fetch-reconnect-urls")
rcv = ssn.receiver("amq.failover")
rcv.capacity = 10
def main():
while True:
msg = rcv.fetch()
- set_reconnect_hosts(conn, msg)
+ set_reconnect_urls(conn, msg)
ssn.acknowledge(msg, sync=False)
- thread = Thread(name="auto-fetch-reconnect-hosts", target=main)
+ thread = Thread(name="auto-fetch-reconnect-urls", target=main)
thread.setDaemon(True)
thread.start()
-def set_reconnect_hosts(conn, msg):
- reconnect_hosts = []
+def set_reconnect_urls(conn, msg):
+ reconnect_urls = []
urls = msg.properties["amq.failover"]
for u in urls:
if u.startswith("amqp:tcp:"):
parts = u.split(":")
host, port = parts[2:4]
- reconnect_hosts.append((host, port))
- conn.reconnect_hosts = reconnect_hosts
- log.warn("set reconnect_hosts for conn %s: %s", conn, reconnect_hosts)
+ reconnect_urls.append("%s:%s" % (host, port))
+ conn.reconnect_urls = reconnect_urls
+ log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)
-__all__ = ["auto_fetch_reconnect_hosts", "set_reconnect_hosts"]
+__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"]