diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 225 |
1 files changed, 130 insertions, 95 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index cb80a6f..19982b1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -184,12 +184,14 @@ class Fetcher(six.Iterator): self._subscriptions.seek(tp, committed) def get_offsets_by_times(self, timestamps, timeout_ms): - response = {} - for tp, timestamp in timestamps.items(): - timestamp = int(timestamp) - offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) - response[tp] = OffsetAndTimestamp(offset, tmst) - return response + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + else: + offset, timestamp = offsets[tp] + offsets[tp] = OffsetAndTimestamp(offset, timestamp) + return offsets def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -210,31 +212,39 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset, _ = self._offset(partition, timestamp) + offsets = self._retrieve_offsets({partition: timestamp}) + assert partition in offsets + offset = offsets[partition][0] # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp, timeout_ms=None): - """Fetch a single offset before the given timestamp for the partition. + def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + """ Fetch offset for each partition passed in ``timestamps`` map. - Blocks until offset is obtained, or a non-retriable exception is raised + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed (if it's not ``None``). Arguments: - partition The partition that needs fetching offset. - timestamp (int): timestamp for fetching offset. -1 for the latest - available, -2 for the earliest available. Otherwise timestamp - is treated as epoch seconds. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch miliseconds. Returns: - (int, int): message offset and timestamp. None if not available + {TopicPartition: (int, int)}: Mapping of partition to + retrieved offset and timestamp. If offset does not exist for + the provided timestamp, that partition will be missing from + this mapping. """ + if not timestamps: + return {} + start_time = time.time() remaining_ms = timeout_ms - while True: - future = self._send_offset_request(partition, timestamp) + while remaining_ms > 0: + future = self._send_offset_requests(timestamps) self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): @@ -242,10 +252,10 @@ class Fetcher(six.Iterator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 - if remaining_ms < 0: - break + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + if remaining_ms < 0: + break if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -254,10 +264,9 @@ class Fetcher(six.Iterator): else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms - # Will only happen when timeout_ms != None raise Errors.KafkaTimeoutError( "Failed to get offsets by times in %s ms" % timeout_ms) @@ -603,104 +612,130 @@ class Fetcher(six.Iterator): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_request(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _send_offset_requests(self, timestamps): + """ Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. Arguments: - partition (TopicPartition): partition that needs fetching offset - timestamp (int): timestamp for fetching offset + timestamps (dict): {TopicPartition: int} mapping of fetching + timestamps. Returns: - Future: resolves to the corresponding offset + Future: resolves to a mapping of retrieved offsets """ - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - log.debug("Partition %s is unknown for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.LeaderNotAvailableError(partition)) + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + return Future().failure( + Errors.LeaderNotAvailableError(partition)) + else: + timestamps_by_node[node_id][partition] = timestamp + + # Aggregate results until we have all + list_offsets_future = Future() + responses = [] + node_count = len(timestamps_by_node) + + def on_success(value): + responses.append(value) + if len(responses) == node_count: + offsets = {} + for r in responses: + offsets.update(r) + list_offsets_future.success(offsets) + + for node_id, timestamps in six.iteritems(timestamps_by_node): + _f = self._send_offset_request(node_id, timestamps) + _f.add_callback(on_success) + _f.add_errback(lambda e: list_offsets_future.failure(e)) + return list_offsets_future + + def _send_offset_request(self, node_id, timestamps): + by_topic = collections.defaultdict(list) + for tp, timestamp in six.iteritems(timestamps): + if self.config['api_version'] >= (0, 10, 1): + data = (tp.partition, timestamp) + else: + data = (tp.partition, timestamp, 1) + by_topic[tp.topic].append(data) if self.config['api_version'] >= (0, 10, 1): - request = OffsetRequest[1]( - -1, [(partition.topic, [(partition.partition, timestamp)])] - ) + request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) else: - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, partition, future) + _f.add_callback(self._handle_offset_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, partition, future, response): + def _handle_offset_response(self, future, response): """Callback for the response of the list offset call above. Arguments: - partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ - topic, partition_info = response.topics[0] - assert len(response.topics) == 1 and len(partition_info) == 1, ( - 'OffsetResponse should only be for a single topic-partition') - - partition_info = partition_info[0] - part, error_code = partition_info[:2] - - assert topic == partition.topic and part == partition.partition, ( - 'OffsetResponse partition does not match OffsetRequest partition') - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - if response.API_VERSION == 0: - offsets = partition_info[2] - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) - future.success((offset, None)) - else: - timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetResponse response for %s. " - "Fetched offset %s, timestamp %s", - partition, offset, timestamp) - if offset != UNKNOWN_OFFSET: - future.success((offset, timestamp)) + timestamp_offset_map = {} + for topic, part_data in response.topics: + for partition_info in part_data: + partition, error_code = partition_info[:2] + partition = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) > 1, 'Expected OffsetResponse with one offset' + if offsets: + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + timestamp_offset_map[partition] = (offset, None) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, timestamp) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, + # we simply put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + elif error_type is Errors.NotLeaderForPartitionError: + log.debug("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: - future.success((None, None)) - elif error_type is Errors.UnsupportedForMessageFormatError: - # The message format on the broker side is before 0.10.0, we simply - # put None in the response. - log.debug("Cannot search by timestamp for partition %s because the" - " message format version is before 0.10.0", partition) - future.success((None, None)) - elif error_type is Errors.NotLeaderForPartitionError: - log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) - future.failure(error_type(partition)) - elif error_type is Errors.UnknownTopicOrPartitionError: - log.warn("Received unknown topic or partition error in ListOffset " - "request for partition %s. The topic/partition " + - "may not exist or the user may not have Describe access " - "to it.", partition) - future.failure(error_type(partition)) - else: - log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) - future.failure(error_type(partition)) + log.warning("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + if not future.is_done: + future.success(timestamp_offset_map) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() |