diff options
Diffstat (limited to 'kafka/queue.py')
-rw-r--r-- | kafka/queue.py | 37 |
1 files changed, 34 insertions, 3 deletions
diff --git a/kafka/queue.py b/kafka/queue.py index b86b1db..d4f5b6c 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest log = logging.getLogger("kafka") class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200): + def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): self.client = copy(client) self.topic = topic self.partition = partition self.out_queue = out_queue self.barrier = barrier + self.consumer_fetch_size = consumer_fetch_size self.consumer_sleep = consumer_sleep / 1000. log.info("Initializing %s" % self) Process.__init__(self) @@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process): def run(self): self.barrier.wait() log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize) + fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size) while True: if self.barrier.is_set() == False: log.info("Shutdown %s" % self) @@ -91,6 +92,10 @@ class KafkaQueue(object): """ KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. + Messages are buffered in the producer thread until producer_flush_timeout or + producer_flush_buffer is reached. + Params ====== client: KafkaClient object @@ -101,6 +106,8 @@ class KafkaQueue(object): Consumer Config =============== + consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default + is 1024 consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches the end of a partition. Default is 200 @@ -133,12 +140,36 @@ class KafkaQueue(object): self.barrier.set() def get(self, block=True, timeout=None): + """ + Consume a message from Kafka + + Params + ====== + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + + Returns + ======= + msg: str, the payload from Kafka + """ return self.in_queue.get(block, timeout).payload def put(self, msg, block=True, timeout=None): - return self.out_queue.put(msg, block, timeout) + """ + Send a message to Kafka + + Params + ====== + msg: std, the message to send + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + """ + self.out_queue.put(msg, block, timeout) def close(self): + """ + Close the internal queues and Kafka consumers/producer + """ self.in_queue.close() self.out_queue.close() self.barrier.clear() |