summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/usage.rst75
1 files changed, 40 insertions, 35 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index ca326d4..6417cd8 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -9,21 +9,24 @@ SimpleProducer
from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
- # Note that the application is responsible for encoding messages to type str
- producer.send_messages("my-topic", "some message")
- producer.send_messages("my-topic", "this method", "is variadic")
+ # Note that the application is responsible for encoding messages to type bytes
+ producer.send_messages(b'my-topic', b'some message')
+ producer.send_messages(b'my-topic', b'this method', b'is variadic')
# Send unicode message
- producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
+ producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8'))
+
+Asynchronous Mode
+-----------------
+
+.. code:: python
# To send messages asynchronously
- # WARNING: current implementation does not guarantee message delivery on failure!
- # messages can get dropped! Use at your own risk! Or help us improve with a PR!
producer = SimpleProducer(kafka, async=True)
- producer.send_messages("my-topic", "async message")
+ producer.send_messages(b'my-topic', b'async message')
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
@@ -32,13 +35,12 @@ SimpleProducer
# by all in sync replicas before sending a response
producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000)
-
- response = producer.send_messages("my-topic", "another message")
+ ack_timeout=2000,
+ sync_fail_on_error=False)
- if response:
- print(response[0].error)
- print(response[0].offset)
+ responses = producer.send_messages(b'my-topic', b'another message')
+ for r in responses:
+ logging.info(r.offset)
# To send messages in batch. You can use any of the available
# producers for doing this. The following producer will collect
@@ -56,16 +58,21 @@ Keyed messages
.. code:: python
- from kafka import (KafkaClient, KeyedProducer, HashedPartitioner,
- RoundRobinPartitioner)
+ from kafka import (
+ KafkaClient, KeyedProducer,
+ Murmur2Partitioner, RoundRobinPartitioner)
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
- # HashedPartitioner is default
+ # HashedPartitioner is default (currently uses python hash())
producer = KeyedProducer(kafka)
- producer.send_messages("my-topic", "key1", "some message")
- producer.send_messages("my-topic", "key2", "this methode")
+ producer.send_messages(b'my-topic', b'key1', b'some message')
+ producer.send_messages(b'my-topic', b'key2', b'this methode')
+ # Murmur2Partitioner attempts to mirror the java client hashing
+ producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
+
+ # Or just produce round-robin (or just use SimpleProducer)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
@@ -78,9 +85,9 @@ KafkaConsumer
from kafka import KafkaConsumer
# To consume messages
- consumer = KafkaConsumer("my-topic",
- group_id="my_group",
- bootstrap_servers=["localhost:9092"])
+ consumer = KafkaConsumer('my-topic',
+ group_id='my_group',
+ bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value is raw byte string -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
@@ -88,8 +95,6 @@ KafkaConsumer
message.offset, message.key,
message.value))
- kafka.close()
-
messages (m) are namedtuples with attributes:
@@ -121,16 +126,16 @@ messages (m) are namedtuples with attributes:
# so it can be included in the next commit
#
# **messages that are not marked w/ task_done currently do not commit!
- kafka.task_done(m)
+ consumer.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
- kafka.commit()
+ consumer.commit()
# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
- kafka.task_done(m)
+ consumer.task_done(m)
Configuration settings can be passed to constructor,
@@ -162,13 +167,13 @@ Multiprocess consumer
from kafka import KafkaClient, MultiProcessConsumer
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
# This will split the number of partitions among two processes
- consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
+ consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
# This will spawn processes such that each handles 2 partitions max
- consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
+ consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic',
partitions_per_proc=2)
for message in consumer:
@@ -186,14 +191,14 @@ Low level
from kafka.protocol import KafkaProtocol
from kafka.common import ProduceRequest
- kafka = KafkaClient("localhost:9092")
+ kafka = KafkaClient('localhost:9092')
- req = ProduceRequest(topic="my-topic", partition=1,
- messages=[create_message("some message")])
+ req = ProduceRequest(topic=b'my-topic', partition=1,
+ messages=[create_message(b'some message')])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
- resps[0].topic # "my-topic"
+ resps[0].topic # b'my-topic'
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request