diff options
-rw-r--r-- | docs/usage.rst | 75 |
1 files changed, 40 insertions, 35 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index ca326d4..6417cd8 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -9,21 +9,24 @@ SimpleProducer from kafka import SimpleProducer, KafkaClient # To send messages synchronously - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') producer = SimpleProducer(kafka) - # Note that the application is responsible for encoding messages to type str - producer.send_messages("my-topic", "some message") - producer.send_messages("my-topic", "this method", "is variadic") + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages(b'my-topic', b'some message') + producer.send_messages(b'my-topic', b'this method', b'is variadic') # Send unicode message - producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8')) + +Asynchronous Mode +----------------- + +.. code:: python # To send messages asynchronously - # WARNING: current implementation does not guarantee message delivery on failure! - # messages can get dropped! Use at your own risk! Or help us improve with a PR! producer = SimpleProducer(kafka, async=True) - producer.send_messages("my-topic", "async message") + producer.send_messages(b'my-topic', b'async message') # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to @@ -32,13 +35,12 @@ SimpleProducer # by all in sync replicas before sending a response producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000) - - response = producer.send_messages("my-topic", "another message") + ack_timeout=2000, + sync_fail_on_error=False) - if response: - print(response[0].error) - print(response[0].offset) + responses = producer.send_messages(b'my-topic', b'another message') + for r in responses: + logging.info(r.offset) # To send messages in batch. You can use any of the available # producers for doing this. The following producer will collect @@ -56,16 +58,21 @@ Keyed messages .. code:: python - from kafka import (KafkaClient, KeyedProducer, HashedPartitioner, - RoundRobinPartitioner) + from kafka import ( + KafkaClient, KeyedProducer, + Murmur2Partitioner, RoundRobinPartitioner) - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') - # HashedPartitioner is default + # HashedPartitioner is default (currently uses python hash()) producer = KeyedProducer(kafka) - producer.send_messages("my-topic", "key1", "some message") - producer.send_messages("my-topic", "key2", "this methode") + producer.send_messages(b'my-topic', b'key1', b'some message') + producer.send_messages(b'my-topic', b'key2', b'this methode') + # Murmur2Partitioner attempts to mirror the java client hashing + producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + + # Or just produce round-robin (or just use SimpleProducer) producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) @@ -78,9 +85,9 @@ KafkaConsumer from kafka import KafkaConsumer # To consume messages - consumer = KafkaConsumer("my-topic", - group_id="my_group", - bootstrap_servers=["localhost:9092"]) + consumer = KafkaConsumer('my-topic', + group_id='my_group', + bootstrap_servers=['localhost:9092']) for message in consumer: # message value is raw byte string -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` @@ -88,8 +95,6 @@ KafkaConsumer message.offset, message.key, message.value)) - kafka.close() - messages (m) are namedtuples with attributes: @@ -121,16 +126,16 @@ messages (m) are namedtuples with attributes: # so it can be included in the next commit # # **messages that are not marked w/ task_done currently do not commit! - kafka.task_done(m) + consumer.task_done(m) # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() + consumer.commit() # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) - kafka.task_done(m) + consumer.task_done(m) Configuration settings can be passed to constructor, @@ -162,13 +167,13 @@ Multiprocess consumer from kafka import KafkaClient, MultiProcessConsumer - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') # This will split the number of partitions among two processes - consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2) # This will spawn processes such that each handles 2 partitions max - consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', partitions_per_proc=2) for message in consumer: @@ -186,14 +191,14 @@ Low level from kafka.protocol import KafkaProtocol from kafka.common import ProduceRequest - kafka = KafkaClient("localhost:9092") + kafka = KafkaClient('localhost:9092') - req = ProduceRequest(topic="my-topic", partition=1, - messages=[create_message("some message")]) + req = ProduceRequest(topic=b'my-topic', partition=1, + messages=[create_message(b'some message')]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() - resps[0].topic # "my-topic" + resps[0].topic # b'my-topic' resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request |