summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py523
1 files changed, 523 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
new file mode 100644
index 0000000..ea9c8b9
--- /dev/null
+++ b/kafka/consumer/fetcher.py
@@ -0,0 +1,523 @@
+from __future__ import absolute_import
+
+import collections
+import logging
+
+import six
+
+import kafka.common as Errors
+from kafka.common import TopicPartition
+from kafka.future import Future
+from kafka.protocol.fetch import FetchRequest
+from kafka.protocol.message import PartialMessage
+from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+
+log = logging.getLogger(__name__)
+
+
+ConsumerRecord = collections.namedtuple("ConsumerRecord",
+ ["topic", "partition", "offset", "key", "value"])
+
+
+class NoOffsetForPartitionError(Errors.KafkaError):
+ pass
+
+
+class RecordTooLargeError(Errors.KafkaError):
+ pass
+
+
+class Fetcher(object):
+ _key_deserializer = None
+ _value_deserializer = None
+ _fetch_min_bytes = 1024
+ _fetch_max_wait_ms = 500
+ _max_partition_fetch_bytes = 1048576
+ _check_crcs = True
+ _retry_backoff_ms = 100
+
+ def __init__(self, client, subscriptions, **kwargs):
+ #metrics=None,
+ #metric_group_prefix='consumer',
+
+ self._client = client
+ self._subscriptions = subscriptions
+ for config in ('key_deserializer', 'value_deserializer',
+ 'fetch_min_bytes', 'fetch_max_wait_ms',
+ 'max_partition_fetch_bytes', 'check_crcs',
+ 'retry_backoff_ms'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs.pop(config))
+
+ self._records = collections.deque() # (offset, topic_partition, messages)
+ self._unauthorized_topics = set()
+ self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
+ self._record_too_large_partitions = dict() # {topic_partition: offset}
+
+ #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+
+ def init_fetches(self):
+ """Send FetchRequests asynchronously for all assigned partitions"""
+ futures = []
+ for node_id, request in six.iteritems(self._create_fetch_requests()):
+ if self._client.ready(node_id):
+ log.debug("Sending FetchRequest to node %s", node_id)
+ future = self._client.send(node_id, request)
+ future.add_callback(self._handle_fetch_response, request)
+ future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
+ futures.append(future)
+ return futures
+
+ def update_fetch_positions(self, partitions):
+ """Update the fetch positions for the provided partitions.
+
+ @param partitions: iterable of TopicPartitions
+ @raises NoOffsetForPartitionError If no offset is stored for a given
+ partition and no reset policy is available
+ """
+ # reset the fetch position to the committed position
+ for tp in partitions:
+ if not self._subscriptions.is_assigned(tp):
+ log.warning("partition %s is not assigned - skipping offset"
+ " update", tp)
+ continue
+ elif self._subscriptions.is_fetchable(tp):
+ log.warning("partition %s is still fetchable -- skipping offset"
+ " update", tp)
+ continue
+
+ # TODO: If there are several offsets to reset,
+ # we could submit offset requests in parallel
+ # for now, each call to _reset_offset will block
+ if self._subscriptions.is_offset_reset_needed(tp):
+ self._reset_offset(tp)
+ elif self._subscriptions.assignment[tp].committed is None:
+ # there's no committed position, so we need to reset with the
+ # default strategy
+ self._subscriptions.need_offset_reset(tp)
+ self._reset_offset(tp)
+ else:
+ committed = self._subscriptions.assignment[tp].committed
+ log.debug("Resetting offset for partition %s to the committed"
+ " offset %s", tp, committed)
+ self._subscriptions.seek(tp, committed)
+
+ def _reset_offset(self, partition):
+ """Reset offsets for the given partition using the offset reset strategy.
+
+ @param partition The given partition that needs reset offset
+ @raises NoOffsetForPartitionError If no offset reset strategy is defined
+ """
+ timestamp = self._subscriptions.assignment[partition].reset_strategy
+ if timestamp is OffsetResetStrategy.EARLIEST:
+ strategy = 'earliest'
+ elif timestamp is OffsetResetStrategy.LATEST:
+ strategy = 'latest'
+ else:
+ raise NoOffsetForPartitionError(partition)
+
+ log.debug("Resetting offset for partition %s to %s offset.",
+ partition, strategy)
+ offset = self._offset(partition, timestamp)
+
+ # we might lose the assignment while fetching the offset,
+ # so check it is still active
+ if self._subscriptions.is_assigned(partition):
+ self._subscriptions.seek(partition, offset)
+
+ def _offset(self, partition, timestamp):
+ """Fetch a single offset before the given timestamp for the partition.
+
+ Blocks until offset is obtained, or a non-retriable exception is raised
+
+ @param partition The partition that needs fetching offset.
+ @param timestamp The timestamp for fetching offset.
+ @raises exceptions
+ @return The offset of the message that is published before the given
+ timestamp
+ """
+ while True:
+ future = self._send_offset_request(partition, timestamp)
+ self._client.poll(future=future)
+
+ if future.succeeded():
+ return future.value
+
+ if not future.retriable():
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ if future.exception.invalid_metadata:
+ refresh_future = self._client.cluster.request_update()
+ self._client.poll(future=refresh_future)
+
+ def _raise_if_offset_out_of_range(self):
+ """
+ If any partition from previous FetchResponse contains
+ OffsetOutOfRangeError and the default_reset_policy is None,
+ raise OffsetOutOfRangeError
+ """
+ current_out_of_range_partitions = {}
+
+ # filter only the fetchable partitions
+ for partition, offset in self._offset_out_of_range_partitions:
+ if not self._subscriptions.is_fetchable(partition):
+ log.debug("Ignoring fetched records for %s since it is no"
+ " longer fetchable", partition)
+ continue
+ consumed = self._subscriptions.assignment[partition].consumed
+ # ignore partition if its consumed offset != offset in FetchResponse
+ # e.g. after seek()
+ if consumed is not None and offset == consumed:
+ current_out_of_range_partitions[partition] = offset
+
+ self._offset_out_of_range_partitions.clear()
+ if current_out_of_range_partitions:
+ raise Errors.OffsetOutOfRangeError(current_out_of_range_partitions)
+
+ def _raise_if_unauthorized_topics(self):
+ """
+ If any topic from previous FetchResponse contains an Authorization
+ error, raise an exception
+
+ @raise TopicAuthorizationFailedError
+ """
+ if self._unauthorized_topics:
+ topics = set(self._unauthorized_topics)
+ self._unauthorized_topics.clear()
+ raise Errors.TopicAuthorizationFailedError(topics)
+
+ def _raise_if_record_too_large(self):
+ """
+ If any partition from previous FetchResponse gets a RecordTooLarge
+ error, raise RecordTooLargeError
+
+ @raise RecordTooLargeError If there is a message larger than fetch size
+ and hence cannot be ever returned
+ """
+ copied_record_too_large_partitions = dict(self._record_too_large_partitions)
+ self._record_too_large_partitions.clear()
+
+ if copied_record_too_large_partitions:
+ raise RecordTooLargeError(
+ "There are some messages at [Partition=Offset]: %s "
+ " whose size is larger than the fetch size %s"
+ " and hence cannot be ever returned."
+ " Increase the fetch size, or decrease the maximum message"
+ " size the broker will allow.",
+ copied_record_too_large_partitions, self._max_partition_fetch_bytes)
+
+ def fetched_records(self):
+ """Returns previously fetched records and updates consumed offsets
+
+ NOTE: returning empty records guarantees the consumed position are NOT updated.
+
+ @return {TopicPartition: deque([messages])}
+ @raises OffsetOutOfRangeError if no subscription offset_reset_strategy
+ """
+ if self._subscriptions.needs_partition_assignment:
+ return {}
+
+ drained = collections.defaultdict(collections.deque)
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ # Loop over the records deque
+ while self._records:
+ (fetch_offset, tp, messages) = self._records.popleft()
+
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the consumed position should always be available
+ # as long as the partition is still assigned
+ consumed = self._subscriptions.assignment[tp].consumed
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition consumption paused before
+ # fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ # we also need to reset the fetch positions to pretend we did
+ # not fetch this partition in the previous request at all
+ self._subscriptions.assignment[tp].fetched = consumed
+ elif fetch_offset == consumed:
+ next_offset = messages[-1][0] + 1
+ log.debug("Returning fetched records for assigned partition %s"
+ " and update consumed position to %s", tp, next_offset)
+ self._subscriptions.assignment[tp].consumed = next_offset
+
+ # TODO: handle compressed messages
+ for offset, size, msg in messages:
+ if msg.attributes:
+ raise Errors.KafkaError('Compressed messages not supported yet')
+ elif self._check_crcs and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+
+ key, value = self._deserialize(msg)
+ record = ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ drained[tp].append(record)
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.debug("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
+ return dict(drained)
+
+ def _deserialize(self, msg):
+ if self._key_deserializer:
+ key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable
+ else:
+ key = msg.key
+ if self._value_deserializer:
+ value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable
+ else:
+ value = msg.value
+ return key, value
+
+ def _send_offset_request(self, partition, timestamp):
+ """
+ Fetch a single offset before the given timestamp for the partition.
+
+ @param partition The TopicPartition that needs fetching offset.
+ @param timestamp The timestamp for fetching offset.
+ @return A future which can be polled to obtain the corresponding offset.
+ """
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None:
+ log.debug("Partition %s is unknown for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.StaleMetadata(partition))
+ elif node_id == -1:
+ log.debug("Leader for partition %s unavailable for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.LeaderNotAvailableError(partition))
+
+ request = OffsetRequest(
+ -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
+ )
+ # Client returns a future that only fails on network issues
+ # so create a separate future and attach a callback to update it
+ # based on response error codes
+ future = Future()
+ if not self._client.ready(node_id):
+ return future.failure(Errors.NodeNotReadyError(node_id))
+
+ _f = self._client.send(node_id, request)
+ _f.add_callback(self._handle_offset_response, partition, future)
+ _f.add_errback(lambda e: future.failure(e))
+ return future
+
+ def _handle_offset_response(self, partition, future, response):
+ """Callback for the response of the list offset call above.
+
+ @param partition The partition that was fetched
+ @param future the future to update based on response
+ @param response The OffsetResponse from the server
+
+ @raises IllegalStateError if response does not match partition
+ """
+ topic, partition_info = response.topics[0]
+ if len(response.topics) != 1 or len(partition_info) != 1:
+ raise Errors.IllegalStateError("OffsetResponse should only be for"
+ " a single topic-partition")
+
+ part, error_code, offsets = partition_info[0]
+ if topic != partition.topic or part != partition.partition:
+ raise Errors.IllegalStateError("OffsetResponse partition does not"
+ " match OffsetRequest partition")
+
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ if len(offsets) != 1:
+ raise Errors.IllegalStateError("OffsetResponse should only"
+ " return a single offset")
+ offset = offsets[0]
+ log.debug("Fetched offset %d for partition %s", offset, partition)
+ future.success(offset)
+ elif error_type in (Errors.NotLeaderForPartitionError,
+ Errors.UnknownTopicOrPartitionError):
+ log.warning("Attempt to fetch offsets for partition %s failed due"
+ " to obsolete leadership information, retrying.",
+ partition)
+ future.failure(error_type(partition))
+ else:
+ log.error("Attempt to fetch offsets for partition %s failed due to:"
+ " %s", partition, error_type)
+ future.failure(error_type(partition))
+
+ def _create_fetch_requests(self):
+ """
+ Create fetch requests for all assigned partitions, grouped by node
+ Except where no leader, node has requests in flight, or we have
+ not returned all previously fetched records to consumer
+ """
+ # create the fetch info as a dict of lists of partition info tuples
+ # which can be passed to FetchRequest() via .items()
+ fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
+
+ for partition in self._subscriptions.fetchable_partitions():
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None or node_id == -1:
+ log.debug("No leader found for partition %s."
+ " Requesting metadata update", partition)
+ self._client.cluster.request_update()
+ elif self._client.in_flight_request_count(node_id) == 0:
+ # if there is a leader and no in-flight requests,
+ # issue a new fetch but only fetch data for partitions whose
+ # previously fetched data has been consumed
+ fetched = self._subscriptions.assignment[partition].fetched
+ consumed = self._subscriptions.assignment[partition].consumed
+ if consumed == fetched:
+ partition_info = (
+ partition.partition,
+ fetched,
+ self._max_partition_fetch_bytes
+ )
+ fetchable[node_id][partition.topic].append(partition_info)
+ else:
+ log.debug("Skipping FetchRequest to %s because previously"
+ " fetched offsets (%s) have not been fully"
+ " consumed yet (%s)", node_id, fetched, consumed)
+
+ requests = {}
+ for node_id, partition_data in six.iteritems(fetchable):
+ requests[node_id] = FetchRequest(
+ -1, # replica_id
+ self._fetch_max_wait_ms,
+ self._fetch_min_bytes,
+ partition_data.items())
+ return requests
+
+ def _handle_fetch_response(self, request, response):
+ """The callback for fetch completion"""
+ #total_bytes = 0
+ #total_count = 0
+
+ fetch_offsets = {}
+ for topic, partitions in request.topics:
+ for partition, offset, _ in partitions:
+ fetch_offsets[TopicPartition(topic, partition)] = offset
+
+ for topic, partitions in response.topics:
+ for partition, error_code, highwater, messages in partitions:
+ tp = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a rebalance happened or a partition
+ # consumption paused while fetch is still in-flight
+ log.debug("Ignoring fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ elif error_type is Errors.NoError:
+ fetch_offset = fetch_offsets[tp]
+
+ # we are interested in this fetch only if the beginning
+ # offset matches the current consumed position
+ consumed = self._subscriptions.assignment[tp].consumed
+ if consumed is None:
+ continue
+ elif consumed != fetch_offset:
+ # the fetched position has gotten out of sync with the
+ # consumed position (which might happen when a
+ # rebalance occurs with a fetch in-flight), so we need
+ # to reset the fetch position so the next fetch is right
+ self._subscriptions.assignment[tp].fetched = consumed
+ continue
+
+ partial = None
+ if messages and isinstance(messages[-1][-1], PartialMessage):
+ partial = messages.pop()
+
+ if messages:
+ last_offset, _, _ = messages[-1]
+ self._subscriptions.assignment[tp].fetched = last_offset + 1
+ self._records.append((fetch_offset, tp, messages))
+ #self.sensors.records_fetch_lag.record(highwater - last_offset)
+ elif partial:
+ # we did not read a single message from a non-empty
+ # buffer because that message's size is larger than
+ # fetch size, in this case record this exception
+ self._record_too_large_partitions[tp] = fetch_offset
+
+ # TODO: bytes metrics
+ #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
+ #totalBytes += num_bytes;
+ #totalCount += parsed.size();
+ elif error_type in (Errors.NotLeaderForPartitionError,
+ Errors.UnknownTopicOrPartitionError):
+ self._client.cluster.request_update()
+ elif error_type is Errors.OffsetOutOfRangeError:
+ fetch_offset = fetch_offsets[tp]
+ if self._subscriptions.has_default_offset_reset_policy():
+ self._subscriptions.need_offset_reset(tp)
+ else:
+ self._offset_out_of_range_partitions[tp] = fetch_offset
+ log.info("Fetch offset %s is out of range, resetting offset",
+ self._subscriptions.assignment[tp].fetched)
+ elif error_type is Errors.TopicAuthorizationFailedError:
+ log.warn("Not authorized to read from topic %s.", tp.topic)
+ self._unauthorized_topics.add(tp.topic)
+ elif error_type is Errors.UnknownError:
+ log.warn("Unknown error fetching data for topic-partition %s", tp)
+ else:
+ raise Errors.IllegalStateError("Unexpected error code %s"
+ " while fetching data"
+ % error_code)
+
+ """TOOD - metrics
+ self.sensors.bytesFetched.record(totalBytes)
+ self.sensors.recordsFetched.record(totalCount)
+ self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
+ self.sensors.fetchLatency.record(resp.requestLatencyMs())
+
+
+class FetchManagerMetrics(object):
+ def __init__(self, metrics, prefix):
+ self.metrics = metrics
+ self.group_name = prefix + "-fetch-manager-metrics"
+
+ self.bytes_fetched = metrics.sensor("bytes-fetched")
+ self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
+ "The average number of bytes fetched per request"), metrics.Avg())
+ self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
+ "The maximum number of bytes fetched per request"), metrics.Max())
+ self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
+ "The average number of bytes consumed per second"), metrics.Rate())
+
+ self.records_fetched = self.metrics.sensor("records-fetched")
+ self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
+ "The average number of records in each request"), metrics.Avg())
+ self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
+ "The average number of records consumed per second"), metrics.Rate())
+
+ self.fetch_latency = metrics.sensor("fetch-latency")
+ self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
+ "The average time taken for a fetch request."), metrics.Avg())
+ self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
+ "The max time taken for any fetch request."), metrics.Max())
+ self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
+ "The number of fetch requests per second."), metrics.Rate(metrics.Count()))
+
+ self.records_fetch_lag = metrics.sensor("records-lag")
+ self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
+ "The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
+
+ self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
+ self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
+ "The average throttle time in ms"), metrics.Avg())
+ self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
+ "The maximum throttle time in ms"), metrics.Max())
+
+ def record_topic_fetch_metrics(topic, num_bytes, num_records):
+ # record bytes fetched
+ name = '.'.join(["topic", topic, "bytes-fetched"])
+ self.metrics[name].record(num_bytes);
+
+ # record records fetched
+ name = '.'.join(["topic", topic, "records-fetched"])
+ self.metrics[name].record(num_records)
+ """