summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 20:47:15 -0700
committerGitHub <noreply@github.com>2016-07-16 20:47:15 -0700
commit947625bfa4b6462e3f7c0fdad0a0cd69708beb2c (patch)
treeaeae9decba9e1eba0827bcc5dc97c3b85d6f358b
parent3666b66a21776d620f68d2f7ff2fed1bc18b94e5 (diff)
parent7a2ec3332b0a83dcaaab4a402db13ed9d56d89e8 (diff)
downloadkafka-python-947625bfa4b6462e3f7c0fdad0a0cd69708beb2c.tar.gz
Merge pull request #754 from dpkp/benchmarks
Producer metrics + consumer/producer benchmark scripts
-rwxr-xr-xbenchmarks/consumer_performance.py179
-rwxr-xr-xbenchmarks/load_example.py (renamed from load_example.py)0
-rwxr-xr-xbenchmarks/producer_performance.py158
-rw-r--r--kafka/consumer/fetcher.py8
-rw-r--r--kafka/consumer/group.py22
-rw-r--r--kafka/metrics/stats/sensor.py6
-rw-r--r--kafka/producer/buffer.py20
-rw-r--r--kafka/producer/kafka.py40
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--kafka/producer/sender.py212
-rw-r--r--test/test_sender.py16
11 files changed, 640 insertions, 27 deletions
diff --git a/benchmarks/consumer_performance.py b/benchmarks/consumer_performance.py
new file mode 100755
index 0000000..3e879ae
--- /dev/null
+++ b/benchmarks/consumer_performance.py
@@ -0,0 +1,179 @@
+#!/usr/bin/env python
+# Adapted from https://github.com/mrafayaleem/kafka-jython
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import logging
+import pprint
+import sys
+import threading
+import traceback
+
+from kafka import KafkaConsumer, KafkaProducer
+from test.fixtures import KafkaFixture, ZookeeperFixture
+
+logging.basicConfig(level=logging.ERROR)
+
+
+def start_brokers(n):
+ print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
+ print('-> 1 Zookeeper')
+ zk = ZookeeperFixture.instance()
+ print('---> {0}:{1}'.format(zk.host, zk.port))
+ print()
+
+ partitions = min(n, 3)
+ replicas = min(n, 3)
+ print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
+ brokers = [
+ KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='',
+ partitions=partitions, replicas=replicas)
+ for i in range(n)
+ ]
+ for broker in brokers:
+ print('---> {0}:{1}'.format(broker.host, broker.port))
+ print()
+ return brokers
+
+
+class ConsumerPerformance(object):
+
+ @staticmethod
+ def run(args):
+ try:
+ props = {}
+ for prop in args.consumer_config:
+ k, v = prop.split('=')
+ try:
+ v = int(v)
+ except ValueError:
+ pass
+ if v == 'None':
+ v = None
+ props[k] = v
+
+ if args.brokers:
+ brokers = start_brokers(args.brokers)
+ props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
+ for broker in brokers]
+ print('---> bootstrap_servers={0}'.format(props['bootstrap_servers']))
+ print()
+
+ print('-> Producing records')
+ record = bytes(bytearray(args.record_size))
+ producer = KafkaProducer(compression_type=args.fixture_compression,
+ **props)
+ for i in xrange(args.num_records):
+ producer.send(topic=args.topic, value=record)
+ producer.flush()
+ producer.close()
+ print('-> OK!')
+ print()
+
+ print('Initializing Consumer...')
+ props['auto_offset_reset'] = 'earliest'
+ if 'consumer_timeout_ms' not in props:
+ props['consumer_timeout_ms'] = 10000
+ props['metrics_sample_window_ms'] = args.stats_interval * 1000
+ for k, v in props.items():
+ print('---> {0}={1}'.format(k, v))
+ consumer = KafkaConsumer(args.topic, **props)
+ print('---> group_id={0}'.format(consumer.config['group_id']))
+ print('---> report stats every {0} secs'.format(args.stats_interval))
+ print('---> raw metrics? {0}'.format(args.raw_metrics))
+ timer_stop = threading.Event()
+ timer = StatsReporter(args.stats_interval, consumer,
+ event=timer_stop,
+ raw_metrics=args.raw_metrics)
+ timer.start()
+ print('-> OK!')
+ print()
+
+ records = 0
+ for msg in consumer:
+ records += 1
+ if records >= args.num_records:
+ break
+ print('Consumed {0} records'.format(records))
+
+ timer_stop.set()
+
+ except Exception:
+ exc_info = sys.exc_info()
+ traceback.print_exception(*exc_info)
+ sys.exit(1)
+
+
+class StatsReporter(threading.Thread):
+ def __init__(self, interval, consumer, event=None, raw_metrics=False):
+ super(StatsReporter, self).__init__()
+ self.interval = interval
+ self.consumer = consumer
+ self.event = event
+ self.raw_metrics = raw_metrics
+
+ def print_stats(self):
+ metrics = self.consumer.metrics()
+ if self.raw_metrics:
+ pprint.pprint(metrics)
+ else:
+ print('{records-consumed-rate} records/sec ({bytes-consumed-rate} B/sec),'
+ ' {fetch-latency-avg} latency,'
+ ' {fetch-rate} fetch/s,'
+ ' {fetch-size-avg} fetch size,'
+ ' {records-lag-max} max record lag,'
+ ' {records-per-request-avg} records/req'
+ .format(**metrics['consumer-fetch-manager-metrics']))
+
+
+ def print_final(self):
+ self.print_stats()
+
+ def run(self):
+ while self.event and not self.event.wait(self.interval):
+ self.print_stats()
+ else:
+ self.print_final()
+
+
+def get_args_parser():
+ parser = argparse.ArgumentParser(
+ description='This tool is used to verify the consumer performance.')
+
+ parser.add_argument(
+ '--topic', type=str,
+ help='Topic for consumer test',
+ default='kafka-python-benchmark-test')
+ parser.add_argument(
+ '--num-records', type=long,
+ help='number of messages to consume',
+ default=1000000)
+ parser.add_argument(
+ '--record-size', type=int,
+ help='message size in bytes',
+ default=100)
+ parser.add_argument(
+ '--consumer-config', type=str, nargs='+', default=(),
+ help='kafka consumer related configuaration properties like '
+ 'bootstrap_servers,client_id etc..')
+ parser.add_argument(
+ '--fixture-compression', type=str,
+ help='specify a compression type for use with broker fixtures / producer')
+ parser.add_argument(
+ '--brokers', type=int,
+ help='Number of kafka brokers to start',
+ default=0)
+ parser.add_argument(
+ '--stats-interval', type=int,
+ help='Interval in seconds for stats reporting to console',
+ default=5)
+ parser.add_argument(
+ '--raw-metrics', action='store_true',
+ help='Enable this flag to print full metrics dict on each interval')
+ return parser
+
+
+if __name__ == '__main__':
+ args = get_args_parser().parse_args()
+ ConsumerPerformance.run(args)
diff --git a/load_example.py b/benchmarks/load_example.py
index a3b09ba..a3b09ba 100755
--- a/load_example.py
+++ b/benchmarks/load_example.py
diff --git a/benchmarks/producer_performance.py b/benchmarks/producer_performance.py
new file mode 100755
index 0000000..e958735
--- /dev/null
+++ b/benchmarks/producer_performance.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+# Adapted from https://github.com/mrafayaleem/kafka-jython
+
+from __future__ import absolute_import, print_function
+
+import argparse
+import pprint
+import sys
+import threading
+import traceback
+
+from kafka import KafkaProducer
+from test.fixtures import KafkaFixture, ZookeeperFixture
+
+
+def start_brokers(n):
+ print('Starting {0} {1}-node cluster...'.format(KafkaFixture.kafka_version, n))
+ print('-> 1 Zookeeper')
+ zk = ZookeeperFixture.instance()
+ print('---> {0}:{1}'.format(zk.host, zk.port))
+ print()
+
+ partitions = min(n, 3)
+ replicas = min(n, 3)
+ print('-> {0} Brokers [{1} partitions / {2} replicas]'.format(n, partitions, replicas))
+ brokers = [
+ KafkaFixture.instance(i, zk.host, zk.port, zk_chroot='',
+ partitions=partitions, replicas=replicas)
+ for i in range(n)
+ ]
+ for broker in brokers:
+ print('---> {0}:{1}'.format(broker.host, broker.port))
+ print()
+ return brokers
+
+
+class ProducerPerformance(object):
+
+ @staticmethod
+ def run(args):
+ try:
+ props = {}
+ for prop in args.producer_config:
+ k, v = prop.split('=')
+ try:
+ v = int(v)
+ except ValueError:
+ pass
+ if v == 'None':
+ v = None
+ props[k] = v
+
+ if args.brokers:
+ brokers = start_brokers(args.brokers)
+ props['bootstrap_servers'] = ['{0}:{1}'.format(broker.host, broker.port)
+ for broker in brokers]
+ print("---> bootstrap_servers={0}".format(props['bootstrap_servers']))
+ print()
+ print('-> OK!')
+ print()
+
+ print('Initializing producer...')
+ record = bytes(bytearray(args.record_size))
+ props['metrics_sample_window_ms'] = args.stats_interval * 1000
+
+ producer = KafkaProducer(**props)
+ for k, v in props.items():
+ print('---> {0}={1}'.format(k, v))
+ print('---> send {0} byte records'.format(args.record_size))
+ print('---> report stats every {0} secs'.format(args.stats_interval))
+ print('---> raw metrics? {0}'.format(args.raw_metrics))
+ timer_stop = threading.Event()
+ timer = StatsReporter(args.stats_interval, producer,
+ event=timer_stop,
+ raw_metrics=args.raw_metrics)
+ timer.start()
+ print('-> OK!')
+ print()
+
+ for i in xrange(args.num_records):
+ producer.send(topic=args.topic, value=record)
+ producer.flush()
+
+ timer_stop.set()
+
+ except Exception:
+ exc_info = sys.exc_info()
+ traceback.print_exception(*exc_info)
+ sys.exit(1)
+
+
+class StatsReporter(threading.Thread):
+ def __init__(self, interval, producer, event=None, raw_metrics=False):
+ super(StatsReporter, self).__init__()
+ self.interval = interval
+ self.producer = producer
+ self.event = event
+ self.raw_metrics = raw_metrics
+
+ def print_stats(self):
+ metrics = self.producer.metrics()
+ if self.raw_metrics:
+ pprint.pprint(metrics)
+ else:
+ print('{record-send-rate} records/sec ({byte-rate} B/sec),'
+ ' {request-latency-avg} latency,'
+ ' {record-size-avg} record size,'
+ ' {batch-size-avg} batch size,'
+ ' {records-per-request-avg} records/req'
+ .format(**metrics['producer-metrics']))
+
+ def print_final(self):
+ self.print_stats()
+
+ def run(self):
+ while self.event and not self.event.wait(self.interval):
+ self.print_stats()
+ else:
+ self.print_final()
+
+
+def get_args_parser():
+ parser = argparse.ArgumentParser(
+ description='This tool is used to verify the producer performance.')
+
+ parser.add_argument(
+ '--topic', type=str,
+ help='Topic name for test',
+ default='kafka-python-benchmark-test')
+ parser.add_argument(
+ '--num-records', type=long,
+ help='number of messages to produce',
+ default=1000000)
+ parser.add_argument(
+ '--record-size', type=int,
+ help='message size in bytes',
+ default=100)
+ parser.add_argument(
+ '--producer-config', type=str, nargs='+', default=(),
+ help='kafka producer related configuaration properties like '
+ 'bootstrap_servers,client_id etc..')
+ parser.add_argument(
+ '--brokers', type=int,
+ help='Number of kafka brokers to start',
+ default=0)
+ parser.add_argument(
+ '--stats-interval', type=int,
+ help='Interval in seconds for stats reporting to console',
+ default=5)
+ parser.add_argument(
+ '--raw-metrics', action='store_true',
+ help='Enable this flag to print full metrics dict on each interval')
+ return parser
+
+
+if __name__ == '__main__':
+ args = get_args_parser().parse_args()
+ ProducerPerformance.run(args)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 34ff4cb..d615848 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -729,6 +729,8 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
+ # Because we are currently decompressing messages lazily, the sensors here
+ # will get compressed bytes / message set stats when compression is enabled
self._sensors.bytes_fetched.record(total_bytes)
self._sensors.records_fetched.record(total_count)
if response.API_VERSION >= 1:
@@ -774,12 +776,12 @@ class FetchManagerMetrics(object):
'The maximum throttle time in ms'), Max())
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
- metric_tags = {'topic': topic.replace('.', '_')}
-
# record bytes fetched
name = '.'.join(['topic', topic, 'bytes-fetched'])
bytes_fetched = self.metrics.get_sensor(name)
if not bytes_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
@@ -799,6 +801,8 @@ class FetchManagerMetrics(object):
name = '.'.join(['topic', topic, 'records-fetched'])
records_fetched = self.metrics.get_sensor(name)
if not records_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8fa43bc..982cd7b 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.metrics import DictReporter, MetricConfig, Metrics
+from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator):
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
- metrics_sample_window_ms (int): The number of samples maintained to
- compute metrics. Default: 30000
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator):
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
- reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
@@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
index b0bf4db..72bacfc 100644
--- a/kafka/metrics/stats/sensor.py
+++ b/kafka/metrics/stats/sensor.py
@@ -55,15 +55,15 @@ class Sensor(object):
Record a value at a known time.
Arguments:
value (double): The value we are recording
- time_ms (int): The current POSIX time in milliseconds
+ time_ms (int): A POSIX timestamp in milliseconds.
+ Default: The time when record() is evaluated (now)
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
- now = time.time() * 1000
if time_ms is None:
- time_ms = now
+ time_ms = time.time() * 1000
self._last_record_time = time_ms
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5fcb35f..de5f0e7 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import io
@@ -55,6 +55,8 @@ class MessageSetBuffer(object):
self._batch_size = batch_size
self._closed = False
self._messages = 0
+ self._bytes_written = 4 # Int32 header is 4 bytes
+ self._final_size = None
def append(self, offset, message):
"""Apend a Message to the MessageSet.
@@ -62,6 +64,8 @@ class MessageSetBuffer(object):
Arguments:
offset (int): offset of the message
message (Message or bytes): message struct or encoded bytes
+
+ Returns: bytes written
"""
if isinstance(message, Message):
encoded = message.encode()
@@ -70,6 +74,8 @@ class MessageSetBuffer(object):
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
self._buffer.write(msg)
self._messages += 1
+ self._bytes_written += len(msg)
+ return len(msg)
def has_room_for(self, key, value):
if self._closed:
@@ -107,16 +113,20 @@ class MessageSetBuffer(object):
self._buffer.write(Int32.encode(len(encoded)))
self._buffer.write(encoded)
- # Update the message set size, and return ready for full read()
- size = self._buffer.tell() - 4
+ # Update the message set size (less the 4 byte header),
+ # and return with buffer ready for full read()
+ self._final_size = self._buffer.tell()
self._buffer.seek(0)
- self._buffer.write(Int32.encode(size))
+ self._buffer.write(Int32.encode(self._final_size - 4))
self._buffer.seek(0)
self._closed = True
def size_in_bytes(self):
- return self._buffer.tell()
+ return self._final_size or self._buffer.tell()
+
+ def compression_rate(self):
+ return self.size_in_bytes() / self._bytes_written
def buffer(self):
return self._buffer
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f5c5d19..70c0cd0 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -9,6 +9,7 @@ import weakref
from .. import errors as Errors
from ..client_async import KafkaClient
+from ..metrics import MetricConfig, Metrics
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
from ..structs import TopicPartition
@@ -220,6 +221,13 @@ class KafkaProducer(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version set to 'auto'
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -255,7 +263,10 @@ class KafkaProducer(object):
'ssl_keyfile': None,
'ssl_crlfile': None,
'api_version': None,
- 'api_version_auto_timeout_ms': 2000
+ 'api_version_auto_timeout_ms': 2000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
@@ -285,6 +296,14 @@ class KafkaProducer(object):
log.warning('use api_version=%s (%s is deprecated)',
str(self.config['api_version']), deprecated)
+ # Configure metrics
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ self._metrics = Metrics(metric_config, reporters)
+
client = KafkaClient(**self.config)
# Get auto-discovered version from client if necessary
@@ -298,7 +317,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(client, self._metadata, self._accumulator,
+ self._sender = Sender(client, self._metadata,
+ self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
@@ -382,6 +402,7 @@ class KafkaProducer(object):
if not invoked_from_callback:
self._sender.join()
+ self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:
@@ -581,3 +602,18 @@ class KafkaProducer(object):
return self.config['partitioner'](serialized_key,
all_partitions,
available)
+
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 566bf6f..7ea579a 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -38,7 +38,7 @@ class AtomicInteger(object):
class RecordBatch(object):
def __init__(self, tp, records, message_version=0):
self.record_count = 0
- #self.max_record_size = 0 # for metrics only
+ self.max_record_size = 0
now = time.time()
self.created = now
self.drained = None
@@ -56,8 +56,8 @@ class RecordBatch(object):
return None
msg = Message(value, key=key, magic=self.message_version)
- self.records.append(self.record_count, msg)
- # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
+ record_size = self.records.append(self.record_count, msg)
+ self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
timestamp_ms)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 958e165..c1d0905 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -1,4 +1,4 @@
-from __future__ import absolute_import
+from __future__ import absolute_import, division
import collections
import copy
@@ -8,9 +8,11 @@ import threading
import six
from .. import errors as Errors
+from ..metrics.measurable import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.produce import ProduceRequest
from ..structs import TopicPartition
from ..version import __version__
-from ..protocol.produce import ProduceRequest
log = logging.getLogger(__name__)
@@ -31,7 +33,7 @@ class Sender(threading.Thread):
'api_version': (0, 8, 0),
}
- def __init__(self, client, metadata, accumulator, **configs):
+ def __init__(self, client, metadata, accumulator, metrics, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@@ -45,6 +47,7 @@ class Sender(threading.Thread):
self._running = True
self._force_close = False
self._topics_to_add = set()
+ self._sensors = SenderMetrics(metrics, self._client, self._metadata)
def run(self):
"""The main run loop for the sender thread."""
@@ -119,7 +122,10 @@ class Sender(threading.Thread):
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
+ for expired_batch in expired_batches:
+ self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
+ self._sensors.update_produce_request_metrics(batches_by_node)
requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
@@ -223,6 +229,7 @@ class Sender(threading.Thread):
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
+ self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)
@@ -230,6 +237,8 @@ class Sender(threading.Thread):
# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
+ if error is not None:
+ self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
@@ -296,3 +305,200 @@ class Sender(threading.Thread):
def wakeup(self):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
+
+
+class SenderMetrics(object):
+
+ def __init__(self, metrics, client, metadata):
+ self.metrics = metrics
+ self._client = client
+ self._metadata = metadata
+
+ sensor_name = 'batch-size'
+ self.batch_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('batch-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per partition per-request.')
+ self.add_metric('batch-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The max number of bytes sent per partition per-request.')
+
+ sensor_name = 'compression-rate'
+ self.compression_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('compression-rate-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average compression rate of record batches.')
+
+ sensor_name = 'queue-time'
+ self.queue_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-queue-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average time in ms record batches spent in the record accumulator.')
+ self.add_metric('record-queue-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum time in ms record batches spent in the record accumulator.')
+
+ sensor_name = 'request-time'
+ self.request_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('request-latency-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average request latency in ms')
+ self.add_metric('request-latency-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum request latency in ms')
+
+ sensor_name = 'produce-throttle-time'
+ self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('produce-throttle-time-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average throttle time in ms')
+ self.add_metric('produce-throttle-time-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum throttle time in ms')
+
+ sensor_name = 'records-per-request'
+ self.records_per_request_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of records sent per second.')
+ self.add_metric('records-per-request-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average number of records per request.')
+
+ sensor_name = 'bytes'
+ self.byte_rate_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average number of bytes sent per second.')
+
+ sensor_name = 'record-retries'
+ self.retry_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of retried record sends')
+
+ sensor_name = 'errors'
+ self.error_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name,
+ description='The average per-second number of record sends that resulted in errors')
+
+ sensor_name = 'record-size-max'
+ self.max_record_size_sensor = self.metrics.sensor(sensor_name)
+ self.add_metric('record-size-max', Max(),
+ sensor_name=sensor_name,
+ description='The maximum record size across all batches')
+ self.add_metric('record-size-avg', Avg(),
+ sensor_name=sensor_name,
+ description='The average maximum record size per batch')
+
+ self.add_metric('requests-in-flight',
+ AnonMeasurable(lambda *_: self._client.in_flight_request_count()),
+ description='The current number of in-flight requests awaiting a response.')
+
+ self.add_metric('metadata-age',
+ AnonMeasurable(lambda _, now: (now - self._metadata._last_successful_refresh_ms) / 1000),
+ description='The age in seconds of the current producer metadata being used.')
+
+ def add_metric(self, metric_name, measurable, group_name='producer-metrics',
+ description=None, tags=None,
+ sensor_name=None):
+ m = self.metrics
+ metric = m.metric_name(metric_name, group_name, description, tags)
+ if sensor_name:
+ sensor = m.sensor(sensor_name)
+ sensor.add(metric, measurable)
+ else:
+ m.add_metric(metric, measurable)
+
+ def maybe_register_topic_metrics(self, topic):
+
+ def sensor_name(name):
+ return 'topic.{0}.{1}'.format(topic, name)
+
+ # if one sensor of the metrics has been registered for the topic,
+ # then all other sensors should have been registered; and vice versa
+ if not self.metrics.get_sensor(sensor_name('records-per-batch')):
+
+ self.add_metric('record-send-rate', Rate(),
+ sensor_name=sensor_name('records-per-batch'),
+ group_name='producer-topic-metrics.' + topic,
+ description= 'Records sent per second for topic ' + topic)
+
+ self.add_metric('byte-rate', Rate(),
+ sensor_name=sensor_name('bytes'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Bytes per second for topic ' + topic)
+
+ self.add_metric('compression-rate', Avg(),
+ sensor_name=sensor_name('compression-rate'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Average Compression ratio for topic ' + topic)
+
+ self.add_metric('record-retry-rate', Rate(),
+ sensor_name=sensor_name('record-retries'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record retries per second for topic ' + topic)
+
+ self.add_metric('record-error-rate', Rate(),
+ sensor_name=sensor_name('record-errors'),
+ group_name='producer-topic-metrics.' + topic,
+ description='Record errors per second for topic ' + topic)
+
+ def update_produce_request_metrics(self, batches_map):
+ for node_batch in batches_map.values():
+ records = 0
+ total_bytes = 0
+ for batch in node_batch:
+ # register all per-topic metrics at once
+ topic = batch.topic_partition.topic
+ self.maybe_register_topic_metrics(topic)
+
+ # per-topic record send rate
+ topic_records_count = self.metrics.get_sensor(
+ 'topic.' + topic + '.records-per-batch')
+ topic_records_count.record(batch.record_count)
+
+ # per-topic bytes send rate
+ topic_byte_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.bytes')
+ topic_byte_rate.record(batch.records.size_in_bytes())
+
+ # per-topic compression rate
+ topic_compression_rate = self.metrics.get_sensor(
+ 'topic.' + topic + '.compression-rate')
+ topic_compression_rate.record(batch.records.compression_rate())
+
+ # global metrics
+ self.batch_size_sensor.record(batch.records.size_in_bytes())
+ if batch.drained:
+ self.queue_time_sensor.record(batch.drained - batch.created)
+ self.compression_rate_sensor.record(batch.records.compression_rate())
+ self.max_record_size_sensor.record(batch.max_record_size)
+ records += batch.record_count
+ total_bytes += batch.records.size_in_bytes()
+
+ self.records_per_request_sensor.record(records)
+ self.byte_rate_sensor.record(total_bytes)
+
+ def record_retries(self, topic, count):
+ self.retry_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-retries')
+ if sensor:
+ sensor.record(count)
+
+ def record_errors(self, topic, count):
+ self.error_sensor.record(count)
+ sensor = self.metrics.get_sensor('topic.' + topic + '.record-errors')
+ if sensor:
+ sensor.record(count)
+
+ def record_latency(self, latency, node=None):
+ self.request_time_sensor.record(latency)
+ if node:
+ sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if sensor:
+ sensor.record(latency)
+
+ def record_throttle_time(self, throttle_time_ms, node=None):
+ self.produce_throttle_time_sensor.record(throttle_time_ms)
diff --git a/test/test_sender.py b/test/test_sender.py
index 44105e2..cf911e1 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -7,12 +7,13 @@ import pytest
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-from kafka.producer.buffer import MessageSetBuffer
-from kafka.producer.sender import Sender
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics import Metrics
+from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
+from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.sender import Sender
from kafka.structs import TopicPartition, OffsetAndMetadata
@@ -29,8 +30,13 @@ def accumulator():
@pytest.fixture
-def sender(client, accumulator):
- return Sender(client, client.cluster, accumulator)
+def metrics():
+ return Metrics()
+
+
+@pytest.fixture
+def sender(client, accumulator, metrics):
+ return Sender(client, client.cluster, accumulator, metrics)
@pytest.mark.parametrize(("api_version", "produce_version"), [