diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-01 00:20:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-01 00:20:22 -0800 |
commit | 2a783d047aa97cef80ba964cdd2d8dcaaebb4f66 (patch) | |
tree | b02da0cf4b42d6957a262f3e4f080a81250a27dc | |
parent | 894c9aac50ee9a0b0034ea396a7a13e3b5150114 (diff) | |
download | kafka-python-2a783d047aa97cef80ba964cdd2d8dcaaebb4f66.tar.gz |
Add more assertions in KafkaConsumer (primarily to seek* methods)
-rw-r--r-- | kafka/consumer/group.py | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 65bb670..10f2b3b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -461,9 +461,11 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): partition to check - """ - assert self._subscription.is_assigned(partition) + Returns: + int: offset + """ + assert self._subscription.is_assigned(partition), 'Partition is not assigned' offset = self._subscription.assignment[partition].position if offset is None: self._update_fetch_positions(partition) @@ -529,8 +531,13 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): partition for seek operation offset (int): message offset in partition + + Raises: + AssertionError: if offset is not an int >= 0; or if partition is not + currently assigned. """ - assert offset >= 0 + assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0' + assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) @@ -540,9 +547,18 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions: optionally provide specific TopicPartitions, otherwise default to all assigned partitions + + Raises: + AssertionError: if any partition is not currently assigned, or if + no partitions are assigned """ if not partitions: partitions = self._subscription.assigned_partitions() + assert partitions, 'No partitions are currently assigned' + else: + for p in partitions: + assert p in self._subscription.assigned_partitions(), 'Unassigned partition' + for tp in partitions: log.debug("Seeking to beginning of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) @@ -553,9 +569,18 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions: optionally provide specific TopicPartitions, otherwise default to all assigned partitions + + Raises: + AssertionError: if any partition is not currently assigned, or if + no partitions are assigned """ if not partitions: partitions = self._subscription.assigned_partitions() + assert partitions, 'No partitions are currently assigned' + else: + for p in partitions: + assert p in self._subscription.assigned_partitions(), 'Unassigned partition' + for tp in partitions: log.debug("Seeking to end of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) |