diff options
authorDana Powers <>2016-01-23 15:45:29 -0800
committerDana Powers <>2016-01-24 17:33:08 -0800
commit434d1abb18a130f54682662909a891edcdb98f5f (patch)
parenta250fe3418a56d0f72458c54078203ee0a65ef0e (diff)
Sender class to manage background IO for KafkaProducer
1 files changed, 272 insertions, 0 deletions
diff --git a/kafka/producer/ b/kafka/producer/
new file mode 100644
index 0000000..ac160fc
--- /dev/null
+++ b/kafka/producer/
@@ -0,0 +1,272 @@
+from __future__ import absolute_import
+import collections
+import copy
+import logging
+import threading
+import time
+import six
+from ..common import TopicPartition
+from ..version import __version__
+from ..protocol.produce import ProduceRequest
+import kafka.common as Errors
+log = logging.getLogger(__name__)
+class Sender(threading.Thread):
+ """
+ The background thread that handles the sending of produce requests to the
+ Kafka cluster. This thread makes metadata requests to renew its view of the
+ cluster and then sends produce requests to the appropriate nodes.
+ """
+ 'max_request_size': 1048576,
+ 'acks': 1,
+ 'retries': 0,
+ 'request_timeout_ms': 30000,
+ 'client_id': 'kafka-python-' + __version__,
+ }
+ def __init__(self, client, metadata, lock, accumulator, **configs):
+ super(Sender, self).__init__()
+ self.config = copy.copy(self._DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs.pop(key)
+ = self.config['client_id'] + '-network-thread'
+ self._client = client
+ self._accumulator = accumulator
+ self._metadata = client.cluster
+ self._lock = lock
+ self._running = True
+ self._force_close = False
+ self._topics_to_add = []
+ def run(self):
+ """The main run loop for the sender thread."""
+ log.debug("Starting Kafka producer I/O thread.")
+ # main loop, runs until close is called
+ while self._running:
+ try:
+ self.run_once()
+ except Exception:
+ log.exception("Uncaught error in kafka producer I/O thread")
+ log.debug("Beginning shutdown of Kafka producer I/O thread, sending"
+ " remaining records.")
+ # okay we stopped accepting requests but there may still be
+ # requests in the accumulator or waiting for acknowledgment,
+ # wait until these are completed.
+ while (not self._force_close
+ and (self._accumulator.has_unsent()
+ or self._client.in_flight_request_count() > 0)):
+ try:
+ self.run_once()
+ except Exception:
+ log.exception("Uncaught error in kafka producer I/O thread")
+ if self._force_close:
+ # We need to fail all the incomplete batches and wake up the
+ # threads waiting on the futures.
+ self._accumulator.abort_incomplete_batches()
+ try:
+ self._client.close()
+ except Exception:
+ log.exception("Failed to close network client")
+ log.debug("Shutdown of Kafka producer I/O thread has completed.")
+ def run_once(self):
+ """Run a single iteration of sending."""
+ while self._topics_to_add:
+ self._client.add_topic(self._topics_to_add.pop())
+ # get the list of partitions with data ready to send
+ result = self._accumulator.ready(self._metadata)
+ ready_nodes, next_ready_check_delay, unknown_leaders_exist = result
+ # if there are any partitions whose leaders are not known yet, force
+ # metadata update
+ if unknown_leaders_exist:
+ with self._lock:
+ self._metadata.request_update()
+ # remove any nodes we aren't ready to send to
+ not_ready_timeout = 999999999
+ for node in list(ready_nodes):
+ if not self._client.ready(node):
+ ready_nodes.remove(node)
+ not_ready_timeout = min(not_ready_timeout,
+ self._client.connection_delay(node))
+ # create produce requests
+ batches_by_node = self._accumulator.drain(
+ self._metadata, ready_nodes, self.config['max_request_size'])
+ expired_batches = self._accumulator.abort_expired_batches(
+ self.config['request_timeout_ms'], self._metadata)
+ requests = self._create_produce_requests(batches_by_node)
+ # If we have any nodes that are ready to send + have sendable data,
+ # poll with 0 timeout so this can immediately loop and try sending more
+ # data. Otherwise, the timeout is determined by nodes that have
+ # partitions with data that isn't yet sendable (e.g. lingering, backing
+ # off). Note that this specifically does not include nodes with
+ # sendable data that aren't ready to send since they would cause busy
+ # looping.
+ poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout)
+ if ready_nodes:
+ log.debug("Nodes with data ready to send: %s", ready_nodes) # trace
+ log.debug("Created %d produce requests: %s", len(requests), requests) # trace
+ poll_timeout_ms = 0
+ with self._lock:
+ for node_id, request in six.iteritems(requests):
+ batches = batches_by_node[node_id]
+ log.debug('Sending Produce Request: %r', request)
+ (self._client.send(node_id, request)
+ .add_callback(
+ self._handle_produce_response, batches)
+ .add_errback(
+ self._failed_produce, batches, node_id))
+ # if some partitions are already ready to be sent, the select time
+ # would be 0; otherwise if some partition already has some data
+ # accumulated but not ready yet, the select time will be the time
+ # difference between now and its linger expiry time; otherwise the
+ # select time will be the time difference between now and the
+ # metadata expiry time
+ self._client.poll(poll_timeout_ms, sleep=True)
+ def initiate_close(self):
+ """Start closing the sender (won't complete until all data is sent)."""
+ self._running = False
+ self._accumulator.close()
+ self.wakeup()
+ def force_close(self):
+ """Closes the sender without sending out any pending messages."""
+ self._force_close = True
+ self.initiate_close()
+ def add_topic(self, topic):
+ self._topics_to_add.append(topic)
+ self.wakeup()
+ def _failed_produce(self, batches, node_id, error):
+ log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
+ for batch in batches:
+ self._complete_batch(batch, error, -1)
+ def _handle_produce_response(self, batches, response):
+ """Handle a produce response."""
+ # if we have a response, parse it
+ log.debug('Parsing produce response: %r', response)
+ if response:
+ batches_by_partition = dict([(batch.topic_partition, batch)
+ for batch in batches])
+ for topic, partitions in response.topics:
+ for partition, error_code, offset in partitions:
+ tp = TopicPartition(topic, partition)
+ error = Errors.for_code(error_code)
+ batch = batches_by_partition[tp]
+ self._complete_batch(batch, error, offset)
+ else:
+ # this is the acks = 0 case, just complete all requests
+ for batch in batches:
+ self._complete_batch(batch, None, -1)
+ def _complete_batch(self, batch, error, base_offset):
+ """Complete or retry the given batch of records.
+ Arguments:
+ batch (RecordBatch): The record batch
+ error (Exception): The error (or None if none)
+ base_offset (int): The base offset assigned to the records if successful
+ """
+ # Standardize no-error to None
+ if error is Errors.NoError:
+ error = None
+ if error is not None and self._can_retry(batch, error):
+ # retry
+ log.warning("Got error produce response on topic-partition %s,"
+ " retrying (%d attempts left). Error: %s",
+ batch.topic_partition,
+ self.config['retries'] - batch.attempts - 1,
+ error)
+ self._accumulator.reenqueue(batch)
+ else:
+ if error is Errors.TopicAuthorizationFailedError:
+ error = error(batch.topic_partition.topic)
+ # tell the user the result of their request
+ batch.done(base_offset, error)
+ self._accumulator.deallocate(batch)
+ if getattr(error, 'invalid_metadata', False):
+ self._metadata.request_update()
+ def _can_retry(self, batch, error):
+ """
+ We can retry a send if the error is transient and the number of
+ attempts taken is fewer than the maximum allowed
+ """
+ return (batch.attempts < self.config['retries']
+ and getattr(error, 'retriable', False))
+ def _create_produce_requests(self, collated):
+ """
+ Transfer the record batches into a list of produce requests on a
+ per-node basis.
+ Arguments:
+ collated: {node_id: [RecordBatch]}
+ Returns:
+ dict: {node_id: ProduceRequest}
+ """
+ requests = {}
+ for node_id, batches in six.iteritems(collated):
+ requests[node_id] = self._produce_request(
+ node_id, self.config['acks'],
+ self.config['request_timeout_ms'], batches)
+ return requests
+ def _produce_request(self, node_id, acks, timeout, batches):
+ """Create a produce request from the given record batches.
+ Returns:
+ ProduceRequest
+ """
+ produce_records_by_partition = collections.defaultdict(dict)
+ for batch in batches:
+ topic = batch.topic_partition.topic
+ partition = batch.topic_partition.partition
+ # TODO: bytearray / memoryview
+ buf = batch.records.buffer()
+ produce_records_by_partition[topic][partition] = buf
+ return ProduceRequest(
+ required_acks=acks,
+ timeout=timeout,
+ topics=[(topic, list(partition_info.items()))
+ for topic, partition_info
+ in six.iteritems(produce_records_by_partition)]
+ )
+ def wakeup(self):
+ """Wake up the selector associated with this send thread."""
+ self._client.wakeup()