diff options
author | David Arthur <mumrah@gmail.com> | 2012-11-19 11:42:09 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-11-19 20:47:27 -0500 |
commit | 1d0bc784d41364d6178dce452f2ac787b52749a8 (patch) | |
tree | 8a9fb036537cad9e7125fbb1fd67497ed237e837 /kafka/queue.py | |
parent | 330ddbca4464f32d31b662eb95b92113e7024323 (diff) | |
download | kafka-python-1d0bc784d41364d6178dce452f2ac787b52749a8.tar.gz |
Add some docs and KafkaQueue config
Ref #8
Diffstat (limited to 'kafka/queue.py')
-rw-r--r-- | kafka/queue.py | 66 |
1 files changed, 47 insertions, 19 deletions
diff --git a/kafka/queue.py b/kafka/queue.py index cefb381..b86b1db 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -9,24 +9,27 @@ from .client import KafkaClient, FetchRequest, ProduceRequest log = logging.getLogger("kafka") class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier): + def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200): self.client = copy(client) self.topic = topic self.partition = partition self.out_queue = out_queue self.barrier = barrier - self.consumer_sleep = 0.2 + self.consumer_sleep = consumer_sleep / 1000. + log.info("Initializing %s" % self) Process.__init__(self) - def config(self, consumer_sleep): - self.consumer_sleep = consumer_sleep / 1000. + def __str__(self): + return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % ( + self.topic, self.partition, self.consumer_sleep) def run(self): self.barrier.wait() - log.info("Starting Consumer") + log.info("Starting %s" % self) fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize) while True: if self.barrier.is_set() == False: + log.info("Shutdown %s" % self) self.client.close() break lastOffset = fetchRequest.offset @@ -39,24 +42,24 @@ class KafkaConsumerProcess(Process): self.out_queue.put(message) class KafkaProducerProcess(Process): - def __init__(self, client, topic, in_queue, barrier): + def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100): self.client = copy(client) self.topic = topic self.in_queue = in_queue self.barrier = barrier - self.producer_flush_buffer = 100 - self.producer_flush_timeout = 2.0 - self.producer_timeout = 0.1 - Process.__init__(self) - - def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout): self.producer_flush_buffer = producer_flush_buffer self.producer_flush_timeout = producer_flush_timeout / 1000. self.producer_timeout = producer_timeout / 1000. + log.info("Initializing %s" % self) + Process.__init__(self) + + def __str__(self): + return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % ( + self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout) def run(self): self.barrier.wait() - log.info("Starting Producer") + log.info("Starting %s" % self) messages = [] last_produce = time.time() @@ -66,13 +69,14 @@ class KafkaProducerProcess(Process): while True: if self.barrier.is_set() == False: - log.info("Producer shut down. Flushing messages") + log.info("Shutdown %s, flushing messages" % self) flush(messages) self.client.close() break if len(messages) > self.producer_flush_buffer: log.debug("Message count threashold reached. Flushing messages") flush(messages) + last_produce = time.time() elif (time.time() - last_produce) > self.producer_flush_timeout: log.debug("Producer timeout reached. Flushing messages") flush(messages) @@ -83,7 +87,33 @@ class KafkaProducerProcess(Process): continue class KafkaQueue(object): - def __init__(self, client, topic, partitions): + def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}): + """ + KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + + Params + ====== + client: KafkaClient object + topic: str, the topic name + partitions: list of ints, the partions to consume from + producer_config: dict, see below + consumer_config: dict, see below + + Consumer Config + =============== + consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches + the end of a partition. Default is 200 + + Producer Config + =============== + producer_timeout: int, time in milliseconds a producer should wait for messages to + enqueue for producing. Default is 100 + producer_flush_timeout: int, time in milliseconds a producer should allow messages + to accumulate before sending to Kafka. Default is 2000 + producer_flush_buffer: int, number of messages a producer should allow to accumulate. + Default is 500 + + """ self.in_queue = Queue() self.out_queue = Queue() self.consumers = [] @@ -91,14 +121,12 @@ class KafkaQueue(object): # Initialize and start consumer threads for partition in partitions: - consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier) - consumer.config(consumer_sleep=200) + consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config) consumer.start() self.consumers.append(consumer) # Initialize and start producer thread - self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier) - self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100) + self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config) self.producer.start() # Trigger everything to start |