diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
commit | 422050f952344e4796725d88db55a983bae4e1ee (patch) | |
tree | 27900edea1b16218d0dc01c8b5c166d2ec43afc0 /kafka/consumer | |
parent | 59c051314890a0a6713e6fdb28d74bc3dc053aa9 (diff) | |
download | kafka-python-422050f952344e4796725d88db55a983bae4e1ee.tar.gz |
Prefer assert or more-specific error to IllegalState / IllegalArgument
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 20 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 9 |
3 files changed, 14 insertions, 25 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index a4be7ae..c133a31 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -371,23 +371,19 @@ class Fetcher(object): response (OffsetResponse): response from the server Raises: - IllegalStateError: if response does not match partition + AssertionError: if response does not match partition """ topic, partition_info = response.topics[0] - if len(response.topics) != 1 or len(partition_info) != 1: - raise Errors.IllegalStateError("OffsetResponse should only be for" - " a single topic-partition") + assert len(response.topics) == 1 and len(partition_info) == 1, ( + 'OffsetResponse should only be for a single topic-partition') part, error_code, offsets = partition_info[0] - if topic != partition.topic or part != partition.partition: - raise Errors.IllegalStateError("OffsetResponse partition does not" - " match OffsetRequest partition") + assert topic == partition.topic and part == partition.partition, ( + 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - if len(offsets) != 1: - raise Errors.IllegalStateError("OffsetResponse should only" - " return a single offset") + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' offset = offsets[0] log.debug("Fetched offset %d for partition %s", offset, partition) future.success(offset) @@ -519,9 +515,7 @@ class Fetcher(object): elif error_type is Errors.UnknownError: log.warn("Unknown error fetching data for topic-partition %s", tp) else: - raise Errors.IllegalStateError("Unexpected error code %s" - " while fetching data" - % error_code) + raise error_type('Unexpected error while fetching data') """TOOD - metrics self.sensors.bytesFetched.record(totalBytes) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 14485d2..90d9d37 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -345,8 +345,7 @@ class KafkaConsumer(object): dict: topic to deque of records since the last fetch for the subscribed list of topics and partitions """ - if timeout_ms < 0: - raise Errors.IllegalArgumentError("Timeout must not be negative") + assert timeout_ms >= 0, 'Timeout must not be negative' # poll for new data until the timeout expires start = time.time() @@ -408,8 +407,8 @@ class KafkaConsumer(object): Arguments: partition (TopicPartition): partition to check """ - if not self._subscription.is_assigned(partition): - raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.") + assert self._subscription.is_assigned(partition) + offset = self._subscription.assignment[partition].consumed if offset is None: self._update_fetch_positions(partition) @@ -454,8 +453,7 @@ class KafkaConsumer(object): partition (TopicPartition): partition for seek operation offset (int): message offset in partition """ - if offset < 0: - raise Errors.IllegalStateError("seek offset must not be a negative number") + assert offset >= 0 log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index fa36bc2..c60f192 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -103,8 +103,7 @@ class SubscriptionState(object): """ if self._user_assignment or (topics and pattern): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - if not (topics or pattern): - raise IllegalStateError('Must provide topics or a pattern') + assert topics or pattern, 'Must provide topics or pattern' if pattern: log.info('Subscribing to pattern: /%s/', pattern) @@ -341,8 +340,7 @@ class TopicPartitionState(object): self._fetched = None # current fetch position def _set_fetched(self, offset): - if not self.has_valid_position: - raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions") + assert self.has_valid_position, 'Valid consumed/fetch position required' self._fetched = offset def _get_fetched(self): @@ -351,8 +349,7 @@ class TopicPartitionState(object): fetched = property(_get_fetched, _set_fetched, None, "current fetch position") def _set_consumed(self, offset): - if not self.has_valid_position: - raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions") + assert self.has_valid_position, 'Valid consumed/fetch position required' self._consumed = offset def _get_consumed(self): |