summaryrefslogtreecommitdiff
path: root/load_example.py
diff options
context:
space:
mode:
Diffstat (limited to 'load_example.py')
-rwxr-xr-xload_example.py65
1 files changed, 0 insertions, 65 deletions
diff --git a/load_example.py b/load_example.py
deleted file mode 100755
index a3b09ba..0000000
--- a/load_example.py
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/usr/bin/env python
-import threading, logging, time
-
-from kafka import KafkaConsumer, KafkaProducer
-
-msg_size = 524288
-
-producer_stop = threading.Event()
-consumer_stop = threading.Event()
-
-class Producer(threading.Thread):
- big_msg = b'1' * msg_size
-
- def run(self):
- producer = KafkaProducer(bootstrap_servers='localhost:9092')
- self.sent = 0
-
- while not producer_stop.is_set():
- producer.send('my-topic', self.big_msg)
- self.sent += 1
- producer.flush()
-
-
-class Consumer(threading.Thread):
-
- def run(self):
- consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- auto_offset_reset='earliest')
- consumer.subscribe(['my-topic'])
- self.valid = 0
- self.invalid = 0
-
- for message in consumer:
- if len(message.value) == msg_size:
- self.valid += 1
- else:
- self.invalid += 1
-
- if consumer_stop.is_set():
- break
-
- consumer.close()
-
-def main():
- threads = [
- Producer(),
- Consumer()
- ]
-
- for t in threads:
- t.start()
-
- time.sleep(10)
- producer_stop.set()
- consumer_stop.set()
- print 'Messages sent: %d' % threads[0].sent
- print 'Messages recvd: %d' % threads[1].valid
- print 'Messages invalid: %d' % threads[1].invalid
-
-if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
- main()