summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-07-30 15:42:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:08 +0000
commit39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch)
tree2b94ed93bec5ae4f072360c5072cc22b0685f8a1 /kafka/consumer/fetcher.py
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
downloadkafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py94
1 files changed, 76 insertions, 18 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8db89a1..cb80a6f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -14,9 +14,11 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
-from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.protocol.offset import (
+ OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
+)
from kafka.serializer import Deserializer
-from kafka.structs import TopicPartition
+from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
@@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
+ 'retry_backoff_ms': 100
}
def __init__(self, client, subscriptions, metrics, **configs):
@@ -180,6 +183,14 @@ class Fetcher(six.Iterator):
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)
+ def get_offsets_by_times(self, timestamps, timeout_ms):
+ response = {}
+ for tp, timestamp in timestamps.items():
+ timestamp = int(timestamp)
+ offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms)
+ response[tp] = OffsetAndTimestamp(offset, tmst)
+ return response
+
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -199,14 +210,14 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
- offset = self._offset(partition, timestamp)
+ offset, _ = self._offset(partition, timestamp)
# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
- def _offset(self, partition, timestamp):
+ def _offset(self, partition, timestamp, timeout_ms=None):
"""Fetch a single offset before the given timestamp for the partition.
Blocks until offset is obtained, or a non-retriable exception is raised
@@ -218,21 +229,37 @@ class Fetcher(six.Iterator):
is treated as epoch seconds.
Returns:
- int: message offset
+ (int, int): message offset and timestamp. None if not available
"""
+ start_time = time.time()
+ remaining_ms = timeout_ms
while True:
future = self._send_offset_request(partition, timestamp)
- self._client.poll(future=future)
+ self._client.poll(future=future, timeout_ms=remaining_ms)
if future.succeeded():
return future.value
-
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
+ if timeout_ms is not None:
+ remaining_ms = timeout_ms - (time.time() - start_time) * 1000
+ if remaining_ms < 0:
+ break
+
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
- self._client.poll(future=refresh_future, sleep=True)
+ self._client.poll(
+ future=refresh_future, sleep=True, timeout_ms=remaining_ms)
+ else:
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+
+ if timeout_ms is not None:
+ remaining_ms = timeout_ms - (time.time() - start_time) * 1000
+
+ # Will only happen when timeout_ms != None
+ raise Errors.KafkaTimeoutError(
+ "Failed to get offsets by times in %s ms" % timeout_ms)
def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
@@ -596,9 +623,15 @@ class Fetcher(six.Iterator):
" wait for metadata refresh", partition)
return Future().failure(Errors.LeaderNotAvailableError(partition))
- request = OffsetRequest[0](
- -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
- )
+ if self.config['api_version'] >= (0, 10, 1):
+ request = OffsetRequest[1](
+ -1, [(partition.topic, [(partition.partition, timestamp)])]
+ )
+ else:
+ request = OffsetRequest[0](
+ -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
+ )
+
# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
@@ -623,22 +656,47 @@ class Fetcher(six.Iterator):
assert len(response.topics) == 1 and len(partition_info) == 1, (
'OffsetResponse should only be for a single topic-partition')
- part, error_code, offsets = partition_info[0]
+ partition_info = partition_info[0]
+ part, error_code = partition_info[:2]
+
assert topic == partition.topic and part == partition.partition, (
'OffsetResponse partition does not match OffsetRequest partition')
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
- offset = offsets[0]
- log.debug("Fetched offset %d for partition %s", offset, partition)
- future.success(offset)
- elif error_type in (Errors.NotLeaderForPartitionError,
- Errors.UnknownTopicOrPartitionError):
+ if response.API_VERSION == 0:
+ offsets = partition_info[2]
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
+ offset = offsets[0]
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ future.success((offset, None))
+ else:
+ timestamp, offset = partition_info[2:]
+ log.debug("Handling ListOffsetResponse response for %s. "
+ "Fetched offset %s, timestamp %s",
+ partition, offset, timestamp)
+ if offset != UNKNOWN_OFFSET:
+ future.success((offset, timestamp))
+ else:
+ future.success((None, None))
+ elif error_type is Errors.UnsupportedForMessageFormatError:
+ # The message format on the broker side is before 0.10.0, we simply
+ # put None in the response.
+ log.debug("Cannot search by timestamp for partition %s because the"
+ " message format version is before 0.10.0", partition)
+ future.success((None, None))
+ elif error_type is Errors.NotLeaderForPartitionError:
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warn("Received unknown topic or partition error in ListOffset "
+ "request for partition %s. The topic/partition " +
+ "may not exist or the user may not have Describe access "
+ "to it.", partition)
+ future.failure(error_type(partition))
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)