diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-22 12:52:40 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-22 12:52:40 +0000 |
| commit | 21f855c232dcb20c0d00fc2770d01581f8e2478b (patch) | |
| tree | cbe8e033291147ac85f1c876a3451f1d928983fd /python | |
| parent | 359c22a5969a0f79b6e5673ba40e441b847c22ad (diff) | |
| download | qpid-python-21f855c232dcb20c0d00fc2770d01581f8e2478b.tar.gz | |
start to split engine logic from transport logic
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@912551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/messaging/driver.py | 105 |
1 files changed, 57 insertions, 48 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index a09686badd..0569d31f2a 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -377,66 +377,26 @@ class Driver: @synchronized def readable(self): - error = None - recoverable = False try: data = self._socket.recv(64*1024) if data: rawlog.debug("READ[%s]: %r", self.log_id, data) - if self._sasl_decode: - data = self._sasl.decode(data) + self.engine_write(data) else: - rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername()) - error = "connection aborted" - recoverable = True + rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername()) + self.engine_close() except socket.error, e: - error = e - recoverable = True - - if not error: - try: - if len(self._hdr) < 8: - r = 8 - len(self._hdr) - self._hdr += data[:r] - data = data[r:] - - if len(self._hdr) == 8: - self.do_header(self._hdr) - - self._frame_dec.write(data) - self._seg_dec.write(*self._frame_dec.read()) - self._op_dec.write(*self._seg_dec.read()) - for op in self._op_dec.read(): - self.assign_id(op) - opslog.debug("RCVD[%s]: %r", self.log_id, op) - op.dispatch(self) - except VersionError, e: - error = e - except: - msg = compat.format_exc() - error = msg - - if error: - self._error(error, recoverable) - else: - self.dispatch() - + self.engine_close(e) self.connection._waiter.notifyAll() - def assign_id(self, op): - if isinstance(op, Command): - sst = self.get_sst(op) - op.id = sst.received - sst.received += 1 - @synchronized def writeable(self): try: - n = self._socket.send(self._buf) - rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n]) - self._buf = self._buf[n:] + n = self._socket.send(self.engine_peek()) + sent = self.engine_read(n) + rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: - self._error(e, True) + self.engine_close(e) self.connection._waiter.notifyAll() @synchronized @@ -444,6 +404,55 @@ class Driver: self.dispatch() self.connection._waiter.notifyAll() + def engine_write(self, data): + try: + if self._sasl_decode: + data = self._sasl.decode(data) + + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + opslog.debug("RCVD[%s]: %r", self.log_id, op) + op.dispatch(self) + self.dispatch() + except VersionError, e: + self._error(e, False) + except: + self._error(compat.format_exc(), False) + + def engine_close(self, e=None): + if e is None: + self._error("connection aborted", True) + else: + self._error(e, True) + + def assign_id(self, op): + if isinstance(op, Command): + sst = self.get_sst(op) + op.id = sst.received + sst.received += 1 + + def engine_pending(self): + return len(self._buf) + + def engine_read(self, n): + result = self._buf[:n] + self._buf = self._buf[n:] + return result + + def engine_peek(self): + return self._buf + def _error(self, err, recoverable): if self._socket is not None: self._socket.close() |
