diff options
author | Robert Kopaczewski <rk@23doors.com> | 2017-05-22 13:42:31 +0200 |
---|---|---|
committer | Asif Saifuddin Auvi <auvipy@users.noreply.github.com> | 2017-05-22 17:42:31 +0600 |
commit | dbed5a4f8f81b0c09dbcaf6abfc90894238ea5f3 (patch) | |
tree | 8375a9103b26b78f243399e151acb94c76b2a88c | |
parent | 9e46363da07ae743babc3e4c1f45b0eea9c2da03 (diff) | |
download | py-amqp-dbed5a4f8f81b0c09dbcaf6abfc90894238ea5f3.tar.gz |
Drain events should read until message is ready (#144)
* Drain events should read until message is ready
* return true if we're done with frame=1
* freeze pydocstyle at 1.1.1
-rw-r--r-- | amqp/connection.py | 4 | ||||
-rw-r--r-- | amqp/method_framing.py | 20 | ||||
-rw-r--r-- | requirements/pkgutils.txt | 2 |
3 files changed, 16 insertions, 10 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index d25306b..f0c4540 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -479,7 +479,9 @@ class Connection(AbstractChannel): raise NotImplementedError('Use AMQP heartbeats') def drain_events(self, timeout=None): - return self.blocking_read(timeout) + # read until message is ready + while not self.blocking_read(timeout): + pass def blocking_read(self, timeout=None): with self.transport.having_timeout(timeout): diff --git a/amqp/method_framing.py b/amqp/method_framing.py index 1d5d799..5ea46f9 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -64,21 +64,24 @@ def frame_handler(connection, callback, frame_method=method_sig, frame_args=buf, ) expected_types[channel] = 2 - else: - callback(channel, method_sig, buf, None) + return False + + callback(channel, method_sig, buf, None) elif frame_type == 2: msg = partial_messages[channel] msg.inbound_header(buf) - if msg.ready: - # bodyless message, we're done - expected_types[channel] = 1 - partial_messages.pop(channel, None) - callback(channel, msg.frame_method, msg.frame_args, msg) - else: + if not msg.ready: # wait for the content-body expected_types[channel] = 3 + return False + + # bodyless message, we're done + expected_types[channel] = 1 + partial_messages.pop(channel, None) + callback(channel, msg.frame_method, msg.frame_args, msg) + elif frame_type == 3: msg = partial_messages[channel] msg.inbound_body(buf) @@ -89,6 +92,7 @@ def frame_handler(connection, callback, elif frame_type == 8: # bytes_recv already updated pass + return True return on_frame diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt index c08306b..e39d1d8 100644 --- a/requirements/pkgutils.txt +++ b/requirements/pkgutils.txt @@ -5,4 +5,4 @@ flakeplus>=1.1 tox>=2.3.1 sphinx2rst>=1.0 bumpversion -pydocstyle +pydocstyle==1.1.1 |