summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-04 12:54:53 -0700
committerDana Powers <dana.powers@gmail.com>2016-08-04 13:11:20 -0700
commit025b69ef4ae22d1677904e99f924b9ef5a096e75 (patch)
tree38d12fc11f82c492c68a4e04dbac26664862f541
parent460f0784a30f303b4543763ca330cce52d6054eb (diff)
downloadkafka-python-conn_metrics.tar.gz
Instrument bufferpool-wait-ratio metric in KafkaProducerconn_metrics
-rw-r--r--kafka/producer/buffer.py17
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/producer/record_accumulator.py6
3 files changed, 17 insertions, 8 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5f41bac..422d47c 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode,
lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors
+from ..metrics.stats import Rate
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -135,7 +136,7 @@ class MessageSetBuffer(object):
class SimpleBufferPool(object):
"""A simple pool of BytesIO objects with a weak memory ceiling."""
- def __init__(self, memory, poolable_size):
+ def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
"""Create a new buffer pool.
Arguments:
@@ -150,10 +151,13 @@ class SimpleBufferPool(object):
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque()
- #self.metrics = metrics;
- #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
- #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
- #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+ self.wait_time = None
+ if metrics:
+ self.wait_time = metrics.sensor('bufferpool-wait-time')
+ self.wait_time.add(metrics.metric_name(
+ 'bufferpool-wait-ratio', metric_group_prefix,
+ 'The fraction of time an appender waits for space allocation.'),
+ Rate())
def allocate(self, size, max_time_to_block_ms):
"""
@@ -187,7 +191,8 @@ class SimpleBufferPool(object):
start_wait = time.time()
more_memory.wait(max_time_to_block_ms / 1000.0)
end_wait = time.time()
- #this.waitTime.record(endWait - startWait, time.milliseconds());
+ if self.wait_time:
+ self.wait_time.record(end_wait - start_wait)
if self._free:
buf = self._free.popleft()
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e3b0d69..84039f6 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -335,7 +335,7 @@ class KafkaProducer(object):
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
- self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
+ self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata,
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 3e2d903..8fe6abb 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -162,6 +162,8 @@ class RecordAccumulator(object):
'linger_ms': 0,
'retry_backoff_ms': 100,
'message_version': 0,
+ 'metrics': None,
+ 'metric_group_prefix': 'producer-metrics',
}
def __init__(self, **configs):
@@ -176,7 +178,9 @@ class RecordAccumulator(object):
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries
self._free = SimpleBufferPool(self.config['buffer_memory'],
- self.config['batch_size'])
+ self.config['batch_size'],
+ metrics=self.config['metrics'],
+ metric_group_prefix=self.config['metric_group_prefix'])
self._incomplete = IncompleteRecordBatches()
# The following variables should only be accessed by the sender thread,
# so we don't need to protect them w/ locking.