summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/framer.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 27ea3287f0..f6363b2291 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -20,7 +20,7 @@
import struct, socket
from exceptions import Closed
from packer import Packer
-from threading import Lock
+from threading import RLock
from logging import getLogger
raw = getLogger("qpid.io.raw")
@@ -75,12 +75,25 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
- self.sock_lock = Lock()
+ self.sock_lock = RLock()
+ self._buf = ""
def aborted(self):
return False
def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
while buf:
try:
n = self.sock.send(buf)
@@ -120,6 +133,7 @@ class Framer(Packer):
self.sock_lock.acquire()
try:
self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
finally:
self.sock_lock.release()
@@ -130,6 +144,8 @@ class Framer(Packer):
track = frame.track & 0x0F
self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
frm.debug("SENT %s", frame)
finally:
self.sock_lock.release()