diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 20:18:04 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 20:21:16 -0800 |
commit | 432c0fe7d7a881a1e796b2dcf2e4889089d69017 (patch) | |
tree | 6b4cd7b692b3d4601a5cc2bee47b7e2688b3099b /example.py | |
parent | 405b1a5d66133fa97f671a16af6fb07af791b716 (diff) | |
download | kafka-python-432c0fe7d7a881a1e796b2dcf2e4889089d69017.tar.gz |
Use KafkaProducer / KafkaConsumer in example.py
Diffstat (limited to 'example.py')
-rwxr-xr-x | example.py | 25 |
1 files changed, 12 insertions, 13 deletions
@@ -1,21 +1,18 @@ #!/usr/bin/env python import threading, logging, time -from kafka.client import KafkaClient -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer +from kafka import KafkaConsumer, KafkaProducer + class Producer(threading.Thread): daemon = True def run(self): - client = KafkaClient("localhost:9092") - producer = SimpleProducer(client) + producer = KafkaProducer(bootstrap_servers='localhost:9092') while True: - producer.send_messages('my-topic', "test") - producer.send_messages('my-topic', "\xc2Hola, mundo!") - + producer.send('my-topic', b"test") + producer.send('my-topic', b"\xc2Hola, mundo!") time.sleep(1) @@ -23,11 +20,13 @@ class Consumer(threading.Thread): daemon = True def run(self): - client = KafkaClient("localhost:9092") - consumer = SimpleConsumer(client, "test-group", "my-topic") + consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + auto_offset_reset='earliest') + consumer.subscribe(['my-topic']) for message in consumer: - print(message) + print (message) + def main(): threads = [ @@ -38,11 +37,11 @@ def main(): for t in threads: t.start() - time.sleep(5) + time.sleep(10) if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.DEBUG + level=logging.INFO ) main() |