summaryrefslogtreecommitdiff
path: root/python/qpid/framer.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-06-16 23:27:54 +0000
committerRafael H. Schloming <rhs@apache.org>2008-06-16 23:27:54 +0000
commit64ecb4252be0f5cf84373bc78f833aad0d998095 (patch)
tree52eb3e2824d3b7eb2dc7dc234107adad08330080 /python/qpid/framer.py
parent3f23de2976509220a588be4a69469c9ee70e0789 (diff)
downloadqpid-python-64ecb4252be0f5cf84373bc78f833aad0d998095.tar.gz
QPID-1143: added buffering, we now only issue one write per assembly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@668345 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/framer.py')
-rw-r--r--python/qpid/framer.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 27ea3287f0..f6363b2291 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -20,7 +20,7 @@
import struct, socket
from exceptions import Closed
from packer import Packer
-from threading import Lock
+from threading import RLock
from logging import getLogger
raw = getLogger("qpid.io.raw")
@@ -75,12 +75,25 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
- self.sock_lock = Lock()
+ self.sock_lock = RLock()
+ self._buf = ""
def aborted(self):
return False
def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
while buf:
try:
n = self.sock.send(buf)
@@ -120,6 +133,7 @@ class Framer(Packer):
self.sock_lock.acquire()
try:
self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
finally:
self.sock_lock.release()
@@ -130,6 +144,8 @@ class Framer(Packer):
track = frame.track & 0x0F
self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
frm.debug("SENT %s", frame)
finally:
self.sock_lock.release()