summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-27 17:47:26 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-27 17:47:26 +0530
commitb3fece508dff6a4fe8c31bc7c2282c114676646b (patch)
treeeca2dcb39888fa9d2d826ac5cab165562ba202b6
parentd6d7299b7dc00f852014e34df060de9268eddfae (diff)
downloadkafka-python-b3fece508dff6a4fe8c31bc7c2282c114676646b.tar.gz
Re-init the sockets in the new process
-rw-r--r--kafka/client.py4
-rw-r--r--kafka/conn.py9
-rw-r--r--kafka/consumer.py4
3 files changed, 16 insertions, 1 deletions
diff --git a/kafka/client.py b/kafka/client.py
index a1c2133..525551e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -176,6 +176,10 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def reinit(self):
+ for conn in self.conns.values():
+ conn.reinit()
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
diff --git a/kafka/conn.py b/kafka/conn.py
index fce1fdc..01975e4 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -86,3 +86,12 @@ class KafkaConnection(local):
def close(self):
"Close this connection"
self._sock.close()
+
+ def reinit(self):
+ """
+ Re-initialize the socket connection
+ """
+ self._sock.close()
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((self.host, self.port))
+ self._sock.settimeout(10)
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 67e1b10..5ca90de 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -445,7 +445,6 @@ class MultiProcessConsumer(Consumer):
proc = Process(target=self._consume, args=(chunk,))
proc.daemon = True
proc.start()
- time.sleep(0.2)
self.procs.append(proc)
def _consume(self, partitions):
@@ -454,6 +453,9 @@ class MultiProcessConsumer(Consumer):
notifications given by the controller process
"""
+ # Make the child processes open separate socket connections
+ self.client.reinit()
+
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(self.client, self.group, self.topic,