diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-07-30 15:42:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:08 +0000 |
commit | 39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch) | |
tree | 2b94ed93bec5ae4f072360c5072cc22b0685f8a1 /kafka/consumer/fetcher.py | |
parent | da25df6d3c6380e27bf638f3620613d05ac9fd03 (diff) | |
download | kafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz |
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 94 |
1 files changed, 76 insertions, 18 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8db89a1..cb80a6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,9 +14,11 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.protocol.offset import ( + OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET +) from kafka.serializer import Deserializer -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -48,6 +50,7 @@ class Fetcher(six.Iterator): 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), + 'retry_backoff_ms': 100 } def __init__(self, client, subscriptions, metrics, **configs): @@ -180,6 +183,14 @@ class Fetcher(six.Iterator): " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + def get_offsets_by_times(self, timestamps, timeout_ms): + response = {} + for tp, timestamp in timestamps.items(): + timestamp = int(timestamp) + offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) + response[tp] = OffsetAndTimestamp(offset, tmst) + return response + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -199,14 +210,14 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset = self._offset(partition, timestamp) + offset, _ = self._offset(partition, timestamp) # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp): + def _offset(self, partition, timestamp, timeout_ms=None): """Fetch a single offset before the given timestamp for the partition. Blocks until offset is obtained, or a non-retriable exception is raised @@ -218,21 +229,37 @@ class Fetcher(six.Iterator): is treated as epoch seconds. Returns: - int: message offset + (int, int): message offset and timestamp. None if not available """ + start_time = time.time() + remaining_ms = timeout_ms while True: future = self._send_offset_request(partition, timestamp) - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): return future.value - if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + if remaining_ms < 0: + break + if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, sleep=True) + self._client.poll( + future=refresh_future, sleep=True, timeout_ms=remaining_ms) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000.0) + + if timeout_ms is not None: + remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + + # Will only happen when timeout_ms != None + raise Errors.KafkaTimeoutError( + "Failed to get offsets by times in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -596,9 +623,15 @@ class Fetcher(six.Iterator): " wait for metadata refresh", partition) return Future().failure(Errors.LeaderNotAvailableError(partition)) - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + if self.config['api_version'] >= (0, 10, 1): + request = OffsetRequest[1]( + -1, [(partition.topic, [(partition.partition, timestamp)])] + ) + else: + request = OffsetRequest[0]( + -1, [(partition.topic, [(partition.partition, timestamp, 1)])] + ) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes @@ -623,22 +656,47 @@ class Fetcher(six.Iterator): assert len(response.topics) == 1 and len(partition_info) == 1, ( 'OffsetResponse should only be for a single topic-partition') - part, error_code, offsets = partition_info[0] + partition_info = partition_info[0] + part, error_code = partition_info[:2] + assert topic == partition.topic and part == partition.partition, ( 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Fetched offset %d for partition %s", offset, partition) - future.success(offset) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + future.success((offset, None)) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + future.success((offset, timestamp)) + else: + future.success((None, None)) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, we simply + # put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + future.success((None, None)) + elif error_type is Errors.NotLeaderForPartitionError: log.debug("Attempt to fetch offsets for partition %s failed due" " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) |