summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py14
-rw-r--r--kafka/conn.py11
-rw-r--r--kafka/consumer/fetcher.py20
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/consumer/subscription_state.py9
-rw-r--r--kafka/coordinator/abstract.py4
-rw-r--r--kafka/coordinator/consumer.py19
-rw-r--r--kafka/coordinator/heartbeat.py6
-rw-r--r--kafka/future.py12
9 files changed, 36 insertions, 69 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 87d616c..d71c9a4 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -131,10 +131,9 @@ class KafkaClient(object):
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
def _initiate_connect(self, node_id):
- """Initiate a connection to the given node"""
+ """Initiate a connection to the given node (must be in metadata)"""
broker = self.cluster.broker_metadata(node_id)
- if not broker:
- raise Errors.IllegalArgumentError('Broker %s not found in current cluster metadata', node_id)
+ assert broker, 'Broker id %s not in current metadata' % node_id
if node_id not in self._conns:
log.debug("Initiating connection to node %s at %s:%s",
@@ -144,8 +143,7 @@ class KafkaClient(object):
return self._finish_connect(node_id)
def _finish_connect(self, node_id):
- if node_id not in self._conns:
- raise Errors.IllegalArgumentError('Node %s not found in connections', node_id)
+ assert node_id in self._conns, '%s is not in current conns' % node_id
state = self._conns[node_id].connect()
if state is ConnectionStates.CONNECTING:
self._connecting.add(node_id)
@@ -242,13 +240,15 @@ class KafkaClient(object):
request (Struct): request object (not-encoded)
Raises:
- IllegalStateError: if node_id is not ready
+ NodeNotReadyError: if node_id is not ready
Returns:
Future: resolves to Response struct
"""
if not self._can_send_request(node_id):
- raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id)
+ raise Errors.NodeNotReadyError("Attempt to send a request to node"
+ " which is not ready (node id %s)."
+ % node_id)
# Every request gets a response, except one special case:
expect_response = True
diff --git a/kafka/conn.py b/kafka/conn.py
index 3e49841..a1767ef 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -190,9 +190,7 @@ class BrokerConnection(object):
Return response if available
"""
- if self._processing:
- raise Errors.IllegalStateError('Recursive connection processing'
- ' not supported')
+ assert not self._processing, 'Recursion not supported'
if not self.connected():
log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
@@ -272,11 +270,8 @@ class BrokerConnection(object):
return response
def _process_response(self, read_buffer):
- if self._processing:
- raise Errors.IllegalStateError('Recursive connection processing'
- ' not supported')
- else:
- self._processing = True
+ assert not self._processing, 'Recursion not supported'
+ self._processing = True
ifr = self.in_flight_requests.popleft()
# verify send/recv correlation ids match
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index a4be7ae..c133a31 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -371,23 +371,19 @@ class Fetcher(object):
response (OffsetResponse): response from the server
Raises:
- IllegalStateError: if response does not match partition
+ AssertionError: if response does not match partition
"""
topic, partition_info = response.topics[0]
- if len(response.topics) != 1 or len(partition_info) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only be for"
- " a single topic-partition")
+ assert len(response.topics) == 1 and len(partition_info) == 1, (
+ 'OffsetResponse should only be for a single topic-partition')
part, error_code, offsets = partition_info[0]
- if topic != partition.topic or part != partition.partition:
- raise Errors.IllegalStateError("OffsetResponse partition does not"
- " match OffsetRequest partition")
+ assert topic == partition.topic and part == partition.partition, (
+ 'OffsetResponse partition does not match OffsetRequest partition')
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- if len(offsets) != 1:
- raise Errors.IllegalStateError("OffsetResponse should only"
- " return a single offset")
+ assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
offset = offsets[0]
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
@@ -519,9 +515,7 @@ class Fetcher(object):
elif error_type is Errors.UnknownError:
log.warn("Unknown error fetching data for topic-partition %s", tp)
else:
- raise Errors.IllegalStateError("Unexpected error code %s"
- " while fetching data"
- % error_code)
+ raise error_type('Unexpected error while fetching data')
"""TOOD - metrics
self.sensors.bytesFetched.record(totalBytes)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 14485d2..90d9d37 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -345,8 +345,7 @@ class KafkaConsumer(object):
dict: topic to deque of records since the last fetch for the
subscribed list of topics and partitions
"""
- if timeout_ms < 0:
- raise Errors.IllegalArgumentError("Timeout must not be negative")
+ assert timeout_ms >= 0, 'Timeout must not be negative'
# poll for new data until the timeout expires
start = time.time()
@@ -408,8 +407,8 @@ class KafkaConsumer(object):
Arguments:
partition (TopicPartition): partition to check
"""
- if not self._subscription.is_assigned(partition):
- raise Errors.IllegalStateError("You can only check the position for partitions assigned to this consumer.")
+ assert self._subscription.is_assigned(partition)
+
offset = self._subscription.assignment[partition].consumed
if offset is None:
self._update_fetch_positions(partition)
@@ -454,8 +453,7 @@ class KafkaConsumer(object):
partition (TopicPartition): partition for seek operation
offset (int): message offset in partition
"""
- if offset < 0:
- raise Errors.IllegalStateError("seek offset must not be a negative number")
+ assert offset >= 0
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index fa36bc2..c60f192 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -103,8 +103,7 @@ class SubscriptionState(object):
"""
if self._user_assignment or (topics and pattern):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
- if not (topics or pattern):
- raise IllegalStateError('Must provide topics or a pattern')
+ assert topics or pattern, 'Must provide topics or pattern'
if pattern:
log.info('Subscribing to pattern: /%s/', pattern)
@@ -341,8 +340,7 @@ class TopicPartitionState(object):
self._fetched = None # current fetch position
def _set_fetched(self, offset):
- if not self.has_valid_position:
- raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions")
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
self._fetched = offset
def _get_fetched(self):
@@ -351,8 +349,7 @@ class TopicPartitionState(object):
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
def _set_consumed(self, offset):
- if not self.has_valid_position:
- raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions")
+ assert self.has_valid_position, 'Valid consumed/fetch position required'
self._consumed = offset
def _get_consumed(self):
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index ca5d38d..032ae31 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -72,10 +72,6 @@ class AbstractCoordinator(object):
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
- if not client:
- raise Errors.IllegalStateError('a client is required to use'
- ' Group Coordinator')
-
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index d5436c4..7bc10cd 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -99,8 +99,7 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
- if not self.config['assignors']:
- raise Errors.IllegalStateError('Coordinator requires assignors')
+ assert self.config['assignors'], 'Coordinator require assignors'
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
@@ -168,10 +167,7 @@ class ConsumerCoordinator(AbstractCoordinator):
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
assignor = self._lookup_assignor(protocol)
- if not assignor:
- raise Errors.IllegalStateError("Coordinator selected invalid"
- " assignment protocol: %s"
- % protocol)
+ assert assignor, 'invalid assignment protocol: %s' % protocol
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
@@ -202,10 +198,7 @@ class ConsumerCoordinator(AbstractCoordinator):
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
- if not assignor:
- raise Errors.IllegalStateError("Coordinator selected invalid"
- " assignment protocol: %s"
- % assignment_strategy)
+ assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
@@ -581,10 +574,8 @@ class AutoCommitTask(object):
pass
def _reschedule(self, at):
- if self._enabled:
- self._client.schedule(self, at)
- else:
- raise Errors.IllegalStateError('AutoCommitTask not enabled')
+ assert self._enabled, 'AutoCommitTask not enabled'
+ self._client.schedule(self, at)
def __call__(self):
if not self._enabled:
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 41ba025..9a28f5e 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -16,9 +16,9 @@ class Heartbeat(object):
if key in configs:
self.config[key] = configs[key]
- if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']:
- raise Errors.IllegalArgumentError("Heartbeat interval must be set"
- " lower than the session timeout")
+ assert (self.config['heartbeat_interval_ms']
+ <= self.config['session_timeout_ms'],
+ 'Heartbeat interval must be lower than the session timeout')
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
self.timeout = self.config['session_timeout_ms'] / 1000.0
diff --git a/kafka/future.py b/kafka/future.py
index 1f22cb7..958e85f 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -27,10 +27,7 @@ class Future(object):
return False
def success(self, value):
- if self.is_done:
- raise Errors.IllegalStateError('Invalid attempt to complete a'
- ' request future which is already'
- ' complete')
+ assert not self.is_done, 'Future is already complete'
self.value = value
self.is_done = True
for f in self._callbacks:
@@ -41,11 +38,10 @@ class Future(object):
return self
def failure(self, e):
- if self.is_done:
- raise Errors.IllegalStateError('Invalid attempt to complete a'
- ' request future which is already'
- ' complete')
+ assert not self.is_done, 'Future is already complete'
self.exception = e if type(e) is not type else e()
+ assert isinstance(self.exception, BaseException), (
+ 'future failed without an exception')
self.is_done = True
for f in self._errbacks:
try: