diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
commit | 75de0f00956eb7cf0394fcfabb6a7d63057409fe (patch) | |
tree | a5fd7bc136b5a3e84fdb2f3d193e89e48c096110 /kafka/client.py | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-75de0f00956eb7cf0394fcfabb6a7d63057409fe.tar.gz |
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects
passed to the Process() class must be pickle-able in Windows.
So, the Async producer did not work in windows.
To fix this we have to ensure that code which uses multiprocessing
has to follow certain rules
* The target=func should not be a member function
* We cannot pass objects like socket() to multiprocessing
This ticket fixes these issues. For KafkaClient and KafkaConnection
objects, we make copies of the object and reinit() them inside the
child processes.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py index 965cbc5..b7ceb2e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,3 +1,4 @@ +import copy from collections import defaultdict from functools import partial from itertools import count @@ -181,6 +182,16 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for k, v in c.conns.items(): + c.conns[k] = v.copy() + return c + def reinit(self): for conn in self.conns.values(): conn.reinit() |