diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 133 |
1 files changed, 94 insertions, 39 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 39e1244..a4be7ae 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -39,6 +39,33 @@ class Fetcher(object): } def __init__(self, client, subscriptions, **configs): + """Initialize a Kafka Message Fetcher. + + Keyword Arguments: + key_deserializer (callable): Any callable that takes a + raw message key and returns a deserialized key. + value_deserializer (callable, optional): Any callable that takes a + raw message value and returns a deserialized value. + fetch_min_bytes (int): Minimum amount of data the server should + return for a fetch request, otherwise wait up to + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds + the server will block before answering the fetch request if + there isn't sufficient data to immediately satisfy the + requirement given by fetch_min_bytes. Default: 500. + max_partition_fetch_bytes (int): The maximum amount of data + per-partition the server will return. The maximum total memory + used for a request = #partitions * max_partition_fetch_bytes. + This size must be at least as large as the maximum message size + the server allows or else it is possible for the producer to + send messages larger than the consumer can fetch. If that + happens, the consumer can get stuck trying to fetch a large + message on a certain partition. Default: 1048576. + check_crcs (bool): Automatically check the CRC32 of the records + consumed. This ensures no on-the-wire or on-disk corruption to + the messages occurred. This check adds some overhead, so it may + be disabled in cases seeking extreme performance. Default: True + """ #metrics=None, #metric_group_prefix='consumer', self.config = copy.copy(self.DEFAULT_CONFIG) @@ -56,7 +83,11 @@ class Fetcher(object): #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): - """Send FetchRequests asynchronously for all assigned partitions""" + """Send FetchRequests asynchronously for all assigned partitions. + + Returns: + List of Futures: each future resolves to a FetchResponse + """ futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -70,8 +101,11 @@ class Fetcher(object): def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. - @param partitions: iterable of TopicPartitions - @raises NoOffsetForPartitionError If no offset is stored for a given + Arguments: + partitions (list of TopicPartitions): partitions to update + + Raises: + NoOffsetForPartitionError: if no offset is stored for a given partition and no reset policy is available """ # reset the fetch position to the committed position @@ -104,8 +138,11 @@ class Fetcher(object): def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. - @param partition The given partition that needs reset offset - @raises NoOffsetForPartitionError If no offset reset strategy is defined + Arguments: + partition (TopicPartition): the partition that needs reset offset + + Raises: + NoOffsetForPartitionError: if no offset reset strategy is defined """ timestamp = self._subscriptions.assignment[partition].reset_strategy if timestamp is OffsetResetStrategy.EARLIEST: @@ -129,11 +166,14 @@ class Fetcher(object): Blocks until offset is obtained, or a non-retriable exception is raised - @param partition The partition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @raises exceptions - @return The offset of the message that is published before the given - timestamp + Arguments: + partition The partition that needs fetching offset. + timestamp (int): timestamp for fetching offset. -1 for the latest + available, -2 for the earliest available. Otherwise timestamp + is treated as epoch seconds. + + Returns: + int: message offset """ while True: future = self._send_offset_request(partition, timestamp) @@ -150,10 +190,12 @@ class Fetcher(object): self._client.poll(future=refresh_future) def _raise_if_offset_out_of_range(self): - """ - If any partition from previous FetchResponse contains - OffsetOutOfRangeError and the default_reset_policy is None, - raise OffsetOutOfRangeError + """Check FetchResponses for offset out of range. + + Raises: + OffsetOutOfRangeError: if any partition from previous FetchResponse + contains OffsetOutOfRangeError and the default_reset_policy is + None """ current_out_of_range_partitions = {} @@ -174,11 +216,10 @@ class Fetcher(object): raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions) def _raise_if_unauthorized_topics(self): - """ - If any topic from previous FetchResponse contains an Authorization - error, raise an exception + """Check FetchResponses for topic authorization failures. - @raise TopicAuthorizationFailedError + Raises: + TopicAuthorizationFailedError """ if self._unauthorized_topics: topics = set(self._unauthorized_topics) @@ -186,12 +227,10 @@ class Fetcher(object): raise Errors.TopicAuthorizationFailedError(topics) def _raise_if_record_too_large(self): - """ - If any partition from previous FetchResponse gets a RecordTooLarge - error, raise RecordTooLargeError + """Check FetchResponses for messages larger than the max per partition. - @raise RecordTooLargeError If there is a message larger than fetch size - and hence cannot be ever returned + Raises: + RecordTooLargeError: if there is a message larger than fetch size """ copied_record_too_large_partitions = dict(self._record_too_large_partitions) self._record_too_large_partitions.clear() @@ -207,12 +246,21 @@ class Fetcher(object): self.config['max_partition_fetch_bytes']) def fetched_records(self): - """Returns previously fetched records and updates consumed offsets + """Returns previously fetched records and updates consumed offsets. NOTE: returning empty records guarantees the consumed position are NOT updated. - @return {TopicPartition: deque([messages])} - @raises OffsetOutOfRangeError if no subscription offset_reset_strategy + Raises: + OffsetOutOfRangeError: if no subscription offset_reset_strategy + InvalidMessageError: if message crc validation fails (check_crcs + must be set to True) + RecordTooLargeError: if a message is larger than the currently + configured max_partition_fetch_bytes + TopicAuthorizationError: if consumer is not authorized to fetch + messages from the topic + + Returns: + dict: {TopicPartition: deque([messages])} """ if self._subscriptions.needs_partition_assignment: return {} @@ -280,12 +328,14 @@ class Fetcher(object): return key, value def _send_offset_request(self, partition, timestamp): - """ - Fetch a single offset before the given timestamp for the partition. + """Fetch a single offset before the given timestamp for the partition. - @param partition The TopicPartition that needs fetching offset. - @param timestamp The timestamp for fetching offset. - @return A future which can be polled to obtain the corresponding offset. + Arguments: + partition (TopicPartition): partition that needs fetching offset + timestamp (int): timestamp for fetching offset + + Returns: + Future: resolves to the corresponding offset """ node_id = self._client.cluster.leader_for_partition(partition) if node_id is None: @@ -315,11 +365,13 @@ class Fetcher(object): def _handle_offset_response(self, partition, future, response): """Callback for the response of the list offset call above. - @param partition The partition that was fetched - @param future the future to update based on response - @param response The OffsetResponse from the server + Arguments: + partition (TopicPartition): The partition that was fetched + future (Future): the future to update based on response + response (OffsetResponse): response from the server - @raises IllegalStateError if response does not match partition + Raises: + IllegalStateError: if response does not match partition """ topic, partition_info = response.topics[0] if len(response.topics) != 1 or len(partition_info) != 1: @@ -351,10 +403,13 @@ class Fetcher(object): future.failure(error_type(partition)) def _create_fetch_requests(self): - """ - Create fetch requests for all assigned partitions, grouped by node - Except where no leader, node has requests in flight, or we have - not returned all previously fetched records to consumer + """Create fetch requests for all assigned partitions, grouped by node. + + FetchRequests skipped if no leader, node has requests in flight, or we + have not returned all previously fetched records to consumer + + Returns: + dict: {node_id: [FetchRequest,...]} """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() |