summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py21
1 files changed, 16 insertions, 5 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 86bcd08..a22b18e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -227,7 +227,8 @@ class Fetcher(six.Iterator):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
- assert partition in offsets
+ if partition not in offsets:
+ raise NoOffsetForPartitionError(partition)
offset = offsets[partition][0]
# we might lose the assignment while fetching the offset,
@@ -660,10 +661,14 @@ class Fetcher(six.Iterator):
offsets.update(r)
list_offsets_future.success(offsets)
+ def on_fail(err):
+ if not list_offsets_future.is_done:
+ list_offsets_future.failure(err)
+
for node_id, timestamps in six.iteritems(timestamps_by_node):
_f = self._send_offset_request(node_id, timestamps)
_f.add_callback(on_success)
- _f.add_errback(lambda e: list_offsets_future.failure(e))
+ _f.add_errback(on_fail)
return list_offsets_future
def _send_offset_request(self, node_id, timestamps):
@@ -710,10 +715,13 @@ class Fetcher(six.Iterator):
if response.API_VERSION == 0:
offsets = partition_info[2]
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
- if offsets:
+ if not offsets:
+ offset = UNKNOWN_OFFSET
+ else:
offset = offsets[0]
- log.debug("Handling v0 ListOffsetResponse response for %s. "
- "Fetched offset %s", partition, offset)
+ log.debug("Handling v0 ListOffsetResponse response for %s. "
+ "Fetched offset %s", partition, offset)
+ if offset != UNKNOWN_OFFSET:
timestamp_offset_map[partition] = (offset, None)
else:
timestamp, offset = partition_info[2:]
@@ -732,16 +740,19 @@ class Fetcher(six.Iterator):
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
+ return
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warn("Received unknown topic or partition error in ListOffset "
"request for partition %s. The topic/partition " +
"may not exist or the user may not have Describe access "
"to it.", partition)
future.failure(error_type(partition))
+ return
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
+ return
if not future.is_done:
future.success(timestamp_offset_map)