diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-26 00:02:33 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-26 00:02:33 +0000 |
| commit | 5b98d7bb9d34362eda787829c8c308c8bd279ea1 (patch) | |
| tree | 7e1ea543dc743d4e38d6f38aab6e1427df6f1c5c /python | |
| parent | e7b97eac85d94d739cba9d9a324e9c7319271278 (diff) | |
| download | qpid-python-5b98d7bb9d34362eda787829c8c308c8bd279ea1.tar.gz | |
split engine into a separate class
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@916499 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/messaging/driver.py | 224 | ||||
| -rw-r--r-- | python/qpid/messaging/endpoints.py | 5 |
2 files changed, 122 insertions, 107 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index a55ba3c360..10ff97418e 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -295,9 +295,6 @@ class Driver: self.log_id = "%x" % id(self.connection) self._lock = self.connection._lock - self._in = LinkIn() - self._out = LinkOut() - self._selector = Selector.default() self._attempts = 0 self._hosts = [(self.connection.host, self.connection.port)] + \ @@ -306,56 +303,9 @@ class Driver: self._retrying = False self._socket = None - self.reset() - - def reset(self): - self._closing = False - self._connected = False - self._attachments = {} - - self._channel_max = 65536 - self._channels = 0 - self._sessions = {} - - options = self.connection.options - - self.address_cache = Cache(options.get("address_ttl", 60)) - - self._engine_status = CLOSED - self._buf = "" - self._hdr = "" - self._op_enc = OpEncoder() - self._seg_enc = SegmentEncoder() - self._frame_enc = FrameEncoder() - self._frame_dec = FrameDecoder() - self._seg_dec = SegmentDecoder() - self._op_dec = OpDecoder() self._timeout = None - self._sasl = sasl.Client() - if self.connection.username: - self._sasl.setAttr("username", self.connection.username) - if self.connection.password: - self._sasl.setAttr("password", self.connection.password) - if self.connection.host: - self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", options.get("service", "qpidd")) - if "min_ssf" in options: - self._sasl.setAttr("minssf", options["min_ssf"]) - if "max_ssf" in options: - self._sasl.setAttr("maxssf", options["max_ssf"]) - self._sasl.init() - self._sasl_encode = False - self._sasl_decode = False - - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for snd in ssn.senders: - snd.linked = False - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.linked = False + self.engine = None @synchronized def wakeup(self): @@ -374,7 +324,7 @@ class Driver: @synchronized def writing(self): - return self._socket is not None and self._buf + return self._socket is not None and self.engine.pending() @synchronized def timing(self): @@ -386,7 +336,7 @@ class Driver: data = self._socket.recv(64*1024) if data: rawlog.debug("READ[%s]: %r", self.log_id, data) - self.engine_write(data) + self.engine.write(data) else: self.close_engine() except socket.error, e: @@ -413,18 +363,19 @@ class Driver: if delay > 0: log.warn("sleeping %s seconds" % delay) self._retrying = True - self.engine_close() + self.engine.close() else: - self.engine_close(e) + self.engine.close(e) def update_status(self): - status = self.engine_status() + status = self.engine.status() return getattr(self, "st_%s" % status.lower())() def st_closed(self): rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) self._socket.close() self._socket = None + self.engine = None return True def st_open(self): @@ -434,8 +385,8 @@ class Driver: def writeable(self): notify = False try: - n = self._socket.send(self.engine_peek()) - sent = self.engine_read(n) + n = self._socket.send(self.engine.peek()) + sent = self.engine.read(n) rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: self.close_engine(e) @@ -449,10 +400,102 @@ class Driver: self.dispatch() self.connection._waiter.notifyAll() - def engine_status(self): - return self._engine_status + def dispatch(self): + try: + if self._socket is None: + if self.connection._connected: + self.connect() + else: + self.engine.dispatch() + except: + # XXX: Does socket get leaked if this occurs? + msg = compat.format_exc() + self.connection.error = (msg,) - def engine_write(self, data): + def connect(self): + try: + # XXX: should make this non blocking + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) + self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + self.engine = Engine(self.connection) + self.engine.open() + except socket.error, e: + self._host = (self._host + 1) % len(self._hosts) + self.close_engine(e) + +class Engine: + + def __init__(self, connection): + self.connection = connection + self.log_id = "%x" % id(self.connection) + self._closing = False + self._connected = False + self._attachments = {} + + self._in = LinkIn() + self._out = LinkOut() + + self._channel_max = 65536 + self._channels = 0 + self._sessions = {} + + options = self.connection.options + + self.address_cache = Cache(options.get("address_ttl", 60)) + + self._status = CLOSED + self._buf = "" + self._hdr = "" + self._op_enc = OpEncoder() + self._seg_enc = SegmentEncoder() + self._frame_enc = FrameEncoder() + self._frame_dec = FrameDecoder() + self._seg_dec = SegmentDecoder() + self._op_dec = OpDecoder() + + self._sasl = sasl.Client() + if self.connection.username: + self._sasl.setAttr("username", self.connection.username) + if self.connection.password: + self._sasl.setAttr("password", self.connection.password) + if self.connection.host: + self._sasl.setAttr("host", self.connection.host) + self._sasl.setAttr("service", options.get("service", "qpidd")) + if "min_ssf" in options: + self._sasl.setAttr("minssf", options["min_ssf"]) + if "max_ssf" in options: + self._sasl.setAttr("maxssf", options["max_ssf"]) + self._sasl.init() + self._sasl_encode = False + self._sasl_decode = False + + def _reset(self): + self.connection._transport_connected = False + + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for snd in ssn.senders: + snd.linked = False + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.linked = False + + def status(self): + return self._status + + def write(self, data): try: if self._sasl_decode: data = self._sasl.decode(data) @@ -472,16 +515,17 @@ class Driver: self.assign_id(op) opslog.debug("RCVD[%s]: %r", self.log_id, op) op.dispatch(self) - self.engine_dispatch() + self.dispatch() except VersionError, e: - self.engine_close(e) + self.close(e) except: - self.engine_close(compat.format_exc()) + self.close(compat.format_exc()) - def engine_close(self, e=None): - self.reset() + def close(self, e=None): + self._reset() if e: self.connection.error = (e,) + self._status = CLOSED def assign_id(self, op): if isinstance(op, Command): @@ -489,15 +533,15 @@ class Driver: op.id = sst.received sst.received += 1 - def engine_pending(self): + def pending(self): return len(self._buf) - def engine_read(self, n): + def read(self, n): result = self._buf[:n] self._buf = self._buf[n:] return result - def engine_peek(self): + def peek(self): return self._buf def write_op(self, op): @@ -543,6 +587,7 @@ class Driver: def do_connection_open_ok(self, open_ok): self._connected = True self._sasl_decode = True + self.connection._transport_connected = True def connection_heartbeat(self, hrt): self.write_op(ConnectionHeartbeat()) @@ -558,7 +603,7 @@ class Driver: # probably the right thing to do def do_connection_close_ok(self, close_ok): - self.reset() + self.close() def do_session_attached(self, atc): pass @@ -611,19 +656,7 @@ class Driver: sst.session.error = (ex,) def dispatch(self): - try: - if self._socket is None: - if self.connection._connected: - self.connect() - else: - self.engine_dispatch() - except: - # XXX: Does socket get leaked if this occurs? - msg = compat.format_exc() - self.connection.error = (msg,) - - def engine_dispatch(self): - if not self.connection._connected and not self._closing and self._engine_status != CLOSED: + if not self.connection._connected and not self._closing and self._status != CLOSED: self.disconnect() if self._connected and not self._closing: @@ -631,28 +664,9 @@ class Driver: self.attach(ssn) self.process(ssn) - def connect(self): - try: - # XXX: should make this non blocking - if self._host == 0: - self._attempts += 1 - host, port = self._hosts[self._host] - if self._retrying: - log.warn("trying: %s:%s", host, port) - self._socket = connect(host, port) - if self._retrying: - log.warn("reconnect succeeded: %s:%s", host, port) - self._timeout = None - self._attempts = 0 - self._host = 0 - self._retrying = False - self.engine_open() - except socket.error, e: - self._host = (self._host + 1) % len(self._hosts) - self.close_engine(e) - - def engine_open(self): - self._engine_status = OPEN + def open(self): + self._reset() + self._status = OPEN self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) def disconnect(self): diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 596866de66..004cee5f88 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -94,6 +94,7 @@ class Connection: self.session_counter = 0 self.sessions = {} self._connected = False + self._transport_connected = False self._lock = RLock() self._condition = Condition(self._lock) self._waiter = Waiter(self._condition) @@ -157,7 +158,7 @@ class Connection: """ self._connected = True self._wakeup() - self._ewait(lambda: self._driver._connected and not self._unlinked(), + self._ewait(lambda: self._transport_connected and not self._unlinked(), exc=ConnectError) def _unlinked(self): @@ -173,7 +174,7 @@ class Connection: """ self._connected = False self._wakeup() - self._ewait(lambda: not self._driver._connected) + self._ewait(lambda: not self._transport_connected) @synchronized def connected(self): |
