diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 86bcd08..a22b18e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -227,7 +227,8 @@ class Fetcher(six.Iterator): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) offsets = self._retrieve_offsets({partition: timestamp}) - assert partition in offsets + if partition not in offsets: + raise NoOffsetForPartitionError(partition) offset = offsets[partition][0] # we might lose the assignment while fetching the offset, @@ -660,10 +661,14 @@ class Fetcher(six.Iterator): offsets.update(r) list_offsets_future.success(offsets) + def on_fail(err): + if not list_offsets_future.is_done: + list_offsets_future.failure(err) + for node_id, timestamps in six.iteritems(timestamps_by_node): _f = self._send_offset_request(node_id, timestamps) _f.add_callback(on_success) - _f.add_errback(lambda e: list_offsets_future.failure(e)) + _f.add_errback(on_fail) return list_offsets_future def _send_offset_request(self, node_id, timestamps): @@ -710,10 +715,13 @@ class Fetcher(six.Iterator): if response.API_VERSION == 0: offsets = partition_info[2] assert len(offsets) <= 1, 'Expected OffsetResponse with one offset' - if offsets: + if not offsets: + offset = UNKNOWN_OFFSET + else: offset = offsets[0] - log.debug("Handling v0 ListOffsetResponse response for %s. " - "Fetched offset %s", partition, offset) + log.debug("Handling v0 ListOffsetResponse response for %s. " + "Fetched offset %s", partition, offset) + if offset != UNKNOWN_OFFSET: timestamp_offset_map[partition] = (offset, None) else: timestamp, offset = partition_info[2:] @@ -732,16 +740,19 @@ class Fetcher(six.Iterator): " to obsolete leadership information, retrying.", partition) future.failure(error_type(partition)) + return elif error_type is Errors.UnknownTopicOrPartitionError: log.warn("Received unknown topic or partition error in ListOffset " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " "to it.", partition) future.failure(error_type(partition)) + return else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type) future.failure(error_type(partition)) + return if not future.is_done: future.success(timestamp_offset_map) |