diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-03-29 18:09:03 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-03-29 18:09:03 -0700 |
commit | bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc (patch) | |
tree | ccb16a62c4da6d1b6e33801289fdbcfdf3c0713f /docs/usage.rst | |
parent | fd204dca174033e36899a0e20d2ce7ebccf11ddb (diff) | |
parent | 35b8f5b5d8b0888806d5d6c9ec02910327c3a671 (diff) | |
download | kafka-python-bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc.tar.gz |
Merge pull request #341 from dpkp/kafka_consumer_docs
KafkaConsumer documentation
Diffstat (limited to 'docs/usage.rst')
-rw-r--r-- | docs/usage.rst | 103 |
1 files changed, 89 insertions, 14 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index 141cf93..150d121 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,12 +1,12 @@ Usage ===== -High level ----------- +SimpleProducer +-------------- .. code:: python - from kafka import SimpleProducer, KafkaClient, KafkaConsumer + from kafka import SimpleProducer, KafkaClient # To send messages synchronously kafka = KafkaClient("localhost:9092") @@ -51,17 +51,6 @@ High level batch_send_every_n=20, batch_send_every_t=60) - # To consume messages - consumer = KafkaConsumer("my-topic", group_id="my_group", - metadata_broker_list=["localhost:9092"]) - for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - - kafka.close() - - Keyed messages -------------- @@ -80,6 +69,92 @@ Keyed messages producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + +KafkaConsumer +------------- + +.. code:: python + + from kafka import KafkaConsumer + + # To consume messages + consumer = KafkaConsumer("my-topic", + group_id="my_group", + bootstrap_servers=["localhost:9092"]) + for message in consumer: + # message value is raw byte string -- decode if necessary! + # e.g., for unicode: `message.value.decode('utf-8')` + print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, + message.offset, message.key, + message.value)) + + kafka.close() + + +messages (m) are namedtuples with attributes: + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) + + +.. code:: python + + from kafka import KafkaConsumer + + # more advanced consumer -- multiple topics w/ auto commit offset + # management + consumer = KafkaConsumer('topic1', 'topic2', + bootstrap_servers=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in consumer: + do_some_work(m) + + # Mark this message as fully consumed + # so it can be included in the next commit + # + # **messages that are not marked w/ task_done currently do not commit! + kafka.task_done(m) + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + + + Configuration settings can be passed to constructor, + otherwise defaults will be used: + +.. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + bootstrap_servers=[], + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 + + Configuration parameters are described in more detail at + http://kafka.apache.org/documentation.html#highlevelconsumerapi + Multiprocess consumer --------------------- |