summaryrefslogtreecommitdiff
path: root/test/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/fixtures.py')
-rw-r--r--test/fixtures.py25
1 files changed, 3 insertions, 22 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index ff6b687..c7748f1 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -4,9 +4,7 @@ import atexit
import logging
import os
import os.path
-import random
import socket
-import string
import subprocess
import time
import uuid
@@ -19,29 +17,12 @@ from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
from kafka.client_async import KafkaClient
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
+from test.testutil import env_kafka_version, random_string
from test.service import ExternalService, SpawnedService
log = logging.getLogger(__name__)
-def random_string(length):
- return "".join(random.choice(string.ascii_letters) for i in range(length))
-
-
-def version_str_to_tuple(version_str):
- """Transform a version string into a tuple.
-
- Example: '0.8.1.1' --> (0, 8, 1, 1)
- """
- return tuple(map(int, version_str.split('.')))
-
-
-def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return version_str_to_tuple(os.environ['KAFKA_VERSION'])
-
-
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
@@ -477,7 +458,7 @@ class KafkaFixture(Fixture):
num_partitions == self.partitions and \
replication_factor == self.replicas:
self._send_request(MetadataRequest[0]([topic_name]))
- elif version() >= (0, 10, 1, 0):
+ elif env_kafka_version() >= (0, 10, 1, 0):
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
replication_factor, [], [])], timeout_ms)
result = self._send_request(request, timeout=timeout_ms)
@@ -497,7 +478,7 @@ class KafkaFixture(Fixture):
'--replication-factor', self.replicas \
if replication_factor is None \
else replication_factor)
- if version() >= (0, 10):
+ if env_kafka_version() >= (0, 10):
args.append('--if-not-exists')
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)