summaryrefslogtreecommitdiff
path: root/load_example.py
diff options
context:
space:
mode:
authorMark Roberts <markroberts@kixeye.com>2014-02-25 01:02:39 -0800
committerMark Roberts <markroberts@kixeye.com>2014-02-25 11:25:36 -0800
commitee7e86ea712de0a0390e64752c5cf9180c1681b5 (patch)
tree17e945cad27737d371712303a80ca4b2ab467b18 /load_example.py
parente5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff)
downloadkafka-python-ee7e86ea712de0a0390e64752c5cf9180c1681b5.tar.gz
Update example.py to compile, add friendly load_example.py
Diffstat (limited to 'load_example.py')
-rwxr-xr-xload_example.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/load_example.py b/load_example.py
new file mode 100755
index 0000000..0ef07b6
--- /dev/null
+++ b/load_example.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+import threading, logging, time, collections
+
+from kafka.client import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.producer import SimpleProducer
+
+msg_size = 524288
+
+class Producer(threading.Thread):
+ daemon = True
+ big_msg = "1" * msg_size
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ producer = SimpleProducer(client)
+ self.sent = 0
+
+ while True:
+ producer.send_messages('my-topic', self.big_msg)
+ self.sent += 1
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ consumer = SimpleConsumer(client, "test-group", "my-topic",
+ max_buffer_size = None,
+ )
+ self.valid = 0
+ self.invalid = 0
+
+ for message in consumer:
+ if len(message.message.value) == msg_size:
+ self.valid += 1
+ else:
+ self.invalid += 1
+
+def main():
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(10)
+ print 'Messages sent: %d' % threads[0].sent
+ print 'Messages recvd: %d' % threads[1].valid
+ print 'Messages invalid: %d' % threads[1].invalid
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.DEBUG)
+ main()