diff options
Diffstat (limited to 'python/qpid/connection.py')
-rw-r--r-- | python/qpid/connection.py | 236 |
1 files changed, 0 insertions, 236 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py deleted file mode 100644 index 7dbefb8778..0000000000 --- a/python/qpid/connection.py +++ /dev/null @@ -1,236 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import datatypes, session -from threading import Thread, Condition, RLock -from util import wait, notify -from codec010 import StringCodec -from framing import * -from session import Session -from generator import control_invoker -from exceptions import * -from logging import getLogger -import delegates, socket - -class ChannelBusy(Exception): pass - -class ChannelsBusy(Exception): pass - -class SessionBusy(Exception): pass - -class ConnectionFailed(Exception): pass - -def client(*args, **kwargs): - return delegates.Client(*args, **kwargs) - -def server(*args, **kwargs): - return delegates.Server(*args, **kwargs) - -from framer import Framer - -class Connection(Framer): - - def __init__(self, sock, delegate=client, **args): - Framer.__init__(self, sock) - self.lock = RLock() - self.attached = {} - self.sessions = {} - - self.condition = Condition() - # XXX: we should combine this into a single comprehensive state - # model (whatever that means) - self.opened = False - self.failed = False - self.closed = False - self.close_code = (None, "connection aborted") - - self.thread = Thread(target=self.run) - self.thread.setDaemon(True) - - self.channel_max = 65535 - self.user_id = None - - self.op_enc = OpEncoder() - self.seg_enc = SegmentEncoder() - self.frame_enc = FrameEncoder() - - self.delegate = delegate(self, **args) - - def attach(self, name, ch, delegate, force=False): - self.lock.acquire() - try: - ssn = self.attached.get(ch.id) - if ssn is not None: - if ssn.name != name: - raise ChannelBusy(ch, ssn) - else: - ssn = self.sessions.get(name) - if ssn is None: - ssn = Session(name, delegate=delegate) - self.sessions[name] = ssn - elif ssn.channel is not None: - if force: - del self.attached[ssn.channel.id] - ssn.channel = None - else: - raise SessionBusy(ssn) - self.attached[ch.id] = ssn - ssn.channel = ch - ch.session = ssn - return ssn - finally: - self.lock.release() - - def detach(self, name, ch): - self.lock.acquire() - try: - self.attached.pop(ch.id, None) - ssn = self.sessions.pop(name, None) - if ssn is not None: - ssn.channel = None - ssn.closed() - return ssn - finally: - self.lock.release() - - def __channel(self): - for i in xrange(1, self.channel_max): - if not self.attached.has_key(i): - return i - else: - raise ChannelsBusy() - - def session(self, name, timeout=None, delegate=session.client): - self.lock.acquire() - try: - ch = Channel(self, self.__channel()) - ssn = self.attach(name, ch, delegate) - ssn.channel.session_attach(name) - if wait(ssn.condition, lambda: ssn.channel is not None, timeout): - return ssn - else: - self.detach(name, ch) - raise Timeout() - finally: - self.lock.release() - - def detach_all(self): - self.lock.acquire() - self.failed = True - try: - for ssn in self.attached.values(): - if self.close_code[0] != 200: - ssn.exceptions.append(self.close_code) - self.detach(ssn.name, ssn.channel) - finally: - self.lock.release() - - def start(self, timeout=None): - self.delegate.start() - self.thread.start() - if not wait(self.condition, lambda: self.opened or self.failed, timeout): - self.thread.join() - raise Timeout() - if self.failed: - self.thread.join() - raise ConnectionFailed(*self.close_code) - - def run(self): - frame_dec = FrameDecoder() - seg_dec = SegmentDecoder() - op_dec = OpDecoder() - - while not self.closed: - try: - data = self.sock.recv(64*1024) - if self.security_layer_rx and data: - status, data = self.security_layer_rx.decode(data) - if not data: - self.detach_all() - break - except socket.timeout: - if self.aborted(): - self.close_code = (None, "connection timed out") - self.detach_all() - break - else: - continue - except socket.error, e: - self.close_code = (None, str(e)) - self.detach_all() - break - frame_dec.write(data) - seg_dec.write(*frame_dec.read()) - op_dec.write(*seg_dec.read()) - for op in op_dec.read(): - try: - self.delegate.received(op) - except Closed, e: - self.close_code = (None, str(e)) - if not self.opened: - self.failed = True - self.closed = True - notify(self.condition) - self.sock.close() - - def write_op(self, op): - self.sock_lock.acquire() - try: - self.op_enc.write(op) - self.seg_enc.write(*self.op_enc.read()) - self.frame_enc.write(*self.seg_enc.read()) - bytes = self.frame_enc.read() - self.write(bytes) - self.flush() - finally: - self.sock_lock.release() - - def close(self, timeout=None): - if not self.opened: return - Channel(self, 0).connection_close(200) - if not wait(self.condition, lambda: not self.opened, timeout): - raise Timeout() - self.thread.join(timeout=timeout) - - def __str__(self): - return "%s:%s" % self.sock.getsockname() - - def __repr__(self): - return str(self) - -log = getLogger("qpid.io.ctl") - -class Channel(control_invoker()): - - def __init__(self, connection, id): - self.connection = connection - self.id = id - self.session = None - - def invoke(self, op, args, kwargs): - ctl = op(*args, **kwargs) - ctl.channel = self.id - self.connection.write_op(ctl) - log.debug("SENT %s", ctl) - - def __str__(self): - return "%s[%s]" % (self.connection, self.id) - - def __repr__(self): - return str(self) |