summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-14 23:40:40 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit26e18ce0c032e2801cbbe5d9f444107b8ab4919a (patch)
tree5d8913dd1fe6a9d2cfa2c155829a490bab3ad5b4
parent9dd7d7e07ca4dd2c47aed9371844c5006b56e4a9 (diff)
downloadkafka-python-26e18ce0c032e2801cbbe5d9f444107b8ab4919a.tar.gz
Add docstring to get_partition_offsets; use request_time_ms and max_num_offsets var names
-rw-r--r--kafka/consumer/new.py32
1 files changed, 25 insertions, 7 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index e7d38de..04696af 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -534,11 +534,11 @@ class KafkaConsumer(object):
LATEST = -1
EARLIEST = -2
- RequestTime = None
+ request_time_ms = None
if self._config['auto_offset_reset'] == 'largest':
- RequestTime = LATEST
+ request_time_ms = LATEST
elif self._config['auto_offset_reset'] == 'smallest':
- RequestTime = EARLIEST
+ request_time_ms = EARLIEST
else:
# Let's raise an reasonable exception type if user calls
@@ -553,12 +553,30 @@ class KafkaConsumer(object):
# the request that triggered it, and we do not want to drop that
raise
- (offset, ) = self.get_partition_offsets(topic, partition, RequestTime,
- num_offsets=1)
+ (offset, ) = self.get_partition_offsets(topic, partition,
+ request_time_ms, max_num_offsets=1)
return offset
- def get_partition_offsets(self, topic, partition, request_time, num_offsets):
- reqs = [OffsetRequest(topic, partition, request_time, num_offsets)]
+ def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
+ """
+ Request available fetch offsets for a single topic/partition
+
+ @param topic (str)
+ @param partition (int)
+ @param request_time_ms (int) -- Used to ask for all messages before a
+ certain time (ms). There are two special
+ values. Specify -1 to receive the latest
+ offset (i.e. the offset of the next coming
+ message) and -2 to receive the earliest
+ available offset. Note that because offsets
+ are pulled in descending order, asking for
+ the earliest offset will always return you
+ a single element.
+ @param max_num_offsets (int)
+
+ @return offsets (list)
+ """
+ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)