summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 16:16:13 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 16:16:13 -0800
commit422050f952344e4796725d88db55a983bae4e1ee (patch)
tree27900edea1b16218d0dc01c8b5c166d2ec43afc0 /kafka/consumer
parent59c051314890a0a6713e6fdb28d74bc3dc053aa9 (diff)
downloadkafka-python-422050f952344e4796725d88db55a983bae4e1ee.tar.gz
Prefer assert or more-specific error to IllegalState / IllegalArgument
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py20
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/consumer/subscription_state.py9
3 files changed, 14 insertions, 25 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index a4be7ae..c133a31 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -371,23 +371,19 @@ class Fetcher(object):
response (OffsetResponse): response from the server
Raises:
- IllegalStateError: if response does not match partition
+ AssertionError: if response does not match partition
"""
topic, partition_info = response.topics[0]
- if len(response.topics) != 1 or len(partition_info) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only be for"
- " a single topic-partition")
+ assert len(response.topics) == 1 and len(partition_info) == 1, (
+ 'OffsetResponse should only be for a single topic-partition')
part, error_code, offsets = partition_info[0]
- if topic != partition.topic or part != partition.partition:
- raise Errors.IllegalStateError("OffsetResponse partition does not"
- " match OffsetRequest partition")
+ assert topic == partition.topic and part == partition.partition, (
+ 'OffsetResponse partition does not match OffsetRequest partition')
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- if len(offsets) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only"
- " return a single offset")
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
offset = offsets[0]
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
@@ -519,9 +515,7 @@ class Fetcher(object):
elif error_type is Errors.UnknownError:
log.warn("Unknown error fetching data for topic-partition %s", tp)
else:
- raise Errors.IllegalStateError("Unexpected error code %s"
- " while fetching data"
- % error_code)
+ raise error_type('Unexpected error while fetching data')
"""TOOD - metrics
self.sensors.bytesFetched.record(totalBytes)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 14485d2..90d9d37 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -345,8 +345,7 @@ class KafkaConsumer(object):
dict: topic to deque of records since the last fetch for the
subscribed list of topics and partitions
"""
- if timeout_ms < 0:
- raise Errors.IllegalArgumentError("Timeout must not be negative")
+ assert timeout_ms >= 0, 'Timeout must not be negative'
# poll for new data until the timeout expires
start = time.time()
@@ -408,8 +407,8 @@ class KafkaConsumer(object):
Arguments:
partition (TopicPartition): partition to check
"""
- if not self._subscription.is_assigned(partition):
- raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.")
+ assert self._subscription.is_assigned(partition)
+
offset = self._subscription.assignment[partition].consumed
if offset is None:
self._update_fetch_positions(partition)
@@ -454,8 +453,7 @@ class KafkaConsumer(object):
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
"""
- if offset < 0:
- raise Errors.IllegalStateError("seek offset must not be a negative number")
+ assert offset >= 0
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index fa36bc2..c60f192 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -103,8 +103,7 @@ class SubscriptionState(object):
"""
if self._user_assignment or (topics and pattern):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
- if not (topics or pattern):
- raise IllegalStateError('Must provide topics or a pattern')
+ assert topics or pattern, 'Must provide topics or pattern'
if pattern:
log.info('Subscribing to pattern: /%s/', pattern)
@@ -341,8 +340,7 @@ class TopicPartitionState(object):
self._fetched = None # current fetch position
def _set_fetched(self, offset):
- if not self.has_valid_position:
- raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions")
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
self._fetched = offset
def _get_fetched(self):
@@ -351,8 +349,7 @@ class TopicPartitionState(object):
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
def _set_consumed(self, offset):
- if not self.has_valid_position:
- raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions")
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
self._consumed = offset
def _get_consumed(self):