summaryrefslogtreecommitdiff
path: root/kafka/util.py
blob: 181f67f3d798e07eb6422ab1e9b4c33d113dc11d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from __future__ import absolute_import

import atexit
import binascii
import collections
import struct
from threading import Thread, Event
import weakref

from kafka.vendor import six

from kafka.errors import BufferUnderflowError


if six.PY3:
    MAX_INT = 2 ** 31
    TO_SIGNED = 2 ** 32

    def crc32(data):
        crc = binascii.crc32(data)
        # py2 and py3 behave a little differently
        # CRC is encoded as a signed int in kafka protocol
        # so we'll convert the py3 unsigned result to signed
        if crc >= MAX_INT:
            crc -= TO_SIGNED
        return crc
else:
    from binascii import crc32


def write_int_string(s):
    if s is not None and not isinstance(s, six.binary_type):
        raise TypeError('Expected "%s" to be bytes\n'
                        'data=%s' % (type(s), repr(s)))
    if s is None:
        return struct.pack('>i', -1)
    else:
        return struct.pack('>i%ds' % len(s), len(s), s)


def read_short_string(data, cur):
    if len(data) < cur + 2:
        raise BufferUnderflowError("Not enough data left")

    (strlen,) = struct.unpack('>h', data[cur:cur + 2])
    if strlen == -1:
        return None, cur + 2

    cur += 2
    if len(data) < cur + strlen:
        raise BufferUnderflowError("Not enough data left")

    out = data[cur:cur + strlen]
    return out, cur + strlen


def relative_unpack(fmt, data, cur):
    size = struct.calcsize(fmt)
    if len(data) < cur + size:
        raise BufferUnderflowError("Not enough data left")

    out = struct.unpack(fmt, data[cur:cur + size])
    return out, cur + size


def group_by_topic_and_partition(tuples):
    out = collections.defaultdict(dict)
    for t in tuples:
        assert t.topic not in out or t.partition not in out[t.topic], \
               'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
                                                   t.topic, t.partition)
        out[t.topic][t.partition] = t
    return out


class ReentrantTimer(object):
    """
    A timer that can be restarted, unlike threading.Timer
    (although this uses threading.Timer)

    Arguments:

        t: timer interval in milliseconds
        fn: a callable to invoke
        args: tuple of args to be passed to function
        kwargs: keyword arguments to be passed to function
    """
    def __init__(self, t, fn, *args, **kwargs):

        if t <= 0:
            raise ValueError('Invalid timeout value')

        if not callable(fn):
            raise ValueError('fn must be callable')

        self.thread = None
        self.t = t / 1000.0
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.active = None

    def _timer(self, active):
        # python2.6 Event.wait() always returns None
        # python2.7 and greater returns the flag value (true/false)
        # we want the flag value, so add an 'or' here for python2.6
        # this is redundant for later python versions (FLAG OR FLAG == FLAG)
        while not (active.wait(self.t) or active.is_set()):
            self.fn(*self.args, **self.kwargs)

    def start(self):
        if self.thread is not None:
            self.stop()

        self.active = Event()
        self.thread = Thread(target=self._timer, args=(self.active,))
        self.thread.daemon = True  # So the app exits when main thread exits
        self.thread.start()

    def stop(self):
        if self.thread is None:
            return

        self.active.set()
        self.thread.join(self.t + 1)
        # noinspection PyAttributeOutsideInit
        self.timer = None
        self.fn = None

    def __del__(self):
        self.stop()


class WeakMethod(object):
    """
    Callable that weakly references a method and the object it is bound to. It
    is based on http://stackoverflow.com/a/24287465.

    Arguments:

        object_dot_method: A bound instance method (i.e. 'object.method').
    """
    def __init__(self, object_dot_method):
        try:
            self.target = weakref.ref(object_dot_method.__self__)
        except AttributeError:
            self.target = weakref.ref(object_dot_method.im_self)
        self._target_id = id(self.target())
        try:
            self.method = weakref.ref(object_dot_method.__func__)
        except AttributeError:
            self.method = weakref.ref(object_dot_method.im_func)
        self._method_id = id(self.method())

    def __call__(self, *args, **kwargs):
        """
        Calls the method on target with args and kwargs.
        """
        return self.method()(self.target(), *args, **kwargs)

    def __hash__(self):
        return hash(self.target) ^ hash(self.method)

    def __eq__(self, other):
        if not isinstance(other, WeakMethod):
            return False
        return self._target_id == other._target_id and self._method_id == other._method_id


def try_method_on_system_exit(obj, method, *args, **kwargs):
    def wrapper(_obj, _meth, *args, **kwargs):
        try:
            getattr(_obj, _meth)(*args, **kwargs)
        except (ReferenceError, AttributeError):
            pass
    atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)