diff options
Diffstat (limited to 'test/fixtures.py')
-rw-r--r-- | test/fixtures.py | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/test/fixtures.py b/test/fixtures.py index ff6b687..c7748f1 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,9 +4,7 @@ import atexit import logging import os import os.path -import random import socket -import string import subprocess import time import uuid @@ -19,29 +17,12 @@ from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient from kafka.client_async import KafkaClient from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest +from test.testutil import env_kafka_version, random_string from test.service import ExternalService, SpawnedService log = logging.getLogger(__name__) -def random_string(length): - return "".join(random.choice(string.ascii_letters) for i in range(length)) - - -def version_str_to_tuple(version_str): - """Transform a version string into a tuple. - - Example: '0.8.1.1' --> (0, 8, 1, 1) - """ - return tuple(map(int, version_str.split('.'))) - - -def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return version_str_to_tuple(os.environ['KAFKA_VERSION']) - - def get_open_port(): sock = socket.socket() sock.bind(("", 0)) @@ -477,7 +458,7 @@ class KafkaFixture(Fixture): num_partitions == self.partitions and \ replication_factor == self.replicas: self._send_request(MetadataRequest[0]([topic_name])) - elif version() >= (0, 10, 1, 0): + elif env_kafka_version() >= (0, 10, 1, 0): request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) result = self._send_request(request, timeout=timeout_ms) @@ -497,7 +478,7 @@ class KafkaFixture(Fixture): '--replication-factor', self.replicas \ if replication_factor is None \ else replication_factor) - if version() >= (0, 10): + if env_kafka_version() >= (0, 10): args.append('--if-not-exists') env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |