From 3631bfa009a28767a2057c9beee470acaa6597d5 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Fri, 11 Oct 2019 12:03:22 -0700 Subject: Remove SimpleClient, Producer, Consumer, Unittest (#1196) In the 2.0 release, we're removing: * `SimpleClient` * `SimpleConsumer` * `SimpleProducer` * Old partitioners used by `SimpleProducer`; these are superceded by the `DefaultPartitioner` These have been deprecated for several years in favor of `KafkaClient` / `KafkaConsumer` / `KafkaProducer`. Since 2.0 allows breaking changes, we are removing the deprecated classes. Additionally, since the only usage of `unittest` was in tests for these old Simple* clients, this also drops `unittest` from the library. All tests now run under `pytest`. --- kafka/util.py | 108 ---------------------------------------------------------- 1 file changed, 108 deletions(-) (limited to 'kafka/util.py') diff --git a/kafka/util.py b/kafka/util.py index 9354bd9..9f65b81 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -2,15 +2,10 @@ from __future__ import absolute_import import atexit import binascii -import collections -import struct -from threading import Thread, Event import weakref from kafka.vendor import six -from kafka.errors import BufferUnderflowError - if six.PY3: MAX_INT = 2 ** 31 @@ -28,109 +23,6 @@ else: from binascii import crc32 -def write_int_string(s): - if s is not None and not isinstance(s, six.binary_type): - raise TypeError('Expected "%s" to be bytes\n' - 'data=%s' % (type(s), repr(s))) - if s is None: - return struct.pack('>i', -1) - else: - return struct.pack('>i%ds' % len(s), len(s), s) - - -def read_short_string(data, cur): - if len(data) < cur + 2: - raise BufferUnderflowError("Not enough data left") - - (strlen,) = struct.unpack('>h', data[cur:cur + 2]) - if strlen == -1: - return None, cur + 2 - - cur += 2 - if len(data) < cur + strlen: - raise BufferUnderflowError("Not enough data left") - - out = data[cur:cur + strlen] - return out, cur + strlen - - -def relative_unpack(fmt, data, cur): - size = struct.calcsize(fmt) - if len(data) < cur + size: - raise BufferUnderflowError("Not enough data left") - - out = struct.unpack(fmt, data[cur:cur + size]) - return out, cur + size - - -def group_by_topic_and_partition(tuples): - out = collections.defaultdict(dict) - for t in tuples: - assert t.topic not in out or t.partition not in out[t.topic], \ - 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, - t.topic, t.partition) - out[t.topic][t.partition] = t - return out - - -class ReentrantTimer(object): - """ - A timer that can be restarted, unlike threading.Timer - (although this uses threading.Timer) - - Arguments: - - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function - """ - def __init__(self, t, fn, *args, **kwargs): - - if t <= 0: - raise ValueError('Invalid timeout value') - - if not callable(fn): - raise ValueError('fn must be callable') - - self.thread = None - self.t = t / 1000.0 - self.fn = fn - self.args = args - self.kwargs = kwargs - self.active = None - - def _timer(self, active): - # python2.6 Event.wait() always returns None - # python2.7 and greater returns the flag value (true/false) - # we want the flag value, so add an 'or' here for python2.6 - # this is redundant for later python versions (FLAG OR FLAG == FLAG) - while not (active.wait(self.t) or active.is_set()): - self.fn(*self.args, **self.kwargs) - - def start(self): - if self.thread is not None: - self.stop() - - self.active = Event() - self.thread = Thread(target=self._timer, args=(self.active,)) - self.thread.daemon = True # So the app exits when main thread exits - self.thread.start() - - def stop(self): - if self.thread is None: - return - - self.active.set() - self.thread.join(self.t + 1) - # noinspection PyAttributeOutsideInit - self.timer = None - self.fn = None - - def __del__(self): - self.stop() - - class WeakMethod(object): """ Callable that weakly references a method and the object it is bound to. It -- cgit v1.2.1