summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 18:10:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 18:10:37 -0700
commitb2a91ab8d8c45a47f018ae951eef9cf0bcc91961 (patch)
treed486942405ba1ce2473cf599b4c632a2ec602bc4
parent96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff)
downloadkafka-python-KAFKA-3197.tar.gz
KAFKA-3197: when max.in.flight.request.per.connection = 1, attempt to guarantee orderingKAFKA-3197
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/producer/record_accumulator.py31
-rw-r--r--kafka/producer/sender.py11
3 files changed, 33 insertions, 11 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index fc60e78..0793c80 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -286,7 +286,9 @@ class KafkaProducer(object):
message_version = 1 if self.config['api_version'] >= (0, 10) else 0
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
+ guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata, self._accumulator,
+ guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
self._sender.start()
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 4434b18..0782d80 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -150,7 +150,6 @@ class RecordAccumulator(object):
self.config[key] = configs.pop(key)
self._closed = False
- self._drain_index = 0
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch]
@@ -158,6 +157,10 @@ class RecordAccumulator(object):
self._free = SimpleBufferPool(self.config['buffer_memory'],
self.config['batch_size'])
self._incomplete = IncompleteRecordBatches()
+ # The following variables should only be accessed by the sender thread,
+ # so we don't need to protect them w/ locking.
+ self.muted = set()
+ self._drain_index = 0
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
"""Add a record to the accumulator, return the append result.
@@ -304,16 +307,20 @@ class RecordAccumulator(object):
Also return the flag for whether there are any unknown leaders for the
accumulated partition batches.
- A destination node is ready to send data if ANY one of its partition is
- not backing off the send and ANY of the following are true:
+ A destination node is ready to send if:
- * The record set is full
- * The record set has sat in the accumulator for at least linger_ms
- milliseconds
- * The accumulator is out of memory and threads are blocking waiting
- for data (in this case all partitions are immediately considered
- ready).
- * The accumulator has been closed
+ * There is at least one partition that is not backing off its send
+ * and those partitions are not muted (to prevent reordering if
+ max_in_flight_connections is set to 1)
+ * and any of the following are true:
+
+ * The record set is full
+ * The record set has sat in the accumulator for at least linger_ms
+ milliseconds
+ * The accumulator is out of memory and threads are blocking waiting
+ for data (in this case all partitions are immediately considered
+ ready).
+ * The accumulator has been closed
Arguments:
cluster (ClusterMetadata):
@@ -341,6 +348,8 @@ class RecordAccumulator(object):
continue
elif leader in ready_nodes:
continue
+ elif tp in self.muted:
+ continue
with self._tp_locks[tp]:
dq = self._batches[tp]
@@ -410,7 +419,7 @@ class RecordAccumulator(object):
start = self._drain_index
while True:
tp = partitions[self._drain_index]
- if tp in self._batches:
+ if tp in self._batches and tp not in self.muted:
with self._tp_locks[tp]:
dq = self._batches[tp]
if dq:
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index f10c34c..f0f77ee 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -26,6 +26,7 @@ class Sender(threading.Thread):
'acks': 1,
'retries': 0,
'request_timeout_ms': 30000,
+ 'guarantee_message_order': False,
'client_id': 'kafka-python-' + __version__,
'api_version': (0, 8, 0),
}
@@ -110,6 +111,12 @@ class Sender(threading.Thread):
batches_by_node = self._accumulator.drain(
self._metadata, ready_nodes, self.config['max_request_size'])
+ if self.config['guarantee_message_order']:
+ # Mute all the partitions drained
+ for batch_list in six.itervalues(batches_by_node):
+ for batch in batch_list:
+ self._accumulator.muted.add(batch.topic_partition)
+
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
@@ -222,6 +229,10 @@ class Sender(threading.Thread):
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
+ # Unmute the completed partition.
+ if self.config['guarantee_message_order']:
+ self._accumulator.muted.remove(batch.topic_partition)
+
def _can_retry(self, batch, error):
"""
We can retry a send if the error is transient and the number of