diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-05 17:19:54 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-07 09:34:09 +0000 |
commit | 1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5 (patch) | |
tree | b7521c2a10dedc958bdc506a3fc5d5ce420e4ba5 /kafka | |
parent | efc03d083d323e35a2d32bcbdbccc053f737836e (diff) | |
download | kafka-python-1f69f8f5b875d1b263663bdf6aa2fc17faa4a3e5.tar.gz |
Added `beginning_offsets` and `end_offsets` API's and fixed @jeffwidman review issues
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 23 | ||||
-rw-r--r-- | kafka/consumer/group.py | 87 |
3 files changed, 96 insertions, 16 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d042300..61d63bf 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -887,7 +887,7 @@ class BrokerConnection(object): def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions - # in descending order. As soon as we find one that works, return it + # in reverse order. As soon as we find one that works, return it test_cases = [ # format (<broker verion>, <needed struct>) ((0, 11, 0), MetadataRequest[4]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1a3dfd5..6a7b794 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -193,6 +193,21 @@ class Fetcher(six.Iterator): offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._retrieve_offsets(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp][0] + return offsets + def _reset_offset(self, partition): """Reset offsets for the given partition using the offset reset strategy. @@ -222,10 +237,10 @@ class Fetcher(six.Iterator): self._subscriptions.seek(partition, offset) def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): - """ Fetch offset for each partition passed in ``timestamps`` map. + """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised - or ``timeout_ms`` passed (if it's not ``None``). + or ``timeout_ms`` passed. Arguments: timestamps: {TopicPartition: int} dict with timestamps to fetch @@ -268,7 +283,7 @@ class Fetcher(six.Iterator): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by times in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % timeout_ms) def _raise_if_offset_out_of_range(self): """Check FetchResponses for offset out of range. @@ -613,7 +628,7 @@ class Fetcher(six.Iterator): return f(bytes_) def _send_offset_requests(self, timestamps): - """ Fetch offsets for each partition in timestamps dict. This may send + """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. Arguments: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 48a88b2..54a3711 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -862,33 +862,37 @@ class KafkaConsumer(six.Iterator): return metrics def offsets_for_times(self, timestamps): - """ - Look up the offsets for the given partitions by timestamp. The returned - offset for each partition is the earliest offset whose timestamp is - greater than or equal to the given timestamp in the corresponding - partition. + """Look up the offsets for the given partitions by timestamp. The + returned offset for each partition is the earliest offset whose + timestamp is greater than or equal to the given timestamp in the + corresponding partition. This is a blocking call. The consumer does not have to be assigned the partitions. If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, ``None`` will be returned for that - partition. + partition. ``None`` will also be returned for the partition if there + are no messages in it. Note: - Notice that this method may block indefinitely if the partition - does not exist. + This method may block indefinitely if the partition does not exist. Arguments: timestamps (dict): ``{TopicPartition: int}`` mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)) + Returns: + ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition + to the timestamp and offset of the first message with timestamp + greater than or equal to the target timestamp. + Raises: - ValueError: if the target timestamp is negative - UnsupportedVersionError: if the broker does not support looking + ValueError: If the target timestamp is negative + UnsupportedVersionError: If the broker does not support looking up the offsets by timestamp. - KafkaTimeoutError: if fetch failed in request_timeout_ms + KafkaTimeoutError: If fetch failed in request_timeout_ms """ if self.config['api_version'] <= (0, 10, 0): raise UnsupportedVersionError( @@ -903,6 +907,67 @@ class KafkaConsumer(six.Iterator): return self._fetcher.get_offsets_by_times( timestamps, self.config['request_timeout_ms']) + def beginning_offsets(self, partitions): + """Get the first offset for the given partitions. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The earliest available offsets for the + given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms. + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.beginning_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + + def end_offsets(self, partitions): + """Get the last offset for the given partitions. The last offset of a + partition is the offset of the upcoming message, i.e. the offset of the + last available message + 1. + + This method does not change the current consumer position of the + partitions. + + Note: + This method may block indefinitely if the partition does not exist. + + Arguments: + partitions (list): List of TopicPartition instances to fetch + offsets for. + + Returns: + ``{TopicPartition: int}``: The end offsets for the given partitions. + + Raises: + UnsupportedVersionError: If the broker does not support looking + up the offsets by timestamp. + KafkaTimeoutError: If fetch failed in request_timeout_ms + """ + if self.config['api_version'] <= (0, 10, 0): + raise UnsupportedVersionError( + "offsets_for_times API not supported for cluster version {}" + .format(self.config['api_version'])) + offsets = self._fetcher.end_offsets( + partitions, self.config['request_timeout_ms']) + return offsets + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): |