diff options
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 228 |
1 files changed, 94 insertions, 134 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f03d15e..5d98f81 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -52,114 +52,59 @@ DEPRECATED_CONFIG_KEYS = { } class KafkaConsumer(object): - """ - A simpler kafka consumer - - .. code:: python - - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1', - bootstrap_servers=['localhost:9092']) - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - - - .. code:: python - - # more advanced consumer -- multiple topics w/ auto commit offset - # management - kafka = KafkaConsumer('topic1', 'topic2', - bootstrap_servers=['localhost:9092'], - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): - process_message(m) - kafka.task_done(m) - - - messages (m) are namedtuples with attributes: - - * `m.topic`: topic name (str) - * `m.partition`: partition number (int) - * `m.offset`: message offset on topic-partition log (int) - * `m.key`: key (bytes - can be None) - * `m.value`: message (output of deserializer_class - default is raw bytes) - - Configuration settings can be passed to constructor, - otherwise defaults will be used: - - .. code:: python - - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - bootstrap_servers=[], - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 - - Configuration parameters are described in more detail at - http://kafka.apache.org/documentation.html#highlevelconsumerapi - """ + """A simpler kafka consumer""" def __init__(self, *topics, **configs): self.configure(**configs) self.set_topic_partitions(*topics) def configure(self, **configs): - """ + """Configure the consumer instance + Configuration settings can be passed to constructor, otherwise defaults will be used: - .. code:: python - - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - bootstrap_servers=[], - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - auto_commit_interval_messages=None, - consumer_timeout_ms=-1 + Keyword Arguments: + bootstrap_servers (list): List of initial broker nodes the consumer + should contact to bootstrap initial cluster metadata. This does + not have to be the full node list. It just needs to have at + least one broker that will respond to a Metadata API Request. + client_id (str): a unique name for this client. Defaults to + 'kafka.consumer.kafka'. + group_id (str): the name of the consumer group to join, + Offsets are fetched / committed to this group name. + fetch_message_max_bytes (int, optional): Maximum bytes for each + topic/partition fetch request. Defaults to 1024*1024. + fetch_min_bytes (int, optional): Minimum amount of data the server + should return for a fetch request, otherwise wait up to + fetch_wait_max_ms for more data to accumulate. Defaults to 1. + fetch_wait_max_ms (int, optional): Maximum time for the server to + block waiting for fetch_min_bytes messages to accumulate. + Defaults to 100. + refresh_leader_backoff_ms (int, optional): Milliseconds to backoff + when refreshing metadata on errors (subject to random jitter). + Defaults to 200. + socket_timeout_ms (int, optional): TCP socket timeout in + milliseconds. Defaults to 30*1000. + auto_offset_reset (str, optional): A policy for resetting offsets on + OffsetOutOfRange errors. 'smallest' will move to the oldest + available message, 'largest' will move to the most recent. Any + ofther value will raise the exception. Defaults to 'smallest'. + deserializer_class (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. Defaults to + lambda msg: msg. + auto_commit_enable (bool, optional): Enabling auto-commit will cause + the KafkaConsumer to periodically commit offsets without an + explicit call to commit(). Defaults to False. + auto_commit_interval_ms (int, optional): If auto_commit_enabled, + the milliseconds between automatic offset commits. Defaults to + 60 * 1000. + auto_commit_interval_messages (int, optional): If + auto_commit_enabled, a number of messages consumed between + automatic offset commits. Defaults to None (disabled). + consumer_timeout_ms (int, optional): number of millisecond to throw + a timeout exception to the consumer if no message is available + for consumption. Defaults to -1 (dont throw exception). Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -316,18 +261,18 @@ class KafkaConsumer(object): self._reset_message_iterator() def next(self): - """ - Return a single message from the message iterator - If consumer_timeout_ms is set, will raise ConsumerTimeout - if no message is available - Otherwise blocks indefinitely + """Return the next available message - Note that this is also the method called internally during iteration: + Blocks indefinitely unless consumer_timeout_ms > 0 - .. code:: python + Returns: + a single KafkaMessage from the message iterator - for m in consumer: - pass + Raises: + ConsumerTimeout after consumer_timeout_ms and no message + + Note: + This is also the method called internally during iteration """ self._set_consumer_timeout_start() @@ -343,21 +288,24 @@ class KafkaConsumer(object): self._check_consumer_timeout() def fetch_messages(self): - """ - Sends FetchRequests for all topic/partitions set for consumption - Returns a generator that yields KafkaMessage structs - after deserializing with the configured `deserializer_class` + """Sends FetchRequests for all topic/partitions set for consumption + + Returns: + Generator that yields KafkaMessage structs + after deserializing with the configured `deserializer_class` - Refreshes metadata on errors, and resets fetch offset on - OffsetOutOfRange, per the configured `auto_offset_reset` policy + Note: + Refreshes metadata on errors, and resets fetch offset on + OffsetOutOfRange, per the configured `auto_offset_reset` policy - Key configuration parameters: + See Also: + Key KafkaConsumer configuration parameters: + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` - * `fetch_message_max_bytes` - * `fetch_max_wait_ms` - * `fetch_min_bytes` - * `deserializer_class` - * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -436,21 +384,22 @@ class KafkaConsumer(object): yield msg def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """ - Request available fetch offsets for a single topic/partition + """Request available fetch offsets for a single topic/partition - Arguments: - topic (str) - partition (int) + Keyword Arguments: + topic (str): topic for offset request + partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element. - max_num_offsets (int) + max_num_offsets (int): Maximum offsets to include in the OffsetResponse Returns: - offsets (list) + a list of offsets in the OffsetResponse submitted for the provided + topic / partition. See: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -466,7 +415,8 @@ class KafkaConsumer(object): return resp.offsets def offsets(self, group=None): - """ + """Get internal consumer offset values + Keyword Arguments: group: Either "fetch", "commit", "task_done", or "highwater". If no group specified, returns all groups. @@ -485,10 +435,17 @@ class KafkaConsumer(object): return dict(deepcopy(getattr(self._offsets, group))) def task_done(self, message): - """ - Mark a fetched message as consumed. + """Mark a fetched message as consumed. + Offsets for messages marked as "task_done" will be stored back to the kafka cluster for this consumer group on commit() + + Arguments: + message (KafkaMessage): the message to mark as complete + + Returns: + Nothing + """ topic_partition = (message.topic, message.partition) offset = message.offset @@ -516,12 +473,15 @@ class KafkaConsumer(object): self.commit() def commit(self): - """ - Store consumed message offsets (marked via task_done()) + """Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - **Note**: this functionality requires server version >=0.8.1.1 - See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. + Returns: + True on success, or False if no offsets were found for commit + + Note: + this functionality requires server version >=0.8.1.1 + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') |