summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/framer.py20
-rw-r--r--qpid/python/tests/framer.py2
2 files changed, 20 insertions, 2 deletions
diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py
index 27ea3287f0..f6363b2291 100644
--- a/qpid/python/qpid/framer.py
+++ b/qpid/python/qpid/framer.py
@@ -20,7 +20,7 @@
import struct, socket
from exceptions import Closed
from packer import Packer
-from threading import Lock
+from threading import RLock
from logging import getLogger
raw = getLogger("qpid.io.raw")
@@ -75,12 +75,25 @@ class Framer(Packer):
def __init__(self, sock):
self.sock = sock
- self.sock_lock = Lock()
+ self.sock_lock = RLock()
+ self._buf = ""
def aborted(self):
return False
def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
while buf:
try:
n = self.sock.send(buf)
@@ -120,6 +133,7 @@ class Framer(Packer):
self.sock_lock.acquire()
try:
self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
finally:
self.sock_lock.release()
@@ -130,6 +144,8 @@ class Framer(Packer):
track = frame.track & 0x0F
self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
frm.debug("SENT %s", frame)
finally:
self.sock_lock.release()
diff --git a/qpid/python/tests/framer.py b/qpid/python/tests/framer.py
index ea2e04e954..05bb467bbe 100644
--- a/qpid/python/tests/framer.py
+++ b/qpid/python/tests/framer.py
@@ -37,6 +37,7 @@ class FramerTest(TestCase):
while True:
frame = conn.read_frame()
conn.write_frame(frame)
+ conn.flush()
except Closed:
pass
@@ -60,6 +61,7 @@ class FramerTest(TestCase):
c.write_frame(Frame(0, 1, 2, 3, "IS"))
c.write_frame(Frame(0, 1, 2, 3, "A"))
c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST"))
+ c.flush()
f = c.read_frame()
assert f.flags & FIRST_FRM