summaryrefslogtreecommitdiff
path: root/kafka/util.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-10-11 12:03:22 -0700
committerGitHub <noreply@github.com>2019-10-11 12:03:22 -0700
commit3631bfa009a28767a2057c9beee470acaa6597d5 (patch)
treee10b73861a33d83a95b6496ef3074ee3caeaae41 /kafka/util.py
parent6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff)
downloadkafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing: * `SimpleClient` * `SimpleConsumer` * `SimpleProducer` * Old partitioners used by `SimpleProducer`; these are superceded by the `DefaultPartitioner` These have been deprecated for several years in favor of `KafkaClient` / `KafkaConsumer` / `KafkaProducer`. Since 2.0 allows breaking changes, we are removing the deprecated classes. Additionally, since the only usage of `unittest` was in tests for these old Simple* clients, this also drops `unittest` from the library. All tests now run under `pytest`.
Diffstat (limited to 'kafka/util.py')
-rw-r--r--kafka/util.py108
1 files changed, 0 insertions, 108 deletions
diff --git a/kafka/util.py b/kafka/util.py
index 9354bd9..9f65b81 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -2,15 +2,10 @@ from __future__ import absolute_import
import atexit
import binascii
-import collections
-import struct
-from threading import Thread, Event
import weakref
from kafka.vendor import six
-from kafka.errors import BufferUnderflowError
-
if six.PY3:
MAX_INT = 2 ** 31
@@ -28,109 +23,6 @@ else:
from binascii import crc32
-def write_int_string(s):
- if s is not None and not isinstance(s, six.binary_type):
- raise TypeError('Expected "%s" to be bytes\n'
- 'data=%s' % (type(s), repr(s)))
- if s is None:
- return struct.pack('>i', -1)
- else:
- return struct.pack('>i%ds' % len(s), len(s), s)
-
-
-def read_short_string(data, cur):
- if len(data) < cur + 2:
- raise BufferUnderflowError("Not enough data left")
-
- (strlen,) = struct.unpack('>h', data[cur:cur + 2])
- if strlen == -1:
- return None, cur + 2
-
- cur += 2
- if len(data) < cur + strlen:
- raise BufferUnderflowError("Not enough data left")
-
- out = data[cur:cur + strlen]
- return out, cur + strlen
-
-
-def relative_unpack(fmt, data, cur):
- size = struct.calcsize(fmt)
- if len(data) < cur + size:
- raise BufferUnderflowError("Not enough data left")
-
- out = struct.unpack(fmt, data[cur:cur + size])
- return out, cur + size
-
-
-def group_by_topic_and_partition(tuples):
- out = collections.defaultdict(dict)
- for t in tuples:
- assert t.topic not in out or t.partition not in out[t.topic], \
- 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
- t.topic, t.partition)
- out[t.topic][t.partition] = t
- return out
-
-
-class ReentrantTimer(object):
- """
- A timer that can be restarted, unlike threading.Timer
- (although this uses threading.Timer)
-
- Arguments:
-
- t: timer interval in milliseconds
- fn: a callable to invoke
- args: tuple of args to be passed to function
- kwargs: keyword arguments to be passed to function
- """
- def __init__(self, t, fn, *args, **kwargs):
-
- if t <= 0:
- raise ValueError('Invalid timeout value')
-
- if not callable(fn):
- raise ValueError('fn must be callable')
-
- self.thread = None
- self.t = t / 1000.0
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
- self.active = None
-
- def _timer(self, active):
- # python2.6 Event.wait() always returns None
- # python2.7 and greater returns the flag value (true/false)
- # we want the flag value, so add an 'or' here for python2.6
- # this is redundant for later python versions (FLAG OR FLAG == FLAG)
- while not (active.wait(self.t) or active.is_set()):
- self.fn(*self.args, **self.kwargs)
-
- def start(self):
- if self.thread is not None:
- self.stop()
-
- self.active = Event()
- self.thread = Thread(target=self._timer, args=(self.active,))
- self.thread.daemon = True # So the app exits when main thread exits
- self.thread.start()
-
- def stop(self):
- if self.thread is None:
- return
-
- self.active.set()
- self.thread.join(self.t + 1)
- # noinspection PyAttributeOutsideInit
- self.timer = None
- self.fn = None
-
- def __del__(self):
- self.stop()
-
-
class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It