diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-09 00:00:49 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-29 16:36:37 -0700 |
commit | c1ca3bf97ed91f25bd3d0489ca4f3a7d61ab95b8 (patch) | |
tree | b559c6b7d49ff958f3f81bcd56fcbb235a3e9f5e /docs | |
parent | 9be4146532e7f7c52b5a47caa3d3c5fe625ed69c (diff) | |
download | kafka-python-c1ca3bf97ed91f25bd3d0489ca4f3a7d61ab95b8.tar.gz |
Updates to KafkaConsumer usage docs
Diffstat (limited to 'docs')
-rw-r--r-- | docs/usage.rst | 68 |
1 files changed, 26 insertions, 42 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index acd52dc..150d121 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -80,32 +80,24 @@ KafkaConsumer # To consume messages consumer = KafkaConsumer("my-topic", group_id="my_group", - metadata_broker_list=["localhost:9092"]) + bootstrap_servers=["localhost:9092"]) for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) + # message value is raw byte string -- decode if necessary! + # e.g., for unicode: `message.value.decode('utf-8')` + print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, + message.offset, message.key, + message.value)) kafka.close() -.. code:: python - - from kafka import KafkaConsumer - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1', - metadata_broker_list=['localhost:9092']) - for m in kafka: - print m +messages (m) are namedtuples with attributes: - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) .. code:: python @@ -114,22 +106,22 @@ KafkaConsumer # more advanced consumer -- multiple topics w/ auto commit offset # management - kafka = KafkaConsumer('topic1', 'topic2', - metadata_broker_list=['localhost:9092'], - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') + consumer = KafkaConsumer('topic1', 'topic2', + bootstrap_servers=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) + for m in consumer: + do_some_work(m) - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) + # Mark this message as fully consumed + # so it can be included in the next commit + # + # **messages that are not marked w/ task_done currently do not commit! + kafka.task_done(m) # If auto_commit_enable is False, remember to commit() periodically kafka.commit() @@ -141,14 +133,6 @@ KafkaConsumer kafka.task_done(m) - messages (m) are namedtuples with attributes: - - * `m.topic`: topic name (str) - * `m.partition`: partition number (int) - * `m.offset`: message offset on topic-partition log (int) - * `m.key`: key (bytes - can be None) - * `m.value`: message (output of deserializer_class - default is raw bytes) - Configuration settings can be passed to constructor, otherwise defaults will be used: @@ -160,7 +144,7 @@ KafkaConsumer fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, - metadata_broker_list=None, + bootstrap_servers=[], socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, |