diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 3 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 241 | ||||
-rw-r--r-- | kafka/consumer/group.py | 109 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 4 | ||||
-rw-r--r-- | kafka/structs.py | 3 |
5 files changed, 296 insertions, 64 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index ac8bb3d..61d63bf 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,6 +19,7 @@ from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -886,7 +887,7 @@ class BrokerConnection(object): def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions - # in descending order. As soon as we find one that works, return it + # in reverse order. As soon as we find one that works, return it test_cases = [ # format (<broker verion>, <needed struct>) ((0, 11, 0), MetadataRequest[4]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8db89a1..c0d6075 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -14,9 +14,11 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.protocol.offset import ( + OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET +) from kafka.serializer import Deserializer -from kafka.structs import TopicPartition +from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -48,6 +50,7 @@ class Fetcher(six.Iterator): 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), + 'retry_backoff_ms': 100 } def __init__(self, client, subscriptions, metrics, **configs): @@ -180,6 +183,31 @@ class Fetcher(six.Iterator): " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + def get_offsets_by_times(self, timestamps, timeout_ms): + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + else: + offset, timestamp = offsets[tp] + offsets[tp] = OffsetAndTimestamp(offset, timestamp) + return offsets + + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -199,40 +227,64 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset = self._offset(partition, timestamp) + offsets = self._retrieve_offsets({partition: timestamp}) + if partition not in offsets: + raise NoOffsetForPartitionError(partition) + offset = offsets[partition][0] # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + """Fetch offset for each partition passed in ``timestamps`` map. - Blocks until offset is obtained, or a non-retriable exception is raised + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed. Arguments: - partition The partition that needs fetching offset. - timestamp (int): timestamp for fetching offset. -1 for the latest - available, -2 for the earliest available. Otherwise timestamp - is treated as epoch seconds. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch miliseconds. Returns: - int: message offset + {TopicPartition: (int, int)}: Mapping of partition to + retrieved offset and timestamp. If offset does not exist for + the provided timestamp, that partition will be missing from + this mapping. """ - while True: - future = self._send_offset_request(partition, timestamp) - self._client.poll(future=future) + if not timestamps: + return {} + + start_time = time.time() + remaining_ms = timeout_ms + while remaining_ms > 0: + future = self._send_offset_requests(timestamps) + self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): return future.value - if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + if remaining_ms < 0: + break + if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, sleep=True) + self._client.poll( + future=refresh_future, sleep=True, timeout_ms=remaining_ms) + else: + time.sleep(self.config['retry_backoff_ms'] / 1000.0) + + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + + raise Errors.KafkaTimeoutError( + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -576,73 +628,140 @@ class Fetcher(six.Iterator): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_request(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _send_offset_requests(self, timestamps): + """Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. Arguments: - partition (TopicPartition): partition that needs fetching offset - timestamp (int): timestamp for fetching offset + timestamps (dict): {TopicPartition: int} mapping of fetching + timestamps. Returns: - Future: resolves to the corresponding offset + Future: resolves to a mapping of retrieved offsets """ - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - log.debug("Partition %s is unknown for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.LeaderNotAvailableError(partition)) - - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + return Future().failure( + Errors.LeaderNotAvailableError(partition)) + else: + timestamps_by_node[node_id][partition] = timestamp + + # Aggregate results until we have all + list_offsets_future = Future() + responses = [] + node_count = len(timestamps_by_node) + + def on_success(value): + responses.append(value) + if len(responses) == node_count: + offsets = {} + for r in responses: + offsets.update(r) + list_offsets_future.success(offsets) + + def on_fail(err): + if not list_offsets_future.is_done: + list_offsets_future.failure(err) + + for node_id, timestamps in six.iteritems(timestamps_by_node): + _f = self._send_offset_request(node_id, timestamps) + _f.add_callback(on_success) + _f.add_errback(on_fail) + return list_offsets_future + + def _send_offset_request(self, node_id, timestamps): + by_topic = collections.defaultdict(list) + for tp, timestamp in six.iteritems(timestamps): + if self.config['api_version'] >= (0, 10, 1): + data = (tp.partition, timestamp) + else: + data = (tp.partition, timestamp, 1) + by_topic[tp.topic].append(data) + + if self.config['api_version'] >= (0, 10, 1): + request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) + else: + request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) + # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, partition, future) + _f.add_callback(self._handle_offset_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, partition, future, response): + def _handle_offset_response(self, future, response): """Callback for the response of the list offset call above. Arguments: - partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ - topic, partition_info = response.topics[0] - assert len(response.topics) == 1 and len(partition_info) == 1, ( - 'OffsetResponse should only be for a single topic-partition') - - part, error_code, offsets = partition_info[0] - assert topic == partition.topic and part == partition.partition, ( - 'OffsetResponse partition does not match OffsetRequest partition') - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Fetched offset %d for partition %s", offset, partition) - future.success(offset) - elif error_type in (Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError): - log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) - future.failure(error_type(partition)) - else: - log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) - future.failure(error_type(partition)) + timestamp_offset_map = {} + for topic, part_data in response.topics: + for partition_info in part_data: + partition, error_code = partition_info[:2] + partition = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' + if not offsets: + offset = UNKNOWN_OFFSET + else: + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, None) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, timestamp) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, + # we simply put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + elif error_type is Errors.NotLeaderForPartitionError: + log.debug("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + return + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) + return + else: + log.warning("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + return + if not future.is_done: + future.success(timestamp_offset_map) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6adb154..54a3711 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,7 +6,7 @@ import socket import sys import time -from kafka.errors import KafkaConfigurationError +from kafka.errors import KafkaConfigurationError, UnsupportedVersionError from kafka.vendor import six @@ -861,6 +861,113 @@ class KafkaConsumer(six.Iterator): metrics[k.group][k.name] = v.value() return metrics + def offsets_for_times(self, timestamps): + """Look up the offsets for the given partitions by timestamp. The + returned offset for each partition is the earliest offset whose + timestamp is greater than or equal to the given timestamp in the + corresponding partition. + + This is a blocking call. The consumer does not have to be assigned the + partitions. + + If the message format version in a partition is before 0.10.0, i.e. + the messages do not have timestamps, ``None`` will be returned for that + partition. ``None`` will also be returned for the partition if there + are no messages in it. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + timestamps (dict): ``{TopicPartition: int}`` mapping from partition + to the timestamp to look up. Unit should be milliseconds since + beginning of the epoch (midnight Jan 1, 1970 (UTC)) + + Returns: + ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition + to the timestamp and offset of the first message with timestamp + greater than or equal to the target timestamp. + + Raises: + ValueError: If the target timestamp is negative + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + for tp, ts in timestamps.items(): + timestamps[tp] = int(ts) + if ts < 0: + raise ValueError( + "The target time for partition {} is {}. The target time " + "cannot be negative.".format(tp, ts)) + return self._fetcher.get_offsets_by_times( + timestamps, self.config['request_timeout_ms']) + + def beginning_offsets(self, partitions): + """Get the first offset for the given partitions. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms. + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.beginning_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + + def end_offsets(self, partitions): + """Get the last offset for the given partitions. The last offset of a + partition is the offset of the upcoming message, i.e. the offset of the + last available message + 1. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The end offsets for the given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.end_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 8353f8c..5179658 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -3,6 +3,8 @@ from __future__ import absolute_import from .api import Request, Response from .types import Array, Int8, Int16, Int32, Int64, Schema, String +UNKNOWN_OFFSET = -1 + class OffsetResetStrategy(object): LATEST = -1 @@ -91,7 +93,7 @@ class OffsetRequest_v2(Request): RESPONSE_TYPE = OffsetResponse_v2 SCHEMA = Schema( ('replica_id', Int32), - ('isolation_level', Int8), + ('isolation_level', Int8), # <- added isolation_level ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( diff --git a/kafka/structs.py b/kafka/structs.py index 48321e7..62f36dd 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -74,6 +74,9 @@ PartitionMetadata = namedtuple("PartitionMetadata", OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"]) +OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", + ["offset", "timestamp"]) + # Deprecated structs OffsetAndMessage = namedtuple("OffsetAndMessage", |