summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 22:07:33 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 22:07:33 -0700
commitcc9ed8b96f3cc96dd2712cc0dda123c6c24679d5 (patch)
tree0bec68b7f384fd9bc9f84c5e8f4a875bc58c2626
parentb000303045e7e4e7d65cf369f91661cad943992c (diff)
downloadkafka-python-cc9ed8b96f3cc96dd2712cc0dda123c6c24679d5.tar.gz
KAFKA-3388: Fix expiration of batches sitting in the accumulator (#699)
-rw-r--r--kafka/producer/record_accumulator.py46
1 files changed, 39 insertions, 7 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index d2ee823..566bf6f 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -40,7 +40,7 @@ class RecordBatch(object):
self.record_count = 0
#self.max_record_size = 0 # for metrics only
now = time.time()
- #self.created = now # for metrics only
+ self.created = now
self.drained = None
self.attempts = 0
self.last_attempt = now
@@ -76,10 +76,28 @@ class RecordBatch(object):
else:
self.produce_future.failure(exception)
- def maybe_expire(self, request_timeout_ms, linger_ms):
- since_append_ms = 1000 * (time.time() - self.last_append)
- if ((self.records.is_full() and request_timeout_ms < since_append_ms)
- or (request_timeout_ms < (since_append_ms + linger_ms))):
+ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
+ """Expire batches if metadata is not available
+
+ A batch whose metadata is not available should be expired if one
+ of the following is true:
+
+ * the batch is not in retry AND request timeout has elapsed after
+ it is ready (full or linger.ms has reached).
+
+ * the batch is in retry AND request timeout has elapsed after the
+ backoff period ended.
+ """
+ now = time.time()
+ since_append = now - self.last_append
+ since_ready = now - (self.created + linger_ms / 1000.0)
+ since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
+ timeout = request_timeout_ms / 1000.0
+
+ if ((not self.in_retry() and is_full and timeout < since_append) or
+ (not self.in_retry() and timeout < since_ready) or
+ (self.in_retry() and timeout < since_backoff)):
+
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
"Batch containing %s record(s) expired due to timeout while"
@@ -259,19 +277,33 @@ class RecordAccumulator(object):
count = 0
for tp in list(self._batches.keys()):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
+
+ # We only check if the batch should be expired if the partition
+ # does not have a batch in flight. This is to avoid the later
+ # batches get expired when an earlier batch is still in progress.
+ # This protection only takes effect when user sets
+ # max.in.flight.request.per.connection=1. Otherwise the expiration
+ # order is not guranteed.
+ if tp in self.muted:
+ continue
+
with self._tp_locks[tp]:
# iterate over the batches and expire them if they have stayed
# in accumulator for more than request_timeout_ms
dq = self._batches[tp]
for batch in dq:
+ is_full = bool(bool(batch != dq[-1]) or batch.records.is_full())
# check if the batch is expired
if batch.maybe_expire(request_timeout_ms,
- self.config['linger_ms']):
+ self.config['retry_backoff_ms'],
+ self.config['linger_ms'],
+ is_full):
expired_batches.append(batch)
to_remove.append(batch)
count += 1
self.deallocate(batch)
- elif not batch.in_retry():
+ else:
+ # Stop at the first batch that has not expired.
break
# Python does not allow us to mutate the dq during iteration