summaryrefslogtreecommitdiff
path: root/RC5/python/qpid/framer.py
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
commit38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch)
tree3599403c0c9690898f1e336c009a5564c587c732 /RC5/python/qpid/framer.py
parenta8960649bcd365ef70a5de7812f5910222388a6d (diff)
downloadqpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/python/qpid/framer.py')
-rw-r--r--RC5/python/qpid/framer.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/RC5/python/qpid/framer.py b/RC5/python/qpid/framer.py
new file mode 100644
index 0000000000..f6363b2291
--- /dev/null
+++ b/RC5/python/qpid/framer.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import struct, socket
+from exceptions import Closed
+from packer import Packer
+from threading import RLock
+from logging import getLogger
+
+raw = getLogger("qpid.io.raw")
+frm = getLogger("qpid.io.frm")
+
+FIRST_SEG = 0x08
+LAST_SEG = 0x04
+FIRST_FRM = 0x02
+LAST_FRM = 0x01
+
+class Frame:
+
+ HEADER = "!2BHxBH4x"
+ MAX_PAYLOAD = 65535 - struct.calcsize(HEADER)
+
+ def __init__(self, flags, type, track, channel, payload):
+ if len(payload) > Frame.MAX_PAYLOAD:
+ raise ValueError("max payload size exceeded: %s" % len(payload))
+ self.flags = flags
+ self.type = type
+ self.track = track
+ self.channel = channel
+ self.payload = payload
+
+ def isFirstSegment(self):
+ return bool(FIRST_SEG & self.flags)
+
+ def isLastSegment(self):
+ return bool(LAST_SEG & self.flags)
+
+ def isFirstFrame(self):
+ return bool(FIRST_FRM & self.flags)
+
+ def isLastFrame(self):
+ return bool(LAST_FRM & self.flags)
+
+ def __str__(self):
+ return "%s%s%s%s %s %s %s %r" % (int(self.isFirstSegment()),
+ int(self.isLastSegment()),
+ int(self.isFirstFrame()),
+ int(self.isLastFrame()),
+ self.type,
+ self.track,
+ self.channel,
+ self.payload)
+
+class FramingError(Exception): pass
+
+class Framer(Packer):
+
+ HEADER="!4s4B"
+
+ def __init__(self, sock):
+ self.sock = sock
+ self.sock_lock = RLock()
+ self._buf = ""
+
+ def aborted(self):
+ return False
+
+ def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
+ while buf:
+ try:
+ n = self.sock.send(buf)
+ except socket.timeout:
+ if self.aborted():
+ raise Closed()
+ else:
+ continue
+ raw.debug("SENT %r", buf[:n])
+ buf = buf[n:]
+
+ def read(self, n):
+ data = ""
+ while len(data) < n:
+ try:
+ s = self.sock.recv(n - len(data))
+ except socket.timeout:
+ if self.aborted():
+ raise Closed()
+ else:
+ continue
+ except socket.error, e:
+ if data != "":
+ raise e
+ else:
+ raise Closed()
+ if len(s) == 0:
+ raise Closed()
+ data += s
+ raw.debug("RECV %r", s)
+ return data
+
+ def read_header(self):
+ return self.unpack(Framer.HEADER)
+
+ def write_header(self, major, minor):
+ self.sock_lock.acquire()
+ try:
+ self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
+ finally:
+ self.sock_lock.release()
+
+ def write_frame(self, frame):
+ self.sock_lock.acquire()
+ try:
+ size = len(frame.payload) + struct.calcsize(Frame.HEADER)
+ track = frame.track & 0x0F
+ self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
+ self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
+ frm.debug("SENT %s", frame)
+ finally:
+ self.sock_lock.release()
+
+ def read_frame(self):
+ flags, type, size, track, channel = self.unpack(Frame.HEADER)
+ if flags & 0xF0: raise FramingError()
+ payload = self.read(size - struct.calcsize(Frame.HEADER))
+ frame = Frame(flags, type, track, channel, payload)
+ frm.debug("RECV %s", frame)
+ return frame