summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 65bb670..10f2b3b 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -461,9 +461,11 @@ class KafkaConsumer(six.Iterator):
Arguments:
partition (TopicPartition): partition to check
- """
- assert self._subscription.is_assigned(partition)
+ Returns:
+ int: offset
+ """
+ assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions(partition)
@@ -529,8 +531,13 @@ class KafkaConsumer(six.Iterator):
Arguments:
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
+
+ Raises:
+ AssertionError: if offset is not an int >= 0; or if partition is not
+ currently assigned.
"""
- assert offset >= 0
+ assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
+ assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
@@ -540,9 +547,18 @@ class KafkaConsumer(six.Iterator):
Arguments:
*partitions: optionally provide specific TopicPartitions, otherwise
default to all assigned partitions
+
+ Raises:
+ AssertionError: if any partition is not currently assigned, or if
+ no partitions are assigned
"""
if not partitions:
partitions = self._subscription.assigned_partitions()
+ assert partitions, 'No partitions are currently assigned'
+ else:
+ for p in partitions:
+ assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
+
for tp in partitions:
log.debug("Seeking to beginning of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
@@ -553,9 +569,18 @@ class KafkaConsumer(six.Iterator):
Arguments:
*partitions: optionally provide specific TopicPartitions, otherwise
default to all assigned partitions
+
+ Raises:
+ AssertionError: if any partition is not currently assigned, or if
+ no partitions are assigned
"""
if not partitions:
partitions = self._subscription.assigned_partitions()
+ assert partitions, 'No partitions are currently assigned'
+ else:
+ for p in partitions:
+ assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
+
for tp in partitions:
log.debug("Seeking to end of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)