diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-07-31 12:41:53 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:08 +0000 |
commit | 63992f907aaabc4055d02de60f789443fcb4b54f (patch) | |
tree | 78208f5abef771e624ba9099f0dc274bc171f357 | |
parent | f244e527a9674fa22b0bf9771585598cb758c8b1 (diff) | |
download | kafka-python-63992f907aaabc4055d02de60f789443fcb4b54f.tar.gz |
Changed retrieve_offsets to allow fetching multiple offsets at once
-rw-r--r-- | kafka/consumer/fetcher.py | 225 | ||||
-rw-r--r-- | kafka/consumer/group.py | 4 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 45 |
3 files changed, 174 insertions, 100 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index cb80a6f..19982b1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -184,12 +184,14 @@ class Fetcher(six.Iterator): self._subscriptions.seek(tp, committed) def get_offsets_by_times(self, timestamps, timeout_ms): - response = {} - for tp, timestamp in timestamps.items(): - timestamp = int(timestamp) - offset, tmst = self._offset(tp, timestamp, timeout_ms=timeout_ms) - response[tp] = OffsetAndTimestamp(offset, tmst) - return response + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + else: + offset, timestamp = offsets[tp] + offsets[tp] = OffsetAndTimestamp(offset, timestamp) + return offsets def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -210,31 +212,39 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offset, _ = self._offset(partition, timestamp) + offsets = self._retrieve_offsets({partition: timestamp}) + assert partition in offsets + offset = offsets[partition][0] # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): self._subscriptions.seek(partition, offset) - def _offset(self, partition, timestamp, timeout_ms=None): - """Fetch a single offset before the given timestamp for the partition. + def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + """ Fetch offset for each partition passed in ``timestamps`` map. - Blocks until offset is obtained, or a non-retriable exception is raised + Blocks until offsets are obtained, a non-retriable exception is raised + or ``timeout_ms`` passed (if it's not ``None``). Arguments: - partition The partition that needs fetching offset. - timestamp (int): timestamp for fetching offset. -1 for the latest - available, -2 for the earliest available. Otherwise timestamp - is treated as epoch seconds. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch miliseconds. Returns: - (int, int): message offset and timestamp. None if not available + {TopicPartition: (int, int)}: Mapping of partition to + retrieved offset and timestamp. If offset does not exist for + the provided timestamp, that partition will be missing from + this mapping. """ + if not timestamps: + return {} + start_time = time.time() remaining_ms = timeout_ms - while True: - future = self._send_offset_request(partition, timestamp) + while remaining_ms > 0: + future = self._send_offset_requests(timestamps) self._client.poll(future=future, timeout_ms=remaining_ms) if future.succeeded(): @@ -242,10 +252,10 @@ class Fetcher(six.Iterator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 - if remaining_ms < 0: - break + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms + if remaining_ms < 0: + break if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -254,10 +264,9 @@ class Fetcher(six.Iterator): else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) - if timeout_ms is not None: - remaining_ms = timeout_ms - (time.time() - start_time) * 1000 + elapsed_ms = (time.time() - start_time) * 1000 + remaining_ms = timeout_ms - elapsed_ms - # Will only happen when timeout_ms != None raise Errors.KafkaTimeoutError( "Failed to get offsets by times in %s ms" % timeout_ms) @@ -603,104 +612,130 @@ class Fetcher(six.Iterator): return f.deserialize(topic, bytes_) return f(bytes_) - def _send_offset_request(self, partition, timestamp): - """Fetch a single offset before the given timestamp for the partition. + def _send_offset_requests(self, timestamps): + """ Fetch offsets for each partition in timestamps dict. This may send + request to multiple nodes, based on who is Leader for partition. Arguments: - partition (TopicPartition): partition that needs fetching offset - timestamp (int): timestamp for fetching offset + timestamps (dict): {TopicPartition: int} mapping of fetching + timestamps. Returns: - Future: resolves to the corresponding offset + Future: resolves to a mapping of retrieved offsets """ - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - log.debug("Partition %s is unknown for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching offset," - " wait for metadata refresh", partition) - return Future().failure(Errors.LeaderNotAvailableError(partition)) + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset," + " wait for metadata refresh", partition) + return Future().failure(Errors.StaleMetadata(partition)) + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + return Future().failure( + Errors.LeaderNotAvailableError(partition)) + else: + timestamps_by_node[node_id][partition] = timestamp + + # Aggregate results until we have all + list_offsets_future = Future() + responses = [] + node_count = len(timestamps_by_node) + + def on_success(value): + responses.append(value) + if len(responses) == node_count: + offsets = {} + for r in responses: + offsets.update(r) + list_offsets_future.success(offsets) + + for node_id, timestamps in six.iteritems(timestamps_by_node): + _f = self._send_offset_request(node_id, timestamps) + _f.add_callback(on_success) + _f.add_errback(lambda e: list_offsets_future.failure(e)) + return list_offsets_future + + def _send_offset_request(self, node_id, timestamps): + by_topic = collections.defaultdict(list) + for tp, timestamp in six.iteritems(timestamps): + if self.config['api_version'] >= (0, 10, 1): + data = (tp.partition, timestamp) + else: + data = (tp.partition, timestamp, 1) + by_topic[tp.topic].append(data) if self.config['api_version'] >= (0, 10, 1): - request = OffsetRequest[1]( - -1, [(partition.topic, [(partition.partition, timestamp)])] - ) + request = OffsetRequest[1](-1, list(six.iteritems(by_topic))) else: - request = OffsetRequest[0]( - -1, [(partition.topic, [(partition.partition, timestamp, 1)])] - ) + request = OffsetRequest[0](-1, list(six.iteritems(by_topic))) # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_response, partition, future) + _f.add_callback(self._handle_offset_response, future) _f.add_errback(lambda e: future.failure(e)) return future - def _handle_offset_response(self, partition, future, response): + def _handle_offset_response(self, future, response): """Callback for the response of the list offset call above. Arguments: - partition (TopicPartition): The partition that was fetched future (Future): the future to update based on response response (OffsetResponse): response from the server Raises: AssertionError: if response does not match partition """ - topic, partition_info = response.topics[0] - assert len(response.topics) == 1 and len(partition_info) == 1, ( - 'OffsetResponse should only be for a single topic-partition') - - partition_info = partition_info[0] - part, error_code = partition_info[:2] - - assert topic == partition.topic and part == partition.partition, ( - 'OffsetResponse partition does not match OffsetRequest partition') - - error_type = Errors.for_code(error_code) - if error_type is Errors.NoError: - if response.API_VERSION == 0: - offsets = partition_info[2] - assert len(offsets) == 1, 'Expected OffsetResponse with one offset' - offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) - future.success((offset, None)) - else: - timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetResponse response for %s. " - "Fetched offset %s, timestamp %s", - partition, offset, timestamp) - if offset != UNKNOWN_OFFSET: - future.success((offset, timestamp)) + timestamp_offset_map = {} + for topic, part_data in response.topics: + for partition_info in part_data: + partition, error_code = partition_info[:2] + partition = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + if response.API_VERSION == 0: + offsets = partition_info[2] + assert len(offsets) > 1, 'Expected OffsetResponse with one offset' + if offsets: + offset = offsets[0] + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + timestamp_offset_map[partition] = (offset, None) + else: + timestamp, offset = partition_info[2:] + log.debug("Handling ListOffsetResponse response for %s. " + "Fetched offset %s, timestamp %s", + partition, offset, timestamp) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = (offset, timestamp) + elif error_type is Errors.UnsupportedForMessageFormatError: + # The message format on the broker side is before 0.10.0, + # we simply put None in the response. + log.debug("Cannot search by timestamp for partition %s because the" + " message format version is before 0.10.0", partition) + elif error_type is Errors.NotLeaderForPartitionError: + log.debug("Attempt to fetch offsets for partition %s failed due" + " to obsolete leadership information, retrying.", + partition) + future.failure(error_type(partition)) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warn("Received unknown topic or partition error in ListOffset " + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) + future.failure(error_type(partition)) else: - future.success((None, None)) - elif error_type is Errors.UnsupportedForMessageFormatError: - # The message format on the broker side is before 0.10.0, we simply - # put None in the response. - log.debug("Cannot search by timestamp for partition %s because the" - " message format version is before 0.10.0", partition) - future.success((None, None)) - elif error_type is Errors.NotLeaderForPartitionError: - log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) - future.failure(error_type(partition)) - elif error_type is Errors.UnknownTopicOrPartitionError: - log.warn("Received unknown topic or partition error in ListOffset " - "request for partition %s. The topic/partition " + - "may not exist or the user may not have Describe access " - "to it.", partition) - future.failure(error_type(partition)) - else: - log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) - future.failure(error_type(partition)) + log.warning("Attempt to fetch offsets for partition %s failed due to:" + " %s", partition, error_type) + future.failure(error_type(partition)) + if not future.is_done: + future.success(timestamp_offset_map) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f9b8f16..48a88b2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -881,7 +881,8 @@ class KafkaConsumer(six.Iterator): Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition - to the timestamp to look up. + to the timestamp to look up. Unit should be milliseconds since + beginning of the epoch (midnight Jan 1, 1970 (UTC)) Raises: ValueError: if the target timestamp is negative @@ -894,6 +895,7 @@ class KafkaConsumer(six.Iterator): "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) for tp, ts in timestamps.items(): + timestamps[tp] = int(ts) if ts < 0: raise ValueError( "The target time for partition {} is {}. The target time " diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 2169145..eab93be 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -14,7 +14,9 @@ from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError ) -from kafka.structs import ProduceRequestPayload, TopicPartition +from kafka.structs import ( + ProduceRequestPayload, TopicPartition, OffsetAndTimestamp +) from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( @@ -637,9 +639,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_time(self): - late_time = int(time.time()) - middle_time = late_time - 1 - early_time = late_time - 2 + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 tp = TopicPartition(self.topic, 0) kafka_producer = self.kafka_producer() @@ -652,6 +654,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer() offsets = consumer.offsets_for_times({tp: early_time}) + self.assertEqual(len(offsets), 1) self.assertEqual(offsets[tp].offset, early_msg.offset) self.assertEqual(offsets[tp].timestamp, early_time) @@ -663,6 +666,40 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(offsets[tp].offset, late_msg.offset) self.assertEqual(offsets[tp].timestamp, late_time) + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + self.assertEqual(offsets[tp].offset, early_msg.offset) + self.assertEqual(offsets[tp].timestamp, early_time) + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + self.assertEqual(offsets[tp], None) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_offsets_search_many_partitions(self): + tp0 = TopicPartition(self.topic, 0) + tp1 = TopicPartition(self.topic, 1) + + kafka_producer = self.kafka_producer() + send_time = int(time.time() * 1000) + p0msg = kafka_producer.send( + self.topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get() + p1msg = kafka_producer.send( + self.topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get() + + consumer = self.kafka_consumer() + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + self.assertEqual(offsets, { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + }) + @kafka_versions('<0.10.1') def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() |