summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/sender.py13
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index c1d0905..e0381d5 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -4,6 +4,7 @@ import collections
import copy
import logging
import threading
+import time
import six
@@ -145,7 +146,7 @@ class Sender(threading.Thread):
log.debug('Sending Produce Request: %r', request)
(self._client.send(node_id, request)
.add_callback(
- self._handle_produce_response, batches)
+ self._handle_produce_response, node_id, time.time(), batches)
.add_errback(
self._failed_produce, batches, node_id))
@@ -183,7 +184,7 @@ class Sender(threading.Thread):
for batch in batches:
self._complete_batch(batch, error, -1, None)
- def _handle_produce_response(self, batches, response):
+ def _handle_produce_response(self, node_id, send_time, batches, response):
"""Handle a produce response."""
# if we have a response, parse it
log.debug('Parsing produce response: %r', response)
@@ -203,6 +204,10 @@ class Sender(threading.Thread):
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts)
+ self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
+ if response.API_VERSION > 0:
+ self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
+
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
@@ -495,8 +500,8 @@ class SenderMetrics(object):
def record_latency(self, latency, node=None):
self.request_time_sensor.record(latency)
- if node:
- sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if node is not None:
+ sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
if sensor:
sensor.record(latency)