summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-09 00:00:49 -0700
committerDana Powers <dana.powers@rd.io>2015-03-29 16:36:37 -0700
commitc1ca3bf97ed91f25bd3d0489ca4f3a7d61ab95b8 (patch)
treeb559c6b7d49ff958f3f81bcd56fcbb235a3e9f5e /docs
parent9be4146532e7f7c52b5a47caa3d3c5fe625ed69c (diff)
downloadkafka-python-c1ca3bf97ed91f25bd3d0489ca4f3a7d61ab95b8.tar.gz
Updates to KafkaConsumer usage docs
Diffstat (limited to 'docs')
-rw-r--r--docs/usage.rst68
1 files changed, 26 insertions, 42 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index acd52dc..150d121 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -80,32 +80,24 @@ KafkaConsumer
# To consume messages
consumer = KafkaConsumer("my-topic",
group_id="my_group",
- metadata_broker_list=["localhost:9092"])
+ bootstrap_servers=["localhost:9092"])
for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
+ # message value is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.value.decode('utf-8')`
+ print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+ message.offset, message.key,
+ message.value))
kafka.close()
-.. code:: python
-
- from kafka import KafkaConsumer
- # A very basic 'tail' consumer, with no stored offset management
- kafka = KafkaConsumer('topic1',
- metadata_broker_list=['localhost:9092'])
- for m in kafka:
- print m
+messages (m) are namedtuples with attributes:
- # Alternate interface: next()
- print kafka.next()
-
- # Alternate interface: batch iteration
- while True:
- for m in kafka.fetch_messages():
- print m
- print "Done with batch - let's do another!"
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
.. code:: python
@@ -114,22 +106,22 @@ KafkaConsumer
# more advanced consumer -- multiple topics w/ auto commit offset
# management
- kafka = KafkaConsumer('topic1', 'topic2',
- metadata_broker_list=['localhost:9092'],
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
+ consumer = KafkaConsumer('topic1', 'topic2',
+ bootstrap_servers=['localhost:9092'],
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
# Infinite iteration
- for m in kafka:
- process_message(m)
- kafka.task_done(m)
+ for m in consumer:
+ do_some_work(m)
- # Alternate interface: next()
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
+ # Mark this message as fully consumed
+ # so it can be included in the next commit
+ #
+ # **messages that are not marked w/ task_done currently do not commit!
+ kafka.task_done(m)
# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()
@@ -141,14 +133,6 @@ KafkaConsumer
kafka.task_done(m)
- messages (m) are namedtuples with attributes:
-
- * `m.topic`: topic name (str)
- * `m.partition`: partition number (int)
- * `m.offset`: message offset on topic-partition log (int)
- * `m.key`: key (bytes - can be None)
- * `m.value`: message (output of deserializer_class - default is raw bytes)
-
Configuration settings can be passed to constructor,
otherwise defaults will be used:
@@ -160,7 +144,7 @@ KafkaConsumer
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
+ bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,