diff options
author | Mark Roberts <markroberts@kixeye.com> | 2014-02-25 01:02:39 -0800 |
---|---|---|
committer | Mark Roberts <markroberts@kixeye.com> | 2014-02-25 11:25:36 -0800 |
commit | ee7e86ea712de0a0390e64752c5cf9180c1681b5 (patch) | |
tree | 17e945cad27737d371712303a80ca4b2ab467b18 | |
parent | e5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff) | |
download | kafka-python-ee7e86ea712de0a0390e64752c5cf9180c1681b5.tar.gz |
Update example.py to compile, add friendly load_example.py
-rwxr-xr-x[-rw-r--r--] | example.py | 48 | ||||
-rwxr-xr-x | load_example.py | 57 |
2 files changed, 92 insertions, 13 deletions
diff --git a/example.py b/example.py index 3a2dc92..0eac0a5 100644..100755 --- a/example.py +++ b/example.py @@ -1,23 +1,45 @@ -import logging +#!/usr/bin/env python +import threading, logging, time -from kafka.client import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer -def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") +class Producer(threading.Thread): + daemon = True -def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") - for message in consumer: - print(message) + def run(self): + client = KafkaClient("localhost", 9092) + producer = SimpleProducer(client) + + while True: + producer.send_messages('my-topic', "test") + producer.send_messages('my-topic', "\xc2Hola, mundo!") + + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost", 9092) + consumer = SimpleConsumer(client, "test-group", "my-topic") + + for message in consumer: + print(message) def main(): - client = KafkaClient("localhost", 9092) - produce_example(client) - consume_example(client) + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() diff --git a/load_example.py b/load_example.py new file mode 100755 index 0000000..0ef07b6 --- /dev/null +++ b/load_example.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +import threading, logging, time, collections + +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +msg_size = 524288 + +class Producer(threading.Thread): + daemon = True + big_msg = "1" * msg_size + + def run(self): + client = KafkaClient("localhost", 9092) + producer = SimpleProducer(client) + self.sent = 0 + + while True: + producer.send_messages('my-topic', self.big_msg) + self.sent += 1 + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost", 9092) + consumer = SimpleConsumer(client, "test-group", "my-topic", + max_buffer_size = None, + ) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.message.value) == msg_size: + self.valid += 1 + else: + self.invalid += 1 + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + print 'Messages sent: %d' % threads[0].sent + print 'Messages recvd: %d' % threads[1].valid + print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() |