summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-08-07 13:34:50 +0300
committerGitHub <noreply@github.com>2017-08-07 13:34:50 +0300
commit8cf44847bc91a986c98494c4a23be31c368ef4dd (patch)
treefaad0970119f52542b7a09b0db8abbc61166e694 /kafka/consumer/fetcher.py
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
parent55ded554f9f5b470eeb53500e455ecd87f4d8f87 (diff)
downloadkafka-python-8cf44847bc91a986c98494c4a23be31c368ef4dd.tar.gz
Merge pull request #1161 from dpkp/issue1036_offset_by_time
Added support for offsets_for_times, beginning_offsets and end_offsets APIs.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py241
1 files changed, 180 insertions, 61 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8db89a1..c0d6075 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -14,9 +14,11 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
-from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.protocol.offset import (
+ OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
+)
from kafka.serializer import Deserializer
-from kafka.structs import TopicPartition
+from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
+ 'retry_backoff_ms': 100
}
def __init__(self, client, subscriptions, metrics, **configs):
@@ -180,6 +183,31 @@ class Fetcher(six.Iterator):
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
+ def get_offsets_by_times(self, timestamps, timeout_ms):
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ if tp not in offsets:
+ offsets[tp] = None
+ else:
+ offset, timestamp = offsets[tp]
+ offsets[tp] = OffsetAndTimestamp(offset, timestamp)
+ return offsets
+
+ def beginning_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.EARLIEST, timeout_ms)
+
+ def end_offsets(self, partitions, timeout_ms):
+ return self.beginning_or_end_offset(
+ partitions, OffsetResetStrategy.LATEST, timeout_ms)
+
+ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
+ timestamps = dict([(tp, timestamp) for tp in partitions])
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ offsets[tp] = offsets[tp][0]
+ return offsets
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -199,40 +227,64 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
- offset = self._offset(partition, timestamp)
+ offsets = self._retrieve_offsets({partition: timestamp})
+ if partition not in offsets:
+ raise NoOffsetForPartitionError(partition)
+ offset = offsets[partition][0]
# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
- def _offset(self, partition, timestamp):
- """Fetch a single offset before the given timestamp for the partition.
+ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
+ """Fetch offset for each partition passed in ``timestamps`` map.
- Blocks until offset is obtained, or a non-retriable exception is raised
+ Blocks until offsets are obtained, a non-retriable exception is raised
+ or ``timeout_ms`` passed.
Arguments:
- partition The partition that needs fetching offset.
- timestamp (int): timestamp for fetching offset. -1 for the latest
- available, -2 for the earliest available. Otherwise timestamp
- is treated as epoch seconds.
+ timestamps: {TopicPartition: int} dict with timestamps to fetch
+ offsets by. -1 for the latest available, -2 for the earliest
+ available. Otherwise timestamp is treated as epoch miliseconds.
Returns:
- int: message offset
+ {TopicPartition: (int, int)}: Mapping of partition to
+ retrieved offset and timestamp. If offset does not exist for
+ the provided timestamp, that partition will be missing from
+ this mapping.
"""
- while True:
- future = self._send_offset_request(partition, timestamp)
- self._client.poll(future=future)
+ if not timestamps:
+ return {}
+
+ start_time = time.time()
+ remaining_ms = timeout_ms
+ while remaining_ms > 0:
+ future = self._send_offset_requests(timestamps)
+ self._client.poll(future=future, timeout_ms=remaining_ms)
if future.succeeded():
return future.value
-
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
+ if remaining_ms < 0:
+ break
+
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
- self._client.poll(future=refresh_future, sleep=True)
+ self._client.poll(
+ future=refresh_future, sleep=True, timeout_ms=remaining_ms)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
+
+ raise Errors.KafkaTimeoutError(
+ "Failed to get offsets by timestamps in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -576,73 +628,140 @@ class Fetcher(six.Iterator):
return f.deserialize(topic, bytes_)
return f(bytes_)
- def _send_offset_request(self, partition, timestamp):
- """Fetch a single offset before the given timestamp for the partition.
+ def _send_offset_requests(self, timestamps):
+ """Fetch offsets for each partition in timestamps dict. This may send
+ request to multiple nodes, based on who is Leader for partition.
Arguments:
- partition (TopicPartition): partition that needs fetching offset
- timestamp (int): timestamp for fetching offset
+ timestamps (dict): {TopicPartition: int} mapping of fetching
+ timestamps.
Returns:
- Future: resolves to the corresponding offset
+ Future: resolves to a mapping of retrieved offsets
"""
- node_id = self._client.cluster.leader_for_partition(partition)
- if node_id is None:
- log.debug("Partition %s is unknown for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.StaleMetadata(partition))
- elif node_id == -1:
- log.debug("Leader for partition %s unavailable for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.LeaderNotAvailableError(partition))
-
- request = OffsetRequest[0](
- -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
- )
+ timestamps_by_node = collections.defaultdict(dict)
+ for partition, timestamp in six.iteritems(timestamps):
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None:
+ self._client.add_topic(partition.topic)
+ log.debug("Partition %s is unknown for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.StaleMetadata(partition))
+ elif node_id == -1:
+ log.debug("Leader for partition %s unavailable for fetching "
+ "offset, wait for metadata refresh", partition)
+ return Future().failure(
+ Errors.LeaderNotAvailableError(partition))
+ else:
+ timestamps_by_node[node_id][partition] = timestamp
+
+ # Aggregate results until we have all
+ list_offsets_future = Future()
+ responses = []
+ node_count = len(timestamps_by_node)
+
+ def on_success(value):
+ responses.append(value)
+ if len(responses) == node_count:
+ offsets = {}
+ for r in responses:
+ offsets.update(r)
+ list_offsets_future.success(offsets)
+
+ def on_fail(err):
+ if not list_offsets_future.is_done:
+ list_offsets_future.failure(err)
+
+ for node_id, timestamps in six.iteritems(timestamps_by_node):
+ _f = self._send_offset_request(node_id, timestamps)
+ _f.add_callback(on_success)
+ _f.add_errback(on_fail)
+ return list_offsets_future
+
+ def _send_offset_request(self, node_id, timestamps):
+ by_topic = collections.defaultdict(list)
+ for tp, timestamp in six.iteritems(timestamps):
+ if self.config['api_version'] >= (0, 10, 1):
+ data = (tp.partition, timestamp)
+ else:
+ data = (tp.partition, timestamp, 1)
+ by_topic[tp.topic].append(data)
+
+ if self.config['api_version'] >= (0, 10, 1):
+ request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
+ else:
+ request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
+
# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()
+
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_response, partition, future)
+ _f.add_callback(self._handle_offset_response, future)
_f.add_errback(lambda e: future.failure(e))
return future
- def _handle_offset_response(self, partition, future, response):
+ def _handle_offset_response(self, future, response):
"""Callback for the response of the list offset call above.
Arguments:
- partition (TopicPartition): The partition that was fetched
future (Future): the future to update based on response
response (OffsetResponse): response from the server
Raises:
AssertionError: if response does not match partition
"""
- topic, partition_info = response.topics[0]
- assert len(response.topics) == 1 and len(partition_info) == 1, (
- 'OffsetResponse should only be for a single topic-partition')
-
- part, error_code, offsets = partition_info[0]
- assert topic == partition.topic and part == partition.partition, (
- 'OffsetResponse partition does not match OffsetRequest partition')
-
- error_type = Errors.for_code(error_code)
- if error_type is Errors.NoError:
- assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
- offset = offsets[0]
- log.debug("Fetched offset %d for partition %s", offset, partition)
- future.success(offset)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
- log.debug("Attempt to fetch offsets for partition %s failed due"
- " to obsolete leadership information, retrying.",
- partition)
- future.failure(error_type(partition))
- else:
- log.warning("Attempt to fetch offsets for partition %s failed due to:"
- " %s", partition, error_type)
- future.failure(error_type(partition))
+ timestamp_offset_map = {}
+ for topic, part_data in response.topics:
+ for partition_info in part_data:
+ partition, error_code = partition_info[:2]
+ partition = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ if response.API_VERSION == 0:
+ offsets = partition_info[2]
+ assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
+ if not offsets:
+ offset = UNKNOWN_OFFSET
+ else:
+ offset = offsets[0]
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ if offset != UNKNOWN_OFFSET:
+ timestamp_offset_map[partition] = (offset, None)
+ else:
+ timestamp, offset = partition_info[2:]
+ log.debug("Handling ListOffsetResponse response for %s. "
+ "Fetched offset %s, timestamp %s",
+ partition, offset, timestamp)
+ if offset != UNKNOWN_OFFSET:
+ timestamp_offset_map[partition] = (offset, timestamp)
+ elif error_type is Errors.UnsupportedForMessageFormatError:
+ # The message format on the broker side is before 0.10.0,
+ # we simply put None in the response.
+ log.debug("Cannot search by timestamp for partition %s because the"
+ " message format version is before 0.10.0", partition)
+ elif error_type is Errors.NotLeaderForPartitionError:
+ log.debug("Attempt to fetch offsets for partition %s failed due"
+ " to obsolete leadership information, retrying.",
+ partition)
+ future.failure(error_type(partition))
+ return
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warn("Received unknown topic or partition error in ListOffset "
+ "request for partition %s. The topic/partition " +
+ "may not exist or the user may not have Describe access "
+ "to it.", partition)
+ future.failure(error_type(partition))
+ return
+ else:
+ log.warning("Attempt to fetch offsets for partition %s failed due to:"
+ " %s", partition, error_type)
+ future.failure(error_type(partition))
+ return
+ if not future.is_done:
+ future.success(timestamp_offset_map)
def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()