summaryrefslogtreecommitdiff
path: root/docs/usage.rst
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-03-29 18:09:03 -0700
committerDana Powers <dana.powers@gmail.com>2015-03-29 18:09:03 -0700
commitbb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc (patch)
treeccb16a62c4da6d1b6e33801289fdbcfdf3c0713f /docs/usage.rst
parentfd204dca174033e36899a0e20d2ce7ebccf11ddb (diff)
parent35b8f5b5d8b0888806d5d6c9ec02910327c3a671 (diff)
downloadkafka-python-bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc.tar.gz
Merge pull request #341 from dpkp/kafka_consumer_docs
KafkaConsumer documentation
Diffstat (limited to 'docs/usage.rst')
-rw-r--r--docs/usage.rst103
1 files changed, 89 insertions, 14 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index 141cf93..150d121 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -1,12 +1,12 @@
Usage
=====
-High level
-----------
+SimpleProducer
+--------------
.. code:: python
- from kafka import SimpleProducer, KafkaClient, KafkaConsumer
+ from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
kafka = KafkaClient("localhost:9092")
@@ -51,17 +51,6 @@ High level
batch_send_every_n=20,
batch_send_every_t=60)
- # To consume messages
- consumer = KafkaConsumer("my-topic", group_id="my_group",
- metadata_broker_list=["localhost:9092"])
- for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
-
- kafka.close()
-
-
Keyed messages
--------------
@@ -80,6 +69,92 @@ Keyed messages
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+KafkaConsumer
+-------------
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # To consume messages
+ consumer = KafkaConsumer("my-topic",
+ group_id="my_group",
+ bootstrap_servers=["localhost:9092"])
+ for message in consumer:
+ # message value is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.value.decode('utf-8')`
+ print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+ message.offset, message.key,
+ message.value))
+
+ kafka.close()
+
+
+messages (m) are namedtuples with attributes:
+
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
+
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # more advanced consumer -- multiple topics w/ auto commit offset
+ # management
+ consumer = KafkaConsumer('topic1', 'topic2',
+ bootstrap_servers=['localhost:9092'],
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in consumer:
+ do_some_work(m)
+
+ # Mark this message as fully consumed
+ # so it can be included in the next commit
+ #
+ # **messages that are not marked w/ task_done currently do not commit!
+ kafka.task_done(m)
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+
+
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+
+.. code:: python
+
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ bootstrap_servers=[],
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+
Multiprocess consumer
---------------------