diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 19:37:15 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 19:37:15 -0700 |
commit | b6d28b86941f53536d87aa4729c6c2f2a13d5a22 (patch) | |
tree | 0bec68b7f384fd9bc9f84c5e8f4a875bc58c2626 | |
parent | b000303045e7e4e7d65cf369f91661cad943992c (diff) | |
download | kafka-python-KAFKA-3388.tar.gz |
KAFKA-3388: Fix expiration of batches sitting in the accumulatorKAFKA-3388
-rw-r--r-- | kafka/producer/record_accumulator.py | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index d2ee823..566bf6f 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -40,7 +40,7 @@ class RecordBatch(object): self.record_count = 0 #self.max_record_size = 0 # for metrics only now = time.time() - #self.created = now # for metrics only + self.created = now self.drained = None self.attempts = 0 self.last_attempt = now @@ -76,10 +76,28 @@ class RecordBatch(object): else: self.produce_future.failure(exception) - def maybe_expire(self, request_timeout_ms, linger_ms): - since_append_ms = 1000 * (time.time() - self.last_append) - if ((self.records.is_full() and request_timeout_ms < since_append_ms) - or (request_timeout_ms < (since_append_ms + linger_ms))): + def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full): + """Expire batches if metadata is not available + + A batch whose metadata is not available should be expired if one + of the following is true: + + * the batch is not in retry AND request timeout has elapsed after + it is ready (full or linger.ms has reached). + + * the batch is in retry AND request timeout has elapsed after the + backoff period ended. + """ + now = time.time() + since_append = now - self.last_append + since_ready = now - (self.created + linger_ms / 1000.0) + since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0) + timeout = request_timeout_ms / 1000.0 + + if ((not self.in_retry() and is_full and timeout < since_append) or + (not self.in_retry() and timeout < since_ready) or + (self.in_retry() and timeout < since_backoff)): + self.records.close() self.done(-1, None, Errors.KafkaTimeoutError( "Batch containing %s record(s) expired due to timeout while" @@ -259,19 +277,33 @@ class RecordAccumulator(object): count = 0 for tp in list(self._batches.keys()): assert tp in self._tp_locks, 'TopicPartition not in locks dict' + + # We only check if the batch should be expired if the partition + # does not have a batch in flight. This is to avoid the later + # batches get expired when an earlier batch is still in progress. + # This protection only takes effect when user sets + # max.in.flight.request.per.connection=1. Otherwise the expiration + # order is not guranteed. + if tp in self.muted: + continue + with self._tp_locks[tp]: # iterate over the batches and expire them if they have stayed # in accumulator for more than request_timeout_ms dq = self._batches[tp] for batch in dq: + is_full = bool(bool(batch != dq[-1]) or batch.records.is_full()) # check if the batch is expired if batch.maybe_expire(request_timeout_ms, - self.config['linger_ms']): + self.config['retry_backoff_ms'], + self.config['linger_ms'], + is_full): expired_batches.append(batch) to_remove.append(batch) count += 1 self.deallocate(batch) - elif not batch.in_retry(): + else: + # Stop at the first batch that has not expired. break # Python does not allow us to mutate the dq during iteration |