summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-26 00:02:33 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-26 00:02:33 +0000
commit5b98d7bb9d34362eda787829c8c308c8bd279ea1 (patch)
tree7e1ea543dc743d4e38d6f38aab6e1427df6f1c5c /python
parente7b97eac85d94d739cba9d9a324e9c7319271278 (diff)
downloadqpid-python-5b98d7bb9d34362eda787829c8c308c8bd279ea1.tar.gz
split engine into a separate class
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@916499 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging/driver.py224
-rw-r--r--python/qpid/messaging/endpoints.py5
2 files changed, 122 insertions, 107 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index a55ba3c360..10ff97418e 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -295,9 +295,6 @@ class Driver:
self.log_id = "%x" % id(self.connection)
self._lock = self.connection._lock
- self._in = LinkIn()
- self._out = LinkOut()
-
self._selector = Selector.default()
self._attempts = 0
self._hosts = [(self.connection.host, self.connection.port)] + \
@@ -306,56 +303,9 @@ class Driver:
self._retrying = False
self._socket = None
- self.reset()
-
- def reset(self):
- self._closing = False
- self._connected = False
- self._attachments = {}
-
- self._channel_max = 65536
- self._channels = 0
- self._sessions = {}
-
- options = self.connection.options
-
- self.address_cache = Cache(options.get("address_ttl", 60))
-
- self._engine_status = CLOSED
- self._buf = ""
- self._hdr = ""
- self._op_enc = OpEncoder()
- self._seg_enc = SegmentEncoder()
- self._frame_enc = FrameEncoder()
- self._frame_dec = FrameDecoder()
- self._seg_dec = SegmentDecoder()
- self._op_dec = OpDecoder()
self._timeout = None
- self._sasl = sasl.Client()
- if self.connection.username:
- self._sasl.setAttr("username", self.connection.username)
- if self.connection.password:
- self._sasl.setAttr("password", self.connection.password)
- if self.connection.host:
- self._sasl.setAttr("host", self.connection.host)
- self._sasl.setAttr("service", options.get("service", "qpidd"))
- if "min_ssf" in options:
- self._sasl.setAttr("minssf", options["min_ssf"])
- if "max_ssf" in options:
- self._sasl.setAttr("maxssf", options["max_ssf"])
- self._sasl.init()
- self._sasl_encode = False
- self._sasl_decode = False
-
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for snd in ssn.senders:
- snd.linked = False
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.linked = False
+ self.engine = None
@synchronized
def wakeup(self):
@@ -374,7 +324,7 @@ class Driver:
@synchronized
def writing(self):
- return self._socket is not None and self._buf
+ return self._socket is not None and self.engine.pending()
@synchronized
def timing(self):
@@ -386,7 +336,7 @@ class Driver:
data = self._socket.recv(64*1024)
if data:
rawlog.debug("READ[%s]: %r", self.log_id, data)
- self.engine_write(data)
+ self.engine.write(data)
else:
self.close_engine()
except socket.error, e:
@@ -413,18 +363,19 @@ class Driver:
if delay > 0:
log.warn("sleeping %s seconds" % delay)
self._retrying = True
- self.engine_close()
+ self.engine.close()
else:
- self.engine_close(e)
+ self.engine.close(e)
def update_status(self):
- status = self.engine_status()
+ status = self.engine.status()
return getattr(self, "st_%s" % status.lower())()
def st_closed(self):
rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
self._socket.close()
self._socket = None
+ self.engine = None
return True
def st_open(self):
@@ -434,8 +385,8 @@ class Driver:
def writeable(self):
notify = False
try:
- n = self._socket.send(self.engine_peek())
- sent = self.engine_read(n)
+ n = self._socket.send(self.engine.peek())
+ sent = self.engine.read(n)
rawlog.debug("SENT[%s]: %r", self.log_id, sent)
except socket.error, e:
self.close_engine(e)
@@ -449,10 +400,102 @@ class Driver:
self.dispatch()
self.connection._waiter.notifyAll()
- def engine_status(self):
- return self._engine_status
+ def dispatch(self):
+ try:
+ if self._socket is None:
+ if self.connection._connected:
+ self.connect()
+ else:
+ self.engine.dispatch()
+ except:
+ # XXX: Does socket get leaked if this occurs?
+ msg = compat.format_exc()
+ self.connection.error = (msg,)
- def engine_write(self, data):
+ def connect(self):
+ try:
+ # XXX: should make this non blocking
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
+ self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ self.engine = Engine(self.connection)
+ self.engine.open()
+ except socket.error, e:
+ self._host = (self._host + 1) % len(self._hosts)
+ self.close_engine(e)
+
+class Engine:
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.log_id = "%x" % id(self.connection)
+ self._closing = False
+ self._connected = False
+ self._attachments = {}
+
+ self._in = LinkIn()
+ self._out = LinkOut()
+
+ self._channel_max = 65536
+ self._channels = 0
+ self._sessions = {}
+
+ options = self.connection.options
+
+ self.address_cache = Cache(options.get("address_ttl", 60))
+
+ self._status = CLOSED
+ self._buf = ""
+ self._hdr = ""
+ self._op_enc = OpEncoder()
+ self._seg_enc = SegmentEncoder()
+ self._frame_enc = FrameEncoder()
+ self._frame_dec = FrameDecoder()
+ self._seg_dec = SegmentDecoder()
+ self._op_dec = OpDecoder()
+
+ self._sasl = sasl.Client()
+ if self.connection.username:
+ self._sasl.setAttr("username", self.connection.username)
+ if self.connection.password:
+ self._sasl.setAttr("password", self.connection.password)
+ if self.connection.host:
+ self._sasl.setAttr("host", self.connection.host)
+ self._sasl.setAttr("service", options.get("service", "qpidd"))
+ if "min_ssf" in options:
+ self._sasl.setAttr("minssf", options["min_ssf"])
+ if "max_ssf" in options:
+ self._sasl.setAttr("maxssf", options["max_ssf"])
+ self._sasl.init()
+ self._sasl_encode = False
+ self._sasl_decode = False
+
+ def _reset(self):
+ self.connection._transport_connected = False
+
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.linked = False
+
+ def status(self):
+ return self._status
+
+ def write(self, data):
try:
if self._sasl_decode:
data = self._sasl.decode(data)
@@ -472,16 +515,17 @@ class Driver:
self.assign_id(op)
opslog.debug("RCVD[%s]: %r", self.log_id, op)
op.dispatch(self)
- self.engine_dispatch()
+ self.dispatch()
except VersionError, e:
- self.engine_close(e)
+ self.close(e)
except:
- self.engine_close(compat.format_exc())
+ self.close(compat.format_exc())
- def engine_close(self, e=None):
- self.reset()
+ def close(self, e=None):
+ self._reset()
if e:
self.connection.error = (e,)
+ self._status = CLOSED
def assign_id(self, op):
if isinstance(op, Command):
@@ -489,15 +533,15 @@ class Driver:
op.id = sst.received
sst.received += 1
- def engine_pending(self):
+ def pending(self):
return len(self._buf)
- def engine_read(self, n):
+ def read(self, n):
result = self._buf[:n]
self._buf = self._buf[n:]
return result
- def engine_peek(self):
+ def peek(self):
return self._buf
def write_op(self, op):
@@ -543,6 +587,7 @@ class Driver:
def do_connection_open_ok(self, open_ok):
self._connected = True
self._sasl_decode = True
+ self.connection._transport_connected = True
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -558,7 +603,7 @@ class Driver:
# probably the right thing to do
def do_connection_close_ok(self, close_ok):
- self.reset()
+ self.close()
def do_session_attached(self, atc):
pass
@@ -611,19 +656,7 @@ class Driver:
sst.session.error = (ex,)
def dispatch(self):
- try:
- if self._socket is None:
- if self.connection._connected:
- self.connect()
- else:
- self.engine_dispatch()
- except:
- # XXX: Does socket get leaked if this occurs?
- msg = compat.format_exc()
- self.connection.error = (msg,)
-
- def engine_dispatch(self):
- if not self.connection._connected and not self._closing and self._engine_status != CLOSED:
+ if not self.connection._connected and not self._closing and self._status != CLOSED:
self.disconnect()
if self._connected and not self._closing:
@@ -631,28 +664,9 @@ class Driver:
self.attach(ssn)
self.process(ssn)
- def connect(self):
- try:
- # XXX: should make this non blocking
- if self._host == 0:
- self._attempts += 1
- host, port = self._hosts[self._host]
- if self._retrying:
- log.warn("trying: %s:%s", host, port)
- self._socket = connect(host, port)
- if self._retrying:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
- self._attempts = 0
- self._host = 0
- self._retrying = False
- self.engine_open()
- except socket.error, e:
- self._host = (self._host + 1) % len(self._hosts)
- self.close_engine(e)
-
- def engine_open(self):
- self._engine_status = OPEN
+ def open(self):
+ self._reset()
+ self._status = OPEN
self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
def disconnect(self):
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 596866de66..004cee5f88 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -94,6 +94,7 @@ class Connection:
self.session_counter = 0
self.sessions = {}
self._connected = False
+ self._transport_connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
self._waiter = Waiter(self._condition)
@@ -157,7 +158,7 @@ class Connection:
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
def _unlinked(self):
@@ -173,7 +174,7 @@ class Connection:
"""
self._connected = False
self._wakeup()
- self._ewait(lambda: not self._driver._connected)
+ self._ewait(lambda: not self._transport_connected)
@synchronized
def connected(self):