diff options
-rw-r--r-- | kafka/producer/sender.py | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index c1d0905..e0381d5 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -4,6 +4,7 @@ import collections import copy import logging import threading +import time import six @@ -145,7 +146,7 @@ class Sender(threading.Thread): log.debug('Sending Produce Request: %r', request) (self._client.send(node_id, request) .add_callback( - self._handle_produce_response, batches) + self._handle_produce_response, node_id, time.time(), batches) .add_errback( self._failed_produce, batches, node_id)) @@ -183,7 +184,7 @@ class Sender(threading.Thread): for batch in batches: self._complete_batch(batch, error, -1, None) - def _handle_produce_response(self, batches, response): + def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" # if we have a response, parse it log.debug('Parsing produce response: %r', response) @@ -203,6 +204,10 @@ class Sender(threading.Thread): batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) + self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id) + if response.API_VERSION > 0: + self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) + else: # this is the acks = 0 case, just complete all requests for batch in batches: @@ -495,8 +500,8 @@ class SenderMetrics(object): def record_latency(self, latency, node=None): self.request_time_sensor.record(latency) - if node: - sensor = self.metrics.get_sensor('node-' + node + '.latency') + if node is not None: + sensor = self.metrics.get_sensor('node-' + str(node) + '.latency') if sensor: sensor.record(latency) |