summaryrefslogtreecommitdiff
path: root/RC5/python/qpid/connection.py
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
commit38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch)
tree3599403c0c9690898f1e336c009a5564c587c732 /RC5/python/qpid/connection.py
parenta8960649bcd365ef70a5de7812f5910222388a6d (diff)
downloadqpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/python/qpid/connection.py')
-rw-r--r--RC5/python/qpid/connection.py218
1 files changed, 218 insertions, 0 deletions
diff --git a/RC5/python/qpid/connection.py b/RC5/python/qpid/connection.py
new file mode 100644
index 0000000000..4c9c02822a
--- /dev/null
+++ b/RC5/python/qpid/connection.py
@@ -0,0 +1,218 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datatypes, session, socket
+from threading import Thread, Condition, RLock
+from util import wait, notify
+from assembler import Assembler, Segment
+from codec010 import StringCodec
+from session import Session
+from invoker import Invoker
+from spec010 import Control, Command, load
+from spec import default
+from exceptions import *
+from logging import getLogger
+import delegates
+
+class ChannelBusy(Exception): pass
+
+class ChannelsBusy(Exception): pass
+
+class SessionBusy(Exception): pass
+
+class ConnectionFailed(Exception): pass
+
+def client(*args, **kwargs):
+ return delegates.Client(*args, **kwargs)
+
+def server(*args, **kwargs):
+ return delegates.Server(*args, **kwargs)
+
+class SSLWrapper:
+
+ def __init__(self, ssl):
+ self.ssl = ssl
+
+ def recv(self, n):
+ return self.ssl.read(n)
+
+ def send(self, s):
+ return self.ssl.write(s)
+
+def sslwrap(sock):
+ if isinstance(sock, socket.SSLType):
+ return SSLWrapper(sock)
+ else:
+ return sock
+
+class Connection(Assembler):
+
+ def __init__(self, sock, spec=None, delegate=client, **args):
+ Assembler.__init__(self, sslwrap(sock))
+ if spec == None:
+ spec = load(default())
+ self.spec = spec
+ self.track = self.spec["track"]
+
+ self.lock = RLock()
+ self.attached = {}
+ self.sessions = {}
+
+ self.condition = Condition()
+ self.opened = False
+ self.failed = False
+ self.close_code = (None, "connection aborted")
+
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+
+ self.channel_max = 65535
+
+ self.delegate = delegate(self, **args)
+
+ def attach(self, name, ch, delegate, force=False):
+ self.lock.acquire()
+ try:
+ ssn = self.attached.get(ch.id)
+ if ssn is not None:
+ if ssn.name != name:
+ raise ChannelBusy(ch, ssn)
+ else:
+ ssn = self.sessions.get(name)
+ if ssn is None:
+ ssn = Session(name, self.spec, delegate=delegate)
+ self.sessions[name] = ssn
+ elif ssn.channel is not None:
+ if force:
+ del self.attached[ssn.channel.id]
+ ssn.channel = None
+ else:
+ raise SessionBusy(ssn)
+ self.attached[ch.id] = ssn
+ ssn.channel = ch
+ ch.session = ssn
+ return ssn
+ finally:
+ self.lock.release()
+
+ def detach(self, name, ch):
+ self.lock.acquire()
+ try:
+ self.attached.pop(ch.id, None)
+ ssn = self.sessions.pop(name, None)
+ if ssn is not None:
+ ssn.channel = None
+ ssn.closed()
+ return ssn
+ finally:
+ self.lock.release()
+
+ def __channel(self):
+ # XXX: ch 0?
+ for i in xrange(self.channel_max):
+ if not self.attached.has_key(i):
+ return i
+ else:
+ raise ChannelsBusy()
+
+ def session(self, name, timeout=None, delegate=session.client):
+ self.lock.acquire()
+ try:
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
+ ssn.channel.session_attach(name)
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
+ return ssn
+ else:
+ self.detach(name, ch)
+ raise Timeout()
+ finally:
+ self.lock.release()
+
+ def detach_all(self):
+ self.lock.acquire()
+ try:
+ for ssn in self.attached.values():
+ if self.close_code[0] != 200:
+ ssn.exceptions.append(self.close_code)
+ self.detach(ssn.name, ssn.channel)
+ finally:
+ self.lock.release()
+
+ def start(self, timeout=None):
+ self.delegate.start()
+ self.thread.start()
+ if not wait(self.condition, lambda: self.opened or self.failed, timeout):
+ raise Timeout()
+ if self.failed:
+ raise ConnectionFailed(*self.close_code)
+
+ def run(self):
+ # XXX: we don't really have a good way to exit this loop without
+ # getting the other end to kill the socket
+ while True:
+ try:
+ seg = self.read_segment()
+ except Closed:
+ self.detach_all()
+ break
+ self.delegate.received(seg)
+
+ def close(self, timeout=None):
+ if not self.opened: return
+ Channel(self, 0).connection_close(200)
+ if not wait(self.condition, lambda: not self.opened, timeout):
+ raise Timeout()
+ self.thread.join(timeout=timeout)
+
+ def __str__(self):
+ return "%s:%s" % self.sock.getsockname()
+
+ def __repr__(self):
+ return str(self)
+
+log = getLogger("qpid.io.ctl")
+
+class Channel(Invoker):
+
+ def __init__(self, connection, id):
+ self.connection = connection
+ self.id = id
+ self.session = None
+
+ def resolve_method(self, name):
+ inst = self.connection.spec.instructions.get(name)
+ if inst is not None and isinstance(inst, Control):
+ return self.METHOD, inst
+ else:
+ return self.ERROR, None
+
+ def invoke(self, type, args, kwargs):
+ ctl = type.new(args, kwargs)
+ sc = StringCodec(self.connection.spec)
+ sc.write_control(ctl)
+ self.connection.write_segment(Segment(True, True, type.segment_type,
+ type.track, self.id, sc.encoded))
+ log.debug("SENT %s", ctl)
+
+ def __str__(self):
+ return "%s[%s]" % (self.connection, self.id)
+
+ def __repr__(self):
+ return str(self)