summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-08-21 18:47:22 -0700
committerGitHub <noreply@github.com>2019-08-21 18:47:22 -0700
commite49caeb3ebdd36eb4d18a517bc402f8e89dfdbee (patch)
treee29f121c7979c96b9850cd3633d2ecb54d340657
parent5bc25292b8bb5b20ba2fff481fdc77b9909d0831 (diff)
downloadkafka-python-e49caeb3ebdd36eb4d18a517bc402f8e89dfdbee.tar.gz
Minor test cleanup (#1885)
Remove unused import, whitespace, etc. No functional changes, just cleaning it up so the diffs of later changes are cleaner.
-rw-r--r--test/conftest.py11
-rw-r--r--test/fixtures.py9
-rw-r--r--test/test_assignors.py3
-rw-r--r--test/test_codec.py2
-rw-r--r--test/test_conn.py1
-rw-r--r--test/test_consumer_group.py5
-rw-r--r--test/test_protocol.py1
-rw-r--r--test/testutil.py4
8 files changed, 25 insertions, 11 deletions
diff --git a/test/conftest.py b/test/conftest.py
index ffaae03..b6d3e3e 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -10,6 +10,7 @@ def version():
"""Return the Kafka version set in the OS environment"""
return kafka_version()
+
@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
@@ -17,11 +18,13 @@ def zookeeper():
yield zk_instance
zk_instance.close()
+
@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]
+
@pytest.fixture(scope="module")
def kafka_broker_factory(version, zookeeper):
"""Return a Kafka broker fixture factory"""
@@ -42,6 +45,7 @@ def kafka_broker_factory(version, zookeeper):
for broker in _brokers:
broker.close()
+
@pytest.fixture
def simple_client(kafka_broker, request, topic):
"""Return a SimpleClient fixture"""
@@ -50,6 +54,7 @@ def simple_client(kafka_broker, request, topic):
yield client
client.close()
+
@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
@@ -57,11 +62,13 @@ def kafka_client(kafka_broker, request):
yield client
client.close()
+
@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()
+
@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
@@ -79,11 +86,13 @@ def kafka_consumer_factory(kafka_broker, topic, request):
if _consumer[0]:
_consumer[0].close()
+
@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()
+
@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
@@ -100,6 +109,7 @@ def kafka_producer_factory(kafka_broker, request):
if _producer[0]:
_producer[0].close()
+
@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
@@ -107,6 +117,7 @@ def topic(kafka_broker, request):
kafka_broker.create_topics([topic_name])
return topic_name
+
@pytest.fixture
def conn(mocker):
"""Return a connection mocker fixture"""
diff --git a/test/fixtures.py b/test/fixtures.py
index 3e59e94..ff6b687 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -23,9 +23,11 @@ from test.service import ExternalService, SpawnedService
log = logging.getLogger(__name__)
+
def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))
+
def version_str_to_tuple(version_str):
"""Transform a version string into a tuple.
@@ -33,11 +35,13 @@ def version_str_to_tuple(version_str):
"""
return tuple(map(int, version_str.split('.')))
+
def version():
if 'KAFKA_VERSION' not in os.environ:
return ()
return version_str_to_tuple(os.environ['KAFKA_VERSION'])
+
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
@@ -45,6 +49,7 @@ def get_open_port():
sock.close()
return port
+
def gen_ssl_resources(directory):
os.system("""
cd {0}
@@ -74,6 +79,7 @@ def gen_ssl_resources(directory):
-file cert-signed -storepass foobar -noprompt
""".format(directory))
+
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
@@ -158,6 +164,7 @@ class Fixture(object):
def dump_logs(self):
self.child.dump_logs()
+
class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
@@ -496,7 +503,7 @@ class KafkaFixture(Fixture):
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
- if not 'kafka.common.TopicExistsException' in stdout:
+ if 'kafka.common.TopicExistsException' not in stdout:
self.out("Failed to create topic %s" % (topic_name,))
self.out(stdout)
self.out(stderr)
diff --git a/test/test_assignors.py b/test/test_assignors.py
index e2a1d4f..0821caf 100644
--- a/test/test_assignors.py
+++ b/test/test_assignors.py
@@ -5,8 +5,7 @@ import pytest
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.coordinator.protocol import (
- ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
+from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment
@pytest.fixture
diff --git a/test/test_codec.py b/test/test_codec.py
index 0fefe6f..3c4d2df 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -7,7 +7,7 @@ import pytest
from kafka.vendor.six.moves import range
from kafka.codec import (
- has_snappy, has_gzip, has_lz4,
+ has_snappy, has_lz4,
gzip_encode, gzip_decode,
snappy_encode, snappy_decode,
lz4_encode, lz4_decode,
diff --git a/test/test_conn.py b/test/test_conn.py
index 7a6588b..6412cb6 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -3,7 +3,6 @@ from __future__ import absolute_import
from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET
import socket
-import time
import mock
import pytest
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index ec26857..ecc6d38 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -8,7 +8,7 @@ from kafka.vendor import six
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
-from kafka.coordinator.base import MemberState, Generation
+from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition
from test.fixtures import random_string, version
@@ -34,8 +34,7 @@ def test_consumer_topics(kafka_broker, topic, version):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
- consumer_topics = consumer.topics()
- assert topic in consumer_topics
+ assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 7abcefb..e295174 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -3,7 +3,6 @@ import io
import struct
import pytest
-from kafka.vendor import six
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
diff --git a/test/testutil.py b/test/testutil.py
index b7b4513..781c364 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -44,10 +44,10 @@ def kafka_versions(*versions):
op_str = '='
v_str = s
elif s[1].isdigit():
- op_str = s[0] # ! < > =
+ op_str = s[0] # ! < > =
v_str = s[1:]
elif s[2].isdigit():
- op_str = s[0:2] # >= <=
+ op_str = s[0:2] # >= <=
v_str = s[2:]
else:
raise ValueError('Unrecognized kafka version / operator: %s' % (s,))