summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 16:16:13 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 16:16:13 -0800
commit422050f952344e4796725d88db55a983bae4e1ee (patch)
tree27900edea1b16218d0dc01c8b5c166d2ec43afc0 /kafka/coordinator/consumer.py
parent59c051314890a0a6713e6fdb28d74bc3dc053aa9 (diff)
downloadkafka-python-422050f952344e4796725d88db55a983bae4e1ee.tar.gz
Prefer assert or more-specific error to IllegalState / IllegalArgument
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py19
1 files changed, 5 insertions, 14 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index d5436c4..7bc10cd 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -99,8 +99,7 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
- if not self.config['assignors']:
- raise Errors.IllegalStateError('Coordinator requires assignors')
+ assert self.config['assignors'], 'Coordinator require assignors'
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
@@ -168,10 +167,7 @@ class ConsumerCoordinator(AbstractCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
assignor = self._lookup_assignor(protocol)
- if not assignor:
- raise Errors.IllegalStateError("Coordinator selected invalid"
- " assignment protocol: %s"
- % protocol)
+ assert assignor, 'invalid assignment protocol: %s' % protocol
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
@@ -202,10 +198,7 @@ class ConsumerCoordinator(AbstractCoordinator):
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
- if not assignor:
- raise Errors.IllegalStateError("Coordinator selected invalid"
- " assignment protocol: %s"
- % assignment_strategy)
+ assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
@@ -581,10 +574,8 @@ class AutoCommitTask(object):
pass
def _reschedule(self, at):
- if self._enabled:
- self._client.schedule(self, at)
- else:
- raise Errors.IllegalStateError('AutoCommitTask not enabled')
+ assert self._enabled, 'AutoCommitTask not enabled'
+ self._client.schedule(self, at)
def __call__(self):
if not self._enabled: