diff options
-rw-r--r-- | kafka/consumer/multiprocess.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index db59f7b..bec3100 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -123,7 +123,10 @@ class MultiProcessConsumer(Consumer): self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch - partitions = list(self.offsets.keys()) + # dict.keys() returns a view in py3 + it's not a thread-safe operation + # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 + # It's safer to copy dict as it only runs during the init. + partitions = list(self.offsets.copy().keys()) # By default, start one consumer process for all partitions # The logic below ensures that |