diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:16:13 -0800 |
commit | 422050f952344e4796725d88db55a983bae4e1ee (patch) | |
tree | 27900edea1b16218d0dc01c8b5c166d2ec43afc0 | |
parent | 59c051314890a0a6713e6fdb28d74bc3dc053aa9 (diff) | |
download | kafka-python-422050f952344e4796725d88db55a983bae4e1ee.tar.gz |
Prefer assert or more-specific error to IllegalState / IllegalArgument
-rw-r--r-- | kafka/client_async.py | 14 | ||||
-rw-r--r-- | kafka/conn.py | 11 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 20 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 9 | ||||
-rw-r--r-- | kafka/coordinator/abstract.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 19 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 6 | ||||
-rw-r--r-- | kafka/future.py | 12 |
9 files changed, 36 insertions, 69 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 87d616c..d71c9a4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -131,10 +131,9 @@ class KafkaClient(object): return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out() def _initiate_connect(self, node_id): - """Initiate a connection to the given node""" + """Initiate a connection to the given node (must be in metadata)""" broker = self.cluster.broker_metadata(node_id) - if not broker: - raise Errors.IllegalArgumentError('Broker %s not found in current cluster metadata', node_id) + assert broker, 'Broker id %s not in current metadata' % node_id if node_id not in self._conns: log.debug("Initiating connection to node %s at %s:%s", @@ -144,8 +143,7 @@ class KafkaClient(object): return self._finish_connect(node_id) def _finish_connect(self, node_id): - if node_id not in self._conns: - raise Errors.IllegalArgumentError('Node %s not found in connections', node_id) + assert node_id in self._conns, '%s is not in current conns' % node_id state = self._conns[node_id].connect() if state is ConnectionStates.CONNECTING: self._connecting.add(node_id) @@ -242,13 +240,15 @@ class KafkaClient(object): request (Struct): request object (not-encoded) Raises: - IllegalStateError: if node_id is not ready + NodeNotReadyError: if node_id is not ready Returns: Future: resolves to Response struct """ if not self._can_send_request(node_id): - raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id) + raise Errors.NodeNotReadyError("Attempt to send a request to node" + " which is not ready (node id %s)." + % node_id) # Every request gets a response, except one special case: expect_response = True diff --git a/kafka/conn.py b/kafka/conn.py index 3e49841..a1767ef 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -190,9 +190,7 @@ class BrokerConnection(object): Return response if available """ - if self._processing: - raise Errors.IllegalStateError('Recursive connection processing' - ' not supported') + assert not self._processing, 'Recursion not supported' if not self.connected(): log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and @@ -272,11 +270,8 @@ class BrokerConnection(object): return response def _process_response(self, read_buffer): - if self._processing: - raise Errors.IllegalStateError('Recursive connection processing' - ' not supported') - else: - self._processing = True + assert not self._processing, 'Recursion not supported' + self._processing = True ifr = self.in_flight_requests.popleft() # verify send/recv correlation ids match diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index a4be7ae..c133a31 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -371,23 +371,19 @@ class Fetcher(object): response (OffsetResponse): response from the server Raises: - IllegalStateError: if response does not match partition + AssertionError: if response does not match partition """ topic, partition_info = response.topics[0] - if len(response.topics) != 1 or len(partition_info) != 1: - raise Errors.IllegalStateError("OffsetResponse should only be for" - " a single topic-partition") + assert len(response.topics) == 1 and len(partition_info) == 1, ( + 'OffsetResponse should only be for a single topic-partition') part, error_code, offsets = partition_info[0] - if topic != partition.topic or part != partition.partition: - raise Errors.IllegalStateError("OffsetResponse partition does not" - " match OffsetRequest partition") + assert topic == partition.topic and part == partition.partition, ( + 'OffsetResponse partition does not match OffsetRequest partition') error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - if len(offsets) != 1: - raise Errors.IllegalStateError("OffsetResponse should only" - " return a single offset") + assert len(offsets) == 1, 'Expected OffsetResponse with one offset' offset = offsets[0] log.debug("Fetched offset %d for partition %s", offset, partition) future.success(offset) @@ -519,9 +515,7 @@ class Fetcher(object): elif error_type is Errors.UnknownError: log.warn("Unknown error fetching data for topic-partition %s", tp) else: - raise Errors.IllegalStateError("Unexpected error code %s" - " while fetching data" - % error_code) + raise error_type('Unexpected error while fetching data') """TOOD - metrics self.sensors.bytesFetched.record(totalBytes) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 14485d2..90d9d37 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -345,8 +345,7 @@ class KafkaConsumer(object): dict: topic to deque of records since the last fetch for the subscribed list of topics and partitions """ - if timeout_ms < 0: - raise Errors.IllegalArgumentError("Timeout must not be negative") + assert timeout_ms >= 0, 'Timeout must not be negative' # poll for new data until the timeout expires start = time.time() @@ -408,8 +407,8 @@ class KafkaConsumer(object): Arguments: partition (TopicPartition): partition to check """ - if not self._subscription.is_assigned(partition): - raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.") + assert self._subscription.is_assigned(partition) + offset = self._subscription.assignment[partition].consumed if offset is None: self._update_fetch_positions(partition) @@ -454,8 +453,7 @@ class KafkaConsumer(object): partition (TopicPartition): partition for seek operation offset (int): message offset in partition """ - if offset < 0: - raise Errors.IllegalStateError("seek offset must not be a negative number") + assert offset >= 0 log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index fa36bc2..c60f192 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -103,8 +103,7 @@ class SubscriptionState(object): """ if self._user_assignment or (topics and pattern): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) - if not (topics or pattern): - raise IllegalStateError('Must provide topics or a pattern') + assert topics or pattern, 'Must provide topics or pattern' if pattern: log.info('Subscribing to pattern: /%s/', pattern) @@ -341,8 +340,7 @@ class TopicPartitionState(object): self._fetched = None # current fetch position def _set_fetched(self, offset): - if not self.has_valid_position: - raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions") + assert self.has_valid_position, 'Valid consumed/fetch position required' self._fetched = offset def _get_fetched(self): @@ -351,8 +349,7 @@ class TopicPartitionState(object): fetched = property(_get_fetched, _set_fetched, None, "current fetch position") def _set_consumed(self, offset): - if not self.has_valid_position: - raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions") + assert self.has_valid_position, 'Valid consumed/fetch position required' self._consumed = offset def _get_consumed(self): diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index ca5d38d..032ae31 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -72,10 +72,6 @@ class AbstractCoordinator(object): retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ - if not client: - raise Errors.IllegalStateError('a client is required to use' - ' Group Coordinator') - self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d5436c4..7bc10cd 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -99,8 +99,7 @@ class ConsumerCoordinator(AbstractCoordinator): self._subscription = subscription self._partitions_per_topic = {} self._auto_commit_task = None - if not self.config['assignors']: - raise Errors.IllegalStateError('Coordinator requires assignors') + assert self.config['assignors'], 'Coordinator require assignors' self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) @@ -168,10 +167,7 @@ class ConsumerCoordinator(AbstractCoordinator): def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): assignor = self._lookup_assignor(protocol) - if not assignor: - raise Errors.IllegalStateError("Coordinator selected invalid" - " assignment protocol: %s" - % protocol) + assert assignor, 'invalid assignment protocol: %s' % protocol assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -202,10 +198,7 @@ class ConsumerCoordinator(AbstractCoordinator): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - if not assignor: - raise Errors.IllegalStateError("Coordinator selected invalid" - " assignment protocol: %s" - % assignment_strategy) + assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: @@ -581,10 +574,8 @@ class AutoCommitTask(object): pass def _reschedule(self, at): - if self._enabled: - self._client.schedule(self, at) - else: - raise Errors.IllegalStateError('AutoCommitTask not enabled') + assert self._enabled, 'AutoCommitTask not enabled' + self._client.schedule(self, at) def __call__(self): if not self._enabled: diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 41ba025..9a28f5e 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -16,9 +16,9 @@ class Heartbeat(object): if key in configs: self.config[key] = configs[key] - if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']: - raise Errors.IllegalArgumentError("Heartbeat interval must be set" - " lower than the session timeout") + assert (self.config['heartbeat_interval_ms'] + <= self.config['session_timeout_ms'], + 'Heartbeat interval must be lower than the session timeout') self.interval = self.config['heartbeat_interval_ms'] / 1000.0 self.timeout = self.config['session_timeout_ms'] / 1000.0 diff --git a/kafka/future.py b/kafka/future.py index 1f22cb7..958e85f 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -27,10 +27,7 @@ class Future(object): return False def success(self, value): - if self.is_done: - raise Errors.IllegalStateError('Invalid attempt to complete a' - ' request future which is already' - ' complete') + assert not self.is_done, 'Future is already complete' self.value = value self.is_done = True for f in self._callbacks: @@ -41,11 +38,10 @@ class Future(object): return self def failure(self, e): - if self.is_done: - raise Errors.IllegalStateError('Invalid attempt to complete a' - ' request future which is already' - ' complete') + assert not self.is_done, 'Future is already complete' self.exception = e if type(e) is not type else e() + assert isinstance(self.exception, BaseException), ( + 'future failed without an exception') self.is_done = True for f in self._errbacks: try: |