From d22ac4bbbd52fc8cbf80f864c49c904b0b24a529 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 11 Aug 2009 15:40:19 +0000 Subject: - removed old and redundent tests - removed old test harness in favor of qpid-python-test - modified qpid-python-test to support "skipped" tests, these are tests that failed due to an anticipated environmental reason such as the broker is not running or it is the wrong version - modified the qpid-python-test harness to exit with appropriate error codes based on the test results - modified the python clients to report version mismatches rather than framing errors - made qpid_config provide variables for 0-8, 0-9, and 0-10 versions of the spec - modified the 0-10 client to directly codegen classes - added new 0-10 framing layer based on push parsing rather than pull parsing - added numerous framing tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@803168 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/connection.py | 65 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 17 deletions(-) (limited to 'python/qpid/connection.py') diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 5abab3802c..680f8f62e3 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -20,14 +20,14 @@ import datatypes, session from threading import Thread, Condition, RLock from util import wait, notify -from assembler import Assembler, Segment from codec010 import StringCodec +from framing import * from session import Session from generator import control_invoker from spec import SPEC from exceptions import * from logging import getLogger -import delegates +import delegates, socket class ChannelBusy(Exception): pass @@ -43,12 +43,12 @@ def client(*args, **kwargs): def server(*args, **kwargs): return delegates.Server(*args, **kwargs) -class Connection(Assembler): +from framer import Framer - def __init__(self, sock, spec=SPEC, delegate=client, **args): - Assembler.__init__(self, sock) - self.spec = spec +class Connection(Framer): + def __init__(self, sock, delegate=client, **args): + Framer.__init__(self, sock) self.lock = RLock() self.attached = {} self.sessions = {} @@ -66,6 +66,10 @@ class Connection(Assembler): self.channel_max = 65535 + self.op_enc = OpEncoder() + self.seg_enc = SegmentEncoder() + self.frame_enc = FrameEncoder() + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): @@ -145,15 +149,44 @@ class Connection(Assembler): raise ConnectionFailed(*self.close_code) def run(self): + frame_dec = FrameDecoder() + seg_dec = SegmentDecoder() + op_dec = OpDecoder() + while not self.closed: try: - seg = self.read_segment() - except Closed: + data = self.sock.recv(64*1024) + if not data: + self.detach_all() + break + except socket.timeout: + if self.aborted(): + self.detach_all() + raise Closed("connection timed out") + else: + continue + except socket.error, e: self.detach_all() - break - self.delegate.received(seg) + raise Closed(e) + frame_dec.write(data) + seg_dec.write(*frame_dec.read()) + op_dec.write(*seg_dec.read()) + for op in op_dec.read(): + self.delegate.received(op) self.sock.close() + def write_op(self, op): + self.sock_lock.acquire() + try: + self.op_enc.write(op) + self.seg_enc.write(*self.op_enc.read()) + self.frame_enc.write(*self.seg_enc.read()) + bytes = self.frame_enc.read() + self.write(bytes) + self.flush() + finally: + self.sock_lock.release() + def close(self, timeout=None): if not self.opened: return Channel(self, 0).connection_close(200) @@ -169,19 +202,17 @@ class Connection(Assembler): log = getLogger("qpid.io.ctl") -class Channel(control_invoker(SPEC)): +class Channel(control_invoker()): def __init__(self, connection, id): self.connection = connection self.id = id self.session = None - def invoke(self, type, args, kwargs): - ctl = type.new(args, kwargs) - sc = StringCodec(self.spec) - sc.write_control(ctl) - self.connection.write_segment(Segment(True, True, type.segment_type, - type.track, self.id, sc.encoded)) + def invoke(self, op, args, kwargs): + ctl = op(*args, **kwargs) + ctl.channel = self.id + self.connection.write_op(ctl) log.debug("SENT %s", ctl) def __str__(self): -- cgit v1.2.1