summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py133
1 files changed, 94 insertions, 39 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 39e1244..a4be7ae 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -39,6 +39,33 @@ class Fetcher(object):
}
def __init__(self, client, subscriptions, **configs):
+ """Initialize a Kafka Message Fetcher.
+
+ Keyword Arguments:
+ key_deserializer (callable): Any callable that takes a
+ raw message key and returns a deserialized key.
+ value_deserializer (callable, optional): Any callable that takes a
+ raw message value and returns a deserialized value.
+ fetch_min_bytes (int): Minimum amount of data the server should
+ return for a fetch request, otherwise wait up to
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
+ the server will block before answering the fetch request if
+ there isn't sufficient data to immediately satisfy the
+ requirement given by fetch_min_bytes. Default: 500.
+ max_partition_fetch_bytes (int): The maximum amount of data
+ per-partition the server will return. The maximum total memory
+ used for a request = #partitions * max_partition_fetch_bytes.
+ This size must be at least as large as the maximum message size
+ the server allows or else it is possible for the producer to
+ send messages larger than the consumer can fetch. If that
+ happens, the consumer can get stuck trying to fetch a large
+ message on a certain partition. Default: 1048576.
+ check_crcs (bool): Automatically check the CRC32 of the records
+ consumed. This ensures no on-the-wire or on-disk corruption to
+ the messages occurred. This check adds some overhead, so it may
+ be disabled in cases seeking extreme performance. Default: True
+ """
#metrics=None,
#metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -56,7 +83,11 @@ class Fetcher(object):
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
- """Send FetchRequests asynchronously for all assigned partitions"""
+ """Send FetchRequests asynchronously for all assigned partitions.
+
+ Returns:
+ List of Futures: each future resolves to a FetchResponse
+ """
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -70,8 +101,11 @@ class Fetcher(object):
def update_fetch_positions(self, partitions):
"""Update the fetch positions for the provided partitions.
- @param partitions: iterable of TopicPartitions
- @raises NoOffsetForPartitionError If no offset is stored for a given
+ Arguments:
+ partitions (list of TopicPartitions): partitions to update
+
+ Raises:
+ NoOffsetForPartitionError: if no offset is stored for a given
partition and no reset policy is available
"""
# reset the fetch position to the committed position
@@ -104,8 +138,11 @@ class Fetcher(object):
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
- @param partition The given partition that needs reset offset
- @raises NoOffsetForPartitionError If no offset reset strategy is defined
+ Arguments:
+ partition (TopicPartition): the partition that needs reset offset
+
+ Raises:
+ NoOffsetForPartitionError: if no offset reset strategy is defined
"""
timestamp = self._subscriptions.assignment[partition].reset_strategy
if timestamp is OffsetResetStrategy.EARLIEST:
@@ -129,11 +166,14 @@ class Fetcher(object):
Blocks until offset is obtained, or a non-retriable exception is raised
- @param partition The partition that needs fetching offset.
- @param timestamp The timestamp for fetching offset.
- @raises exceptions
- @return The offset of the message that is published before the given
- timestamp
+ Arguments:
+ partition The partition that needs fetching offset.
+ timestamp (int): timestamp for fetching offset. -1 for the latest
+ available, -2 for the earliest available. Otherwise timestamp
+ is treated as epoch seconds.
+
+ Returns:
+ int: message offset
"""
while True:
future = self._send_offset_request(partition, timestamp)
@@ -150,10 +190,12 @@ class Fetcher(object):
self._client.poll(future=refresh_future)
def _raise_if_offset_out_of_range(self):
- """
- If any partition from previous FetchResponse contains
- OffsetOutOfRangeError and the default_reset_policy is None,
- raise OffsetOutOfRangeError
+ """Check FetchResponses for offset out of range.
+
+ Raises:
+ OffsetOutOfRangeError: if any partition from previous FetchResponse
+ contains OffsetOutOfRangeError and the default_reset_policy is
+ None
"""
current_out_of_range_partitions = {}
@@ -174,11 +216,10 @@ class Fetcher(object):
raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
def _raise_if_unauthorized_topics(self):
- """
- If any topic from previous FetchResponse contains an Authorization
- error, raise an exception
+ """Check FetchResponses for topic authorization failures.
- @raise TopicAuthorizationFailedError
+ Raises:
+ TopicAuthorizationFailedError
"""
if self._unauthorized_topics:
topics = set(self._unauthorized_topics)
@@ -186,12 +227,10 @@ class Fetcher(object):
raise Errors.TopicAuthorizationFailedError(topics)
def _raise_if_record_too_large(self):
- """
- If any partition from previous FetchResponse gets a RecordTooLarge
- error, raise RecordTooLargeError
+ """Check FetchResponses for messages larger than the max per partition.
- @raise RecordTooLargeError If there is a message larger than fetch size
- and hence cannot be ever returned
+ Raises:
+ RecordTooLargeError: if there is a message larger than fetch size
"""
copied_record_too_large_partitions = dict(self._record_too_large_partitions)
self._record_too_large_partitions.clear()
@@ -207,12 +246,21 @@ class Fetcher(object):
self.config['max_partition_fetch_bytes'])
def fetched_records(self):
- """Returns previously fetched records and updates consumed offsets
+ """Returns previously fetched records and updates consumed offsets.
NOTE: returning empty records guarantees the consumed position are NOT updated.
- @return {TopicPartition: deque([messages])}
- @raises OffsetOutOfRangeError if no subscription offset_reset_strategy
+ Raises:
+ OffsetOutOfRangeError: if no subscription offset_reset_strategy
+ InvalidMessageError: if message crc validation fails (check_crcs
+ must be set to True)
+ RecordTooLargeError: if a message is larger than the currently
+ configured max_partition_fetch_bytes
+ TopicAuthorizationError: if consumer is not authorized to fetch
+ messages from the topic
+
+ Returns:
+ dict: {TopicPartition: deque([messages])}
"""
if self._subscriptions.needs_partition_assignment:
return {}
@@ -280,12 +328,14 @@ class Fetcher(object):
return key, value
def _send_offset_request(self, partition, timestamp):
- """
- Fetch a single offset before the given timestamp for the partition.
+ """Fetch a single offset before the given timestamp for the partition.
- @param partition The TopicPartition that needs fetching offset.
- @param timestamp The timestamp for fetching offset.
- @return A future which can be polled to obtain the corresponding offset.
+ Arguments:
+ partition (TopicPartition): partition that needs fetching offset
+ timestamp (int): timestamp for fetching offset
+
+ Returns:
+ Future: resolves to the corresponding offset
"""
node_id = self._client.cluster.leader_for_partition(partition)
if node_id is None:
@@ -315,11 +365,13 @@ class Fetcher(object):
def _handle_offset_response(self, partition, future, response):
"""Callback for the response of the list offset call above.
- @param partition The partition that was fetched
- @param future the future to update based on response
- @param response The OffsetResponse from the server
+ Arguments:
+ partition (TopicPartition): The partition that was fetched
+ future (Future): the future to update based on response
+ response (OffsetResponse): response from the server
- @raises IllegalStateError if response does not match partition
+ Raises:
+ IllegalStateError: if response does not match partition
"""
topic, partition_info = response.topics[0]
if len(response.topics) != 1 or len(partition_info) != 1:
@@ -351,10 +403,13 @@ class Fetcher(object):
future.failure(error_type(partition))
def _create_fetch_requests(self):
- """
- Create fetch requests for all assigned partitions, grouped by node
- Except where no leader, node has requests in flight, or we have
- not returned all previously fetched records to consumer
+ """Create fetch requests for all assigned partitions, grouped by node.
+
+ FetchRequests skipped if no leader, node has requests in flight, or we
+ have not returned all previously fetched records to consumer
+
+ Returns:
+ dict: {node_id: [FetchRequest,...]}
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()