summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-24 14:08:25 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-03-24 14:08:25 +0300
commit88465f70ef75c13bd6317496f1f8a40d0455b091 (patch)
treed397da1bf873bc855697b9a97aa5f823d2a5330e
parent9641e9fa296a035e73838f07b77310cb5c9eb655 (diff)
downloadkafka-python-88465f70ef75c13bd6317496f1f8a40d0455b091.tar.gz
Using mp.manager to solve the issue with join for MPConsumer
-rw-r--r--kafka/consumer/multiprocess.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 5bc04cc..a63b090 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -4,7 +4,7 @@ import logging
import time
from collections import namedtuple
-from multiprocessing import Process, Queue as MPQueue, Event, Value
+from multiprocessing import Process, Manager as MPManager
try:
from Queue import Empty
@@ -121,12 +121,13 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = MPQueue(1024) # Child consumers dump messages into this
+ manager = MPManager()
+ self.queue = manager.Queue(1024) # Child consumers dump messages into this
self.events = Events(
- start = Event(), # Indicates the consumers to start fetch
- exit = Event(), # Requests the consumers to shutdown
- pause = Event()) # Requests the consumers to pause fetch
- self.size = Value('i', 0) # Indicator of number of messages to fetch
+ start = manager.Event(), # Indicates the consumers to start fetch
+ exit = manager.Event(), # Requests the consumers to shutdown
+ pause = manager.Event()) # Requests the consumers to pause fetch
+ self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
# dict.keys() returns a view in py3 + it's not a thread-safe operation
# http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3