summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-22 12:52:40 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-22 12:52:40 +0000
commit21f855c232dcb20c0d00fc2770d01581f8e2478b (patch)
treecbe8e033291147ac85f1c876a3451f1d928983fd /python
parent359c22a5969a0f79b6e5673ba40e441b847c22ad (diff)
downloadqpid-python-21f855c232dcb20c0d00fc2770d01581f8e2478b.tar.gz
start to split engine logic from transport logic
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@912551 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py105
1 files changed, 57 insertions, 48 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index a09686badd..0569d31f2a 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -377,66 +377,26 @@ class Driver:
@synchronized
def readable(self):
- error = None
- recoverable = False
try:
data = self._socket.recv(64*1024)
if data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
- if self._sasl_decode:
- data = self._sasl.decode(data)
+ self.engine_write(data)
else:
- rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
- error = "connection aborted"
- recoverable = True
+ rawlog.debug("CLOSED[%s]: %s", self.log_id, self._socket.getpeername())
+ self.engine_close()
except socket.error, e:
- error = e
- recoverable = True
-
- if not error:
- try:
- if len(self._hdr) < 8:
- r = 8 - len(self._hdr)
- self._hdr += data[:r]
- data = data[r:]
-
- if len(self._hdr) == 8:
- self.do_header(self._hdr)
-
- self._frame_dec.write(data)
- self._seg_dec.write(*self._frame_dec.read())
- self._op_dec.write(*self._seg_dec.read())
- for op in self._op_dec.read():
- self.assign_id(op)
- opslog.debug("RCVD[%s]: %r", self.log_id, op)
- op.dispatch(self)
- except VersionError, e:
- error = e
- except:
- msg = compat.format_exc()
- error = msg
-
- if error:
- self._error(error, recoverable)
- else:
- self.dispatch()
-
+ self.engine_close(e)
self.connection._waiter.notifyAll()
- def assign_id(self, op):
- if isinstance(op, Command):
- sst = self.get_sst(op)
- op.id = sst.received
- sst.received += 1
-
@synchronized
def writeable(self):
try:
- n = self._socket.send(self._buf)
- rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
- self._buf = self._buf[n:]
+ n = self._socket.send(self.engine_peek())
+ sent = self.engine_read(n)
+ rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
- self._error(e, True)
+ self.engine_close(e)
self.connection._waiter.notifyAll()
@synchronized
@@ -444,6 +404,55 @@ class Driver:
self.dispatch()
self.connection._waiter.notifyAll()
+ def engine_write(self, data):
+ try:
+ if self._sasl_decode:
+ data = self._sasl.decode(data)
+
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ opslog.debug("RCVD[%s]: %r", self.log_id, op)
+ op.dispatch(self)
+ self.dispatch()
+ except VersionError, e:
+ self._error(e, False)
+ except:
+ self._error(compat.format_exc(), False)
+
+ def engine_close(self, e=None):
+ if e is None:
+ self._error("connection aborted", True)
+ else:
+ self._error(e, True)
+
+ def assign_id(self, op):
+ if isinstance(op, Command):
+ sst = self.get_sst(op)
+ op.id = sst.received
+ sst.received += 1
+
+ def engine_pending(self):
+ return len(self._buf)
+
+ def engine_read(self, n):
+ result = self._buf[:n]
+ self._buf = self._buf[n:]
+ return result
+
+ def engine_peek(self):
+ return self._buf
+
def _error(self, err, recoverable):
if self._socket is not None:
self._socket.close()