summaryrefslogtreecommitdiff
path: root/example.py
blob: 93293128f88917901b3656ad4a4d39ff3e2db403 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import logging

from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer

def produce_example(kafka):
    message = kafka.create_message("testing")
    request = ProduceRequest("my-topic", -1, [message])
    kafka.send_message_set(request)

def consume_example(kafka):
    request = FetchRequest("my-topic", 0, 0, 1024)
    (messages, nextRequest) = kafka.get_message_set(request)
    for message in messages:
        print("Got Message: %s" % (message,))
    print(nextRequest)

def produce_gz_example(kafka):
    message = kafka.create_gzip_message("this message was gzipped", "along with this one")
    request = ProduceRequest("my-topic", 0, [message])
    kafka.send_message_set(request)

def main():
    client = KafkaClient("localhost", 9092)
    consumer = SimpleConsumer(client, "test-group", "my-topic")

if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    main()