From 8cc77b154715bbe94e0bd93708292551c46adc98 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Mar 2017 10:07:56 -0700 Subject: Avoid re-encoding for message crc check --- kafka/protocol/message.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) (limited to 'kafka/protocol/message.py') diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ec5ee6c..efdf4fc 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -48,6 +48,7 @@ class Message(Struct): timestamp = int(time.time() * 1000) self.timestamp = timestamp self.crc = crc + self._validated_crc = None self.magic = magic self.attributes = attributes self.key = key @@ -85,7 +86,9 @@ class Message(Struct): @classmethod def decode(cls, data): + _validated_crc = None if isinstance(data, bytes): + _validated_crc = crc32(data[4:]) data = io.BytesIO(data) # Partial decode required to determine message version base_fields = cls.SCHEMAS[0].fields[0:3] @@ -96,14 +99,17 @@ class Message(Struct): timestamp = fields[0] else: timestamp = None - return cls(fields[-1], key=fields[-2], - magic=magic, attributes=attributes, crc=crc, - timestamp=timestamp) + msg = cls(fields[-1], key=fields[-2], + magic=magic, attributes=attributes, crc=crc, + timestamp=timestamp) + msg._validated_crc = _validated_crc + return msg def validate_crc(self): - raw_msg = self._encode_self(recalc_crc=False) - crc = crc32(raw_msg[4:]) - if crc == self.crc: + if self._validated_crc is None: + raw_msg = self._encode_self(recalc_crc=False) + self._validated_crc = crc32(raw_msg[4:]) + if self.crc == self._validated_crc: return True return False -- cgit v1.2.1