summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-07 15:57:44 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-07 15:57:44 +0530
commit75de0f00956eb7cf0394fcfabb6a7d63057409fe (patch)
treea5fd7bc136b5a3e84fdb2f3d193e89e48c096110 /kafka/client.py
parentcfd9f86e60429d1f7af8bcac5849808354b8719e (diff)
downloadkafka-python-75de0f00956eb7cf0394fcfabb6a7d63057409fe.tar.gz
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects passed to the Process() class must be pickle-able in Windows. So, the Async producer did not work in windows. To fix this we have to ensure that code which uses multiprocessing has to follow certain rules * The target=func should not be a member function * We cannot pass objects like socket() to multiprocessing This ticket fixes these issues. For KafkaClient and KafkaConnection objects, we make copies of the object and reinit() them inside the child processes.
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py11
1 files changed, 11 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 965cbc5..b7ceb2e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,3 +1,4 @@
+import copy
from collections import defaultdict
from functools import partial
from itertools import count
@@ -181,6 +182,16 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def copy(self):
+ """
+ Create an inactive copy of the client object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ for k, v in c.conns.items():
+ c.conns[k] = v.copy()
+ return c
+
def reinit(self):
for conn in self.conns.values():
conn.reinit()