diff options
-rwxr-xr-x | benchmarks/consumer_performance.py | 179 | ||||
-rwxr-xr-x | benchmarks/load_example.py (renamed from load_example.py) | 0 | ||||
-rwxr-xr-x | benchmarks/producer_performance.py | 158 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 8 | ||||
-rw-r--r-- | kafka/consumer/group.py | 22 | ||||
-rw-r--r-- | kafka/metrics/stats/sensor.py | 6 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 20 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 40 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 | ||||
-rw-r--r-- | kafka/producer/sender.py | 212 | ||||
-rw-r--r-- | test/test_sender.py | 16 |
11 files changed, 640 insertions, 27 deletions
diff --git a/benchmarks/consumer_performance.py b/benchmarks/consumer_performance.py new file mode 100755 index 0000000..3e879ae --- /dev/null +++ b/benchmarks/consumer_performance.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# Adapted from https://github.com/mrafayaleem/kafka-jython + +from __future__ import absolute_import, print_function + +import argparse +import logging +import pprint +import sys +import threading +import traceback + +from kafka import KafkaConsumer, KafkaProducer +from test.fixtures import KafkaFixture, ZookeeperFixture + +logging.basicConfig(level=logging.ERROR) + + +def start_brokers(n): + print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) + print('-> 1 Zookeeper') + zk = ZookeeperFixture.instance() + print('---> {0}:{1}'.format(zk.host, zk.port)) + print() + + partitions = min(n, 3) + replicas = min(n, 3) + print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) + brokers = [ + KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='', + partitions=partitions, replicas=replicas) + for i in range(n) + ] + for broker in brokers: + print('---> {0}:{1}'.format(broker.host, broker.port)) + print() + return brokers + + +class ConsumerPerformance(object): + + @staticmethod + def run(args): + try: + props = {} + for prop in args.consumer_config: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + props[k] = v + + if args.brokers: + brokers = start_brokers(args.brokers) + props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) + for broker in brokers] + print('---> bootstrap_servers={0}'.format(props['bootstrap_servers'])) + print() + + print('-> Producing records') + record = bytes(bytearray(args.record_size)) + producer = KafkaProducer(compression_type=args.fixture_compression, + **props) + for i in xrange(args.num_records): + producer.send(topic=args.topic, value=record) + producer.flush() + producer.close() + print('-> OK!') + print() + + print('Initializing Consumer...') + props['auto_offset_reset'] = 'earliest' + if 'consumer_timeout_ms' not in props: + props['consumer_timeout_ms'] = 10000 + props['metrics_sample_window_ms'] = args.stats_interval * 1000 + for k, v in props.items(): + print('---> {0}={1}'.format(k, v)) + consumer = KafkaConsumer(args.topic, **props) + print('---> group_id={0}'.format(consumer.config['group_id'])) + print('---> report stats every {0} secs'.format(args.stats_interval)) + print('---> raw metrics? {0}'.format(args.raw_metrics)) + timer_stop = threading.Event() + timer = StatsReporter(args.stats_interval, consumer, + event=timer_stop, + raw_metrics=args.raw_metrics) + timer.start() + print('-> OK!') + print() + + records = 0 + for msg in consumer: + records += 1 + if records >= args.num_records: + break + print('Consumed {0} records'.format(records)) + + timer_stop.set() + + except Exception: + exc_info = sys.exc_info() + traceback.print_exception(*exc_info) + sys.exit(1) + + +class StatsReporter(threading.Thread): + def __init__(self, interval, consumer, event=None, raw_metrics=False): + super(StatsReporter, self).__init__() + self.interval = interval + self.consumer = consumer + self.event = event + self.raw_metrics = raw_metrics + + def print_stats(self): + metrics = self.consumer.metrics() + if self.raw_metrics: + pprint.pprint(metrics) + else: + print('{records-consumed-rate} records/sec ({bytes-consumed-rate} B/sec),' + ' {fetch-latency-avg} latency,' + ' {fetch-rate} fetch/s,' + ' {fetch-size-avg} fetch size,' + ' {records-lag-max} max record lag,' + ' {records-per-request-avg} records/req' + .format(**metrics['consumer-fetch-manager-metrics'])) + + + def print_final(self): + self.print_stats() + + def run(self): + while self.event and not self.event.wait(self.interval): + self.print_stats() + else: + self.print_final() + + +def get_args_parser(): + parser = argparse.ArgumentParser( + description='This tool is used to verify the consumer performance.') + + parser.add_argument( + '--topic', type=str, + help='Topic for consumer test', + default='kafka-python-benchmark-test') + parser.add_argument( + '--num-records', type=long, + help='number of messages to consume', + default=1000000) + parser.add_argument( + '--record-size', type=int, + help='message size in bytes', + default=100) + parser.add_argument( + '--consumer-config', type=str, nargs='+', default=(), + help='kafka consumer related configuaration properties like ' + 'bootstrap_servers,client_id etc..') + parser.add_argument( + '--fixture-compression', type=str, + help='specify a compression type for use with broker fixtures / producer') + parser.add_argument( + '--brokers', type=int, + help='Number of kafka brokers to start', + default=0) + parser.add_argument( + '--stats-interval', type=int, + help='Interval in seconds for stats reporting to console', + default=5) + parser.add_argument( + '--raw-metrics', action='store_true', + help='Enable this flag to print full metrics dict on each interval') + return parser + + +if __name__ == '__main__': + args = get_args_parser().parse_args() + ConsumerPerformance.run(args) diff --git a/load_example.py b/benchmarks/load_example.py index a3b09ba..a3b09ba 100755 --- a/load_example.py +++ b/benchmarks/load_example.py diff --git a/benchmarks/producer_performance.py b/benchmarks/producer_performance.py new file mode 100755 index 0000000..e958735 --- /dev/null +++ b/benchmarks/producer_performance.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# Adapted from https://github.com/mrafayaleem/kafka-jython + +from __future__ import absolute_import, print_function + +import argparse +import pprint +import sys +import threading +import traceback + +from kafka import KafkaProducer +from test.fixtures import KafkaFixture, ZookeeperFixture + + +def start_brokers(n): + print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n)) + print('-> 1 Zookeeper') + zk = ZookeeperFixture.instance() + print('---> {0}:{1}'.format(zk.host, zk.port)) + print() + + partitions = min(n, 3) + replicas = min(n, 3) + print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas)) + brokers = [ + KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='', + partitions=partitions, replicas=replicas) + for i in range(n) + ] + for broker in brokers: + print('---> {0}:{1}'.format(broker.host, broker.port)) + print() + return brokers + + +class ProducerPerformance(object): + + @staticmethod + def run(args): + try: + props = {} + for prop in args.producer_config: + k, v = prop.split('=') + try: + v = int(v) + except ValueError: + pass + if v == 'None': + v = None + props[k] = v + + if args.brokers: + brokers = start_brokers(args.brokers) + props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port) + for broker in brokers] + print("---> bootstrap_servers={0}".format(props['bootstrap_servers'])) + print() + print('-> OK!') + print() + + print('Initializing producer...') + record = bytes(bytearray(args.record_size)) + props['metrics_sample_window_ms'] = args.stats_interval * 1000 + + producer = KafkaProducer(**props) + for k, v in props.items(): + print('---> {0}={1}'.format(k, v)) + print('---> send {0} byte records'.format(args.record_size)) + print('---> report stats every {0} secs'.format(args.stats_interval)) + print('---> raw metrics? {0}'.format(args.raw_metrics)) + timer_stop = threading.Event() + timer = StatsReporter(args.stats_interval, producer, + event=timer_stop, + raw_metrics=args.raw_metrics) + timer.start() + print('-> OK!') + print() + + for i in xrange(args.num_records): + producer.send(topic=args.topic, value=record) + producer.flush() + + timer_stop.set() + + except Exception: + exc_info = sys.exc_info() + traceback.print_exception(*exc_info) + sys.exit(1) + + +class StatsReporter(threading.Thread): + def __init__(self, interval, producer, event=None, raw_metrics=False): + super(StatsReporter, self).__init__() + self.interval = interval + self.producer = producer + self.event = event + self.raw_metrics = raw_metrics + + def print_stats(self): + metrics = self.producer.metrics() + if self.raw_metrics: + pprint.pprint(metrics) + else: + print('{record-send-rate} records/sec ({byte-rate} B/sec),' + ' {request-latency-avg} latency,' + ' {record-size-avg} record size,' + ' {batch-size-avg} batch size,' + ' {records-per-request-avg} records/req' + .format(**metrics['producer-metrics'])) + + def print_final(self): + self.print_stats() + + def run(self): + while self.event and not self.event.wait(self.interval): + self.print_stats() + else: + self.print_final() + + +def get_args_parser(): + parser = argparse.ArgumentParser( + description='This tool is used to verify the producer performance.') + + parser.add_argument( + '--topic', type=str, + help='Topic name for test', + default='kafka-python-benchmark-test') + parser.add_argument( + '--num-records', type=long, + help='number of messages to produce', + default=1000000) + parser.add_argument( + '--record-size', type=int, + help='message size in bytes', + default=100) + parser.add_argument( + '--producer-config', type=str, nargs='+', default=(), + help='kafka producer related configuaration properties like ' + 'bootstrap_servers,client_id etc..') + parser.add_argument( + '--brokers', type=int, + help='Number of kafka brokers to start', + default=0) + parser.add_argument( + '--stats-interval', type=int, + help='Interval in seconds for stats reporting to console', + default=5) + parser.add_argument( + '--raw-metrics', action='store_true', + help='Enable this flag to print full metrics dict on each interval') + return parser + + +if __name__ == '__main__': + args = get_args_parser().parse_args() + ProducerPerformance.run(args) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 34ff4cb..d615848 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -729,6 +729,8 @@ class Fetcher(six.Iterator): else: raise error_type('Unexpected error while fetching data') + # Because we are currently decompressing messages lazily, the sensors here + # will get compressed bytes / message set stats when compression is enabled self._sensors.bytes_fetched.record(total_bytes) self._sensors.records_fetched.record(total_count) if response.API_VERSION >= 1: @@ -774,12 +776,12 @@ class FetchManagerMetrics(object): 'The maximum throttle time in ms'), Max()) def record_topic_fetch_metrics(self, topic, num_bytes, num_records): - metric_tags = {'topic': topic.replace('.', '_')} - # record bytes fetched name = '.'.join(['topic', topic, 'bytes-fetched']) bytes_fetched = self.metrics.get_sensor(name) if not bytes_fetched: + metric_tags = {'topic': topic.replace('.', '_')} + bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, @@ -799,6 +801,8 @@ class FetchManagerMetrics(object): name = '.'.join(['topic', topic, 'records-fetched']) records_fetched = self.metrics.get_sensor(name) if not records_fetched: + metric_tags = {'topic': topic.replace('.', '_')} + records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8fa43bc..982cd7b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.metrics import DictReporter, MetricConfig, Metrics +from kafka.metrics import MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator): in classes that will be notified of new metric creation. Default: [] metrics_num_samples (int): The number of samples maintained to compute metrics. Default: 2 - metrics_sample_window_ms (int): The number of samples maintained to - compute metrics. Default: 30000 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator): time_window_ms=self.config['metrics_sample_window_ms'], tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] - reporters.append(DictReporter('kafka.consumer')) self._metrics = Metrics(metric_config, reporters) metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. @@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py index b0bf4db..72bacfc 100644 --- a/kafka/metrics/stats/sensor.py +++ b/kafka/metrics/stats/sensor.py @@ -55,15 +55,15 @@ class Sensor(object): Record a value at a known time. Arguments: value (double): The value we are recording - time_ms (int): The current POSIX time in milliseconds + time_ms (int): A POSIX timestamp in milliseconds. + Default: The time when record() is evaluated (now) Raises: QuotaViolationException: if recording this value moves a metric beyond its configured maximum or minimum bound """ - now = time.time() * 1000 if time_ms is None: - time_ms = now + time_ms = time.time() * 1000 self._last_record_time = time_ms with self._lock: # XXX high volume, might be performance issue # increment all the stats diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5fcb35f..de5f0e7 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import io @@ -55,6 +55,8 @@ class MessageSetBuffer(object): self._batch_size = batch_size self._closed = False self._messages = 0 + self._bytes_written = 4 # Int32 header is 4 bytes + self._final_size = None def append(self, offset, message): """Apend a Message to the MessageSet. @@ -62,6 +64,8 @@ class MessageSetBuffer(object): Arguments: offset (int): offset of the message message (Message or bytes): message struct or encoded bytes + + Returns: bytes written """ if isinstance(message, Message): encoded = message.encode() @@ -70,6 +74,8 @@ class MessageSetBuffer(object): msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded self._buffer.write(msg) self._messages += 1 + self._bytes_written += len(msg) + return len(msg) def has_room_for(self, key, value): if self._closed: @@ -107,16 +113,20 @@ class MessageSetBuffer(object): self._buffer.write(Int32.encode(len(encoded))) self._buffer.write(encoded) - # Update the message set size, and return ready for full read() - size = self._buffer.tell() - 4 + # Update the message set size (less the 4 byte header), + # and return with buffer ready for full read() + self._final_size = self._buffer.tell() self._buffer.seek(0) - self._buffer.write(Int32.encode(size)) + self._buffer.write(Int32.encode(self._final_size - 4)) self._buffer.seek(0) self._closed = True def size_in_bytes(self): - return self._buffer.tell() + return self._final_size or self._buffer.tell() + + def compression_rate(self): + return self.size_in_bytes() / self._bytes_written def buffer(self): return self._buffer diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f5c5d19..70c0cd0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -9,6 +9,7 @@ import weakref from .. import errors as Errors from ..client_async import KafkaClient +from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet from ..structs import TopicPartition @@ -220,6 +221,13 @@ class KafkaProducer(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version set to 'auto' + metric_reporters (list): A list of classes to use as metrics reporters. + Implementing the AbstractMetricsReporter interface allows plugging + in classes that will be notified of new metric creation. Default: [] + metrics_num_samples (int): The number of samples maintained to compute + metrics. Default: 2 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -255,7 +263,10 @@ class KafkaProducer(object): 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': None, - 'api_version_auto_timeout_ms': 2000 + 'api_version_auto_timeout_ms': 2000, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, } def __init__(self, **configs): @@ -285,6 +296,14 @@ class KafkaProducer(object): log.warning('use api_version=%s (%s is deprecated)', str(self.config['api_version']), deprecated) + # Configure metrics + metrics_tags = {'client-id': self.config['client_id']} + metric_config = MetricConfig(samples=self.config['metrics_num_samples'], + time_window_ms=self.config['metrics_sample_window_ms'], + tags=metrics_tags) + reporters = [reporter() for reporter in self.config['metric_reporters']] + self._metrics = Metrics(metric_config, reporters) + client = KafkaClient(**self.config) # Get auto-discovered version from client if necessary @@ -298,7 +317,8 @@ class KafkaProducer(object): self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, self._accumulator, + self._sender = Sender(client, self._metadata, + self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True @@ -382,6 +402,7 @@ class KafkaProducer(object): if not invoked_from_callback: self._sender.join() + self._metrics.close() try: self.config['key_serializer'].close() except AttributeError: @@ -581,3 +602,18 @@ class KafkaProducer(object): return self.config['partitioner'](serialized_key, all_partitions, available) + + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 566bf6f..7ea579a 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -38,7 +38,7 @@ class AtomicInteger(object): class RecordBatch(object): def __init__(self, tp, records, message_version=0): self.record_count = 0 - #self.max_record_size = 0 # for metrics only + self.max_record_size = 0 now = time.time() self.created = now self.drained = None @@ -56,8 +56,8 @@ class RecordBatch(object): return None msg = Message(value, key=key, magic=self.message_version) - self.records.append(self.record_count, msg) - # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only + record_size = self.records.append(self.record_count, msg) + self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() future = FutureRecordMetadata(self.produce_future, self.record_count, timestamp_ms) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 958e165..c1d0905 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -8,9 +8,11 @@ import threading import six from .. import errors as Errors +from ..metrics.measurable import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate +from ..protocol.produce import ProduceRequest from ..structs import TopicPartition from ..version import __version__ -from ..protocol.produce import ProduceRequest log = logging.getLogger(__name__) @@ -31,7 +33,7 @@ class Sender(threading.Thread): 'api_version': (0, 8, 0), } - def __init__(self, client, metadata, accumulator, **configs): + def __init__(self, client, metadata, accumulator, metrics, **configs): super(Sender, self).__init__() self.config = copy.copy(self._DEFAULT_CONFIG) for key in self.config: @@ -45,6 +47,7 @@ class Sender(threading.Thread): self._running = True self._force_close = False self._topics_to_add = set() + self._sensors = SenderMetrics(metrics, self._client, self._metadata) def run(self): """The main run loop for the sender thread.""" @@ -119,7 +122,10 @@ class Sender(threading.Thread): expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) + for expired_batch in expired_batches: + self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count) + self._sensors.update_produce_request_metrics(batches_by_node) requests = self._create_produce_requests(batches_by_node) # If we have any nodes that are ready to send + have sendable data, # poll with 0 timeout so this can immediately loop and try sending more @@ -223,6 +229,7 @@ class Sender(threading.Thread): self.config['retries'] - batch.attempts - 1, error) self._accumulator.reenqueue(batch) + self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: if error is Errors.TopicAuthorizationFailedError: error = error(batch.topic_partition.topic) @@ -230,6 +237,8 @@ class Sender(threading.Thread): # tell the user the result of their request batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) + if error is not None: + self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) if getattr(error, 'invalid_metadata', False): self._metadata.request_update() @@ -296,3 +305,200 @@ class Sender(threading.Thread): def wakeup(self): """Wake up the selector associated with this send thread.""" self._client.wakeup() + + +class SenderMetrics(object): + + def __init__(self, metrics, client, metadata): + self.metrics = metrics + self._client = client + self._metadata = metadata + + sensor_name = 'batch-size' + self.batch_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('batch-size-avg', Avg(), + sensor_name=sensor_name, + description='The average number of bytes sent per partition per-request.') + self.add_metric('batch-size-max', Max(), + sensor_name=sensor_name, + description='The max number of bytes sent per partition per-request.') + + sensor_name = 'compression-rate' + self.compression_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('compression-rate-avg', Avg(), + sensor_name=sensor_name, + description='The average compression rate of record batches.') + + sensor_name = 'queue-time' + self.queue_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-queue-time-avg', Avg(), + sensor_name=sensor_name, + description='The average time in ms record batches spent in the record accumulator.') + self.add_metric('record-queue-time-max', Max(), + sensor_name=sensor_name, + description='The maximum time in ms record batches spent in the record accumulator.') + + sensor_name = 'request-time' + self.request_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('request-latency-avg', Avg(), + sensor_name=sensor_name, + description='The average request latency in ms') + self.add_metric('request-latency-max', Max(), + sensor_name=sensor_name, + description='The maximum request latency in ms') + + sensor_name = 'produce-throttle-time' + self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) + self.add_metric('produce-throttle-time-avg', Avg(), + sensor_name=sensor_name, + description='The average throttle time in ms') + self.add_metric('produce-throttle-time-max', Max(), + sensor_name=sensor_name, + description='The maximum throttle time in ms') + + sensor_name = 'records-per-request' + self.records_per_request_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name, + description='The average number of records sent per second.') + self.add_metric('records-per-request-avg', Avg(), + sensor_name=sensor_name, + description='The average number of records per request.') + + sensor_name = 'bytes' + self.byte_rate_sensor = self.metrics.sensor(sensor_name) + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name, + description='The average number of bytes sent per second.') + + sensor_name = 'record-retries' + self.retry_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of retried record sends') + + sensor_name = 'errors' + self.error_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name, + description='The average per-second number of record sends that resulted in errors') + + sensor_name = 'record-size-max' + self.max_record_size_sensor = self.metrics.sensor(sensor_name) + self.add_metric('record-size-max', Max(), + sensor_name=sensor_name, + description='The maximum record size across all batches') + self.add_metric('record-size-avg', Avg(), + sensor_name=sensor_name, + description='The average maximum record size per batch') + + self.add_metric('requests-in-flight', + AnonMeasurable(lambda *_: self._client.in_flight_request_count()), + description='The current number of in-flight requests awaiting a response.') + + self.add_metric('metadata-age', + AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000), + description='The age in seconds of the current producer metadata being used.') + + def add_metric(self, metric_name, measurable, group_name='producer-metrics', + description=None, tags=None, + sensor_name=None): + m = self.metrics + metric = m.metric_name(metric_name, group_name, description, tags) + if sensor_name: + sensor = m.sensor(sensor_name) + sensor.add(metric, measurable) + else: + m.add_metric(metric, measurable) + + def maybe_register_topic_metrics(self, topic): + + def sensor_name(name): + return 'topic.{0}.{1}'.format(topic, name) + + # if one sensor of the metrics has been registered for the topic, + # then all other sensors should have been registered; and vice versa + if not self.metrics.get_sensor(sensor_name('records-per-batch')): + + self.add_metric('record-send-rate', Rate(), + sensor_name=sensor_name('records-per-batch'), + group_name='producer-topic-metrics.' + topic, + description= 'Records sent per second for topic ' + topic) + + self.add_metric('byte-rate', Rate(), + sensor_name=sensor_name('bytes'), + group_name='producer-topic-metrics.' + topic, + description='Bytes per second for topic ' + topic) + + self.add_metric('compression-rate', Avg(), + sensor_name=sensor_name('compression-rate'), + group_name='producer-topic-metrics.' + topic, + description='Average Compression ratio for topic ' + topic) + + self.add_metric('record-retry-rate', Rate(), + sensor_name=sensor_name('record-retries'), + group_name='producer-topic-metrics.' + topic, + description='Record retries per second for topic ' + topic) + + self.add_metric('record-error-rate', Rate(), + sensor_name=sensor_name('record-errors'), + group_name='producer-topic-metrics.' + topic, + description='Record errors per second for topic ' + topic) + + def update_produce_request_metrics(self, batches_map): + for node_batch in batches_map.values(): + records = 0 + total_bytes = 0 + for batch in node_batch: + # register all per-topic metrics at once + topic = batch.topic_partition.topic + self.maybe_register_topic_metrics(topic) + + # per-topic record send rate + topic_records_count = self.metrics.get_sensor( + 'topic.' + topic + '.records-per-batch') + topic_records_count.record(batch.record_count) + + # per-topic bytes send rate + topic_byte_rate = self.metrics.get_sensor( + 'topic.' + topic + '.bytes') + topic_byte_rate.record(batch.records.size_in_bytes()) + + # per-topic compression rate + topic_compression_rate = self.metrics.get_sensor( + 'topic.' + topic + '.compression-rate') + topic_compression_rate.record(batch.records.compression_rate()) + + # global metrics + self.batch_size_sensor.record(batch.records.size_in_bytes()) + if batch.drained: + self.queue_time_sensor.record(batch.drained - batch.created) + self.compression_rate_sensor.record(batch.records.compression_rate()) + self.max_record_size_sensor.record(batch.max_record_size) + records += batch.record_count + total_bytes += batch.records.size_in_bytes() + + self.records_per_request_sensor.record(records) + self.byte_rate_sensor.record(total_bytes) + + def record_retries(self, topic, count): + self.retry_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries') + if sensor: + sensor.record(count) + + def record_errors(self, topic, count): + self.error_sensor.record(count) + sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors') + if sensor: + sensor.record(count) + + def record_latency(self, latency, node=None): + self.request_time_sensor.record(latency) + if node: + sensor = self.metrics.get_sensor('node-' + node + '.latency') + if sensor: + sensor.record(latency) + + def record_throttle_time(self, throttle_time_ms, node=None): + self.produce_throttle_time_sensor.record(throttle_time_ms) diff --git a/test/test_sender.py b/test/test_sender.py index 44105e2..cf911e1 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -7,12 +7,13 @@ import pytest from kafka.client_async import KafkaClient from kafka.cluster import ClusterMetadata -from kafka.producer.buffer import MessageSetBuffer -from kafka.producer.sender import Sender -from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch import kafka.errors as Errors from kafka.future import Future +from kafka.metrics import Metrics +from kafka.producer.buffer import MessageSetBuffer from kafka.protocol.produce import ProduceRequest +from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +from kafka.producer.sender import Sender from kafka.structs import TopicPartition, OffsetAndMetadata @@ -29,8 +30,13 @@ def accumulator(): @pytest.fixture -def sender(client, accumulator): - return Sender(client, client.cluster, accumulator) +def metrics(): + return Metrics() + + +@pytest.fixture +def sender(client, accumulator, metrics): + return Sender(client, client.cluster, accumulator, metrics) @pytest.mark.parametrize(("api_version", "produce_version"), [ |