summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-07 20:29:35 -0700
committerMahendra M <mahendra.m@gmail.com>2013-10-07 20:29:35 -0700
commit92d70e7310b5519a1be86f189a4ddb9d772a0434 (patch)
treeb43b90fcdaaef0839329b20a02c79f8229773b26
parenteb2c1735f26ce11540fb92ea94817f43b9b3a798 (diff)
parentf9cf62816ff2c2255d414a2d9f3dd32d8c81418b (diff)
downloadkafka-python-92d70e7310b5519a1be86f189a4ddb9d772a0434.tar.gz
Merge pull request #61 from mahendra/prod-windows
Ensure that async producer works in windows. Fixes #46
-rw-r--r--kafka/client.py11
-rw-r--r--kafka/conn.py15
-rw-r--r--kafka/producer.py112
3 files changed, 87 insertions, 51 deletions
diff --git a/kafka/client.py b/kafka/client.py
index c0a3cdb..71ededa 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,3 +1,4 @@
+import copy
from collections import defaultdict
from functools import partial
from itertools import count
@@ -193,6 +194,16 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.close()
+ def copy(self):
+ """
+ Create an inactive copy of the client object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ for k, v in c.conns.items():
+ c.conns[k] = v.copy()
+ return c
+
def reinit(self):
for conn in self.conns.values():
conn.reinit()
diff --git a/kafka/conn.py b/kafka/conn.py
index 9356731..14aebc6 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,3 +1,4 @@
+import copy
import logging
import socket
import struct
@@ -103,17 +104,27 @@ class KafkaConnection(local):
self.data = self._consume_response()
return self.data
+ def copy(self):
+ """
+ Create an inactive copy of the connection object
+ A reinit() has to be done on the copy before it can be used again
+ """
+ c = copy.deepcopy(self)
+ c._sock = None
+ return c
+
def close(self):
"""
Close this connection
"""
- self._sock.close()
+ if self._sock:
+ self._sock.close()
def reinit(self):
"""
Re-initialize the socket connection
"""
- self._sock.close()
+ self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
diff --git a/kafka/producer.py b/kafka/producer.py
index cceb584..7ef7896 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -19,6 +19,58 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
+def _send_upstream(topic, queue, client, batch_time, batch_size,
+ req_acks, ack_timeout):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+
+ NOTE: Ideally, this should have been a method inside the Producer
+ class. However, multiprocessing module has issues in windows. The
+ functionality breaks unless this function is kept outside of a class
+ """
+ stop = False
+ client.reinit()
+
+ while not stop:
+ timeout = batch_time
+ count = batch_size
+ send_at = datetime.now() + timedelta(seconds=timeout)
+ msgset = defaultdict(list)
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ partition, msg = queue.get(timeout=timeout)
+ except Empty:
+ break
+
+ # Check if the controller has requested us to stop
+ if partition == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = (send_at - datetime.now()).total_seconds()
+ msgset[partition].append(msg)
+
+ # Send collected requests upstream
+ reqs = []
+ for partition, messages in msgset.items():
+ req = ProduceRequest(topic, partition, messages)
+ reqs.append(req)
+
+ try:
+ client.send_produce_request(reqs,
+ acks=req_acks,
+ timeout=ack_timeout)
+ except Exception as exp:
+ log.exception("Unable to send message")
+
+
class Producer(object):
"""
Base class to be used by producers
@@ -62,60 +114,22 @@ class Producer(object):
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
- self.batch_send = batch_send
- self.batch_size = batch_send_every_n
- self.batch_time = batch_send_every_t
if self.async:
self.queue = Queue() # Messages are sent through this queue
- self.proc = Process(target=self._send_upstream, args=(self.queue,))
- self.proc.daemon = True # Process will die if main thread exits
+ self.proc = Process(target=_send_upstream,
+ args=(self.topic,
+ self.queue,
+ self.client.copy(),
+ batch_send_every_t,
+ batch_send_every_n,
+ self.req_acks,
+ self.ack_timeout))
+
+ # Process will die if main thread exits
+ self.proc.daemon = True
self.proc.start()
- def _send_upstream(self, queue):
- """
- Listen on the queue for a specified number of messages or till
- a specified timeout and send them upstream to the brokers in one
- request
- """
- stop = False
-
- while not stop:
- timeout = self.batch_time
- send_at = datetime.now() + timedelta(seconds=timeout)
- count = self.batch_size
- msgset = defaultdict(list)
-
- # Keep fetching till we gather enough messages or a
- # timeout is reached
- while count > 0 and timeout >= 0:
- try:
- partition, msg = queue.get(timeout=timeout)
- except Empty:
- break
-
- # Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
- stop = True
- break
-
- # Adjust the timeout to match the remaining period
- count -= 1
- timeout = (send_at - datetime.now()).total_seconds()
- msgset[partition].append(msg)
-
- # Send collected requests upstream
- reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(self.topic, partition, messages)
- reqs.append(req)
-
- try:
- self.client.send_produce_request(reqs, acks=self.req_acks,
- timeout=self.ack_timeout)
- except Exception:
- log.exception("Unable to send message")
-
def send_messages(self, partition, *msg):
"""
Helper method to send produce requests