diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 12:51:34 -0800 |
commit | e093ffefaecb59c26f2e480214f72a03ba5a49fc (patch) | |
tree | 88b4bd434b29a751b7e0adebbe4e7514a4c7e844 /kafka/consumer | |
parent | 1dd9e8bb05b6efc2888ac4cae8e7199b35dd633f (diff) | |
download | kafka-python-e093ffefaecb59c26f2e480214f72a03ba5a49fc.tar.gz |
More Docstring Improvements
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 133 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 113 |
3 files changed, 194 insertions, 62 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 39e1244..a4be7ae 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -39,6 +39,33 @@ class Fetcher(object): } def __init__(self, client, subscriptions, **configs): + """Initialize a Kafka Message Fetcher. + + Keyword Arguments: + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + """ #metrics=None, #metric_group_prefix='consumer', self.config = copy.copy(self.DEFAULT_CONFIG) @@ -56,7 +83,11 @@ class Fetcher(object): #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): - """Send FetchRequests asynchronously for all assigned partitions""" + """Send FetchRequests asynchronously for all assigned partitions. + + Returns: + List of Futures: each future resolves to a FetchResponse + """ futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -70,8 +101,11 @@ class Fetcher(object): def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. - @param partitions: iterable of TopicPartitions - @raises NoOffsetForPartitionError If no offset is stored for a given + Arguments: + partitions (list of TopicPartitions): partitions to update + + Raises: + NoOffsetForPartitionError: if no offset is stored for a given partition and no reset policy is available """ # reset the fetch position to the committed position @@ -104,8 +138,11 @@ class Fetcher(object): def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. - @param partition The given partition that needs reset offset - @raises NoOffsetForPartitionError If no offset reset strategy is defined + Arguments: + partition (TopicPartition): the partition that needs reset offset + + Raises: + NoOffsetForPartitionError: if no offset reset strategy is defined """ timestamp = self._subscriptions.assignment[partition].reset_strategy if timestamp is OffsetResetStrategy.EARLIEST: @@ -129,11 +166,14 @@ class Fetcher(object): Blocks until offset is obtained, or a non-retriable exception is raised - @param partition The partition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @raises exceptions - @return The offset of the message that is published before the given - timestamp + Arguments: + partition The partition that needs fetching offset. + timestamp (int): timestamp for fetching offset. -1 for the latest + available, -2 for the earliest available. Otherwise timestamp + is treated as epoch seconds. + + Returns: + int: message offset """ while True: future = self._send_offset_request(partition, timestamp) @@ -150,10 +190,12 @@ class Fetcher(object): self._client.poll(future=refresh_future) def _raise_if_offset_out_of_range(self): - """ - If any partition from previous FetchResponse contains - OffsetOutOfRangeError and the default_reset_policy is None, - raise OffsetOutOfRangeError + """Check FetchResponses for offset out of range. + + Raises: + OffsetOutOfRangeError: if any partition from previous FetchResponse + contains OffsetOutOfRangeError and the default_reset_policy is + None """ current_out_of_range_partitions = {} @@ -174,11 +216,10 @@ class Fetcher(object): raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions) def _raise_if_unauthorized_topics(self): - """ - If any topic from previous FetchResponse contains an Authorization - error, raise an exception + """Check FetchResponses for topic authorization failures. - @raise TopicAuthorizationFailedError + Raises: + TopicAuthorizationFailedError """ if self._unauthorized_topics: topics = set(self._unauthorized_topics) @@ -186,12 +227,10 @@ class Fetcher(object): raise Errors.TopicAuthorizationFailedError(topics) def _raise_if_record_too_large(self): - """ - If any partition from previous FetchResponse gets a RecordTooLarge - error, raise RecordTooLargeError + """Check FetchResponses for messages larger than the max per partition. - @raise RecordTooLargeError If there is a message larger than fetch size - and hence cannot be ever returned + Raises: + RecordTooLargeError: if there is a message larger than fetch size """ copied_record_too_large_partitions = dict(self._record_too_large_partitions) self._record_too_large_partitions.clear() @@ -207,12 +246,21 @@ class Fetcher(object): self.config['max_partition_fetch_bytes']) def fetched_records(self): - """Returns previously fetched records and updates consumed offsets + """Returns previously fetched records and updates consumed offsets. NOTE: returning empty records guarantees the consumed position are NOT updated. - @return {TopicPartition: deque([messages])} - @raises OffsetOutOfRangeError if no subscription offset_reset_strategy + Raises: + OffsetOutOfRangeError: if no subscription offset_reset_strategy + InvalidMessageError: if message crc validation fails (check_crcs + must be set to True) + RecordTooLargeError: if a message is larger than the currently + configured max_partition_fetch_bytes + TopicAuthorizationError: if consumer is not authorized to fetch + messages from the topic + + Returns: + dict: {TopicPartition: deque([messages])} """ if self._subscriptions.needs_partition_assignment: return {} @@ -280,12 +328,14 @@ class Fetcher(object): return key, value def _send_offset_request(self, partition, timestamp): - """ - Fetch a single offset before the given timestamp for the partition. + """Fetch a single offset before the given timestamp for the partition. - @param partition The TopicPartition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @return A future which can be polled to obtain the corresponding offset. + Arguments: + partition (TopicPartition): partition that needs fetching offset + timestamp (int): timestamp for fetching offset + + Returns: + Future: resolves to the corresponding offset """ node_id = self._client.cluster.leader_for_partition(partition) if node_id is None: @@ -315,11 +365,13 @@ class Fetcher(object): def _handle_offset_response(self, partition, future, response): """Callback for the response of the list offset call above. - @param partition The partition that was fetched - @param future the future to update based on response - @param response The OffsetResponse from the server + Arguments: + partition (TopicPartition): The partition that was fetched + future (Future): the future to update based on response + response (OffsetResponse): response from the server - @raises IllegalStateError if response does not match partition + Raises: + IllegalStateError: if response does not match partition """ topic, partition_info = response.topics[0] if len(response.topics) != 1 or len(partition_info) != 1: @@ -351,10 +403,13 @@ class Fetcher(object): future.failure(error_type(partition)) def _create_fetch_requests(self): - """ - Create fetch requests for all assigned partitions, grouped by node - Except where no leader, node has requests in flight, or we have - not returned all previously fetched records to consumer + """Create fetch requests for all assigned partitions, grouped by node. + + FetchRequests skipped if no leader, node has requests in flight, or we + have not returned all previously fetched records to consumer + + Returns: + dict: {node_id: [FetchRequest,...]} """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 00955f8..14485d2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -114,6 +114,10 @@ class KafkaConsumer(object): periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000. + default_offset_commit_callback (callable): called as + callback(offsets, response) response will be either an Exception + or a OffsetCommitResponse struct. This callback can be used to + trigger custom actions when a commit request completes. check_crcs (bool): Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may @@ -438,13 +442,17 @@ class KafkaConsumer(object): self._subscription.resume(partition) def seek(self, partition, offset): - """Manually specify the fetch offset for a TopicPartition + """Manually specify the fetch offset for a TopicPartition. Overrides the fetch offsets that the consumer will use on the next poll(). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition """ if offset < 0: raise Errors.IllegalStateError("seek offset must not be a negative number") diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 38d4571..fa36bc2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -42,10 +42,10 @@ class SubscriptionState(object): def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance - offset_reset_strategy: 'earliest' or 'latest', otherwise - exception will be raised when fetching an offset - that is no longer available. - Defaults to earliest. + Keyword Arguments: + offset_reset_strategy: 'earliest' or 'latest', otherwise + exception will be raised when fetching an offset that is no + longer available. Default: 'earliest' """ try: offset_reset_strategy = getattr(OffsetResetStrategy, @@ -67,14 +67,39 @@ class SubscriptionState(object): self.needs_fetch_committed_offsets = True def subscribe(self, topics=(), pattern=None, listener=None): - """Subscribe to a list of topics, or a topic regex pattern + """Subscribe to a list of topics, or a topic regex pattern. - Partitions will be assigned via a group coordinator - (incompatible with assign_from_user) + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). - Optionally include listener callback, which must be a - ConsumerRebalanceListener and will be called before and - after each rebalance operation. + This method is incompatible with assign_from_user() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. """ if self._user_assignment or (topics and pattern): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -93,6 +118,14 @@ class SubscriptionState(object): self.listener = listener def change_subscription(self, topics): + """Change the topic subscription. + + Arguments: + topics (list of str): topics for subscription + + Raises: + IllegalStateErrror: if assign_from_user has been used already + """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -117,7 +150,8 @@ class SubscriptionState(object): This is used by the group leader to ensure that it receives metadata updates for all topics that any member of the group is subscribed to. - @param topics list of topics to add to the group subscription + Arguments: + topics (list of str): topics to add to the group subscription """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -128,12 +162,22 @@ class SubscriptionState(object): self.needs_partition_assignment = True def assign_from_user(self, partitions): - """ - Change the assignment to the specified partitions provided by the user, - note this is different from assign_from_subscribed() - whose input partitions are provided from the subscribed topics. + """Manually assign a list of TopicPartitions to this consumer. + + This interface does not allow for incremental assignment and will + replace the previous assignment (if there was one). - @param partitions: list (or iterable) of TopicPartition() + Manual topic assignment through this method does not use the consumer's + group management functionality. As such, there will be no rebalance + operation triggered when group membership or cluster and topic metadata + change. Note that it is not possible to use both manual partition + assignment with assign() and group assignment with subscribe(). + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() """ if self.subscription is not None: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -175,6 +219,7 @@ class SubscriptionState(object): log.info("Updated partition assignment: %s", assignments) def unsubscribe(self): + """Clear all topic subscriptions and partition assignments""" self.subscription = None self._user_assignment.clear() self.assignment.clear() @@ -191,17 +236,32 @@ class SubscriptionState(object): that would require rebalancing (the leader fetches metadata for all topics in the group so that it can do partition assignment). - @return set of topics + Returns: + set: topics """ return self._group_subscription def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ self.assignment[partition].seek(offset) def assigned_partitions(self): + """Return set of TopicPartitions in current assignment.""" return set(self.assignment.keys()) def fetchable_partitions(self): + """Return set of TopicPartitions that should be Fetched.""" fetchable = set() for partition, state in six.iteritems(self.assignment): if state.is_fetchable(): @@ -209,6 +269,7 @@ class SubscriptionState(object): return fetchable def partitions_auto_assigned(self): + """Return True unless user supplied partitions manually.""" return self.subscription is not None def all_consumed_offsets(self): @@ -220,11 +281,18 @@ class SubscriptionState(object): return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): + """Mark partition for offset reset using specified or default strategy. + + Arguments: + partition (TopicPartition): partition to mark + offset_reset_strategy (OffsetResetStrategy, optional) + """ if offset_reset_strategy is None: offset_reset_strategy = self._default_offset_reset_strategy self.assignment[partition].await_reset(offset_reset_strategy) def has_default_offset_reset_policy(self): + """Return True if default offset reset policy is Earliest or Latest""" return self._default_offset_reset_strategy != OffsetResetStrategy.NONE def is_offset_reset_needed(self, partition): @@ -372,8 +440,9 @@ class ConsumerRebalanceListener(object): NOTE: This method is only called before rebalances. It is not called prior to KafkaConsumer.close() - @param partitions The list of partitions that were assigned to the - consumer on the last rebalance + Arguments: + revoked (list of TopicPartition): the partitions that were assigned + to the consumer on the last rebalance """ pass @@ -389,8 +458,8 @@ class ConsumerRebalanceListener(object): their on_partitions_revoked() callback before any instance executes its on_partitions_assigned() callback. - @param partitions The list of partitions that are now assigned to the - consumer (may include partitions previously assigned - to the consumer) + Arguments: + assigned (list of TopicPartition): the partitions assigned to the + consumer (may include partitions that were previously assigned) """ pass |