diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-09 10:54:07 +0000 |
| commit | df0f59227e5947aa4620e8c319823e96c5796234 (patch) | |
| tree | 8455f285068e911137d753f7c390781432032373 /python/qpid/messaging | |
| parent | 7a0d795940c5c68383b0224740f085e8e6b4e60b (diff) | |
| download | qpid-python-df0f59227e5947aa4620e8c319823e96c5796234.tar.gz | |
Changes to connection lifecycle methods and Connection parameters:
- Connection.open -> Connection.establish
- Connection.connect() split into Connection.open(), Connection.attach()
- Connection.disconnect() -> Connection.detach()
- reconnect_hosts -> reconnect_urls
- transport now takes tcp, ssl, and tcp+tls
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932352 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
| -rw-r--r-- | python/qpid/messaging/driver.py | 25 | ||||
| -rw-r--r-- | python/qpid/messaging/endpoints.py | 149 | ||||
| -rw-r--r-- | python/qpid/messaging/exceptions.py | 4 | ||||
| -rw-r--r-- | python/qpid/messaging/transports.py | 9 | ||||
| -rw-r--r-- | python/qpid/messaging/util.py | 20 |
5 files changed, 140 insertions, 67 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 7b165ec94b..cf8498794b 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -324,9 +324,10 @@ class Driver: self._selector = Selector.default() self._attempts = 0 self._delay = self.connection.reconnect_interval_min + urls = [URL(u) for u in self.connection.reconnect_urls] self._hosts = [(self.connection.host, self.connection.port)] + \ - self.connection.reconnect_hosts - self._reconnect_log = self.connection.options.get("reconnect_log", True) + [(u.host, u.port) for u in urls] + self._reconnect_log = self.connection.reconnect_log self._host = 0 self._retrying = False self._transport = None @@ -463,7 +464,7 @@ class Driver: self.engine = Engine(self.connection) self.engine.open() rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - trans = getattr(transports, self.connection.transport, None) + trans = transports.TRANSPORTS.get(self.connection.transport) if trans: self._transport = trans(host, port) else: @@ -507,9 +508,7 @@ class Engine: self._channels = 0 self._sessions = {} - options = self.connection.options - - self.address_cache = Cache(options.get("address_ttl", 60)) + self.address_cache = Cache(self.connection.address_ttl) self._status = CLOSED self._buf = "" @@ -528,11 +527,11 @@ class Engine: self._sasl.setAttr("password", self.connection.password) if self.connection.host: self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", options.get("service", "qpidd")) - if "min_ssf" in options: - self._sasl.setAttr("minssf", options["min_ssf"]) - if "max_ssf" in options: - self._sasl.setAttr("maxssf", options["max_ssf"]) + self._sasl.setAttr("service", self.connection.sasl_service) + if self.connection.sasl_min_ssf is not None: + self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) + if self.connection.sasl_max_ssf is not None: + self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) self._sasl.init() self._sasl_encode = False self._sasl_decode = False @@ -619,8 +618,8 @@ class Engine: (cli_major, cli_minor, major, minor)) def do_connection_start(self, start): - if self.connection.mechanisms: - permitted = self.connection.mechanisms.split() + if self.connection.sasl_mechanisms: + permitted = self.connection.sasl_mechanisms.split() mechs = [m for m in start.mechanisms if m in permitted] else: mechs = start.mechanisms diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 7ac3881bac..17807f20d2 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -36,7 +36,7 @@ from qpid.messaging.constants import * from qpid.messaging.exceptions import * from qpid.messaging.message import * from qpid.ops import PRIMITIVE -from qpid.util import default +from qpid.util import default, URL from threading import Thread, RLock log = getLogger("qpid.messaging") @@ -51,61 +51,110 @@ class Connection: """ @static - def open(host, port=None, username="guest", password="guest", **options): + def establish(url=None, **options): """ - Creates an AMQP connection and connects it to the given host and port. - - @type host: str - @param host: the name or ip address of the remote host - @type port: int - @param port: the port number of the remote host - @rtype: Connection - @return: a connected Connection + Constructs a L{Connection} with the supplied parameters and opens + it. """ - conn = Connection(host, port, username, password, **options) - conn.connect() + conn = Connection(url, **options) + conn.open() return conn - def __init__(self, host, port=None, username="guest", password="guest", **options): + def __init__(self, url=None, **options): """ Creates a connection. A newly created connection must be connected with the Connection.connect() method before it can be used. + @type url: str + @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] @type host: str - @param host: the name or ip address of the remote host + @param host: the name or ip address of the remote host (overriden by url) @type port: int - @param port: the port number of the remote host + @param port: the port number of the remote host (overriden by url) + @type transport: str + @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) + @type heartbeat: int + @param heartbeat: heartbeat interval in seconds + + @type username: str + @param username: the username for authentication (overriden by url) + @type password: str + @param password: the password for authentication (overriden by url) + + @type sasl_mechanisms: str + @param sasl_mechanisms: space separated list of permitted sasl mechanisms + @type sasl_service: str + @param sasl_service: ??? + @type sasl_min_ssf: ??? + @param sasl_min_ssf: ??? + @type sasl_max_ssf: ??? + @param sasl_max_ssf: ??? + + @type reconnect: bool + @param reconnect: enable/disable automatic reconnect + @type reconnect_timeout: float + @param reconnect_timeout: total time to attempt reconnect + @type reconnect_internal_min: float + @param reconnect_internal_min: minimum interval between reconnect attempts + @type reconnect_internal_max: float + @param reconnect_internal_max: maximum interval between reconnect attempts + @type reconnect_internal: float + @param reconnect_interval: set both min and max reconnect intervals + @type reconnect_limit: int + @param reconnect_limit: limit the total number of reconnect attempts + @type reconnect_urls: list[str] + @param reconnect_urls: list of backup hosts specified as urls + + @type address_ttl: float + @param address_ttl: time until cached address resolution expires + @rtype: Connection @return: a disconnected Connection """ - self.host = host - self.username = username - self.password = password - self.mechanisms = options.get("mechanisms") + if url is None: + url = options.get("host") + if isinstance(url, basestring): + url = URL(url) + self.host = url.host + if url.scheme == url.AMQP: + self.transport = "tcp" + elif url.scheme == url.AMQPS: + self.transport = "ssl" + else: + self.transport = options.get("transport", "tcp") + if self.transport in ("ssl", "tcp+tls"): + self.port = default(url.port, options.get("port", AMQPS_PORT)) + else: + self.port = default(url.port, options.get("port", AMQP_PORT)) self.heartbeat = options.get("heartbeat") + self.username = default(url.user, options.get("username", "guest")) + self.password = default(url.password, options.get("password", "guest")) + + self.sasl_mechanisms = options.get("sasl_mechanisms") + self.sasl_service = options.get("sasl_service", "qpidd") + self.sasl_min_ssf = options.get("sasl_min_ssf") + self.sasl_max_ssf = options.get("sasl_max_ssf") + self.reconnect = options.get("reconnect", False) self.reconnect_timeout = options.get("reconnect_timeout") - if "reconnect_interval_min" in options: - self.reconnect_interval_min = options["reconnect_interval_min"] - else: - self.reconnect_interval_min = options.get("reconnect_interval", 1) - if "reconnect_interval_max" in options: - self.reconnect_interval_max = options["reconnect_interval_max"] - else: - self.reconnect_interval_max = options.get("reconnect_interval", 2*60) + reconnect_interval = options.get("reconnect_interval") + self.reconnect_interval_min = options.get("reconnect_interval_min", + default(reconnect_interval, 1)) + self.reconnect_interval_max = options.get("reconnect_interval_max", + default(reconnect_interval, 2*60)) self.reconnect_limit = options.get("reconnect_limit") - self.reconnect_hosts = options.get("reconnect_hosts", []) - self.transport = options.get("transport", "plain") + self.reconnect_urls = options.get("reconnect_urls", []) + self.reconnect_log = options.get("reconnect_log", True) + + self.address_ttl = options.get("address_ttl", 60) + self.options = options - if self.transport == "tls": - self.port = default(port, AMQPS_PORT) - else: - self.port = default(port, AMQP_PORT) self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} + self._open = False self._connected = False self._transport_connected = False self._lock = RLock() @@ -164,9 +213,26 @@ class Connection: del self.sessions[ssn.name] @synchronized - def connect(self): + def open(self): + """ + Opens a connection. + """ + if self._open: + raise ConnectionError("already open") + self._open = True + self.attach() + + @synchronized + def opened(self): + """ + Return true if the connection is open, false otherwise. + """ + return self._open + + @synchronized + def attach(self): """ - Connect to the remote endpoint. + Attach to the remote endpoint. """ self._connected = True self._driver.start() @@ -181,9 +247,9 @@ class Connection: if not (l.linked or l.error or l.closed)] @synchronized - def disconnect(self): + def detach(self): """ - Disconnect from the remote endpoint. + Detach from the remote endpoint. """ self._connected = False self._wakeup() @@ -192,9 +258,9 @@ class Connection: self._condition.gc() @synchronized - def connected(self): + def attached(self): """ - Return true if the connection is connected, false otherwise. + Return true if the connection is attached, false otherwise. """ return self._connected @@ -207,7 +273,8 @@ class Connection: for ssn in self.sessions.values(): ssn.close() finally: - self.disconnect() + self.detach() + self._open = False class Session: @@ -680,7 +747,7 @@ class Sender: """ if not self.session.connection._connected or self.session.closing: - raise Disconnected() + raise Detached() self._ewait(lambda: self.linked) diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py index 5c8bdedc26..10ad529806 100644 --- a/python/qpid/messaging/exceptions.py +++ b/python/qpid/messaging/exceptions.py @@ -33,10 +33,10 @@ class ConnectError(ConnectionError): class SessionError(Exception): pass -class Disconnected(SessionError): +class Detached(SessionError): """ Exception raised when an operation is attempted that is illegal when - disconnected. + detached. """ pass diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py index 1dea469fe5..8133a45604 100644 --- a/python/qpid/messaging/transports.py +++ b/python/qpid/messaging/transports.py @@ -19,7 +19,9 @@ from qpid.util import connect -class plain: +TRANSPORTS = {} + +class tcp: def __init__(self, host, port): self.socket = connect(host, port) @@ -42,6 +44,8 @@ class plain: def close(self): self.socket.close() +TRANSPORTS["tcp"] = tcp + try: from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ SSL_ERROR_WANT_WRITE @@ -105,3 +109,6 @@ else: self.socket.setblocking(1) # this closes the underlying socket self.tls.close() + + TRANSPORTS["ssl"] = tls + TRANSPORTS["tcp+tls"] = tls diff --git a/python/qpid/messaging/util.py b/python/qpid/messaging/util.py index 30b54f3ad4..42bc280454 100644 --- a/python/qpid/messaging/util.py +++ b/python/qpid/messaging/util.py @@ -26,31 +26,31 @@ from threading import Thread log = getLogger("qpid.messaging.util") -def auto_fetch_reconnect_hosts(conn): - ssn = conn.session("auto-fetch-reconnect-hosts") +def auto_fetch_reconnect_urls(conn): + ssn = conn.session("auto-fetch-reconnect-urls") rcv = ssn.receiver("amq.failover") rcv.capacity = 10 def main(): while True: msg = rcv.fetch() - set_reconnect_hosts(conn, msg) + set_reconnect_urls(conn, msg) ssn.acknowledge(msg, sync=False) - thread = Thread(name="auto-fetch-reconnect-hosts", target=main) + thread = Thread(name="auto-fetch-reconnect-urls", target=main) thread.setDaemon(True) thread.start() -def set_reconnect_hosts(conn, msg): - reconnect_hosts = [] +def set_reconnect_urls(conn, msg): + reconnect_urls = [] urls = msg.properties["amq.failover"] for u in urls: if u.startswith("amqp:tcp:"): parts = u.split(":") host, port = parts[2:4] - reconnect_hosts.append((host, port)) - conn.reconnect_hosts = reconnect_hosts - log.warn("set reconnect_hosts for conn %s: %s", conn, reconnect_hosts) + reconnect_urls.append("%s:%s" % (host, port)) + conn.reconnect_urls = reconnect_urls + log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls) -__all__ = ["auto_fetch_reconnect_hosts", "set_reconnect_hosts"] +__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"] |
