summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-07-31 12:41:53 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:08 +0000
commit63992f907aaabc4055d02de60f789443fcb4b54f (patch)
tree78208f5abef771e624ba9099f0dc274bc171f357
parentf244e527a9674fa22b0bf9771585598cb758c8b1 (diff)
downloadkafka-python-63992f907aaabc4055d02de60f789443fcb4b54f.tar.gz
Changed retrieve_offsets to allow fetching multiple offsets at once
-rw-r--r--kafka/consumer/fetcher.py225
-rw-r--r--kafka/consumer/group.py4
-rw-r--r--test/test_consumer_integration.py45
3 files changed, 174 insertions, 100 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index cb80a6f..19982b1 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -184,12 +184,14 @@ class Fetcher(six.Iterator):
self._subscriptions.seek(tp, committed)
def get_offsets_by_times(self, timestamps, timeout_ms):
- response = {}
- for tp, timestamp in timestamps.items():
- timestamp = int(timestamp)
- offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms)
- response[tp] = OffsetAndTimestamp(offset, tmst)
- return response
+ offsets = self._retrieve_offsets(timestamps, timeout_ms)
+ for tp in timestamps:
+ if tp not in offsets:
+ offsets[tp] = None
+ else:
+ offset, timestamp = offsets[tp]
+ offsets[tp] = OffsetAndTimestamp(offset, timestamp)
+ return offsets
def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.
@@ -210,31 +212,39 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
- offset, _ = self._offset(partition, timestamp)
+ offsets = self._retrieve_offsets({partition: timestamp})
+ assert partition in offsets
+ offset = offsets[partition][0]
# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)
- def _offset(self, partition, timestamp, timeout_ms=None):
- """Fetch a single offset before the given timestamp for the partition.
+ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
+ """ Fetch offset for each partition passed in ``timestamps`` map.
- Blocks until offset is obtained, or a non-retriable exception is raised
+ Blocks until offsets are obtained, a non-retriable exception is raised
+ or ``timeout_ms`` passed (if it's not ``None``).
Arguments:
- partition The partition that needs fetching offset.
- timestamp (int): timestamp for fetching offset. -1 for the latest
- available, -2 for the earliest available. Otherwise timestamp
- is treated as epoch seconds.
+ timestamps: {TopicPartition: int} dict with timestamps to fetch
+ offsets by. -1 for the latest available, -2 for the earliest
+ available. Otherwise timestamp is treated as epoch miliseconds.
Returns:
- (int, int): message offset and timestamp. None if not available
+ {TopicPartition: (int, int)}: Mapping of partition to
+ retrieved offset and timestamp. If offset does not exist for
+ the provided timestamp, that partition will be missing from
+ this mapping.
"""
+ if not timestamps:
+ return {}
+
start_time = time.time()
remaining_ms = timeout_ms
- while True:
- future = self._send_offset_request(partition, timestamp)
+ while remaining_ms > 0:
+ future = self._send_offset_requests(timestamps)
self._client.poll(future=future, timeout_ms=remaining_ms)
if future.succeeded():
@@ -242,10 +252,10 @@ class Fetcher(six.Iterator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- if timeout_ms is not None:
- remaining_ms = timeout_ms - (time.time() - start_time) * 1000
- if remaining_ms < 0:
- break
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
+ if remaining_ms < 0:
+ break
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
@@ -254,10 +264,9 @@ class Fetcher(six.Iterator):
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
- if timeout_ms is not None:
- remaining_ms = timeout_ms - (time.time() - start_time) * 1000
+ elapsed_ms = (time.time() - start_time) * 1000
+ remaining_ms = timeout_ms - elapsed_ms
- # Will only happen when timeout_ms != None
raise Errors.KafkaTimeoutError(
"Failed to get offsets by times in %s ms" % timeout_ms)
@@ -603,104 +612,130 @@ class Fetcher(six.Iterator):
return f.deserialize(topic, bytes_)
return f(bytes_)
- def _send_offset_request(self, partition, timestamp):
- """Fetch a single offset before the given timestamp for the partition.
+ def _send_offset_requests(self, timestamps):
+ """ Fetch offsets for each partition in timestamps dict. This may send
+ request to multiple nodes, based on who is Leader for partition.
Arguments:
- partition (TopicPartition): partition that needs fetching offset
- timestamp (int): timestamp for fetching offset
+ timestamps (dict): {TopicPartition: int} mapping of fetching
+ timestamps.
Returns:
- Future: resolves to the corresponding offset
+ Future: resolves to a mapping of retrieved offsets
"""
- node_id = self._client.cluster.leader_for_partition(partition)
- if node_id is None:
- log.debug("Partition %s is unknown for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.StaleMetadata(partition))
- elif node_id == -1:
- log.debug("Leader for partition %s unavailable for fetching offset,"
- " wait for metadata refresh", partition)
- return Future().failure(Errors.LeaderNotAvailableError(partition))
+ timestamps_by_node = collections.defaultdict(dict)
+ for partition, timestamp in six.iteritems(timestamps):
+ node_id = self._client.cluster.leader_for_partition(partition)
+ if node_id is None:
+ self._client.add_topic(partition.topic)
+ log.debug("Partition %s is unknown for fetching offset,"
+ " wait for metadata refresh", partition)
+ return Future().failure(Errors.StaleMetadata(partition))
+ elif node_id == -1:
+ log.debug("Leader for partition %s unavailable for fetching "
+ "offset, wait for metadata refresh", partition)
+ return Future().failure(
+ Errors.LeaderNotAvailableError(partition))
+ else:
+ timestamps_by_node[node_id][partition] = timestamp
+
+ # Aggregate results until we have all
+ list_offsets_future = Future()
+ responses = []
+ node_count = len(timestamps_by_node)
+
+ def on_success(value):
+ responses.append(value)
+ if len(responses) == node_count:
+ offsets = {}
+ for r in responses:
+ offsets.update(r)
+ list_offsets_future.success(offsets)
+
+ for node_id, timestamps in six.iteritems(timestamps_by_node):
+ _f = self._send_offset_request(node_id, timestamps)
+ _f.add_callback(on_success)
+ _f.add_errback(lambda e: list_offsets_future.failure(e))
+ return list_offsets_future
+
+ def _send_offset_request(self, node_id, timestamps):
+ by_topic = collections.defaultdict(list)
+ for tp, timestamp in six.iteritems(timestamps):
+ if self.config['api_version'] >= (0, 10, 1):
+ data = (tp.partition, timestamp)
+ else:
+ data = (tp.partition, timestamp, 1)
+ by_topic[tp.topic].append(data)
if self.config['api_version'] >= (0, 10, 1):
- request = OffsetRequest[1](
- -1, [(partition.topic, [(partition.partition, timestamp)])]
- )
+ request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
else:
- request = OffsetRequest[0](
- -1, [(partition.topic, [(partition.partition, timestamp, 1)])]
- )
+ request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))
# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()
+
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_response, partition, future)
+ _f.add_callback(self._handle_offset_response, future)
_f.add_errback(lambda e: future.failure(e))
return future
- def _handle_offset_response(self, partition, future, response):
+ def _handle_offset_response(self, future, response):
"""Callback for the response of the list offset call above.
Arguments:
- partition (TopicPartition): The partition that was fetched
future (Future): the future to update based on response
response (OffsetResponse): response from the server
Raises:
AssertionError: if response does not match partition
"""
- topic, partition_info = response.topics[0]
- assert len(response.topics) == 1 and len(partition_info) == 1, (
- 'OffsetResponse should only be for a single topic-partition')
-
- partition_info = partition_info[0]
- part, error_code = partition_info[:2]
-
- assert topic == partition.topic and part == partition.partition, (
- 'OffsetResponse partition does not match OffsetRequest partition')
-
- error_type = Errors.for_code(error_code)
- if error_type is Errors.NoError:
- if response.API_VERSION == 0:
- offsets = partition_info[2]
- assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
- offset = offsets[0]
- log.debug("Handling v0 ListOffsetResponse response for %s. "
- "Fetched offset %s", partition, offset)
- future.success((offset, None))
- else:
- timestamp, offset = partition_info[2:]
- log.debug("Handling ListOffsetResponse response for %s. "
- "Fetched offset %s, timestamp %s",
- partition, offset, timestamp)
- if offset != UNKNOWN_OFFSET:
- future.success((offset, timestamp))
+ timestamp_offset_map = {}
+ for topic, part_data in response.topics:
+ for partition_info in part_data:
+ partition, error_code = partition_info[:2]
+ partition = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ if response.API_VERSION == 0:
+ offsets = partition_info[2]
+ assert len(offsets) > 1, 'Expected OffsetResponse with one offset'
+ if offsets:
+ offset = offsets[0]
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ timestamp_offset_map[partition] = (offset, None)
+ else:
+ timestamp, offset = partition_info[2:]
+ log.debug("Handling ListOffsetResponse response for %s. "
+ "Fetched offset %s, timestamp %s",
+ partition, offset, timestamp)
+ if offset != UNKNOWN_OFFSET:
+ timestamp_offset_map[partition] = (offset, timestamp)
+ elif error_type is Errors.UnsupportedForMessageFormatError:
+ # The message format on the broker side is before 0.10.0,
+ # we simply put None in the response.
+ log.debug("Cannot search by timestamp for partition %s because the"
+ " message format version is before 0.10.0", partition)
+ elif error_type is Errors.NotLeaderForPartitionError:
+ log.debug("Attempt to fetch offsets for partition %s failed due"
+ " to obsolete leadership information, retrying.",
+ partition)
+ future.failure(error_type(partition))
+ elif error_type is Errors.UnknownTopicOrPartitionError:
+ log.warn("Received unknown topic or partition error in ListOffset "
+ "request for partition %s. The topic/partition " +
+ "may not exist or the user may not have Describe access "
+ "to it.", partition)
+ future.failure(error_type(partition))
else:
- future.success((None, None))
- elif error_type is Errors.UnsupportedForMessageFormatError:
- # The message format on the broker side is before 0.10.0, we simply
- # put None in the response.
- log.debug("Cannot search by timestamp for partition %s because the"
- " message format version is before 0.10.0", partition)
- future.success((None, None))
- elif error_type is Errors.NotLeaderForPartitionError:
- log.debug("Attempt to fetch offsets for partition %s failed due"
- " to obsolete leadership information, retrying.",
- partition)
- future.failure(error_type(partition))
- elif error_type is Errors.UnknownTopicOrPartitionError:
- log.warn("Received unknown topic or partition error in ListOffset "
- "request for partition %s. The topic/partition " +
- "may not exist or the user may not have Describe access "
- "to it.", partition)
- future.failure(error_type(partition))
- else:
- log.warning("Attempt to fetch offsets for partition %s failed due to:"
- " %s", partition, error_type)
- future.failure(error_type(partition))
+ log.warning("Attempt to fetch offsets for partition %s failed due to:"
+ " %s", partition, error_type)
+ future.failure(error_type(partition))
+ if not future.is_done:
+ future.success(timestamp_offset_map)
def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f9b8f16..48a88b2 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -881,7 +881,8 @@ class KafkaConsumer(six.Iterator):
Arguments:
timestamps (dict): ``{TopicPartition: int}`` mapping from partition
- to the timestamp to look up.
+ to the timestamp to look up. Unit should be milliseconds since
+ beginning of the epoch (midnight Jan 1, 1970 (UTC))
Raises:
ValueError: if the target timestamp is negative
@@ -894,6 +895,7 @@ class KafkaConsumer(six.Iterator):
"offsets_for_times API not supported for cluster version {}"
.format(self.config['api_version']))
for tp, ts in timestamps.items():
+ timestamps[tp] = int(ts)
if ts < 0:
raise ValueError(
"The target time for partition {} is {}. The target time "
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2169145..eab93be 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -14,7 +14,9 @@ from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError
)
-from kafka.structs import ProduceRequestPayload, TopicPartition
+from kafka.structs import (
+ ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+)
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
@@ -637,9 +639,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_time(self):
- late_time = int(time.time())
- middle_time = late_time - 1
- early_time = late_time - 2
+ late_time = int(time.time()) * 1000
+ middle_time = late_time - 1000
+ early_time = late_time - 2000
tp = TopicPartition(self.topic, 0)
kafka_producer = self.kafka_producer()
@@ -652,6 +654,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.kafka_consumer()
offsets = consumer.offsets_for_times({tp: early_time})
+ self.assertEqual(len(offsets), 1)
self.assertEqual(offsets[tp].offset, early_msg.offset)
self.assertEqual(offsets[tp].timestamp, early_time)
@@ -663,6 +666,40 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(offsets[tp].offset, late_msg.offset)
self.assertEqual(offsets[tp].timestamp, late_time)
+ # Out of bound timestamps check
+
+ offsets = consumer.offsets_for_times({tp: 0})
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: 9999999999999})
+ self.assertEqual(offsets[tp], None)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_search_many_partitions(self):
+ tp0 = TopicPartition(self.topic, 0)
+ tp1 = TopicPartition(self.topic, 1)
+
+ kafka_producer = self.kafka_producer()
+ send_time = int(time.time() * 1000)
+ p0msg = kafka_producer.send(
+ self.topic, partition=0, value=b"XXX",
+ timestamp_ms=send_time).get()
+ p1msg = kafka_producer.send(
+ self.topic, partition=1, value=b"XXX",
+ timestamp_ms=send_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({
+ tp0: send_time,
+ tp1: send_time
+ })
+
+ self.assertEqual(offsets, {
+ tp0: OffsetAndTimestamp(p0msg.offset, send_time),
+ tp1: OffsetAndTimestamp(p1msg.offset, send_time)
+ })
+
@kafka_versions('<0.10.1')
def test_kafka_consumer_offsets_for_time_old(self):
consumer = self.kafka_consumer()