diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 14485d2..90d9d37 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -345,8 +345,7 @@ class KafkaConsumer(object): dict: topic to deque of records since the last fetch for the subscribed list of topics and partitions """ - if timeout_ms < 0: - raise Errors.IllegalArgumentError("Timeout must not be negative") + assert timeout_ms >= 0, 'Timeout must not be negative' # poll for new data until the timeout expires start = time.time() @@ -408,8 +407,8 @@ class KafkaConsumer(object): Arguments: partition (TopicPartition): partition to check """ - if not self._subscription.is_assigned(partition): - raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.") + assert self._subscription.is_assigned(partition) + offset = self._subscription.assignment[partition].consumed if offset is None: self._update_fetch_positions(partition) @@ -454,8 +453,7 @@ class KafkaConsumer(object): partition (TopicPartition): partition for seek operation offset (int): message offset in partition """ - if offset < 0: - raise Errors.IllegalStateError("seek offset must not be a negative number") + assert offset >= 0 log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) |