diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-14 23:40:40 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 26e18ce0c032e2801cbbe5d9f444107b8ab4919a (patch) | |
tree | 5d8913dd1fe6a9d2cfa2c155829a490bab3ad5b4 | |
parent | 9dd7d7e07ca4dd2c47aed9371844c5006b56e4a9 (diff) | |
download | kafka-python-26e18ce0c032e2801cbbe5d9f444107b8ab4919a.tar.gz |
Add docstring to get_partition_offsets; use request_time_ms and max_num_offsets var names
-rw-r--r-- | kafka/consumer/new.py | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index e7d38de..04696af 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -534,11 +534,11 @@ class KafkaConsumer(object): LATEST = -1 EARLIEST = -2 - RequestTime = None + request_time_ms = None if self._config['auto_offset_reset'] == 'largest': - RequestTime = LATEST + request_time_ms = LATEST elif self._config['auto_offset_reset'] == 'smallest': - RequestTime = EARLIEST + request_time_ms = EARLIEST else: # Let's raise an reasonable exception type if user calls @@ -553,12 +553,30 @@ class KafkaConsumer(object): # the request that triggered it, and we do not want to drop that raise - (offset, ) = self.get_partition_offsets(topic, partition, RequestTime, - num_offsets=1) + (offset, ) = self.get_partition_offsets(topic, partition, + request_time_ms, max_num_offsets=1) return offset - def get_partition_offsets(self, topic, partition, request_time, num_offsets): - reqs = [OffsetRequest(topic, partition, request_time, num_offsets)] + def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): + """ + Request available fetch offsets for a single topic/partition + + @param topic (str) + @param partition (int) + @param request_time_ms (int) -- Used to ask for all messages before a + certain time (ms). There are two special + values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming + message) and -2 to receive the earliest + available offset. Note that because offsets + are pulled in descending order, asking for + the earliest offset will always return you + a single element. + @param max_num_offsets (int) + + @return offsets (list) + """ + reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) |