diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
commit | 75de0f00956eb7cf0394fcfabb6a7d63057409fe (patch) | |
tree | a5fd7bc136b5a3e84fdb2f3d193e89e48c096110 /kafka/conn.py | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-75de0f00956eb7cf0394fcfabb6a7d63057409fe.tar.gz |
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects
passed to the Process() class must be pickle-able in Windows.
So, the Async producer did not work in windows.
To fix this we have to ensure that code which uses multiprocessing
has to follow certain rules
* The target=func should not be a member function
* We cannot pass objects like socket() to multiprocessing
This ticket fixes these issues. For KafkaClient and KafkaConnection
objects, we make copies of the object and reinit() them inside the
child processes.
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e85fd11..194a19c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,3 +1,4 @@ +import copy import logging import socket import struct @@ -96,17 +97,27 @@ class KafkaConnection(local): self.data = self._consume_response() return self.data + def copy(self): + """ + Create an inactive copy of the connection object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + c._sock = None + return c + def close(self): """ Close this connection """ - self._sock.close() + if self._sock: + self._sock.close() def reinit(self): """ Re-initialize the socket connection """ - self._sock.close() + self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) |