summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
blob: 08d2d0169902c5a82989ad3e353cc8d0599703d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
from __future__ import absolute_import

import logging
import struct

import six

from six.moves import xrange

import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
import kafka.structs

from kafka.codec import (
    gzip_encode, gzip_decode, snappy_encode, snappy_decode)
from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
    crc32, read_short_string, read_int_string, relative_unpack,
    write_short_string, write_int_string, group_by_topic_and_partition)


log = logging.getLogger(__name__)

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)


class KafkaProtocol(object):
    """
    Class to encapsulate all of the protocol encoding/decoding.
    This class does not have any state associated with it, it is purely
    for organization.
    """
    PRODUCE_KEY = 0
    FETCH_KEY = 1
    OFFSET_KEY = 2
    METADATA_KEY = 3
    OFFSET_COMMIT_KEY = 8
    OFFSET_FETCH_KEY = 9
    CONSUMER_METADATA_KEY = 10

    ###################
    #   Private API   #
    ###################

    @classmethod
    def _encode_message_header(cls, client_id, correlation_id, request_key,
                               version=0):
        """
        Encode the common request envelope
        """
        return struct.pack('>hhih%ds' % len(client_id),
                           request_key,          # ApiKey
                           version,              # ApiVersion
                           correlation_id,       # CorrelationId
                           len(client_id),       # ClientId size
                           client_id)            # ClientId

    @classmethod
    def _encode_message_set(cls, messages):
        """
        Encode a MessageSet. Unlike other arrays in the protocol,
        MessageSets are not length-prefixed

        Format
        ======
        MessageSet => [Offset MessageSize Message]
          Offset => int64
          MessageSize => int32
        """
        message_set = []
        for message in messages:
            encoded_message = KafkaProtocol._encode_message(message)
            message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
                                           len(encoded_message),
                                           encoded_message))
        return b''.join(message_set)

    @classmethod
    def _encode_message(cls, message):
        """
        Encode a single message.

        The magic number of a message is a format version number.
        The only supported magic number right now is zero

        Format
        ======
        Message => Crc MagicByte Attributes Key Value
          Crc => int32
          MagicByte => int8
          Attributes => int8
          Key => bytes
          Value => bytes
        """
        if message.magic == 0:
            msg = b''.join([
                struct.pack('>BB', message.magic, message.attributes),
                write_int_string(message.key),
                write_int_string(message.value)
            ])
            crc = crc32(msg)
            msg = struct.pack('>i%ds' % len(msg), crc, msg)
        else:
            raise ProtocolError("Unexpected magic number: %d" % message.magic)
        return msg

    ##################
    #   Public API   #
    ##################

    @classmethod
    def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
        """
        Encode a ProduceRequest struct

        Arguments:
            payloads: list of ProduceRequestPayload
            acks: How "acky" you want the request to be
                1: written to disk by the leader
                0: immediate response
                -1: waits for all replicas to be in sync
            timeout: Maximum time (in ms) the server will wait for replica acks.
                This is _not_ a socket timeout

        Returns: ProduceRequest
        """
        if acks not in (1, 0, -1):
            raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)

        return kafka.protocol.produce.ProduceRequest[0](
            required_acks=acks,
            timeout=timeout,
            topics=[(
                topic,
                [(
                    partition,
                    [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
                                                           magic=msg.magic,
                                                           attributes=msg.attributes))
                    for msg in payload.messages])
                for partition, payload in topic_payloads.items()])
            for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])

    @classmethod
    def decode_produce_response(cls, response):
        """
        Decode ProduceResponse to ProduceResponsePayload

        Arguments:
            response: ProduceResponse

        Return: list of ProduceResponsePayload
        """
        return [
            kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
            for topic, partitions in response.topics
            for partition, error, offset in partitions
        ]

    @classmethod
    def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
        """
        Encodes a FetchRequest struct

        Arguments:
            payloads: list of FetchRequestPayload
            max_wait_time (int, optional): ms to block waiting for min_bytes
                data. Defaults to 100.
            min_bytes (int, optional): minimum bytes required to return before
                max_wait_time. Defaults to 4096.

        Return: FetchRequest
        """
        return kafka.protocol.fetch.FetchRequest[0](
            replica_id=-1,
            max_wait_time=max_wait_time,
            min_bytes=min_bytes,
            topics=[(
                topic,
                [(
                    partition,
                    payload.offset,
                    payload.max_bytes)
                for partition, payload in topic_payloads.items()])
            for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])

    @classmethod
    def decode_fetch_response(cls, response):
        """
        Decode FetchResponse struct to FetchResponsePayloads

        Arguments:
            response: FetchResponse
        """
        return [
            kafka.structs.FetchResponsePayload(
                topic, partition, error, highwater_offset, [
                    kafka.structs.OffsetAndMessage(offset, message)
                    for offset, _, message in messages])
            for topic, partitions in response.topics
                for partition, error, highwater_offset, messages in partitions
        ]

    @classmethod
    def encode_offset_request(cls, payloads=()):
        return kafka.protocol.offset.OffsetRequest[0](
            replica_id=-1,
            topics=[(
                topic,
                [(
                    partition,
                    payload.time,
                    payload.max_offsets)
                for partition, payload in six.iteritems(topic_payloads)])
            for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])

    @classmethod
    def decode_offset_response(cls, response):
        """
        Decode OffsetResponse into OffsetResponsePayloads

        Arguments:
            response: OffsetResponse

        Returns: list of OffsetResponsePayloads
        """
        return [
            kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
            for topic, partitions in response.topics
            for partition, error, offsets in partitions
        ]

    @classmethod
    def encode_metadata_request(cls, topics=(), payloads=None):
        """
        Encode a MetadataRequest

        Arguments:
            topics: list of strings
        """
        if payloads is not None:
            topics = payloads

        return kafka.protocol.metadata.MetadataRequest[0](topics)

    @classmethod
    def decode_metadata_response(cls, response):
        return response

    @classmethod
    def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
        """
        Encode a ConsumerMetadataRequest

        Arguments:
            client_id: string
            correlation_id: int
            payloads: string (consumer group)
        """
        message = []
        message.append(cls._encode_message_header(client_id, correlation_id,
                                                  KafkaProtocol.CONSUMER_METADATA_KEY))
        message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))

        msg = b''.join(message)
        return write_int_string(msg)

    @classmethod
    def decode_consumer_metadata_response(cls, data):
        """
        Decode bytes to a ConsumerMetadataResponse

        Arguments:
            data: bytes to decode
        """
        ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
        (host, cur) = read_short_string(data, cur)
        ((port,), cur) = relative_unpack('>i', data, cur)

        return ConsumerMetadataResponse(error, nodeId, host, port)

    @classmethod
    def encode_offset_commit_request(cls, group, payloads):
        """
        Encode an OffsetCommitRequest struct

        Arguments:
            group: string, the consumer group you are committing offsets for
            payloads: list of OffsetCommitRequestPayload
        """
        return kafka.protocol.commit.OffsetCommitRequest[0](
            consumer_group=group,
            topics=[(
                topic,
                [(
                    partition,
                    payload.offset,
                    payload.metadata)
                for partition, payload in six.iteritems(topic_payloads)])
            for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])


    @classmethod
    def decode_offset_commit_response(cls, response):
        """
        Decode OffsetCommitResponse to an OffsetCommitResponsePayload

        Arguments:
            response: OffsetCommitResponse
        """
        return [
            kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
            for topic, partitions in response.topics
            for partition, error in partitions
        ]

    @classmethod
    def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
        """
        Encode an OffsetFetchRequest struct. The request is encoded using
        version 0 if from_kafka is false, indicating a request for Zookeeper
        offsets. It is encoded using version 1 otherwise, indicating a request
        for Kafka offsets.

        Arguments:
            group: string, the consumer group you are fetching offsets for
            payloads: list of OffsetFetchRequestPayload
            from_kafka: bool, default False, set True for Kafka-committed offsets
        """
        version = 1 if from_kafka else 0
        return kafka.protocol.commit.OffsetFetchRequest[version](
            consumer_group=group,
            topics=[(
                topic,
                list(topic_payloads.keys()))
            for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])

    @classmethod
    def decode_offset_fetch_response(cls, response):
        """
        Decode OffsetFetchResponse to OffsetFetchResponsePayloads

        Arguments:
            response: OffsetFetchResponse
        """
        return [
            kafka.structs.OffsetFetchResponsePayload(
                topic, partition, offset, metadata, error
            )
            for topic, partitions in response.topics
            for partition, offset, metadata, error in partitions
        ]


