summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Kopaczewski <rk@23doors.com>2017-05-22 13:42:31 +0200
committerAsif Saifuddin Auvi <auvipy@users.noreply.github.com>2017-05-22 17:42:31 +0600
commitdbed5a4f8f81b0c09dbcaf6abfc90894238ea5f3 (patch)
tree8375a9103b26b78f243399e151acb94c76b2a88c
parent9e46363da07ae743babc3e4c1f45b0eea9c2da03 (diff)
downloadpy-amqp-dbed5a4f8f81b0c09dbcaf6abfc90894238ea5f3.tar.gz
Drain events should read until message is ready (#144)
* Drain events should read until message is ready * return true if we're done with frame=1 * freeze pydocstyle at 1.1.1
-rw-r--r--amqp/connection.py4
-rw-r--r--amqp/method_framing.py20
-rw-r--r--requirements/pkgutils.txt2
3 files changed, 16 insertions, 10 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index d25306b..f0c4540 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -479,7 +479,9 @@ class Connection(AbstractChannel):
raise NotImplementedError('Use AMQP heartbeats')
def drain_events(self, timeout=None):
- return self.blocking_read(timeout)
+ # read until message is ready
+ while not self.blocking_read(timeout):
+ pass
def blocking_read(self, timeout=None):
with self.transport.having_timeout(timeout):
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index 1d5d799..5ea46f9 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -64,21 +64,24 @@ def frame_handler(connection, callback,
frame_method=method_sig, frame_args=buf,
)
expected_types[channel] = 2
- else:
- callback(channel, method_sig, buf, None)
+ return False
+
+ callback(channel, method_sig, buf, None)
elif frame_type == 2:
msg = partial_messages[channel]
msg.inbound_header(buf)
- if msg.ready:
- # bodyless message, we're done
- expected_types[channel] = 1
- partial_messages.pop(channel, None)
- callback(channel, msg.frame_method, msg.frame_args, msg)
- else:
+ if not msg.ready:
# wait for the content-body
expected_types[channel] = 3
+ return False
+
+ # bodyless message, we're done
+ expected_types[channel] = 1
+ partial_messages.pop(channel, None)
+ callback(channel, msg.frame_method, msg.frame_args, msg)
+
elif frame_type == 3:
msg = partial_messages[channel]
msg.inbound_body(buf)
@@ -89,6 +92,7 @@ def frame_handler(connection, callback,
elif frame_type == 8:
# bytes_recv already updated
pass
+ return True
return on_frame
diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt
index c08306b..e39d1d8 100644
--- a/requirements/pkgutils.txt
+++ b/requirements/pkgutils.txt
@@ -5,4 +5,4 @@ flakeplus>=1.1
tox>=2.3.1
sphinx2rst>=1.0
bumpversion
-pydocstyle
+pydocstyle==1.1.1