diff options
-rw-r--r-- | kafka/consumer/group.py | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 65bb670..10f2b3b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -461,9 +461,11 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): partition to check - """ - assert self._subscription.is_assigned(partition) + Returns: + int: offset + """ + assert self._subscription.is_assigned(partition), 'Partition is not assigned' offset = self._subscription.assignment[partition].position if offset is None: self._update_fetch_positions(partition) @@ -529,8 +531,13 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): partition for seek operation offset (int): message offset in partition + + Raises: + AssertionError: if offset is not an int >= 0; or if partition is not + currently assigned. """ - assert offset >= 0 + assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0' + assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) @@ -540,9 +547,18 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions: optionally provide specific TopicPartitions, otherwise default to all assigned partitions + + Raises: + AssertionError: if any partition is not currently assigned, or if + no partitions are assigned """ if not partitions: partitions = self._subscription.assigned_partitions() + assert partitions, 'No partitions are currently assigned' + else: + for p in partitions: + assert p in self._subscription.assigned_partitions(), 'Unassigned partition' + for tp in partitions: log.debug("Seeking to beginning of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) @@ -553,9 +569,18 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions: optionally provide specific TopicPartitions, otherwise default to all assigned partitions + + Raises: + AssertionError: if any partition is not currently assigned, or if + no partitions are assigned """ if not partitions: partitions = self._subscription.assigned_partitions() + assert partitions, 'No partitions are currently assigned' + else: + for p in partitions: + assert p in self._subscription.assigned_partitions(), 'Unassigned partition' + for tp in partitions: log.debug("Seeking to end of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) |