diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-08-07 13:34:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-07 13:34:50 +0300 |
commit | 8cf44847bc91a986c98494c4a23be31c368ef4dd (patch) | |
tree | faad0970119f52542b7a09b0db8abbc61166e694 /kafka/consumer/fetcher.py | |
parent | da25df6d3c6380e27bf638f3620613d05ac9fd03 (diff) | |
parent | 55ded554f9f5b470eeb53500e455ecd87f4d8f87 (diff) | |
download | kafka-python-8cf44847bc91a986c98494c4a23be31c368ef4dd.tar.gz |
Merge pull request #1161 from dpkp/issue1036_offset_by_time
Added support for offsets_for_times, beginning_offsets and end_offsets APIs.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 241 |
1 files changed, 180 insertions, 61 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8db89a1..c0d6075 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,9 +14,11 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.protocol.offset import ( + OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET +) from kafka.serializer import Deserializer -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -48,6 +50,7 @@ class Fetcher(six.Iterator): 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), + 'retry_backoff_ms': 100 } def __init__(self, client, subscriptions, metrics, **configs): @@ -180,6 +183,31 @@ class Fetcher(six.Iterator): " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + def get_offsets_by_times(self, timestamps, timeout_ms): + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + else: + offset, timestamp = offsets[tp] + offsets[tp] = OffsetAndTimestamp(offset, timestamp) + return offsets + + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -199,40 +227,64 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset = self._offset(partition, timestamp) + offsets = self._retrieve_offsets({partition: timestamp}) + if partition not in offsets: + raise NoOffsetForPartitionError(partition) + offset = offsets[partition][0] # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + """Fetch offset for each partition passed in ``timestamps`` map. - Blocks until offset is obtained, or a non-retriable exception is raised + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. Arguments: - partition The partition that needs fetching offset. - timestamp (int): timestamp for fetching offset. -1 for the latest - available, -2 for the earliest available. Otherwise timestamp - is treated as epoch seconds. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch miliseconds. Returns: - int: message offset + {TopicPartition: (int, int)}: Mapping of partition to + retrieved offset and timestamp. If offset does not exist for + the provided timestamp, that partition will be missing from + this mapping. """ - while True: - future = self._send_offset_request(partition, timestamp) - self._client.poll(future=future) + if not timestamps: + return {} + + start_time = time.time() + remaining_ms = timeout_ms + while remaining_ms > 0: + future = self._send_offset_requests(timestamps) + self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): return future.value - if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + if remaining_ms < 0: + break + if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, sleep=True) + self._client.poll( + future=refresh_future, sleep=True, timeout_ms=remaining_ms) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000.0) + + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + + raise Errors.KafkaTimeoutError( + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -576,73 +628,140 @@ class Fetcher(six.Iterator): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_request(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _send_offset_requests(self, timestamps): + """Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. Arguments: - partition (TopicPartition): partition that needs fetching offset - timestamp (int): timestamp for fetching offset + timestamps (dict): {TopicPartition: int} mapping of fetching + timestamps. Returns: - Future: resolves to the corresponding offset + Future: resolves to a mapping of retrieved offsets """ - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - log.debug("Partition %s is unknown for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.LeaderNotAvailableError(partition)) - - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + return Future().failure( + Errors.LeaderNotAvailableError(partition)) + else: + timestamps_by_node[node_id][partition] = timestamp + + # Aggregate results until we have all + list_offsets_future = Future() + responses = [] + node_count = len(timestamps_by_node) + + def on_success(value): + responses.append(value) + if len(responses) == node_count: + offsets = {} + for r in responses: + offsets.update(r) + list_offsets_future.success(offsets) + + def on_fail(err): + if not list_offsets_future.is_done: + list_offsets_future.failure(err) + + for node_id, timestamps in six.iteritems(timestamps_by_node): + _f = self._send_offset_request(node_id, timestamps) + _f.add_callback(on_success) + _f.add_errback(on_fail) + return list_offsets_future + + def _send_offset_request(self, node_id, timestamps): + by_topic = collections.defaultdict(list) + for tp, timestamp in six.iteritems(timestamps): + if self.config['api_version'] >= (0, 10, 1): + data = (tp.partition, timestamp) + else: + data = (tp.partition, timestamp, 1) + by_topic[tp.topic].append(data) + + if self.config['api_version'] >= (0, 10, 1): + request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) + else: + request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, partition, future) + _f.add_callback(self._handle_offset_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, partition, future, response): + def _handle_offset_response(self, future, response): """Callback for the response of the list offset call above. Arguments: - partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ - topic, partition_info = response.topics[0] - assert len(response.topics) == 1 and len(partition_info) == 1, ( - 'OffsetResponse should only be for a single topic-partition') - - part, error_code, offsets = partition_info[0] - assert topic == partition.topic and part == partition.partition, ( - 'OffsetResponse partition does not match OffsetRequest partition') - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Fetched offset %d for partition %s", offset, partition) - future.success(offset) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): - log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) - future.failure(error_type(partition)) - else: - log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) - future.failure(error_type(partition)) + timestamp_offset_map = {} + for topic, part_data in response.topics: + for partition_info in part_data: + partition, error_code = partition_info[:2] + partition = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' + if not offsets: + offset = UNKNOWN_OFFSET + else: + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, None) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, timestamp) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, + # we simply put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + elif error_type is Errors.NotLeaderForPartitionError: + log.debug("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + return + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) + return + else: + log.warning("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + return + if not future.is_done: + future.success(timestamp_offset_map) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() |