diff options
Diffstat (limited to 'load_example.py')
| -rwxr-xr-x | load_example.py | 57 | 
1 files changed, 57 insertions, 0 deletions
| diff --git a/load_example.py b/load_example.py new file mode 100755 index 0000000..0ef07b6 --- /dev/null +++ b/load_example.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +import threading, logging, time, collections + +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +msg_size = 524288 + +class Producer(threading.Thread): +    daemon = True +    big_msg = "1" * msg_size + +    def run(self): +        client = KafkaClient("localhost", 9092) +        producer = SimpleProducer(client) +        self.sent = 0 + +        while True: +            producer.send_messages('my-topic', self.big_msg) +            self.sent += 1 + + +class Consumer(threading.Thread): +    daemon = True + +    def run(self): +        client = KafkaClient("localhost", 9092) +        consumer = SimpleConsumer(client, "test-group", "my-topic", +            max_buffer_size = None, +        ) +        self.valid = 0 +        self.invalid = 0 + +        for message in consumer: +            if len(message.message.value) == msg_size: +                self.valid += 1 +            else: +                self.invalid += 1 + +def main(): +    threads = [ +        Producer(), +        Consumer() +    ] + +    for t in threads: +        t.start() + +    time.sleep(10) +    print 'Messages sent: %d' % threads[0].sent +    print 'Messages recvd: %d' % threads[1].valid +    print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": +    logging.basicConfig(level=logging.DEBUG) +    main() | 
