summaryrefslogtreecommitdiff
path: root/benchmarks/load_example.py
diff options
context:
space:
mode:
Diffstat (limited to 'benchmarks/load_example.py')
-rwxr-xr-xbenchmarks/load_example.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/benchmarks/load_example.py b/benchmarks/load_example.py
new file mode 100755
index 0000000..a3b09ba
--- /dev/null
+++ b/benchmarks/load_example.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+import threading, logging, time
+
+from kafka import KafkaConsumer, KafkaProducer
+
+msg_size = 524288
+
+producer_stop = threading.Event()
+consumer_stop = threading.Event()
+
+class Producer(threading.Thread):
+ big_msg = b'1' * msg_size
+
+ def run(self):
+ producer = KafkaProducer(bootstrap_servers='localhost:9092')
+ self.sent = 0
+
+ while not producer_stop.is_set():
+ producer.send('my-topic', self.big_msg)
+ self.sent += 1
+ producer.flush()
+
+
+class Consumer(threading.Thread):
+
+ def run(self):
+ consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ auto_offset_reset='earliest')
+ consumer.subscribe(['my-topic'])
+ self.valid = 0
+ self.invalid = 0
+
+ for message in consumer:
+ if len(message.value) == msg_size:
+ self.valid += 1
+ else:
+ self.invalid += 1
+
+ if consumer_stop.is_set():
+ break
+
+ consumer.close()
+
+def main():
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(10)
+ producer_stop.set()
+ consumer_stop.set()
+ print 'Messages sent: %d' % threads[0].sent
+ print 'Messages recvd: %d' % threads[1].valid
+ print 'Messages invalid: %d' % threads[1].invalid
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+ level=logging.INFO
+ )
+ main()