diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-23 15:45:29 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-24 17:33:08 -0800 |
commit | 434d1abb18a130f54682662909a891edcdb98f5f (patch) | |
tree | 23ef8562dddd48ffdb154184ab3c2ff3e33592a2 | |
parent | a250fe3418a56d0f72458c54078203ee0a65ef0e (diff) | |
download | kafka-python-434d1abb18a130f54682662909a891edcdb98f5f.tar.gz |
Sender class to manage background IO for KafkaProducer
-rw-r--r-- | kafka/producer/sender.py | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py new file mode 100644 index 0000000..ac160fc --- /dev/null +++ b/kafka/producer/sender.py @@ -0,0 +1,272 @@ +from __future__ import absolute_import + +import collections +import copy +import logging +import threading +import time + +import six + +from ..common import TopicPartition +from ..version import __version__ +from ..protocol.produce import ProduceRequest + +import kafka.common as Errors + + +log = logging.getLogger(__name__) + + +class Sender(threading.Thread): + """ + The background thread that handles the sending of produce requests to the + Kafka cluster. This thread makes metadata requests to renew its view of the + cluster and then sends produce requests to the appropriate nodes. + """ + _DEFAULT_CONFIG = { + 'max_request_size': 1048576, + 'acks': 1, + 'retries': 0, + 'request_timeout_ms': 30000, + 'client_id': 'kafka-python-' + __version__, + } + + def __init__(self, client, metadata, lock, accumulator, **configs): + super(Sender, self).__init__() + self.config = copy.copy(self._DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs.pop(key) + + self.name = self.config['client_id'] + '-network-thread' + self._client = client + self._accumulator = accumulator + self._metadata = client.cluster + self._lock = lock + self._running = True + self._force_close = False + self._topics_to_add = [] + + def run(self): + """The main run loop for the sender thread.""" + log.debug("Starting Kafka producer I/O thread.") + + # main loop, runs until close is called + while self._running: + try: + self.run_once() + except Exception: + log.exception("Uncaught error in kafka producer I/O thread") + + log.debug("Beginning shutdown of Kafka producer I/O thread, sending" + " remaining records.") + + # okay we stopped accepting requests but there may still be + # requests in the accumulator or waiting for acknowledgment, + # wait until these are completed. + while (not self._force_close + and (self._accumulator.has_unsent() + or self._client.in_flight_request_count() > 0)): + try: + self.run_once() + except Exception: + log.exception("Uncaught error in kafka producer I/O thread") + + if self._force_close: + # We need to fail all the incomplete batches and wake up the + # threads waiting on the futures. + self._accumulator.abort_incomplete_batches() + + try: + self._client.close() + except Exception: + log.exception("Failed to close network client") + + log.debug("Shutdown of Kafka producer I/O thread has completed.") + + def run_once(self): + """Run a single iteration of sending.""" + while self._topics_to_add: + self._client.add_topic(self._topics_to_add.pop()) + + # get the list of partitions with data ready to send + result = self._accumulator.ready(self._metadata) + ready_nodes, next_ready_check_delay, unknown_leaders_exist = result + + # if there are any partitions whose leaders are not known yet, force + # metadata update + if unknown_leaders_exist: + with self._lock: + self._metadata.request_update() + + # remove any nodes we aren't ready to send to + not_ready_timeout = 999999999 + for node in list(ready_nodes): + if not self._client.ready(node): + ready_nodes.remove(node) + not_ready_timeout = min(not_ready_timeout, + self._client.connection_delay(node)) + + # create produce requests + batches_by_node = self._accumulator.drain( + self._metadata, ready_nodes, self.config['max_request_size']) + + expired_batches = self._accumulator.abort_expired_batches( + self.config['request_timeout_ms'], self._metadata) + + requests = self._create_produce_requests(batches_by_node) + # If we have any nodes that are ready to send + have sendable data, + # poll with 0 timeout so this can immediately loop and try sending more + # data. Otherwise, the timeout is determined by nodes that have + # partitions with data that isn't yet sendable (e.g. lingering, backing + # off). Note that this specifically does not include nodes with + # sendable data that aren't ready to send since they would cause busy + # looping. + poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout) + if ready_nodes: + log.debug("Nodes with data ready to send: %s", ready_nodes) # trace + log.debug("Created %d produce requests: %s", len(requests), requests) # trace + poll_timeout_ms = 0 + + with self._lock: + for node_id, request in six.iteritems(requests): + batches = batches_by_node[node_id] + log.debug('Sending Produce Request: %r', request) + (self._client.send(node_id, request) + .add_callback( + self._handle_produce_response, batches) + .add_errback( + self._failed_produce, batches, node_id)) + + # if some partitions are already ready to be sent, the select time + # would be 0; otherwise if some partition already has some data + # accumulated but not ready yet, the select time will be the time + # difference between now and its linger expiry time; otherwise the + # select time will be the time difference between now and the + # metadata expiry time + self._client.poll(poll_timeout_ms, sleep=True) + + def initiate_close(self): + """Start closing the sender (won't complete until all data is sent).""" + self._running = False + self._accumulator.close() + self.wakeup() + + def force_close(self): + """Closes the sender without sending out any pending messages.""" + self._force_close = True + self.initiate_close() + + def add_topic(self, topic): + self._topics_to_add.append(topic) + self.wakeup() + + def _failed_produce(self, batches, node_id, error): + log.debug("Error sending produce request to node %d: %s", node_id, error) # trace + for batch in batches: + self._complete_batch(batch, error, -1) + + def _handle_produce_response(self, batches, response): + """Handle a produce response.""" + # if we have a response, parse it + log.debug('Parsing produce response: %r', response) + if response: + batches_by_partition = dict([(batch.topic_partition, batch) + for batch in batches]) + + for topic, partitions in response.topics: + for partition, error_code, offset in partitions: + tp = TopicPartition(topic, partition) + error = Errors.for_code(error_code) + batch = batches_by_partition[tp] + self._complete_batch(batch, error, offset) + + else: + # this is the acks = 0 case, just complete all requests + for batch in batches: + self._complete_batch(batch, None, -1) + + def _complete_batch(self, batch, error, base_offset): + """Complete or retry the given batch of records. + + Arguments: + batch (RecordBatch): The record batch + error (Exception): The error (or None if none) + base_offset (int): The base offset assigned to the records if successful + """ + # Standardize no-error to None + if error is Errors.NoError: + error = None + + if error is not None and self._can_retry(batch, error): + # retry + log.warning("Got error produce response on topic-partition %s," + " retrying (%d attempts left). Error: %s", + batch.topic_partition, + self.config['retries'] - batch.attempts - 1, + error) + self._accumulator.reenqueue(batch) + else: + if error is Errors.TopicAuthorizationFailedError: + error = error(batch.topic_partition.topic) + + # tell the user the result of their request + batch.done(base_offset, error) + self._accumulator.deallocate(batch) + + if getattr(error, 'invalid_metadata', False): + self._metadata.request_update() + + def _can_retry(self, batch, error): + """ + We can retry a send if the error is transient and the number of + attempts taken is fewer than the maximum allowed + """ + return (batch.attempts < self.config['retries'] + and getattr(error, 'retriable', False)) + + def _create_produce_requests(self, collated): + """ + Transfer the record batches into a list of produce requests on a + per-node basis. + + Arguments: + collated: {node_id: [RecordBatch]} + + Returns: + dict: {node_id: ProduceRequest} + """ + requests = {} + for node_id, batches in six.iteritems(collated): + requests[node_id] = self._produce_request( + node_id, self.config['acks'], + self.config['request_timeout_ms'], batches) + return requests + + def _produce_request(self, node_id, acks, timeout, batches): + """Create a produce request from the given record batches. + + Returns: + ProduceRequest + """ + produce_records_by_partition = collections.defaultdict(dict) + for batch in batches: + topic = batch.topic_partition.topic + partition = batch.topic_partition.partition + + # TODO: bytearray / memoryview + buf = batch.records.buffer() + produce_records_by_partition[topic][partition] = buf + + return ProduceRequest( + required_acks=acks, + timeout=timeout, + topics=[(topic, list(partition_info.items())) + for topic, partition_info + in six.iteritems(produce_records_by_partition)] + ) + + def wakeup(self): + """Wake up the selector associated with this send thread.""" + self._client.wakeup() |