diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 19:21:35 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 19:21:35 -0700 |
commit | b000303045e7e4e7d65cf369f91661cad943992c (patch) | |
tree | 3b0717a0dd09a77365cc9870e9ddae14f5a938b0 | |
parent | 42293725e5361fd6e6fd38b0ac58afda82e94d3a (diff) | |
download | kafka-python-b000303045e7e4e7d65cf369f91661cad943992c.tar.gz |
KAFKA-3197: when max.in.flight.request.per.connection = 1, attempt to guarantee ordering (#698)
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 31 | ||||
-rw-r--r-- | kafka/producer/sender.py | 11 |
3 files changed, 33 insertions, 11 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index fc60e78..0793c80 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -286,7 +286,9 @@ class KafkaProducer(object): message_version = 1 if self.config['api_version'] >= (0, 10) else 0 self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster + guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) self._sender = Sender(client, self._metadata, self._accumulator, + guarantee_message_order=guarantee_message_order, **self.config) self._sender.daemon = True self._sender.start() diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 90cb386..d2ee823 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -150,7 +150,6 @@ class RecordAccumulator(object): self.config[key] = configs.pop(key) self._closed = False - self._drain_index = 0 self._flushes_in_progress = AtomicInteger() self._appends_in_progress = AtomicInteger() self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] @@ -158,6 +157,10 @@ class RecordAccumulator(object): self._free = SimpleBufferPool(self.config['buffer_memory'], self.config['batch_size']) self._incomplete = IncompleteRecordBatches() + # The following variables should only be accessed by the sender thread, + # so we don't need to protect them w/ locking. + self.muted = set() + self._drain_index = 0 def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): """Add a record to the accumulator, return the append result. @@ -304,16 +307,20 @@ class RecordAccumulator(object): Also return the flag for whether there are any unknown leaders for the accumulated partition batches. - A destination node is ready to send data if ANY one of its partition is - not backing off the send and ANY of the following are true: + A destination node is ready to send if: - * The record set is full - * The record set has sat in the accumulator for at least linger_ms - milliseconds - * The accumulator is out of memory and threads are blocking waiting - for data (in this case all partitions are immediately considered - ready). - * The accumulator has been closed + * There is at least one partition that is not backing off its send + * and those partitions are not muted (to prevent reordering if + max_in_flight_connections is set to 1) + * and any of the following are true: + + * The record set is full + * The record set has sat in the accumulator for at least linger_ms + milliseconds + * The accumulator is out of memory and threads are blocking waiting + for data (in this case all partitions are immediately considered + ready). + * The accumulator has been closed Arguments: cluster (ClusterMetadata): @@ -341,6 +348,8 @@ class RecordAccumulator(object): continue elif leader in ready_nodes: continue + elif tp in self.muted: + continue with self._tp_locks[tp]: dq = self._batches[tp] @@ -410,7 +419,7 @@ class RecordAccumulator(object): start = self._drain_index while True: tp = partitions[self._drain_index] - if tp in self._batches: + if tp in self._batches and tp not in self.muted: with self._tp_locks[tp]: dq = self._batches[tp] if dq: diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index f10c34c..f0f77ee 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -26,6 +26,7 @@ class Sender(threading.Thread): 'acks': 1, 'retries': 0, 'request_timeout_ms': 30000, + 'guarantee_message_order': False, 'client_id': 'kafka-python-' + __version__, 'api_version': (0, 8, 0), } @@ -110,6 +111,12 @@ class Sender(threading.Thread): batches_by_node = self._accumulator.drain( self._metadata, ready_nodes, self.config['max_request_size']) + if self.config['guarantee_message_order']: + # Mute all the partitions drained + for batch_list in six.itervalues(batches_by_node): + for batch in batch_list: + self._accumulator.muted.add(batch.topic_partition) + expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) @@ -222,6 +229,10 @@ class Sender(threading.Thread): if getattr(error, 'invalid_metadata', False): self._metadata.request_update() + # Unmute the completed partition. + if self.config['guarantee_message_order']: + self._accumulator.muted.remove(batch.topic_partition) + def _can_retry(self, batch, error): """ We can retry a send if the error is transient and the number of |