diff options
| -rw-r--r-- | kafka/queue.py | 66 | ||||
| -rw-r--r-- | test/integration.py | 1 | 
2 files changed, 48 insertions, 19 deletions
| diff --git a/kafka/queue.py b/kafka/queue.py index cefb381..b86b1db 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -9,24 +9,27 @@ from .client import KafkaClient, FetchRequest, ProduceRequest  log = logging.getLogger("kafka")  class KafkaConsumerProcess(Process): -    def __init__(self, client, topic, partition, out_queue, barrier): +    def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):          self.client = copy(client)          self.topic = topic          self.partition = partition          self.out_queue = out_queue          self.barrier = barrier -        self.consumer_sleep = 0.2 +        self.consumer_sleep = consumer_sleep / 1000. +        log.info("Initializing %s" % self)          Process.__init__(self) -    def config(self, consumer_sleep): -        self.consumer_sleep = consumer_sleep / 1000. +    def __str__(self): +        return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % ( +                self.topic, self.partition, self.consumer_sleep)      def run(self):          self.barrier.wait() -        log.info("Starting Consumer") +        log.info("Starting %s" % self)          fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)          while True:              if self.barrier.is_set() == False: +                log.info("Shutdown %s" % self)                  self.client.close()                  break              lastOffset = fetchRequest.offset @@ -39,24 +42,24 @@ class KafkaConsumerProcess(Process):                  self.out_queue.put(message)  class KafkaProducerProcess(Process): -    def __init__(self, client, topic, in_queue, barrier): +    def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100):          self.client = copy(client)          self.topic = topic          self.in_queue = in_queue          self.barrier = barrier -        self.producer_flush_buffer = 100 -        self.producer_flush_timeout = 2.0 -        self.producer_timeout = 0.1 -        Process.__init__(self) - -    def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):          self.producer_flush_buffer = producer_flush_buffer          self.producer_flush_timeout = producer_flush_timeout / 1000.          self.producer_timeout = producer_timeout / 1000. +        log.info("Initializing %s" % self) +        Process.__init__(self) + +    def __str__(self): +        return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % ( +                self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout)      def run(self):          self.barrier.wait() -        log.info("Starting Producer") +        log.info("Starting %s" % self)          messages = []          last_produce = time.time() @@ -66,13 +69,14 @@ class KafkaProducerProcess(Process):          while True:              if self.barrier.is_set() == False: -                log.info("Producer shut down. Flushing messages") +                log.info("Shutdown %s, flushing messages" % self)                  flush(messages)                  self.client.close()                  break              if len(messages) > self.producer_flush_buffer:                  log.debug("Message count threashold reached. Flushing messages")                  flush(messages) +                last_produce = time.time()              elif (time.time() - last_produce) > self.producer_flush_timeout:                  log.debug("Producer timeout reached. Flushing messages")                  flush(messages) @@ -83,7 +87,33 @@ class KafkaProducerProcess(Process):                  continue  class KafkaQueue(object): -    def __init__(self, client, topic, partitions): +    def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}): +        """ +        KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + +        Params +        ====== +        client: KafkaClient object +        topic: str, the topic name +        partitions: list of ints, the partions to consume from +        producer_config: dict, see below +        consumer_config: dict, see below + +        Consumer Config +        =============== +        consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches +            the end of a partition. Default is 200 + +        Producer Config +        =============== +        producer_timeout: int, time in milliseconds a producer should wait for messages to  +            enqueue for producing. Default is 100 +        producer_flush_timeout: int, time in milliseconds a producer should allow messages +            to accumulate before sending to Kafka. Default is 2000 +        producer_flush_buffer: int, number of messages a producer should allow to accumulate. +            Default is 500 +         +        """          self.in_queue = Queue()          self.out_queue = Queue()          self.consumers = [] @@ -91,14 +121,12 @@ class KafkaQueue(object):          # Initialize and start consumer threads          for partition in partitions: -            consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier) -            consumer.config(consumer_sleep=200) +            consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config)              consumer.start()              self.consumers.append(consumer)          # Initialize and start producer thread -        self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier) -        self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100) +        self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config)          self.producer.start()          # Trigger everything to start diff --git a/test/integration.py b/test/integration.py index e11b33b..3971d3f 100644 --- a/test/integration.py +++ b/test/integration.py @@ -258,4 +258,5 @@ class IntegrationTest(unittest.TestCase):          q.close()  if __name__ == "__main__": +    logging.basicConfig(level=logging.INFO)      unittest.main()  | 
