summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-27 17:47:26 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-27 17:47:26 +0530
commitb3fece508dff6a4fe8c31bc7c2282c114676646b (patch)
treeeca2dcb39888fa9d2d826ac5cab165562ba202b6 /kafka/consumer.py
parentd6d7299b7dc00f852014e34df060de9268eddfae (diff)
downloadkafka-python-b3fece508dff6a4fe8c31bc7c2282c114676646b.tar.gz
Re-init the sockets in the new process
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 67e1b10..5ca90de 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -445,7 +445,6 @@ class MultiProcessConsumer(Consumer):
proc = Process(target=self._consume, args=(chunk,))
proc.daemon = True
proc.start()
- time.sleep(0.2)
self.procs.append(proc)
def _consume(self, partitions):
@@ -454,6 +453,9 @@ class MultiProcessConsumer(Consumer):
notifications given by the controller process
"""
+ # Make the child processes open separate socket connections
+ self.client.reinit()
+
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(self.client, self.group, self.topic,