def create_message(payload, key=None):
    """
    Construct a Message

    Arguments:
        payload: bytes, the payload to send to Kafka
        key: bytes, a key used for partition routing (optional)

    """
    return kafka.structs.Message(0, 0, key, payload)


def create_gzip_message(payloads, key=None, compresslevel=None):
    """
    Construct a Gzipped Message containing multiple Messages

    The given payloads will be encoded, compressed, and sent as a single atomic
    message to Kafka.

    Arguments:
        payloads: list(bytes), a list of payload to send be sent to Kafka
        key: bytes, a key used for partition routing (optional)

    """
    message_set = KafkaProtocol._encode_message_set(
        [create_message(payload, pl_key) for payload, pl_key in payloads])

    gzipped = gzip_encode(message_set, compresslevel=compresslevel)
    codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP

    return kafka.structs.Message(0, 0x00 | codec, key, gzipped)


def create_snappy_message(payloads, key=None):
    """
    Construct a Snappy Message containing multiple Messages

    The given payloads will be encoded, compressed, and sent as a single atomic
    message to Kafka.

    Arguments:
        payloads: list(bytes), a list of payload to send be sent to Kafka
        key: bytes, a key used for partition routing (optional)

    """
    message_set = KafkaProtocol._encode_message_set(
        [create_message(payload, pl_key) for payload, pl_key in payloads])

    snapped = snappy_encode(message_set)
    codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY

    return kafka.structs.Message(0, 0x00 | codec, key, snapped)


def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
    """Create a message set using the given codec.

    If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
    return a list containing a single codec-encoded message.
    """
    if codec == CODEC_NONE:
        return [create_message(m, k) for m, k in messages]
    elif codec == CODEC_GZIP:
        return [create_gzip_message(messages, key, compresslevel)]
    elif codec == CODEC_SNAPPY:
        return [create_snappy_message(messages, key)]
    else:
        raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)