summaryrefslogtreecommitdiff
path: root/kafka/codec.py
blob: c7d39920c4617f38cbb16e4795d14ca6d245f405 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from cStringIO import StringIO
import gzip
import logging

log = logging.getLogger("kafka.codec")

try:
    import snappy
    hasSnappy = True
except ImportError:
    log.warn("Snappy codec not available")
    hasSnappy = False


def gzip_encode(payload):
    buf = StringIO()
    f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
    f.write(payload)
    f.close()
    buf.seek(0)
    out = buf.read()
    buf.close()
    return out


def gzip_decode(payload):
    buf = StringIO(payload)
    f = gzip.GzipFile(fileobj=buf, mode='r')
    out = f.read()
    f.close()
    buf.close()
    return out


def snappy_encode(payload):
    if not hasSnappy:
        raise NotImplementedError("Snappy codec not available")
    return snappy.compress(payload)


def snappy_decode(payload):
    if not hasSnappy:
        raise NotImplementedError("Snappy codec not available")
    return snappy.decompress(payload)