summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-18 21:54:12 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-18 21:54:12 -0800
commitf2d10f02d3f0bbecff2f9469dc477ccd6046ec59 (patch)
tree9715b7a004324756e382543c0689fac6834f9a30
parent2dd216b5acafb89d177a79ec779374c5a6f94dcf (diff)
downloadkafka-python-f2d10f02d3f0bbecff2f9469dc477ccd6046ec59.tar.gz
Fix concurrency bug in RecordAccumulator.ready()
-rw-r--r--kafka/producer/record_accumulator.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 1e692ee..70f45f2 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -320,8 +320,11 @@ class RecordAccumulator(object):
now = time.time()
exhausted = bool(self._free.queued() > 0)
- for tp, dq in six.iteritems(self._batches):
-
+ # several threads are accessing self._batches -- to simplify
+ # concurrent access, we iterate over a snapshot of partitions
+ # and lock each partition separately as needed
+ partitions = list(self._batches.keys())
+ for tp in partitions:
leader = cluster.leader_for_partition(tp)
if leader is None or leader == -1:
unknown_leaders_exist = True
@@ -330,6 +333,7 @@ class RecordAccumulator(object):
continue
with self._tp_locks[tp]:
+ dq = self._batches[tp]
if not dq:
continue
batch = dq[0]