summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarson Ip <carsonip715@gmail.com>2020-01-17 15:05:18 +0800
committerJeff Widman <jeff@jeffwidman.com>2020-02-05 15:22:51 -0800
commitd54aaf6a46dbd981a0bb08570d94b4d8c4c59aef (patch)
tree3dc8a269d3772487e665c21dbfcb93fa148894e8
parentbb1c13e9d3ef609fc456b4b6fd4cc075bd100b1b (diff)
downloadkafka-python-d54aaf6a46dbd981a0bb08570d94b4d8c4c59aef.tar.gz
Fix slots usage and use more slots
Use empty slots for ABC classes, otherwise classes which inherit from them will still have __dict__. Also use __slots__ for more classes.
-rw-r--r--kafka/record/abc.py4
-rw-r--r--kafka/record/default_records.py10
-rw-r--r--kafka/record/legacy_records.py7
-rw-r--r--kafka/record/memory_records.py5
4 files changed, 26 insertions, 0 deletions
diff --git a/kafka/record/abc.py b/kafka/record/abc.py
index 83121c6..d5c172a 100644
--- a/kafka/record/abc.py
+++ b/kafka/record/abc.py
@@ -4,6 +4,7 @@ import abc
class ABCRecord(object):
__metaclass__ = abc.ABCMeta
+ __slots__ = ()
@abc.abstractproperty
def offset(self):
@@ -45,6 +46,7 @@ class ABCRecord(object):
class ABCRecordBatchBuilder(object):
__metaclass__ = abc.ABCMeta
+ __slots__ = ()
@abc.abstractmethod
def append(self, offset, timestamp, key, value, headers=None):
@@ -87,6 +89,7 @@ class ABCRecordBatch(object):
compressed) message.
"""
__metaclass__ = abc.ABCMeta
+ __slots__ = ()
@abc.abstractmethod
def __iter__(self):
@@ -97,6 +100,7 @@ class ABCRecordBatch(object):
class ABCRecords(object):
__metaclass__ = abc.ABCMeta
+ __slots__ = ()
@abc.abstractmethod
def __init__(self, buffer):
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 7f0e2b3..07368bb 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -70,6 +70,8 @@ import kafka.codec as codecs
class DefaultRecordBase(object):
+ __slots__ = ()
+
HEADER_STRUCT = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
@@ -116,6 +118,9 @@ class DefaultRecordBase(object):
class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
+ __slots__ = ("_buffer", "_header_data", "_pos", "_num_records",
+ "_next_record_index", "_decompressed")
+
def __init__(self, buffer):
self._buffer = bytearray(buffer)
self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer)
@@ -358,6 +363,11 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
# 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes
MAX_RECORD_OVERHEAD = 21
+ __slots__ = ("_magic", "_compression_type", "_batch_size", "_is_transactional",
+ "_producer_id", "_producer_epoch", "_base_sequence",
+ "_first_timestamp", "_max_timestamp", "_last_offset", "_num_records",
+ "_buffer")
+
def __init__(
self, magic, compression_type, is_transactional,
producer_id, producer_epoch, base_sequence, batch_size):
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index bb6c21c..e2ee549 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -57,6 +57,8 @@ from kafka.errors import CorruptRecordException, UnsupportedCodecError
class LegacyRecordBase(object):
+ __slots__ = ()
+
HEADER_STRUCT_V0 = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
@@ -127,6 +129,9 @@ class LegacyRecordBase(object):
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
+ __slots__ = ("_buffer", "_magic", "_offset", "_crc", "_timestamp",
+ "_attributes", "_decompressed")
+
def __init__(self, buffer, magic):
self._buffer = memoryview(buffer)
self._magic = magic
@@ -336,6 +341,8 @@ class LegacyRecord(ABCRecord):
class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
+ __slots__ = ("_magic", "_compression_type", "_batch_size", "_buffer")
+
def __init__(self, magic, compression_type, batch_size):
self._magic = magic
self._compression_type = compression_type
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index f67c4fe..a6c4b51 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -37,6 +37,8 @@ class MemoryRecords(ABCRecords):
# Minimum space requirements for Record V0
MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0
+ __slots__ = ("_buffer", "_pos", "_next_slice", "_remaining_bytes")
+
def __init__(self, bytes_data):
self._buffer = bytes_data
self._pos = 0
@@ -110,6 +112,9 @@ class MemoryRecords(ABCRecords):
class MemoryRecordsBuilder(object):
+ __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
+ "_bytes_written")
+
def __init__(self, magic, compression_type, batch_size):
assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3], "Not valid compression type"