diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-10-07 15:57:44 +0530 |
commit | 75de0f00956eb7cf0394fcfabb6a7d63057409fe (patch) | |
tree | a5fd7bc136b5a3e84fdb2f3d193e89e48c096110 /kafka | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-75de0f00956eb7cf0394fcfabb6a7d63057409fe.tar.gz |
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects
passed to the Process() class must be pickle-able in Windows.
So, the Async producer did not work in windows.
To fix this we have to ensure that code which uses multiprocessing
has to follow certain rules
* The target=func should not be a member function
* We cannot pass objects like socket() to multiprocessing
This ticket fixes these issues. For KafkaClient and KafkaConnection
objects, we make copies of the object and reinit() them inside the
child processes.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 11 | ||||
-rw-r--r-- | kafka/conn.py | 15 | ||||
-rw-r--r-- | kafka/producer.py | 112 |
3 files changed, 87 insertions, 51 deletions
diff --git a/kafka/client.py b/kafka/client.py index 965cbc5..b7ceb2e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,3 +1,4 @@ +import copy from collections import defaultdict from functools import partial from itertools import count @@ -181,6 +182,16 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for k, v in c.conns.items(): + c.conns[k] = v.copy() + return c + def reinit(self): for conn in self.conns.values(): conn.reinit() diff --git a/kafka/conn.py b/kafka/conn.py index e85fd11..194a19c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,3 +1,4 @@ +import copy import logging import socket import struct @@ -96,17 +97,27 @@ class KafkaConnection(local): self.data = self._consume_response() return self.data + def copy(self): + """ + Create an inactive copy of the connection object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + c._sock = None + return c + def close(self): """ Close this connection """ - self._sock.close() + if self._sock: + self._sock.close() def reinit(self): """ Re-initialize the socket connection """ - self._sock.close() + self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) diff --git a/kafka/producer.py b/kafka/producer.py index 5f23285..a7bfe28 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -18,6 +18,58 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 +def _send_upstream(topic, queue, client, batch_time, batch_size, + req_acks, ack_timeout): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + + NOTE: Ideally, this should have been a method inside the Producer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + stop = False + client.reinit() + + while not stop: + timeout = batch_time + count = batch_size + send_at = datetime.now() + timedelta(seconds=timeout) + msgset = defaultdict(list) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + partition, msg = queue.get(timeout=timeout) + except Empty: + break + + # Check if the controller has requested us to stop + if partition == STOP_ASYNC_PRODUCER: + stop = True + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = (send_at - datetime.now()).total_seconds() + msgset[partition].append(msg) + + # Send collected requests upstream + reqs = [] + for partition, messages in msgset.items(): + req = ProduceRequest(topic, partition, messages) + reqs.append(req) + + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + except Exception as exp: + log.error("Error sending message", exc_info=sys.exc_info()) + + class Producer(object): """ Base class to be used by producers @@ -61,60 +113,22 @@ class Producer(object): self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout - self.batch_send = batch_send - self.batch_size = batch_send_every_n - self.batch_time = batch_send_every_t if self.async: self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=self._send_upstream, args=(self.queue,)) - self.proc.daemon = True # Process will die if main thread exits + self.proc = Process(target=_send_upstream, + args=(self.topic, + self.queue, + self.client.copy(), + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout)) + + # Process will die if main thread exits + self.proc.daemon = True self.proc.start() - def _send_upstream(self, queue): - """ - Listen on the queue for a specified number of messages or till - a specified timeout and send them upstream to the brokers in one - request - """ - stop = False - - while not stop: - timeout = self.batch_time - send_at = datetime.now() + timedelta(seconds=timeout) - count = self.batch_size - msgset = defaultdict(list) - - # Keep fetching till we gather enough messages or a - # timeout is reached - while count > 0 and timeout >= 0: - try: - partition, msg = queue.get(timeout=timeout) - except Empty: - break - - # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: - stop = True - break - - # Adjust the timeout to match the remaining period - count -= 1 - timeout = (send_at - datetime.now()).total_seconds() - msgset[partition].append(msg) - - # Send collected requests upstream - reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(self.topic, partition, messages) - reqs.append(req) - - try: - self.client.send_produce_request(reqs, acks=self.req_acks, - timeout=self.ack_timeout) - except Exception: - log.error("Error sending message", exc_info=sys.exc_info()) - def send_messages(self, partition, *msg): """ Helper method to send produce requests |