summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/queue.py37
1 files changed, 34 insertions, 3 deletions
diff --git a/kafka/queue.py b/kafka/queue.py
index b86b1db..d4f5b6c 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
class KafkaConsumerProcess(Process):
- def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):
+ def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
+ self.consumer_fetch_size = consumer_fetch_size
self.consumer_sleep = consumer_sleep / 1000.
log.info("Initializing %s" % self)
Process.__init__(self)
@@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process):
def run(self):
self.barrier.wait()
log.info("Starting %s" % self)
- fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
+ fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size)
while True:
if self.barrier.is_set() == False:
log.info("Shutdown %s" % self)
@@ -91,6 +92,10 @@ class KafkaQueue(object):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
+ Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size.
+ Messages are buffered in the producer thread until producer_flush_timeout or
+ producer_flush_buffer is reached.
+
Params
======
client: KafkaClient object
@@ -101,6 +106,8 @@ class KafkaQueue(object):
Consumer Config
===============
+ consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default
+ is 1024
consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
the end of a partition. Default is 200
@@ -133,12 +140,36 @@ class KafkaQueue(object):
self.barrier.set()
def get(self, block=True, timeout=None):
+ """
+ Consume a message from Kafka
+
+ Params
+ ======
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
+
+ Returns
+ =======
+ msg: str, the payload from Kafka
+ """
return self.in_queue.get(block, timeout).payload
def put(self, msg, block=True, timeout=None):
- return self.out_queue.put(msg, block, timeout)
+ """
+ Send a message to Kafka
+
+ Params
+ ======
+ msg: std, the message to send
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
+ """
+ self.out_queue.put(msg, block, timeout)
def close(self):
+ """
+ Close the internal queues and Kafka consumers/producer
+ """
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()