diff options
-rw-r--r-- | kafka/client.py | 11 | ||||
-rw-r--r-- | kafka/conn.py | 15 | ||||
-rw-r--r-- | kafka/producer.py | 112 |
3 files changed, 87 insertions, 51 deletions
diff --git a/kafka/client.py b/kafka/client.py index c0a3cdb..71ededa 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,3 +1,4 @@ +import copy from collections import defaultdict from functools import partial from itertools import count @@ -193,6 +194,16 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() + def copy(self): + """ + Create an inactive copy of the client object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + for k, v in c.conns.items(): + c.conns[k] = v.copy() + return c + def reinit(self): for conn in self.conns.values(): conn.reinit() diff --git a/kafka/conn.py b/kafka/conn.py index 9356731..14aebc6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,3 +1,4 @@ +import copy import logging import socket import struct @@ -103,17 +104,27 @@ class KafkaConnection(local): self.data = self._consume_response() return self.data + def copy(self): + """ + Create an inactive copy of the connection object + A reinit() has to be done on the copy before it can be used again + """ + c = copy.deepcopy(self) + c._sock = None + return c + def close(self): """ Close this connection """ - self._sock.close() + if self._sock: + self._sock.close() def reinit(self): """ Re-initialize the socket connection """ - self._sock.close() + self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) diff --git a/kafka/producer.py b/kafka/producer.py index cceb584..7ef7896 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -19,6 +19,58 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 +def _send_upstream(topic, queue, client, batch_time, batch_size, + req_acks, ack_timeout): + """ + Listen on the queue for a specified number of messages or till + a specified timeout and send them upstream to the brokers in one + request + + NOTE: Ideally, this should have been a method inside the Producer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + stop = False + client.reinit() + + while not stop: + timeout = batch_time + count = batch_size + send_at = datetime.now() + timedelta(seconds=timeout) + msgset = defaultdict(list) + + # Keep fetching till we gather enough messages or a + # timeout is reached + while count > 0 and timeout >= 0: + try: + partition, msg = queue.get(timeout=timeout) + except Empty: + break + + # Check if the controller has requested us to stop + if partition == STOP_ASYNC_PRODUCER: + stop = True + break + + # Adjust the timeout to match the remaining period + count -= 1 + timeout = (send_at - datetime.now()).total_seconds() + msgset[partition].append(msg) + + # Send collected requests upstream + reqs = [] + for partition, messages in msgset.items(): + req = ProduceRequest(topic, partition, messages) + reqs.append(req) + + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + except Exception as exp: + log.exception("Unable to send message") + + class Producer(object): """ Base class to be used by producers @@ -62,60 +114,22 @@ class Producer(object): self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout - self.batch_send = batch_send - self.batch_size = batch_send_every_n - self.batch_time = batch_send_every_t if self.async: self.queue = Queue() # Messages are sent through this queue - self.proc = Process(target=self._send_upstream, args=(self.queue,)) - self.proc.daemon = True # Process will die if main thread exits + self.proc = Process(target=_send_upstream, + args=(self.topic, + self.queue, + self.client.copy(), + batch_send_every_t, + batch_send_every_n, + self.req_acks, + self.ack_timeout)) + + # Process will die if main thread exits + self.proc.daemon = True self.proc.start() - def _send_upstream(self, queue): - """ - Listen on the queue for a specified number of messages or till - a specified timeout and send them upstream to the brokers in one - request - """ - stop = False - - while not stop: - timeout = self.batch_time - send_at = datetime.now() + timedelta(seconds=timeout) - count = self.batch_size - msgset = defaultdict(list) - - # Keep fetching till we gather enough messages or a - # timeout is reached - while count > 0 and timeout >= 0: - try: - partition, msg = queue.get(timeout=timeout) - except Empty: - break - - # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: - stop = True - break - - # Adjust the timeout to match the remaining period - count -= 1 - timeout = (send_at - datetime.now()).total_seconds() - msgset[partition].append(msg) - - # Send collected requests upstream - reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(self.topic, partition, messages) - reqs.append(req) - - try: - self.client.send_produce_request(reqs, acks=self.req_acks, - timeout=self.ack_timeout) - except Exception: - log.exception("Unable to send message") - def send_messages(self, partition, *msg): """ Helper method to send produce requests |