summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py20
1 files changed, 7 insertions, 13 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index a4be7ae..c133a31 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -371,23 +371,19 @@ class Fetcher(object):
response (OffsetResponse): response from the server
Raises:
- IllegalStateError: if response does not match partition
+ AssertionError: if response does not match partition
"""
topic, partition_info = response.topics[0]
- if len(response.topics) != 1 or len(partition_info) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only be for"
- " a single topic-partition")
+ assert len(response.topics) == 1 and len(partition_info) == 1, (
+ 'OffsetResponse should only be for a single topic-partition')
part, error_code, offsets = partition_info[0]
- if topic != partition.topic or part != partition.partition:
- raise Errors.IllegalStateError("OffsetResponse partition does not"
- " match OffsetRequest partition")
+ assert topic == partition.topic and part == partition.partition, (
+ 'OffsetResponse partition does not match OffsetRequest partition')
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- if len(offsets) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only"
- " return a single offset")
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
offset = offsets[0]
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
@@ -519,9 +515,7 @@ class Fetcher(object):
elif error_type is Errors.UnknownError:
log.warn("Unknown error fetching data for topic-partition %s", tp)
else:
- raise Errors.IllegalStateError("Unexpected error code %s"
- " while fetching data"
- % error_code)
+ raise error_type('Unexpected error while fetching data')
"""TOOD - metrics
self.sensors.bytesFetched.record(totalBytes)