diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-06-16 23:27:54 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-06-16 23:27:54 +0000 |
| commit | 032a0103e4d168f7856fc7cb26a49675a56b1ba6 (patch) | |
| tree | c0093db3cbb267afce6db851f28e0371e87120d5 /qpid/python | |
| parent | 4d9b122efd40ea20ceb4dd9927cb0e0af1a2ed96 (diff) | |
| download | qpid-python-032a0103e4d168f7856fc7cb26a49675a56b1ba6.tar.gz | |
QPID-1143: added buffering, we now only issue one write per assembly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@668345 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/framer.py | 20 | ||||
| -rw-r--r-- | qpid/python/tests/framer.py | 2 |
2 files changed, 20 insertions, 2 deletions
diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py index 27ea3287f0..f6363b2291 100644 --- a/qpid/python/qpid/framer.py +++ b/qpid/python/qpid/framer.py @@ -20,7 +20,7 @@ import struct, socket from exceptions import Closed from packer import Packer -from threading import Lock +from threading import RLock from logging import getLogger raw = getLogger("qpid.io.raw") @@ -75,12 +75,25 @@ class Framer(Packer): def __init__(self, sock): self.sock = sock - self.sock_lock = Lock() + self.sock_lock = RLock() + self._buf = "" def aborted(self): return False def write(self, buf): + self._buf += buf + + def flush(self): + self.sock_lock.acquire() + try: + self._write(self._buf) + self._buf = "" + frm.debug("FLUSHED") + finally: + self.sock_lock.release() + + def _write(self, buf): while buf: try: n = self.sock.send(buf) @@ -120,6 +133,7 @@ class Framer(Packer): self.sock_lock.acquire() try: self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor) + self.flush() finally: self.sock_lock.release() @@ -130,6 +144,8 @@ class Framer(Packer): track = frame.track & 0x0F self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel) self.write(frame.payload) + if frame.isLastSegment() and frame.isLastFrame(): + self.flush() frm.debug("SENT %s", frame) finally: self.sock_lock.release() diff --git a/qpid/python/tests/framer.py b/qpid/python/tests/framer.py index ea2e04e954..05bb467bbe 100644 --- a/qpid/python/tests/framer.py +++ b/qpid/python/tests/framer.py @@ -37,6 +37,7 @@ class FramerTest(TestCase): while True: frame = conn.read_frame() conn.write_frame(frame) + conn.flush() except Closed: pass @@ -60,6 +61,7 @@ class FramerTest(TestCase): c.write_frame(Frame(0, 1, 2, 3, "IS")) c.write_frame(Frame(0, 1, 2, 3, "A")) c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST")) + c.flush() f = c.read_frame() assert f.flags & FIRST_FRM |
