summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-07-11 05:58:54 -0700
committerDavid Arthur <mumrah@gmail.com>2013-07-11 05:58:54 -0700
commit5684af438e6cf871540aa8ea8b556737f56e9798 (patch)
treef9bb1f6046943946236ceae61d9266a8c14bdcfe /kafka
parentffdc08aeec040862d522914a480c135626a19e69 (diff)
parentd2df8f54637490b1dbe858066b74710b57186016 (diff)
downloadkafka-python-5684af438e6cf871540aa8ea8b556737f56e9798.tar.gz
Merge pull request #33 from mahendra/asyncproducer
Support for async producer Merged locally, tests pass, +1
Diffstat (limited to 'kafka')
-rw-r--r--kafka/__init__.py6
-rw-r--r--kafka/client.py13
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/producer.py207
4 files changed, 202 insertions, 26 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index d229169..e352c03 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -9,10 +9,12 @@ from kafka.conn import KafkaConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
-from kafka.producer import SimpleProducer
+from kafka.producer import SimpleProducer, KeyedProducer
+from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer
__all__ = [
- 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer',
+ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
+ 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..b3f8667 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,12 +161,16 @@ class KafkaClient(object):
# Send the request, recv the response
conn.send(requestId, request)
+
+ if decoder_fn is None:
+ continue
+
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
+ return (acc[k] for k in original_keys) if acc else ()
#################
# Public API #
@@ -201,7 +205,12 @@ class KafkaClient(object):
encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
- decoder = KafkaProtocol.decode_produce_response
+
+ if acks == 0:
+ decoder = None
+ else:
+ decoder = KafkaProtocol.decode_produce_response
+
resps = self._send_broker_aware_request(payloads, encoder, decoder)
out = []
diff --git a/kafka/conn.py b/kafka/conn.py
index fce1fdc..aba3ada 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -76,11 +76,11 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent != None:
raise RuntimeError("Kafka went away")
- self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
log.debug("Reading response %d from Kafka" % requestId)
+ self.data = self._consume_response()
return self.data
def close(self):
diff --git a/kafka/producer.py b/kafka/producer.py
index 69c3830..06e468d 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,5 +1,10 @@
+from collections import defaultdict
+from datetime import datetime, timedelta
from itertools import cycle
+from multiprocessing import Queue, Process
+from Queue import Empty
import logging
+import sys
from kafka.common import ProduceRequest
from kafka.protocol import create_message
@@ -7,26 +12,175 @@ from kafka.partitioner import HashedPartitioner
log = logging.getLogger("kafka")
+BATCH_SEND_DEFAULT_INTERVAL = 20
+BATCH_SEND_MSG_COUNT = 20
-class SimpleProducer(object):
+STOP_ASYNC_PRODUCER = -1
+
+
+class Producer(object):
"""
- A simple, round-robbin producer. Each message goes to exactly one partition
+ Base class to be used by producers
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If set to true, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic):
+
+ ACK_NOT_REQUIRED = 0 # No ack is required
+ ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
+ ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
+
+ DEFAULT_ACK_TIMEOUT = 1000
+
+ def __init__(self, client, async=False,
+ req_acks=ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+
+ if batch_send:
+ async = True
+ assert batch_send_every_n > 0
+ assert batch_send_every_t > 0
+ else:
+ batch_send_every_n = 1
+ batch_send_every_t = 3600
+
self.client = client
+ self.async = async
+ self.req_acks = req_acks
+ self.ack_timeout = ack_timeout
+ self.batch_send = batch_send
+ self.batch_size = batch_send_every_n
+ self.batch_time = batch_send_every_t
+
+ if self.async:
+ self.queue = Queue() # Messages are sent through this queue
+ self.proc = Process(target=self._send_upstream, args=(self.queue,))
+ self.proc.daemon = True # Process will die if main thread exits
+ self.proc.start()
+
+ def _send_upstream(self, queue):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+ """
+ stop = False
+
+ while not stop:
+ timeout = self.batch_time
+ send_at = datetime.now() + timedelta(seconds=timeout)
+ count = self.batch_size
+ msgset = defaultdict(list)
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ partition, msg = queue.get(timeout=timeout)
+ except Empty:
+ break
+
+ # Check if the controller has requested us to stop
+ if partition == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = (send_at - datetime.now()).total_seconds()
+ msgset[partition].append(msg)
+
+ # Send collected requests upstream
+ reqs = []
+ for partition, messages in msgset.items():
+ req = ProduceRequest(self.topic, partition, messages)
+ reqs.append(req)
+
+ try:
+ self.client.send_produce_request(reqs, acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as exp:
+ log.error("Error sending message", exc_info=sys.exc_info())
+
+ def send_messages(self, partition, *msg):
+ """
+ Helper method to send produce requests
+ """
+ if self.async:
+ for m in msg:
+ self.queue.put((partition, create_message(m)))
+ resp = []
+ else:
+ messages = [create_message(m) for m in msg]
+ req = ProduceRequest(self.topic, partition, messages)
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ return resp
+
+ def stop(self, timeout=1):
+ """
+ Stop the producer. Optionally wait for the specified timeout before
+ forcefully cleaning up.
+ """
+ if self.async:
+ self.queue.put((STOP_ASYNC_PRODUCER, None))
+ self.proc.join(timeout)
+
+ if self.proc.is_alive():
+ self.proc.terminate()
+
+
+class SimpleProducer(Producer):
+ """
+ A simple, round-robbin producer. Each message goes to exactly one partition
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
+ """
+ def __init__(self, client, topic, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
- self.next_partition = cycle(self.client.topic_partitions[topic])
+ client._load_metadata_for_topics(topic)
+ self.next_partition = cycle(client.topic_partitions[topic])
- def send_messages(self, *msg):
- req = ProduceRequest(self.topic, self.next_partition.next(),
- messages=[create_message(m) for m in msg])
+ super(SimpleProducer, self).__init__(client, async, req_acks,
+ ack_timeout, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ def send_messages(self, *msg):
+ partition = self.next_partition.next()
+ return super(SimpleProducer, self).send_messages(partition, *msg)
-class KeyedProducer(object):
+class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
@@ -35,23 +189,34 @@ class KeyedProducer(object):
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None):
- self.client = client
+ def __init__(self, client, topic, partitioner=None, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
+ client._load_metadata_for_topics(topic)
if not partitioner:
partitioner = HashedPartitioner
- self.partitioner = partitioner(self.client.topic_partitions[topic])
+ self.partitioner = partitioner(client.topic_partitions[topic])
+
+ super(KeyedProducer, self).__init__(client, async, req_acks,
+ ack_timeout, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
-
- req = ProduceRequest(self.topic, partition,
- messages=[create_message(msg)])
-
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ return self.send_messages(partition, msg)