diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 20:47:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-16 20:47:15 -0700 |
commit | 947625bfa4b6462e3f7c0fdad0a0cd69708beb2c (patch) | |
tree | aeae9decba9e1eba0827bcc5dc97c3b85d6f358b /load_example.py | |
parent | 3666b66a21776d620f68d2f7ff2fed1bc18b94e5 (diff) | |
parent | 7a2ec3332b0a83dcaaab4a402db13ed9d56d89e8 (diff) | |
download | kafka-python-947625bfa4b6462e3f7c0fdad0a0cd69708beb2c.tar.gz |
Merge pull request #754 from dpkp/benchmarks
Producer metrics + consumer/producer benchmark scripts
Diffstat (limited to 'load_example.py')
-rwxr-xr-x | load_example.py | 65 |
1 files changed, 0 insertions, 65 deletions
diff --git a/load_example.py b/load_example.py deleted file mode 100755 index a3b09ba..0000000 --- a/load_example.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -import threading, logging, time - -from kafka import KafkaConsumer, KafkaProducer - -msg_size = 524288 - -producer_stop = threading.Event() -consumer_stop = threading.Event() - -class Producer(threading.Thread): - big_msg = b'1' * msg_size - - def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') - self.sent = 0 - - while not producer_stop.is_set(): - producer.send('my-topic', self.big_msg) - self.sent += 1 - producer.flush() - - -class Consumer(threading.Thread): - - def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') - consumer.subscribe(['my-topic']) - self.valid = 0 - self.invalid = 0 - - for message in consumer: - if len(message.value) == msg_size: - self.valid += 1 - else: - self.invalid += 1 - - if consumer_stop.is_set(): - break - - consumer.close() - -def main(): - threads = [ - Producer(), - Consumer() - ] - - for t in threads: - t.start() - - time.sleep(10) - producer_stop.set() - consumer_stop.set() - print 'Messages sent: %d' % threads[0].sent - print 'Messages recvd: %d' % threads[1].valid - print 'Messages invalid: %d' % threads[1].invalid - -if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) - main() |