diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:12 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 21:54:12 -0800 |
commit | f2d10f02d3f0bbecff2f9469dc477ccd6046ec59 (patch) | |
tree | 9715b7a004324756e382543c0689fac6834f9a30 | |
parent | 2dd216b5acafb89d177a79ec779374c5a6f94dcf (diff) | |
download | kafka-python-f2d10f02d3f0bbecff2f9469dc477ccd6046ec59.tar.gz |
Fix concurrency bug in RecordAccumulator.ready()
-rw-r--r-- | kafka/producer/record_accumulator.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 1e692ee..70f45f2 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -320,8 +320,11 @@ class RecordAccumulator(object): now = time.time() exhausted = bool(self._free.queued() > 0) - for tp, dq in six.iteritems(self._batches): - + # several threads are accessing self._batches -- to simplify + # concurrent access, we iterate over a snapshot of partitions + # and lock each partition separately as needed + partitions = list(self._batches.keys()) + for tp in partitions: leader = cluster.leader_for_partition(tp) if leader is None or leader == -1: unknown_leaders_exist = True @@ -330,6 +333,7 @@ class RecordAccumulator(object): continue with self._tp_locks[tp]: + dq = self._batches[tp] if not dq: continue batch = dq[0] |