diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-14 00:29:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-14 00:29:08 -0700 |
commit | 6ded42f3c4caf4c753f19776d9e2dfaceb484ebb (patch) | |
tree | d1963fd24664ed909d31e6c4a6ba442f89a45c1f | |
parent | 0330036bef996815c5ef384ab6803697816e4189 (diff) | |
download | kafka-python-accumulator_bugfix.tar.gz |
Fix producer threading bug that could crash sender (dict changing during iteration)accumulator_bugfix
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 958d207..19dc199 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -248,11 +248,12 @@ class RecordAccumulator(object): expired_batches = [] to_remove = [] count = 0 - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): assert tp in self._tp_locks, 'TopicPartition not in locks dict' with self._tp_locks[tp]: # iterate over the batches and expire them if they have stayed # in accumulator for more than request_timeout_ms + dq = self._batches[tp] for batch in dq: # check if the batch is expired if batch.maybe_expire(request_timeout_ms, @@ -367,8 +368,9 @@ class RecordAccumulator(object): def has_unsent(self): """Return whether there is any unsent record in the accumulator.""" - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): with self._tp_locks[tp]: + dq = self._batches[tp] if len(dq): return True return False |