summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 5f41bac..422d47c 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode,
lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors
+from ..metrics.stats import Rate
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -135,7 +136,7 @@ class MessageSetBuffer(object):
class SimpleBufferPool(object):
"""A simple pool of BytesIO objects with a weak memory ceiling."""
- def __init__(self, memory, poolable_size):
+ def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'):
"""Create a new buffer pool.
Arguments:
@@ -150,10 +151,13 @@ class SimpleBufferPool(object):
self._free = collections.deque([io.BytesIO() for _ in range(buffers)])
self._waiters = collections.deque()
- #self.metrics = metrics;
- #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
- #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
- #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
+ self.wait_time = None
+ if metrics:
+ self.wait_time = metrics.sensor('bufferpool-wait-time')
+ self.wait_time.add(metrics.metric_name(
+ 'bufferpool-wait-ratio', metric_group_prefix,
+ 'The fraction of time an appender waits for space allocation.'),
+ Rate())
def allocate(self, size, max_time_to_block_ms):
"""
@@ -187,7 +191,8 @@ class SimpleBufferPool(object):
start_wait = time.time()
more_memory.wait(max_time_to_block_ms / 1000.0)
end_wait = time.time()
- #this.waitTime.record(endWait - startWait, time.milliseconds());
+ if self.wait_time:
+ self.wait_time.record(end_wait - start_wait)
if self._free:
buf = self._free.popleft()