diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
commit | 422050f952344e4796725d88db55a983bae4e1ee (patch) | |
tree | 27900edea1b16218d0dc01c8b5c166d2ec43afc0 /kafka/coordinator/consumer.py | |
parent | 59c051314890a0a6713e6fdb28d74bc3dc053aa9 (diff) | |
download | kafka-python-422050f952344e4796725d88db55a983bae4e1ee.tar.gz |
Prefer assert or more-specific error to IllegalState / IllegalArgument
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 19 |
1 files changed, 5 insertions, 14 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d5436c4..7bc10cd 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -99,8 +99,7 @@ class ConsumerCoordinator(AbstractCoordinator): self._subscription = subscription self._partitions_per_topic = {} self._auto_commit_task = None - if not self.config['assignors']: - raise Errors.IllegalStateError('Coordinator requires assignors') + assert self.config['assignors'], 'Coordinator require assignors' self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) @@ -168,10 +167,7 @@ class ConsumerCoordinator(AbstractCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): assignor = self._lookup_assignor(protocol) - if not assignor: - raise Errors.IllegalStateError("Coordinator selected invalid" - " assignment protocol: %s" - % protocol) + assert assignor, 'invalid assignment protocol: %s' % protocol assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -202,10 +198,7 @@ class ConsumerCoordinator(AbstractCoordinator): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - if not assignor: - raise Errors.IllegalStateError("Coordinator selected invalid" - " assignment protocol: %s" - % assignment_strategy) + assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: @@ -581,10 +574,8 @@ class AutoCommitTask(object): pass def _reschedule(self, at): - if self._enabled: - self._client.schedule(self, at) - else: - raise Errors.IllegalStateError('AutoCommitTask not enabled') + assert self._enabled, 'AutoCommitTask not enabled' + self._client.schedule(self, at) def __call__(self): if not self._enabled: |