diff options
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 958d207..19dc199 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -248,11 +248,12 @@ class RecordAccumulator(object): expired_batches = [] to_remove = [] count = 0 - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): assert tp in self._tp_locks, 'TopicPartition not in locks dict' with self._tp_locks[tp]: # iterate over the batches and expire them if they have stayed # in accumulator for more than request_timeout_ms + dq = self._batches[tp] for batch in dq: # check if the batch is expired if batch.maybe_expire(request_timeout_ms, @@ -367,8 +368,9 @@ class RecordAccumulator(object): def has_unsent(self): """Return whether there is any unsent record in the accumulator.""" - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): with self._tp_locks[tp]: + dq = self._batches[tp] if len(dq): return True return False |