summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-08-07 13:34:50 +0300
committerGitHub <noreply@github.com>2017-08-07 13:34:50 +0300
commit8cf44847bc91a986c98494c4a23be31c368ef4dd (patch)
treefaad0970119f52542b7a09b0db8abbc61166e694 /kafka/consumer/group.py
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
parent55ded554f9f5b470eeb53500e455ecd87f4d8f87 (diff)
downloadkafka-python-8cf44847bc91a986c98494c4a23be31c368ef4dd.tar.gz
Merge pull request #1161 from dpkp/issue1036_offset_by_time
Added support for offsets_for_times, beginning_offsets and end_offsets APIs.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py109
1 files changed, 108 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6adb154..54a3711 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -6,7 +6,7 @@ import socket
import sys
import time
-from kafka.errors import KafkaConfigurationError
+from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
from kafka.vendor import six
@@ -861,6 +861,113 @@ class KafkaConsumer(six.Iterator):
metrics[k.group][k.name] = v.value()
return metrics
+ def offsets_for_times(self, timestamps):
+ """Look up the offsets for the given partitions by timestamp. The
+ returned offset for each partition is the earliest offset whose
+ timestamp is greater than or equal to the given timestamp in the
+ corresponding partition.
+
+ This is a blocking call. The consumer does not have to be assigned the
+ partitions.
+
+ If the message format version in a partition is before 0.10.0, i.e.
+ the messages do not have timestamps, ``None`` will be returned for that
+ partition. ``None`` will also be returned for the partition if there
+ are no messages in it.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ timestamps (dict): ``{TopicPartition: int}`` mapping from partition
+ to the timestamp to look up. Unit should be milliseconds since
+ beginning of the epoch (midnight Jan 1, 1970 (UTC))
+
+ Returns:
+ ``{TopicPartition: OffsetAndTimestamp}``: mapping from partition
+ to the timestamp and offset of the first message with timestamp
+ greater than or equal to the target timestamp.
+
+ Raises:
+ ValueError: If the target timestamp is negative
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ for tp, ts in timestamps.items():
+ timestamps[tp] = int(ts)
+ if ts < 0:
+ raise ValueError(
+ "The target time for partition {} is {}. The target time "
+ "cannot be negative.".format(tp, ts))
+ return self._fetcher.get_offsets_by_times(
+ timestamps, self.config['request_timeout_ms'])
+
+ def beginning_offsets(self, partitions):
+ """Get the first offset for the given partitions.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The earliest available offsets for the
+ given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms.
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.beginning_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
+ def end_offsets(self, partitions):
+ """Get the last offset for the given partitions. The last offset of a
+ partition is the offset of the upcoming message, i.e. the offset of the
+ last available message + 1.
+
+ This method does not change the current consumer position of the
+ partitions.
+
+ Note:
+ This method may block indefinitely if the partition does not exist.
+
+ Arguments:
+ partitions (list): List of TopicPartition instances to fetch
+ offsets for.
+
+ Returns:
+ ``{TopicPartition: int}``: The end offsets for the given partitions.
+
+ Raises:
+ UnsupportedVersionError: If the broker does not support looking
+ up the offsets by timestamp.
+ KafkaTimeoutError: If fetch failed in request_timeout_ms
+ """
+ if self.config['api_version'] <= (0, 10, 0):
+ raise UnsupportedVersionError(
+ "offsets_for_times API not supported for cluster version {}"
+ .format(self.config['api_version']))
+ offsets = self._fetcher.end_offsets(
+ partitions, self.config['request_timeout_ms'])
+ return offsets
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):