summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-07 15:57:44 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-07 15:57:44 +0530
commit75de0f00956eb7cf0394fcfabb6a7d63057409fe (patch)
treea5fd7bc136b5a3e84fdb2f3d193e89e48c096110 /kafka
parentcfd9f86e60429d1f7af8bcac5849808354b8719e (diff)
downloadkafka-python-75de0f00956eb7cf0394fcfabb6a7d63057409fe.tar.gz
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects passed to the Process() class must be pickle-able in Windows. So, the Async producer did not work in windows. To fix this we have to ensure that code which uses multiprocessing has to follow certain rules * The target=func should not be a member function * We cannot pass objects like socket() to multiprocessing This ticket fixes these issues. For KafkaClient and KafkaConnection objects, we make copies of the object and reinit() them inside the child processes.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py11
-rw-r--r--kafka/conn.py15
-rw-r--r--kafka/producer.py112
3 files changed, 87 insertions, 51 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 965cbc5..b7ceb2e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,3 +1,4 @@
+import copy
from collections import defaultdict
from functools import partial
from itertools import count
@@ -181,6 +182,16 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def copy(self):
+ """
+ Create an inactive copy of the client object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ for k, v in c.conns.items():
+ c.conns[k] = v.copy()
+ return c
+
def reinit(self):
for conn in self.conns.values():
conn.reinit()
diff --git a/kafka/conn.py b/kafka/conn.py
index e85fd11..194a19c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,3 +1,4 @@
+import copy
import logging
import socket
import struct
@@ -96,17 +97,27 @@ class KafkaConnection(local):
self.data = self._consume_response()
return self.data
+ def copy(self):
+ """
+ Create an inactive copy of the connection object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ c._sock = None
+ return c
+
def close(self):
"""
Close this connection
"""
- self._sock.close()
+ if self._sock:
+ self._sock.close()
def reinit(self):
"""
Re-initialize the socket connection
"""
- self._sock.close()
+ self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
diff --git a/kafka/producer.py b/kafka/producer.py
index 5f23285..a7bfe28 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -18,6 +18,58 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
+def _send_upstream(topic, queue, client, batch_time, batch_size,
+ req_acks, ack_timeout):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+
+ NOTE: Ideally, this should have been a method inside the Producer
+ class. However, multiprocessing module has issues in windows. The
+ functionality breaks unless this function is kept outside of a class
+ """
+ stop = False
+ client.reinit()
+
+ while not stop:
+ timeout = batch_time
+ count = batch_size
+ send_at = datetime.now() + timedelta(seconds=timeout)
+ msgset = defaultdict(list)
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ partition, msg = queue.get(timeout=timeout)
+ except Empty:
+ break
+
+ # Check if the controller has requested us to stop
+ if partition == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = (send_at - datetime.now()).total_seconds()
+ msgset[partition].append(msg)
+
+ # Send collected requests upstream
+ reqs = []
+ for partition, messages in msgset.items():
+ req = ProduceRequest(topic, partition, messages)
+ reqs.append(req)
+
+ try:
+ client.send_produce_request(reqs,
+ acks=req_acks,
+ timeout=ack_timeout)
+ except Exception as exp:
+ log.error("Error sending message", exc_info=sys.exc_info())
+
+
class Producer(object):
"""
Base class to be used by producers
@@ -61,60 +113,22 @@ class Producer(object):
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
- self.batch_send = batch_send
- self.batch_size = batch_send_every_n
- self.batch_time = batch_send_every_t
if self.async:
self.queue = Queue() # Messages are sent through this queue
- self.proc = Process(target=self._send_upstream, args=(self.queue,))
- self.proc.daemon = True # Process will die if main thread exits
+ self.proc = Process(target=_send_upstream,
+ args=(self.topic,
+ self.queue,
+ self.client.copy(),
+ batch_send_every_t,
+ batch_send_every_n,
+ self.req_acks,
+ self.ack_timeout))
+
+ # Process will die if main thread exits
+ self.proc.daemon = True
self.proc.start()
- def _send_upstream(self, queue):
- """
- Listen on the queue for a specified number of messages or till
- a specified timeout and send them upstream to the brokers in one
- request
- """
- stop = False
-
- while not stop:
- timeout = self.batch_time
- send_at = datetime.now() + timedelta(seconds=timeout)
- count = self.batch_size
- msgset = defaultdict(list)
-
- # Keep fetching till we gather enough messages or a
- # timeout is reached
- while count > 0 and timeout >= 0:
- try:
- partition, msg = queue.get(timeout=timeout)
- except Empty:
- break
-
- # Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
- stop = True
- break
-
- # Adjust the timeout to match the remaining period
- count -= 1
- timeout = (send_at - datetime.now()).total_seconds()
- msgset[partition].append(msg)
-
- # Send collected requests upstream
- reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(self.topic, partition, messages)
- reqs.append(req)
-
- try:
- self.client.send_produce_request(reqs, acks=self.req_acks,
- timeout=self.ack_timeout)
- except Exception:
- log.error("Error sending message", exc_info=sys.exc_info())
-
def send_messages(self, partition, *msg):
"""
Helper method to send produce requests