diff options
Diffstat (limited to 'test/fixtures.py')
-rw-r--r-- | test/fixtures.py | 299 |
1 files changed, 237 insertions, 62 deletions
diff --git a/test/fixtures.py b/test/fixtures.py index 1c418fd..493a664 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,29 +4,55 @@ import atexit import logging import os import os.path -import shutil +import random +import socket +import string import subprocess -import tempfile import time import uuid -from six.moves import urllib +import py +from six.moves import urllib, xrange from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient +from kafka.client_async import KafkaClient +from kafka.protocol.admin import CreateTopicsRequest +from kafka.protocol.metadata import MetadataRequest from test.service import ExternalService, SpawnedService -from test.testutil import get_open_port - log = logging.getLogger(__name__) +def random_string(length): + return "".join(random.choice(string.ascii_letters) for i in xrange(length)) + +def version_str_to_list(version_str): + return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1) + +def version(): + if 'KAFKA_VERSION' not in os.environ: + return () + return version_str_to_list(os.environ['KAFKA_VERSION']) + +def get_open_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') - project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + project_root = os.environ.get('PROJECT_ROOT', + os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", + os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + def __init__(self): + self.child = None + @classmethod def download_official_distribution(cls, kafka_version=None, @@ -71,31 +97,34 @@ class Fixture(object): @classmethod def kafka_run_class_args(cls, *args): result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] - result.extend(args) + result.extend([str(arg) for arg in args]) return result def kafka_run_class_env(self): env = os.environ.copy() - env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties") + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ + self.test_resource("log4j.properties") return env @classmethod def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file, source_file) + log.info('Rendering %s from template %s', target_file.strpath, source_file) with open(source_file, "r") as handle: template = handle.read() assert len(template) > 0, 'Empty template %s' % source_file - with open(target_file, "w") as handle: + with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() os.fsync(handle) # fsync directory for durability # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ - dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY) + dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + def dump_logs(self): + self.child.dump_logs() class ZookeeperFixture(Fixture): @classmethod @@ -111,32 +140,36 @@ class ZookeeperFixture(Fixture): fixture.open() return fixture - def __init__(self, host, port): + def __init__(self, host, port, tmp_dir=None): + super(ZookeeperFixture, self).__init__() self.host = host self.port = port - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir def kafka_run_class_env(self): env = super(ZookeeperFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) def open(self): - self.tmp_dir = tempfile.mkdtemp() + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.out("Running local instance...") log.info(" host = %s", self.host) log.info(" port = %s", self.port or '(auto)') - log.info(" tmp_dir = %s", self.tmp_dir) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) # Configure Zookeeper child process template = self.test_resource("zookeeper.properties") - properties = os.path.join(self.tmp_dir, "zookeeper.properties") - args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + properties = self.tmp_dir.join("zookeeper.properties") + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", + properties.strpath) env = self.kafka_run_class_env() # Party! @@ -174,7 +207,7 @@ class ZookeeperFixture(Fixture): self.child.stop() self.child = None self.out("Done!") - shutil.rmtree(self.tmp_dir) + self.tmp_dir.remove() def __del__(self): self.close() @@ -182,9 +215,11 @@ class ZookeeperFixture(Fixture): class KafkaFixture(Fixture): @classmethod - def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, + def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, - transport='PLAINTEXT', replicas=1, partitions=2): + transport='PLAINTEXT', replicas=1, partitions=2, + sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -195,19 +230,29 @@ class KafkaFixture(Fixture): if host is None: host = "localhost" fixture = KafkaFixture(host, port, broker_id, - zk_host, zk_port, zk_chroot, + zookeeper, zk_chroot, transport=transport, - replicas=replicas, partitions=partitions) + replicas=replicas, partitions=partitions, + sasl_mechanism=sasl_mechanism, + auto_create_topic=auto_create_topic, + tmp_dir=tmp_dir) + fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, - replicas=1, partitions=2, transport='PLAINTEXT'): + def __init__(self, host, port, broker_id, zookeeper, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT', + sasl_mechanism='PLAIN', auto_create_topic=True, + tmp_dir=None): + super(KafkaFixture, self).__init__() + self.host = host self.port = port self.broker_id = broker_id + self.auto_create_topic = auto_create_topic self.transport = transport.upper() + self.sasl_mechanism = sasl_mechanism.upper() self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs @@ -215,67 +260,55 @@ class KafkaFixture(Fixture): # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id - self.zk_host = zk_host - self.zk_port = zk_port + self.zookeeper = zookeeper self.zk_chroot = zk_chroot + # Add the attributes below for the template binding + self.zk_host = self.zookeeper.host + self.zk_port = self.zookeeper.port self.replicas = replicas self.partitions = partitions - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir self.running = False + self._client = None + + def bootstrap_server(self): + return '%s:%d' % (self.host, self.port) + def kafka_run_class_env(self): env = super(KafkaFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message) - def open(self): - if self.running: - self.out("Instance already running") - return - - self.tmp_dir = tempfile.mkdtemp() - self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zk_host) - log.info(" zk_port = %s", self.zk_port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir) - - # Create directories - os.mkdir(os.path.join(self.tmp_dir, "logs")) - os.mkdir(os.path.join(self.tmp_dir, "data")) - + def _create_zk_chroot(self): self.out("Creating Zookeeper chroot node...") args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", - "-server", "%s:%d" % (self.zk_host, self.zk_port), + "-server", + "%s:%d" % (self.zookeeper.host, + self.zookeeper.port), "create", "/%s" % self.zk_chroot, "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if proc.wait() != 0: + if proc.wait() != 0 or proc.returncode != 0: self.out("Failed to create Zookeeper chroot node") self.out(proc.stdout.read()) self.out(proc.stderr.read()) raise RuntimeError("Failed to create Zookeeper chroot node") - self.out("Done!") + self.out("Kafka chroot created in Zookeeper!") + def start(self): # Configure Kafka child process - properties = os.path.join(self.tmp_dir, "kafka.properties") + properties = self.tmp_dir.join("kafka.properties") template = self.test_resource("kafka.properties") - args = self.kafka_run_class_args("kafka.Kafka", properties) + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() timeout = 5 @@ -305,14 +338,45 @@ class KafkaFixture(Fixture): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + + (self._client,) = self.get_clients(1, '_internal_client') + self.out("Done!") self.running = True + + def open(self): + if self.running: + self.out("Instance already running") + return + + # Create directories + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.tmp_dir.ensure('logs', dir=True) + self.tmp_dir.ensure('data', dir=True) + + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) + + self._create_zk_chroot() + self.start() + atexit.register(self.close) def __del__(self): self.close() - def close(self): + def stop(self): if not self.running: self.out("Instance already stopped") return @@ -320,6 +384,117 @@ class KafkaFixture(Fixture): self.out("Stopping...") self.child.stop() self.child = None - self.out("Done!") - shutil.rmtree(self.tmp_dir) self.running = False + self.out("Stopped!") + + def close(self): + self.stop() + if self.tmp_dir is not None: + self.tmp_dir.remove() + self.tmp_dir = None + self.out("Done!") + + def dump_logs(self): + super(KafkaFixture, self).dump_logs() + self.zookeeper.dump_logs() + + def _send_request(self, request, timeout=None): + def _failure(error): + raise error + retries = 10 + while True: + node_id = self._client.least_loaded_node() + for ready_retry in range(40): + if self._client.ready(node_id, False): + break + time.sleep(.1) + else: + raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) + + try: + future = self._client.send(node_id, request) + future.error_on_callbacks = True + future.add_errback(_failure) + return self._client.poll(future=future, timeout_ms=timeout) + except Exception as exc: + time.sleep(1) + retries -= 1 + if retries == 0: + raise exc + else: + pass # retry + + def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + if num_partitions is None: + num_partitions = self.partitions + if replication_factor is None: + replication_factor = self.replicas + + # Try different methods to create a topic, from the fastest to the slowest + if self.auto_create_topic and \ + num_partitions == self.partitions and \ + replication_factor == self.replicas: + self._send_request(MetadataRequest[0]([topic_name])) + elif version() >= (0, 10, 1, 0): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + result = self._send_request(request, timeout=timeout_ms) + for topic_result in result[0].topic_error_codes: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + else: + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + ret = proc.wait() + if ret != 0 or proc.returncode != 0: + output = proc.stdout.read() + if not 'kafka.common.TopicExistsException' in output: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(output) + self.out(proc.stderr.read()) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def create_topics(self, topic_names, num_partitions=None, replication_factor=None): + for topic_name in topic_names: + self._create_topic(topic_name, num_partitions, replication_factor) + + def get_clients(self, cnt=1, client_id=None): + if client_id is None: + client_id = 'client' + return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), + bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) + + def get_consumers(self, cnt, topics, **params): + params.setdefault('client_id', 'consumer') + params.setdefault('heartbeat_interval_ms', 500) + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaConsumer(*topics, **params) + + def get_producers(self, cnt, **params): + params.setdefault('client_id', 'producer') + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaProducer(**params) + + def get_simple_client(self, **params): + params.setdefault('client_id', 'simple_client') + return SimpleClient(self.bootstrap_server(), **params) |