summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py10
1 files changed, 4 insertions, 6 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 14485d2..90d9d37 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -345,8 +345,7 @@ class KafkaConsumer(object):
dict: topic to deque of records since the last fetch for the
subscribed list of topics and partitions
"""
- if timeout_ms < 0:
- raise Errors.IllegalArgumentError("Timeout must not be negative")
+ assert timeout_ms >= 0, 'Timeout must not be negative'
# poll for new data until the timeout expires
start = time.time()
@@ -408,8 +407,8 @@ class KafkaConsumer(object):
Arguments:
partition (TopicPartition): partition to check
"""
- if not self._subscription.is_assigned(partition):
- raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.")
+ assert self._subscription.is_assigned(partition)
+
offset = self._subscription.assignment[partition].consumed
if offset is None:
self._update_fetch_positions(partition)
@@ -454,8 +453,7 @@ class KafkaConsumer(object):
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
"""
- if offset < 0:
- raise Errors.IllegalStateError("seek offset must not be a negative number")
+ assert offset >= 0
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)