diff options
| author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 12:40:15 -0700 | 
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 14:05:50 -0700 | 
| commit | bea1a2adacc662abe2b041bc38bfc452bb12caab (patch) | |
| tree | 0a8336444393e4724b962eabea6121ac1612ed3d /benchmarks | |
| parent | dbb0dae4f34a469bf04a4df751892b237b4707a9 (diff) | |
| download | kafka-python-bea1a2adacc662abe2b041bc38bfc452bb12caab.tar.gz | |
Move load_example.py to benchmarks/
Diffstat (limited to 'benchmarks')
| -rwxr-xr-x | benchmarks/load_example.py | 65 | 
1 files changed, 65 insertions, 0 deletions
| diff --git a/benchmarks/load_example.py b/benchmarks/load_example.py new file mode 100755 index 0000000..a3b09ba --- /dev/null +++ b/benchmarks/load_example.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +import threading, logging, time + +from kafka import KafkaConsumer, KafkaProducer + +msg_size = 524288 + +producer_stop = threading.Event() +consumer_stop = threading.Event() + +class Producer(threading.Thread): +    big_msg = b'1' * msg_size + +    def run(self): +        producer = KafkaProducer(bootstrap_servers='localhost:9092') +        self.sent = 0 + +        while not producer_stop.is_set(): +            producer.send('my-topic', self.big_msg) +            self.sent += 1 +        producer.flush() + + +class Consumer(threading.Thread): + +    def run(self): +        consumer = KafkaConsumer(bootstrap_servers='localhost:9092', +                                 auto_offset_reset='earliest') +        consumer.subscribe(['my-topic']) +        self.valid = 0 +        self.invalid = 0 + +        for message in consumer: +            if len(message.value) == msg_size: +                self.valid += 1 +            else: +                self.invalid += 1 + +            if consumer_stop.is_set(): +                break + +        consumer.close() + +def main(): +    threads = [ +        Producer(), +        Consumer() +    ] + +    for t in threads: +        t.start() + +    time.sleep(10) +    producer_stop.set() +    consumer_stop.set() +    print 'Messages sent: %d' % threads[0].sent +    print 'Messages recvd: %d' % threads[1].valid +    print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": +    logging.basicConfig( +        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', +        level=logging.INFO +        ) +    main() | 
