summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-08-22 01:58:28 -0700
committerGitHub <noreply@github.com>2019-08-22 01:58:28 -0700
commit98c005852e36fde0ef44a7b9c60a54f4686651af (patch)
tree82f0f218064cd8163addc43042a71207e86cadc9
parente49caeb3ebdd36eb4d18a517bc402f8e89dfdbee (diff)
downloadkafka-python-98c005852e36fde0ef44a7b9c60a54f4686651af.tar.gz
Cleanup handling of KAFKA_VERSION env var in tests (#1887)
Now that we are using `pytest`, there is no need for a custom decorator because we can use `pytest.mark.skipif()`. This makes the code significantly simpler. In particular, dropping the custom `@kafka_versions()` decorator is necessary because it uses `func.wraps()` which doesn't play nice with `pytest` fixtures: - https://github.com/pytest-dev/pytest/issues/677 - https://stackoverflow.com/a/19614807/770425 So this is a pre-requisite to migrating some of those tests to using pytest fixtures.
-rw-r--r--test/conftest.py14
-rw-r--r--test/fixtures.py25
-rw-r--r--test/test_client_integration.py6
-rw-r--r--test/test_codec.py2
-rw-r--r--test/test_consumer_group.py18
-rw-r--r--test/test_consumer_integration.py42
-rw-r--r--test/test_failover_integration.py4
-rw-r--r--test/test_producer.py10
-rw-r--r--test/test_producer_integration.py8
-rw-r--r--test/testutil.py78
10 files changed, 65 insertions, 142 deletions
diff --git a/test/conftest.py b/test/conftest.py
index b6d3e3e..5015cc7 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -2,14 +2,8 @@ from __future__ import absolute_import
import pytest
-from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
-
-
-@pytest.fixture(scope="module")
-def version():
- """Return the Kafka version set in the OS environment"""
- return kafka_version()
-
+from test.testutil import env_kafka_version, random_string
+from test.fixtures import KafkaFixture, ZookeeperFixture
@pytest.fixture(scope="module")
def zookeeper():
@@ -26,9 +20,9 @@ def kafka_broker(kafka_broker_factory):
@pytest.fixture(scope="module")
-def kafka_broker_factory(version, zookeeper):
+def kafka_broker_factory(zookeeper):
"""Return a Kafka broker fixture factory"""
- assert version, 'KAFKA_VERSION must be specified to run integration tests'
+ assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'
_brokers = []
def factory(**broker_params):
diff --git a/test/fixtures.py b/test/fixtures.py
index ff6b687..c7748f1 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -4,9 +4,7 @@ import atexit
import logging
import os
import os.path
-import random
import socket
-import string
import subprocess
import time
import uuid
@@ -19,29 +17,12 @@ from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
from kafka.client_async import KafkaClient
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
+from test.testutil import env_kafka_version, random_string
from test.service import ExternalService, SpawnedService
log = logging.getLogger(__name__)
-def random_string(length):
- return "".join(random.choice(string.ascii_letters) for i in range(length))
-
-
-def version_str_to_tuple(version_str):
- """Transform a version string into a tuple.
-
- Example: '0.8.1.1' --> (0, 8, 1, 1)
- """
- return tuple(map(int, version_str.split('.')))
-
-
-def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return version_str_to_tuple(os.environ['KAFKA_VERSION'])
-
-
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
@@ -477,7 +458,7 @@ class KafkaFixture(Fixture):
num_partitions == self.partitions and \
replication_factor == self.replicas:
self._send_request(MetadataRequest[0]([topic_name]))
- elif version() >= (0, 10, 1, 0):
+ elif env_kafka_version() >= (0, 10, 1, 0):
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
replication_factor, [], [])], timeout_ms)
result = self._send_request(request, timeout=timeout_ms)
@@ -497,7 +478,7 @@ class KafkaFixture(Fixture):
'--replication-factor', self.replicas \
if replication_factor is None \
else replication_factor)
- if version() >= (0, 10):
+ if env_kafka_version() >= (0, 10):
args.append('--if-not-exists')
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index df0faef..a983ce1 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,5 +1,7 @@
import os
+import pytest
+
from kafka.errors import KafkaTimeoutError
from kafka.protocol import create_message
from kafka.structs import (
@@ -7,7 +9,7 @@ from kafka.structs import (
ProduceRequestPayload)
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, kafka_versions
+from test.testutil import KafkaIntegrationTestCase, env_kafka_version
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@@ -80,7 +82,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# Offset Tests #
####################
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
(resp,) = self.client.send_offset_commit_request('group', [req])
diff --git a/test/test_codec.py b/test/test_codec.py
index 3c4d2df..9eff888 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -14,7 +14,7 @@ from kafka.codec import (
lz4_encode_old_kafka, lz4_decode_old_kafka,
)
-from test.fixtures import random_string
+from test.testutil import random_string
def test_gzip():
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index ecc6d38..3367617 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -11,15 +11,15 @@ from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition
-from test.fixtures import random_string, version
+from test.testutil import env_kafka_version, random_string
def get_connect_str(kafka_broker):
return kafka_broker.host + ':' + str(kafka_broker.port)
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_consumer(kafka_broker, topic, version):
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_consumer(kafka_broker, topic):
# The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -29,8 +29,8 @@ def test_consumer(kafka_broker, topic, version):
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_consumer_topics(kafka_broker, topic, version):
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
@@ -38,8 +38,7 @@ def test_consumer_topics(kafka_broker, topic, version):
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
-@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
connect_str = get_connect_str(kafka_broker)
@@ -129,7 +128,7 @@ def test_group(kafka_broker, topic):
threads[c] = None
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_paused(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
topics = [TopicPartition(topic, 1)]
@@ -148,8 +147,7 @@ def test_paused(kafka_broker, topic):
consumer.close()
-@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_heartbeat_thread(kafka_broker, topic):
group_id = 'test-group-' + random_string(6)
consumer = KafkaConsumer(topic,
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fdffd05..cb05242 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,19 +1,18 @@
import logging
import os
import time
-from mock import patch
-import pytest
-import kafka.codec
+from mock import patch
import pytest
-from kafka.vendor.six.moves import range
from kafka.vendor import six
+from kafka.vendor.six.moves import range
from . import unittest
from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
create_gzip_message, KafkaProducer
)
+import kafka.codec
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
@@ -23,11 +22,11 @@ from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
)
-from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version
-from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
"""Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
@@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
kafka_consumer.close()
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer_unsupported_encoding(
topic, kafka_producer_factory, kafka_consumer_factory):
# Send a compressed message
@@ -211,7 +210,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -388,7 +387,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -459,7 +458,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -491,7 +490,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer2.stop()
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -548,6 +547,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
@@ -586,7 +586,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
consumer.close()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10)
@@ -605,7 +605,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs1 = []
for _ in range(180):
m = next(consumer1)
- output_msgs1.append(m)
+ output_msgs1.append((m.key, m.value))
self.assert_message_count(output_msgs1, 180)
consumer1.close()
@@ -621,12 +621,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs2 = []
for _ in range(20):
m = next(consumer2)
- output_msgs2.append(m)
+ output_msgs2.append((m.key, m.value))
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
consumer2.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_simple(self):
self.send_messages(0, range(100, 200))
self.send_messages(1, range(200, 300))
@@ -647,7 +647,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_one_msg(self):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
@@ -673,7 +673,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(fetched_msgs), 10)
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_for_time(self):
late_time = int(time.time()) * 1000
middle_time = late_time - 1000
@@ -727,7 +727,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
})
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_search_many_partitions(self):
tp0 = TopicPartition(self.topic, 0)
tp1 = TopicPartition(self.topic, 1)
@@ -766,7 +766,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
})
consumer.close()
- @kafka_versions('<0.10.1')
+ @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
def test_kafka_consumer_offsets_for_time_old(self):
consumer = self.kafka_consumer()
tp = TopicPartition(self.topic, 0)
@@ -774,7 +774,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(UnsupportedVersionError):
consumer.offsets_for_times({tp: int(time.time())})
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_for_times_errors(self):
consumer = self.kafka_consumer(fetch_max_wait_ms=200,
request_timeout_ms=500)
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 48021a4..ad7dcb9 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -9,8 +9,8 @@ from kafka.errors import (
from kafka.producer.base import Producer
from kafka.structs import TopicPartition
-from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
-from test.testutil import KafkaIntegrationTestCase
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, random_string
log = logging.getLogger(__name__)
diff --git a/test/test_producer.py b/test/test_producer.py
index 60b19bf..9605adf 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,7 +7,7 @@ import pytest
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
-from test.fixtures import random_string, version
+from test.testutil import env_kafka_version, random_string
def test_buffer_pool():
@@ -22,13 +22,13 @@ def test_buffer_pool():
assert buf2.read() == b''
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):
if compression == 'lz4':
# LZ4 requires 0.8.2
- if version() < (0, 8, 2):
+ if env_kafka_version() < (0, 8, 2):
return
# python-lz4 crashes on older versions of pypy
elif platform.python_implementation() == 'PyPy':
@@ -80,7 +80,7 @@ def test_kafka_producer_gc_cleanup():
assert threading.active_count() == threads
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
@@ -91,7 +91,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
magic = producer._max_usable_produce_magic()
# record headers are supported in 0.11.0
- if version() < (0, 11, 0):
+ if env_kafka_version() < (0, 11, 0):
headers = None
else:
headers = [("Header Key", b"Header Value")]
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 7109886..e0939a6 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -15,8 +15,8 @@ from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError
from kafka.producer.base import Producer
from kafka.structs import FetchRequestPayload, ProduceRequestPayload
-from test.fixtures import ZookeeperFixture, KafkaFixture, version
-from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
@@ -43,7 +43,7 @@ def assert_produce_response(resp, initial_offset):
assert resp[0].offset == initial_offset
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_produce_many_simple(simple_client, topic):
"""Test multiple produces using the SimpleClient
"""
@@ -353,7 +353,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_keyedproducer_null_payload(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
diff --git a/test/testutil.py b/test/testutil.py
index 781c364..3272262 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import
-import functools
-import operator
import os
+import random
+import string
import time
import uuid
@@ -16,72 +16,20 @@ from kafka.errors import (
FailedPayloadsError
)
from kafka.structs import OffsetRequestPayload
-from test.fixtures import random_string, version_str_to_tuple, version as kafka_version #pylint: disable=wrong-import-order
-def kafka_versions(*versions):
- """
- Describe the Kafka versions this test is relevant to.
-
- The versions are passed in as strings, for example:
- '0.11.0'
- '>=0.10.1.0'
- '>0.9', '<1.0' # since this accepts multiple versions args
-
- The current KAFKA_VERSION will be evaluated against this version. If the
- result is False, then the test is skipped. Similarly, if KAFKA_VERSION is
- not set the test is skipped.
-
- Note: For simplicity, this decorator accepts Kafka versions as strings even
- though the similarly functioning `api_version` only accepts tuples. Trying
- to convert it to tuples quickly gets ugly due to mixing operator strings
- alongside version tuples. While doable when one version is passed in, it
- isn't pretty when multiple versions are passed in.
- """
+def random_string(length):
+ return "".join(random.choice(string.ascii_letters) for i in range(length))
- def construct_lambda(s):
- if s[0].isdigit():
- op_str = '='
- v_str = s
- elif s[1].isdigit():
- op_str = s[0] # ! < > =
- v_str = s[1:]
- elif s[2].isdigit():
- op_str = s[0:2] # >= <=
- v_str = s[2:]
- else:
- raise ValueError('Unrecognized kafka version / operator: %s' % (s,))
-
- op_map = {
- '=': operator.eq,
- '!': operator.ne,
- '>': operator.gt,
- '<': operator.lt,
- '>=': operator.ge,
- '<=': operator.le
- }
- op = op_map[op_str]
- version = version_str_to_tuple(v_str)
- return lambda a: op(a, version)
-
- validators = map(construct_lambda, versions)
-
- def real_kafka_versions(func):
- @functools.wraps(func)
- def wrapper(func, *args, **kwargs):
- version = kafka_version()
-
- if not version:
- pytest.skip("no kafka version set in KAFKA_VERSION env var")
-
- for f in validators:
- if not f(version):
- pytest.skip("unsupported kafka version")
-
- return func(*args, **kwargs)
- return wrapper
-
- return real_kafka_versions
+
+def env_kafka_version():
+ """Return the Kafka version set in the OS environment as a tuple.
+
+ Example: '0.8.1.1' --> (0, 8, 1, 1)
+ """
+ if 'KAFKA_VERSION' not in os.environ:
+ return ()
+ return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
def current_offset(client, topic, partition, kafka_broker=None):