diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-03-29 18:09:03 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-03-29 18:09:03 -0700 |
commit | bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc (patch) | |
tree | ccb16a62c4da6d1b6e33801289fdbcfdf3c0713f | |
parent | fd204dca174033e36899a0e20d2ce7ebccf11ddb (diff) | |
parent | 35b8f5b5d8b0888806d5d6c9ec02910327c3a671 (diff) | |
download | kafka-python-bb1c11e199a91f50d227ac9d95ea0c81a3f1bbfc.tar.gz |
Merge pull request #341 from dpkp/kafka_consumer_docs
KafkaConsumer documentation
-rw-r--r-- | docs/requirements.txt | 1 | ||||
-rw-r--r-- | docs/usage.rst | 103 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 228 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 2 |
4 files changed, 185 insertions, 149 deletions
diff --git a/docs/requirements.txt b/docs/requirements.txt index 86b4f05..d32365f 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,5 +1,6 @@ sphinx sphinxcontrib-napoleon +sphinx_rtd_theme # Install kafka-python in editable mode # This allows the sphinx autodoc module diff --git a/docs/usage.rst b/docs/usage.rst index 141cf93..150d121 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,12 +1,12 @@ Usage ===== -High level ----------- +SimpleProducer +-------------- .. code:: python - from kafka import SimpleProducer, KafkaClient, KafkaConsumer + from kafka import SimpleProducer, KafkaClient # To send messages synchronously kafka = KafkaClient("localhost:9092") @@ -51,17 +51,6 @@ High level batch_send_every_n=20, batch_send_every_t=60) - # To consume messages - consumer = KafkaConsumer("my-topic", group_id="my_group", - metadata_broker_list=["localhost:9092"]) - for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - - kafka.close() - - Keyed messages -------------- @@ -80,6 +69,92 @@ Keyed messages producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + +KafkaConsumer +------------- + +.. code:: python + + from kafka import KafkaConsumer + + # To consume messages + consumer = KafkaConsumer("my-topic", + group_id="my_group", + bootstrap_servers=["localhost:9092"]) + for message in consumer: + # message value is raw byte string -- decode if necessary! + # e.g., for unicode: `message.value.decode('utf-8')` + print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, + message.offset, message.key, + message.value)) + + kafka.close() + + +messages (m) are namedtuples with attributes: + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) + + +.. code:: python + + from kafka import KafkaConsumer + + # more advanced consumer -- multiple topics w/ auto commit offset + # management + consumer = KafkaConsumer('topic1', 'topic2', + bootstrap_servers=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in consumer: + do_some_work(m) + + # Mark this message as fully consumed + # so it can be included in the next commit + # + # **messages that are not marked w/ task_done currently do not commit! + kafka.task_done(m) + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + + + Configuration settings can be passed to constructor, + otherwise defaults will be used: + +.. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + bootstrap_servers=[], + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 + + Configuration parameters are described in more detail at + http://kafka.apache.org/documentation.html#highlevelconsumerapi + Multiprocess consumer --------------------- diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f03d15e..6f5bcdd 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -52,114 +52,59 @@ DEPRECATED_CONFIG_KEYS = { } class KafkaConsumer(object): - """ - A simpler kafka consumer - - .. code:: python - - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1', - bootstrap_servers=['localhost:9092']) - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - - - .. code:: python - - # more advanced consumer -- multiple topics w/ auto commit offset - # management - kafka = KafkaConsumer('topic1', 'topic2', - bootstrap_servers=['localhost:9092'], - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): - process_message(m) - kafka.task_done(m) - - - messages (m) are namedtuples with attributes: - - * `m.topic`: topic name (str) - * `m.partition`: partition number (int) - * `m.offset`: message offset on topic-partition log (int) - * `m.key`: key (bytes - can be None) - * `m.value`: message (output of deserializer_class - default is raw bytes) - - Configuration settings can be passed to constructor, - otherwise defaults will be used: - - .. code:: python - - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - bootstrap_servers=[], - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 - - Configuration parameters are described in more detail at - http://kafka.apache.org/documentation.html#highlevelconsumerapi - """ + """A simpler kafka consumer""" def __init__(self, *topics, **configs): self.configure(**configs) self.set_topic_partitions(*topics) def configure(self, **configs): - """ + """Configure the consumer instance + Configuration settings can be passed to constructor, otherwise defaults will be used: - .. code:: python - - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - bootstrap_servers=[], - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - auto_commit_interval_messages=None, - consumer_timeout_ms=-1 + Keyword Arguments: + bootstrap_servers (list): List of initial broker nodes the consumer + should contact to bootstrap initial cluster metadata. This does + not have to be the full node list. It just needs to have at + least one broker that will respond to a Metadata API Request. + client_id (str): a unique name for this client. Defaults to + 'kafka.consumer.kafka'. + group_id (str): the name of the consumer group to join, + Offsets are fetched / committed to this group name. + fetch_message_max_bytes (int, optional): Maximum bytes for each + topic/partition fetch request. Defaults to 1024*1024. + fetch_min_bytes (int, optional): Minimum amount of data the server + should return for a fetch request, otherwise wait up to + fetch_wait_max_ms for more data to accumulate. Defaults to 1. + fetch_wait_max_ms (int, optional): Maximum time for the server to + block waiting for fetch_min_bytes messages to accumulate. + Defaults to 100. + refresh_leader_backoff_ms (int, optional): Milliseconds to backoff + when refreshing metadata on errors (subject to random jitter). + Defaults to 200. + socket_timeout_ms (int, optional): TCP socket timeout in + milliseconds. Defaults to 30*1000. + auto_offset_reset (str, optional): A policy for resetting offsets on + OffsetOutOfRange errors. 'smallest' will move to the oldest + available message, 'largest' will move to the most recent. Any + ofther value will raise the exception. Defaults to 'largest'. + deserializer_class (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. Defaults to + lambda msg: msg. + auto_commit_enable (bool, optional): Enabling auto-commit will cause + the KafkaConsumer to periodically commit offsets without an + explicit call to commit(). Defaults to False. + auto_commit_interval_ms (int, optional): If auto_commit_enabled, + the milliseconds between automatic offset commits. Defaults to + 60 * 1000. + auto_commit_interval_messages (int, optional): If + auto_commit_enabled, a number of messages consumed between + automatic offset commits. Defaults to None (disabled). + consumer_timeout_ms (int, optional): number of millisecond to throw + a timeout exception to the consumer if no message is available + for consumption. Defaults to -1 (dont throw exception). Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -316,18 +261,18 @@ class KafkaConsumer(object): self._reset_message_iterator() def next(self): - """ - Return a single message from the message iterator - If consumer_timeout_ms is set, will raise ConsumerTimeout - if no message is available - Otherwise blocks indefinitely + """Return the next available message - Note that this is also the method called internally during iteration: + Blocks indefinitely unless consumer_timeout_ms > 0 - .. code:: python + Returns: + a single KafkaMessage from the message iterator - for m in consumer: - pass + Raises: + ConsumerTimeout after consumer_timeout_ms and no message + + Note: + This is also the method called internally during iteration """ self._set_consumer_timeout_start() @@ -343,21 +288,24 @@ class KafkaConsumer(object): self._check_consumer_timeout() def fetch_messages(self): - """ - Sends FetchRequests for all topic/partitions set for consumption - Returns a generator that yields KafkaMessage structs - after deserializing with the configured `deserializer_class` + """Sends FetchRequests for all topic/partitions set for consumption + + Returns: + Generator that yields KafkaMessage structs + after deserializing with the configured `deserializer_class` - Refreshes metadata on errors, and resets fetch offset on - OffsetOutOfRange, per the configured `auto_offset_reset` policy + Note: + Refreshes metadata on errors, and resets fetch offset on + OffsetOutOfRange, per the configured `auto_offset_reset` policy - Key configuration parameters: + See Also: + Key KafkaConsumer configuration parameters: + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` - * `fetch_message_max_bytes` - * `fetch_max_wait_ms` - * `fetch_min_bytes` - * `deserializer_class` - * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -436,21 +384,22 @@ class KafkaConsumer(object): yield msg def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """ - Request available fetch offsets for a single topic/partition + """Request available fetch offsets for a single topic/partition - Arguments: - topic (str) - partition (int) + Keyword Arguments: + topic (str): topic for offset request + partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element. - max_num_offsets (int) + max_num_offsets (int): Maximum offsets to include in the OffsetResponse Returns: - offsets (list) + a list of offsets in the OffsetResponse submitted for the provided + topic / partition. See: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -466,7 +415,8 @@ class KafkaConsumer(object): return resp.offsets def offsets(self, group=None): - """ + """Get internal consumer offset values + Keyword Arguments: group: Either "fetch", "commit", "task_done", or "highwater". If no group specified, returns all groups. @@ -485,10 +435,17 @@ class KafkaConsumer(object): return dict(deepcopy(getattr(self._offsets, group))) def task_done(self, message): - """ - Mark a fetched message as consumed. + """Mark a fetched message as consumed. + Offsets for messages marked as "task_done" will be stored back to the kafka cluster for this consumer group on commit() + + Arguments: + message (KafkaMessage): the message to mark as complete + + Returns: + Nothing + """ topic_partition = (message.topic, message.partition) offset = message.offset @@ -516,12 +473,15 @@ class KafkaConsumer(object): self.commit() def commit(self): - """ - Store consumed message offsets (marked via task_done()) + """Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - **Note**: this functionality requires server version >=0.8.1.1 - See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. + Returns: + True on success, or False if no offsets were found for commit + + Note: + this functionality requires server version >=0.8.1.1 + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d3df56a..17a8ac9 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -69,7 +69,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def kafka_consumer(self, **configs): brokers = '%s:%d' % (self.server.host, self.server.port) consumer = KafkaConsumer(self.topic, - metadata_broker_list=brokers, + bootstrap_servers=brokers, **configs) return consumer |