summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-14 08:21:27 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-14 08:21:27 -0700
commit6e68ccd716775a05f7382fbefd8a39a6b748590a (patch)
tree593c1a3cc29e8f2e243fd3f18be0757f212df830
parentc3bc541a69d5b8469771dbde9172cad0a9d0d1ae (diff)
parent6ded42f3c4caf4c753f19776d9e2dfaceb484ebb (diff)
downloadkafka-python-6e68ccd716775a05f7382fbefd8a39a6b748590a.tar.gz
Merge pull request #590 from dpkp/accumulator_bugfix
Fix producer threading bug that can crash sender
-rw-r--r--kafka/producer/record_accumulator.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 958d207..19dc199 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -248,11 +248,12 @@ class RecordAccumulator(object):
expired_batches = []
to_remove = []
count = 0
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
with self._tp_locks[tp]:
# iterate over the batches and expire them if they have stayed
# in accumulator for more than request_timeout_ms
+ dq = self._batches[tp]
for batch in dq:
# check if the batch is expired
if batch.maybe_expire(request_timeout_ms,
@@ -367,8 +368,9 @@ class RecordAccumulator(object):
def has_unsent(self):
"""Return whether there is any unsent record in the accumulator."""
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
with self._tp_locks[tp]:
+ dq = self._batches[tp]
if len(dq):
return True
return False