diff options
author | Ask Solem <ask@celeryproject.org> | 2013-07-31 15:49:01 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-07-31 15:49:01 +0100 |
commit | f79929c535d8f45721c31db1c4eed7c5476f2d91 (patch) | |
tree | 2dd32d0d425ac8dfaa3b5b7369bae9eba74f857f | |
parent | 34a108c3e911bee09a20a9c188c0afec9a91383f (diff) | |
download | py-amqp-optimizations.tar.gz |
Experimental optimizationsoptimizations
-rw-r--r-- | amqp/basic_message.py | 1 | ||||
-rw-r--r-- | amqp/method_framing.py | 188 | ||||
-rw-r--r-- | amqp/serialization.py | 37 |
3 files changed, 120 insertions, 106 deletions
diff --git a/amqp/basic_message.py b/amqp/basic_message.py index 192ede9..833f1f0 100644 --- a/amqp/basic_message.py +++ b/amqp/basic_message.py @@ -23,6 +23,7 @@ __all__ = ['Message'] class Message(GenericContent): """A Message for use with the Channnel.basic_* methods.""" + complete = False #: Instances of this class have these attributes, which #: are passed back and forth as message properties between diff --git a/amqp/method_framing.py b/amqp/method_framing.py index daa5733..083c62f 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -43,6 +43,36 @@ _CONTENT_METHODS = [ ] +def _partial_message(msg, method_sig, args): + body_parts = [] + add_part = body_parts.append + received = 0 + + header = (yield) + class_id, weight, body_size = unpack('>HHQ', header[:12]) + msg._load_properties(header[12:]) + if body_size != 0: + while 1: + part = (yield) + add_part(part) + received += len(part) + + if received >= body_size: + if len(body_parts) > 1: + msg.body = bytes().join(body_parts) + else: + msg.body = body_parts[0] + break + msg.complete = True + + +def PartialMessage(method_sig, args): + msg = Message() + it = _partial_message(msg, method_sig, args) + next(it) + return msg, it, method_sig, args + + class _PartialMessage(object): """Helper class to build up a multi-frame method.""" @@ -61,11 +91,15 @@ class _PartialMessage(object): self.complete = (self.body_size == 0) def add_payload(self, payload): - self.body_parts.append(payload) + parts = self.body_parts + parts.append(payload) self.body_received += len(payload) if self.body_received == self.body_size: - self.msg.body = bytes().join(self.body_parts) + if len(parts) > 1: + self.msg.body = bytes().join(parts) + else: + self.msg.body = parts[0] self.complete = True @@ -96,101 +130,79 @@ class MethodReader(object): self.bytes_recv = 0 self._quick_put = self.queue.append self._quick_get = self.queue.popleft + self.it = self.iter() - def _next_method(self): - """Read the next method from the source, once one complete method has - been assembled it is placed in the internal queue.""" - queue = self.queue - put = self._quick_put + def iter(self, unpack=unpack): read_frame = self.source.read_frame - while not queue: - try: + partials = self.partial_messages + expected_types = self.expected_types + ITEM = None + while 1: + while ITEM is None: frame_type, channel, payload = read_frame() - except Exception as exc: - # - # Connection was closed? Framing Error? - # - put(exc) - break - - self.bytes_recv += 1 + self.bytes_recv += 1 - if frame_type not in (self.expected_types[channel], 8): - put(( - channel, - UnexpectedFrame( + if frame_type not in (expected_types[channel], 8): + raise UnexpectedFrame( 'Received frame {0} while expecting type: {1}'.format( - frame_type, self.expected_types[channel]), + frame_type, expected_types[channel]), ) - )) - elif frame_type == 1: - self._process_method_frame(channel, payload) - elif frame_type == 2: - self._process_content_header(channel, payload) - elif frame_type == 3: - self._process_content_body(channel, payload) - elif frame_type == 8: - self._process_heartbeat(channel, payload) - - def _process_heartbeat(self, channel, payload): - self.heartbeats += 1 - - def _process_method_frame(self, channel, payload): - """Process Method frames""" - method_sig = unpack('>HH', payload[:4]) - args = AMQPReader(payload[4:]) - - if method_sig in _CONTENT_METHODS: - # - # Save what we've got so far and wait for the content-header - # - self.partial_messages[channel] = _PartialMessage(method_sig, args) - self.expected_types[channel] = 2 - else: - self._quick_put((channel, method_sig, args, None)) - - def _process_content_header(self, channel, payload): - """Process Content Header frames""" - partial = self.partial_messages[channel] - partial.add_header(payload) - - if partial.complete: - # - # a bodyless message, we're done - # - self._quick_put((channel, partial.method_sig, - partial.args, partial.msg)) - self.partial_messages.pop(channel, None) - self.expected_types[channel] = 1 - else: - # - # wait for the content-body - # - self.expected_types[channel] = 3 - - def _process_content_body(self, channel, payload): - """Process Content Body frames""" - partial = self.partial_messages[channel] - partial.add_payload(payload) - if partial.complete: - # - # Stick the message in the queue and go back to - # waiting for method frames - # - self._quick_put((channel, partial.method_sig, - partial.args, partial.msg)) - self.partial_messages.pop(channel, None) - self.expected_types[channel] = 1 + elif frame_type == 1: + method_sig = unpack('>HH', payload[:4]) + args = AMQPReader(payload[4:]) + if method_sig in _CONTENT_METHODS: + # Save what we've got so far and wait for the content-header + partials[channel] = PartialMessage(method_sig, args) + expected_types[channel] = 2 + else: + ITEM = channel, method_sig, args, None + elif frame_type == 2: + msg, it, msig, margs = partials[channel] + it.send(payload) + + if msg.complete: + # a bodyless message, we're done + ITEM = channel, msig, margs, msg + partials.pop(channel, None) + expected_types[channel] = 1 + else: + # wait for the content-body + expected_types[channel] = 3 + elif frame_type == 3: + msg, it, msig, margs = partials[channel] + try: + it.send(payload) + except StopIteration: + pass + if msg.complete: + # + # Stick the message in the queue and go back to + # waiting for method frames + # + ITEM = channel, msig, margs, msg + partials.pop(channel, None) + expected_types[channel] = 1 + elif frame_type == 8: + self.heartbeats += 1 + yield ITEM + ITEM = None + + + def _next_method(self): + """Read the next method from the source, once one complete method has + been assembled it is placed in the internal queue.""" + return next(self.it) def read_method(self): """Read a method from the peer.""" - self._next_method() - m = self._quick_get() - if isinstance(m, Exception): - raise m - if isinstance(m, tuple) and isinstance(m[1], AMQPError): - raise m[1] - return m + try: + X = next(self.it) + except StopIteration: + self.it = self.iter() + except Exception: + self.it = self.iter() + raise + return X class MethodWriter(object): diff --git a/amqp/serialization.py b/amqp/serialization.py index 6a74702..db4b9c6 100644 --- a/amqp/serialization.py +++ b/amqp/serialization.py @@ -76,6 +76,7 @@ class AMQPReader(object): 'AMQPReader needs a file-like object or plain string') self.bitcount = self.bits = 0 + self._quick_read = self.input.read def close(self): self.input.close() @@ -83,12 +84,12 @@ class AMQPReader(object): def read(self, n): """Read n bytes.""" self.bitcount = self.bits = 0 - return self.input.read(n) + return self._quick_read(n) def read_bit(self): """Read a single boolean value.""" if not self.bitcount: - self.bits = ord(self.input.read(1)) + self.bits = ord(self._quick_read(1)) self.bitcount = 8 result = (self.bits & 1) == 1 self.bits >>= 1 @@ -98,27 +99,27 @@ class AMQPReader(object): def read_octet(self): """Read one byte, return as an integer""" self.bitcount = self.bits = 0 - return unpack('B', self.input.read(1))[0] + return unpack('B', self._quick_read(1))[0] def read_short(self): """Read an unsigned 16-bit integer""" self.bitcount = self.bits = 0 - return unpack('>H', self.input.read(2))[0] + return unpack('>H', self._quick_read(2))[0] def read_long(self): """Read an unsigned 32-bit integer""" self.bitcount = self.bits = 0 - return unpack('>I', self.input.read(4))[0] + return unpack('>I', self._quick_read(4))[0] def read_longlong(self): """Read an unsigned 64-bit integer""" self.bitcount = self.bits = 0 - return unpack('>Q', self.input.read(8))[0] + return unpack('>Q', self._quick_read(8))[0] def read_float(self): """Read float value.""" self.bitcount = self.bits = 0 - return unpack('>d', self.input.read(8))[0] + return unpack('>d', self._quick_read(8))[0] def read_shortstr(self): """Read a short string that's stored in up to 255 bytes. @@ -128,8 +129,8 @@ class AMQPReader(object): """ self.bitcount = self.bits = 0 - slen = unpack('B', self.input.read(1))[0] - return self.input.read(slen).decode('utf-8') + slen = unpack('B', self._quick_read(1))[0] + return self._quick_read(slen).decode('utf-8') def read_longstr(self): """Read a string that's up to 2**32 bytes. @@ -139,14 +140,14 @@ class AMQPReader(object): """ self.bitcount = self.bits = 0 - slen = unpack('>I', self.input.read(4))[0] - return self.input.read(slen).decode('utf-8') + slen = unpack('>I', self._quick_read(4))[0] + return self._quick_read(slen).decode('utf-8') def read_table(self): """Read an AMQP table, and return as a Python dictionary.""" self.bitcount = self.bits = 0 - tlen = unpack('>I', self.input.read(4))[0] - table_data = AMQPReader(self.input.read(tlen)) + tlen = unpack('>I', self._quick_read(4))[0] + table_data = AMQPReader(self._quick_read(tlen)) result = {} while table_data.input.tell() < tlen: name = table_data.read_shortstr() @@ -155,14 +156,14 @@ class AMQPReader(object): return result def read_item(self): - ftype = ord(self.input.read(1)) + ftype = ord(self._quick_read(1)) if ftype == 83: # 'S' val = self.read_longstr() elif ftype == 73: # 'I' - val = unpack('>i', self.input.read(4))[0] + val = unpack('>i', self._quick_read(4))[0] elif ftype == 68: # 'D' d = self.read_octet() - n = unpack('>i', self.input.read(4))[0] + n = unpack('>i', self._quick_read(4))[0] val = Decimal(n) / Decimal(10 ** d) elif ftype == 84: # 'T' val = self.read_timestamp() @@ -181,8 +182,8 @@ class AMQPReader(object): return val def read_array(self): - array_length = unpack('>I', self.input.read(4))[0] - array_data = AMQPReader(self.input.read(array_length)) + array_length = unpack('>I', self._quick_read(4))[0] + array_data = AMQPReader(self._quick_read(array_length)) result = [] while array_data.input.tell() < array_length: val = array_data.read_item() |