summaryrefslogtreecommitdiff
path: root/kafka/queue.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-19 11:42:09 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-19 20:47:27 -0500
commit1d0bc784d41364d6178dce452f2ac787b52749a8 (patch)
tree8a9fb036537cad9e7125fbb1fd67497ed237e837 /kafka/queue.py
parent330ddbca4464f32d31b662eb95b92113e7024323 (diff)
downloadkafka-python-1d0bc784d41364d6178dce452f2ac787b52749a8.tar.gz
Add some docs and KafkaQueue config
Ref #8
Diffstat (limited to 'kafka/queue.py')
-rw-r--r--kafka/queue.py66
1 files changed, 47 insertions, 19 deletions
diff --git a/kafka/queue.py b/kafka/queue.py
index cefb381..b86b1db 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -9,24 +9,27 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
class KafkaConsumerProcess(Process):
- def __init__(self, client, topic, partition, out_queue, barrier):
+ def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
- self.consumer_sleep = 0.2
+ self.consumer_sleep = consumer_sleep / 1000.
+ log.info("Initializing %s" % self)
Process.__init__(self)
- def config(self, consumer_sleep):
- self.consumer_sleep = consumer_sleep / 1000.
+ def __str__(self):
+ return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % (
+ self.topic, self.partition, self.consumer_sleep)
def run(self):
self.barrier.wait()
- log.info("Starting Consumer")
+ log.info("Starting %s" % self)
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
while True:
if self.barrier.is_set() == False:
+ log.info("Shutdown %s" % self)
self.client.close()
break
lastOffset = fetchRequest.offset
@@ -39,24 +42,24 @@ class KafkaConsumerProcess(Process):
self.out_queue.put(message)
class KafkaProducerProcess(Process):
- def __init__(self, client, topic, in_queue, barrier):
+ def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100):
self.client = copy(client)
self.topic = topic
self.in_queue = in_queue
self.barrier = barrier
- self.producer_flush_buffer = 100
- self.producer_flush_timeout = 2.0
- self.producer_timeout = 0.1
- Process.__init__(self)
-
- def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
self.producer_flush_buffer = producer_flush_buffer
self.producer_flush_timeout = producer_flush_timeout / 1000.
self.producer_timeout = producer_timeout / 1000.
+ log.info("Initializing %s" % self)
+ Process.__init__(self)
+
+ def __str__(self):
+ return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % (
+ self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout)
def run(self):
self.barrier.wait()
- log.info("Starting Producer")
+ log.info("Starting %s" % self)
messages = []
last_produce = time.time()
@@ -66,13 +69,14 @@ class KafkaProducerProcess(Process):
while True:
if self.barrier.is_set() == False:
- log.info("Producer shut down. Flushing messages")
+ log.info("Shutdown %s, flushing messages" % self)
flush(messages)
self.client.close()
break
if len(messages) > self.producer_flush_buffer:
log.debug("Message count threashold reached. Flushing messages")
flush(messages)
+ last_produce = time.time()
elif (time.time() - last_produce) > self.producer_flush_timeout:
log.debug("Producer timeout reached. Flushing messages")
flush(messages)
@@ -83,7 +87,33 @@ class KafkaProducerProcess(Process):
continue
class KafkaQueue(object):
- def __init__(self, client, topic, partitions):
+ def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}):
+ """
+ KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
+
+ Params
+ ======
+ client: KafkaClient object
+ topic: str, the topic name
+ partitions: list of ints, the partions to consume from
+ producer_config: dict, see below
+ consumer_config: dict, see below
+
+ Consumer Config
+ ===============
+ consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
+ the end of a partition. Default is 200
+
+ Producer Config
+ ===============
+ producer_timeout: int, time in milliseconds a producer should wait for messages to
+ enqueue for producing. Default is 100
+ producer_flush_timeout: int, time in milliseconds a producer should allow messages
+ to accumulate before sending to Kafka. Default is 2000
+ producer_flush_buffer: int, number of messages a producer should allow to accumulate.
+ Default is 500
+
+ """
self.in_queue = Queue()
self.out_queue = Queue()
self.consumers = []
@@ -91,14 +121,12 @@ class KafkaQueue(object):
# Initialize and start consumer threads
for partition in partitions:
- consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
- consumer.config(consumer_sleep=200)
+ consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config)
consumer.start()
self.consumers.append(consumer)
# Initialize and start producer thread
- self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
- self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
+ self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config)
self.producer.start()
# Trigger everything to start