summaryrefslogtreecommitdiff
path: root/docs/usage.rst
diff options
context:
space:
mode:
Diffstat (limited to 'docs/usage.rst')
-rw-r--r--docs/usage.rst246
1 files changed, 88 insertions, 158 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index 6417cd8..e74e5af 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -1,68 +1,126 @@
Usage
-=====
+*****
-SimpleProducer
---------------
+
+KafkaConsumer
+=============
.. code:: python
- from kafka import SimpleProducer, KafkaClient
+ from kafka import KafkaConsumer
- # To send messages synchronously
- kafka = KafkaClient('localhost:9092')
- producer = SimpleProducer(kafka)
+ # To consume latest messages and auto-commit offsets
+ consumer = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers=['localhost:9092'])
+ for message in consumer:
+ # message value and key are raw bytes -- decode if necessary!
+ # e.g., for unicode: `message.value.decode('utf-8')`
+ print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+ message.offset, message.key,
+ message.value))
- # Note that the application is responsible for encoding messages to type bytes
- producer.send_messages(b'my-topic', b'some message')
- producer.send_messages(b'my-topic', b'this method', b'is variadic')
+ # consume earliest available messages, dont commit offsets
+ KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
- # Send unicode message
- producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8'))
+ # consume json messages
+ KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+ # consume msgpack
+ KafkaConsumer(value_deserializer=msgpack.unpackb)
+
+ # StopIteration if no message after 1sec
+ KafkaConsumer(consumer_timeout_ms=1000)
+
+ # Subscribe to a regex topic pattern
+ consumer = KafkaConsumer()
+ consumer.subscribe(pattern='^awesome.*')
+
+ # Use multiple consumers in parallel w/ 0.9 kafka brokers
+ # typically you would run each on a different server / process / CPU
+ consumer1 = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers='my.server.com')
+ consumer2 = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers='my.server.com')
+
+
+There are many configuration options for the consumer class. See
+:class:`~kafka.KafkaConsumer` API documentation for more details.
+
+
+SimpleProducer
+==============
Asynchronous Mode
-----------------
.. code:: python
+ from kafka import SimpleProducer, SimpleClient
+
# To send messages asynchronously
- producer = SimpleProducer(kafka, async=True)
- producer.send_messages(b'my-topic', b'async message')
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=True)
+ producer.send_messages('my-topic', b'async message')
+
+ # To send messages in batch. You can use any of the available
+ # producers for doing this. The following producer will collect
+ # messages in batch and send them to Kafka after 20 messages are
+ # collected or every 60 seconds
+ # Notes:
+ # * If the producer dies before the messages are sent, there will be losses
+ # * Call producer.stop() to send the messages and cleanup
+ producer = SimpleProducer(client,
+ async=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+Synchronous Mode
+----------------
+
+.. code:: python
+
+ from kafka import SimpleProducer, SimpleClient
+
+ # To send messages synchronously
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=False)
+
+ # Note that the application is responsible for encoding messages to type bytes
+ producer.send_messages('my-topic', b'some message')
+ producer.send_messages('my-topic', b'this method', b'is variadic')
+
+ # Send unicode message
+ producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
- producer = SimpleProducer(kafka, async=False,
+ producer = SimpleProducer(client,
+ async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000,
sync_fail_on_error=False)
- responses = producer.send_messages(b'my-topic', b'another message')
+ responses = producer.send_messages('my-topic', b'another message')
for r in responses:
logging.info(r.offset)
- # To send messages in batch. You can use any of the available
- # producers for doing this. The following producer will collect
- # messages in batch and send them to Kafka after 20 messages are
- # collected or every 60 seconds
- # Notes:
- # * If the producer dies before the messages are sent, there will be losses
- # * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(kafka, async=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-Keyed messages
---------------
+KeyedProducer
+=============
.. code:: python
from kafka import (
- KafkaClient, KeyedProducer,
+ SimpleClient, KeyedProducer,
Murmur2Partitioner, RoundRobinPartitioner)
- kafka = KafkaClient('localhost:9092')
+ kafka = SimpleClient('localhost:9092')
# HashedPartitioner is default (currently uses python hash())
producer = KeyedProducer(kafka)
@@ -74,131 +132,3 @@ Keyed messages
# Or just produce round-robin (or just use SimpleProducer)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
-
-
-
-KafkaConsumer
--------------
-
-.. code:: python
-
- from kafka import KafkaConsumer
-
- # To consume messages
- consumer = KafkaConsumer('my-topic',
- group_id='my_group',
- bootstrap_servers=['localhost:9092'])
- for message in consumer:
- # message value is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.value.decode('utf-8')`
- print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
- message.offset, message.key,
- message.value))
-
-
-messages (m) are namedtuples with attributes:
-
- * `m.topic`: topic name (str)
- * `m.partition`: partition number (int)
- * `m.offset`: message offset on topic-partition log (int)
- * `m.key`: key (bytes - can be None)
- * `m.value`: message (output of deserializer_class - default is raw bytes)
-
-
-.. code:: python
-
- from kafka import KafkaConsumer
-
- # more advanced consumer -- multiple topics w/ auto commit offset
- # management
- consumer = KafkaConsumer('topic1', 'topic2',
- bootstrap_servers=['localhost:9092'],
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in consumer:
- do_some_work(m)
-
- # Mark this message as fully consumed
- # so it can be included in the next commit
- #
- # **messages that are not marked w/ task_done currently do not commit!
- consumer.task_done(m)
-
- # If auto_commit_enable is False, remember to commit() periodically
- consumer.commit()
-
- # Batch process interface
- while True:
- for m in kafka.fetch_messages():
- process_message(m)
- consumer.task_done(m)
-
-
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
-
-.. code:: python
-
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- bootstrap_servers=[],
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
-
- Configuration parameters are described in more detail at
- http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
-Multiprocess consumer
----------------------
-
-.. code:: python
-
- from kafka import KafkaClient, MultiProcessConsumer
-
- kafka = KafkaClient('localhost:9092')
-
- # This will split the number of partitions among two processes
- consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
-
- # This will spawn processes such that each handles 2 partitions max
- consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic',
- partitions_per_proc=2)
-
- for message in consumer:
- print(message)
-
- for message in consumer.get_messages(count=5, block=True, timeout=4):
- print(message)
-
-Low level
----------
-
-.. code:: python
-
- from kafka import KafkaClient, create_message
- from kafka.protocol import KafkaProtocol
- from kafka.common import ProduceRequest
-
- kafka = KafkaClient('localhost:9092')
-
- req = ProduceRequest(topic=b'my-topic', partition=1,
- messages=[create_message(b'some message')])
- resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
- kafka.close()
-
- resps[0].topic # b'my-topic'
- resps[0].partition # 1
- resps[0].error # 0 (hopefully)
- resps[0].offset # offset of the first message sent in this request