summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x[-rw-r--r--]example.py48
-rwxr-xr-xload_example.py57
2 files changed, 92 insertions, 13 deletions
diff --git a/example.py b/example.py
index 3a2dc92..0eac0a5 100644..100755
--- a/example.py
+++ b/example.py
@@ -1,23 +1,45 @@
-import logging
+#!/usr/bin/env python
+import threading, logging, time
-from kafka.client import KafkaClient, FetchRequest, ProduceRequest
+from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
-def produce_example(client):
- producer = SimpleProducer(client, "my-topic")
- producer.send_messages("test")
+class Producer(threading.Thread):
+ daemon = True
-def consume_example(client):
- consumer = SimpleConsumer(client, "test-group", "my-topic")
- for message in consumer:
- print(message)
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ producer = SimpleProducer(client)
+
+ while True:
+ producer.send_messages('my-topic', "test")
+ producer.send_messages('my-topic', "\xc2Hola, mundo!")
+
+ time.sleep(1)
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ consumer = SimpleConsumer(client, "test-group", "my-topic")
+
+ for message in consumer:
+ print(message)
def main():
- client = KafkaClient("localhost", 9092)
- produce_example(client)
- consume_example(client)
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(5)
if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.WARN)
main()
diff --git a/load_example.py b/load_example.py
new file mode 100755
index 0000000..0ef07b6
--- /dev/null
+++ b/load_example.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+import threading, logging, time, collections
+
+from kafka.client import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.producer import SimpleProducer
+
+msg_size = 524288
+
+class Producer(threading.Thread):
+ daemon = True
+ big_msg = "1" * msg_size
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ producer = SimpleProducer(client)
+ self.sent = 0
+
+ while True:
+ producer.send_messages('my-topic', self.big_msg)
+ self.sent += 1
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ consumer = SimpleConsumer(client, "test-group", "my-topic",
+ max_buffer_size = None,
+ )
+ self.valid = 0
+ self.invalid = 0
+
+ for message in consumer:
+ if len(message.message.value) == msg_size:
+ self.valid += 1
+ else:
+ self.invalid += 1
+
+def main():
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(10)
+ print 'Messages sent: %d' % threads[0].sent
+ print 'Messages recvd: %d' % threads[1].valid
+ print 'Messages invalid: %d' % threads[1].invalid
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.DEBUG)
+ main()