summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py14
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 87d616c..d71c9a4 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -131,10 +131,9 @@ class KafkaClient(object):
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
def _initiate_connect(self, node_id):
- """Initiate a connection to the given node"""
+ """Initiate a connection to the given node (must be in metadata)"""
broker = self.cluster.broker_metadata(node_id)
- if not broker:
- raise Errors.IllegalArgumentError('Broker %s not found in current cluster metadata', node_id)
+ assert broker, 'Broker id %s not in current metadata' % node_id
if node_id not in self._conns:
log.debug("Initiating connection to node %s at %s:%s",
@@ -144,8 +143,7 @@ class KafkaClient(object):
return self._finish_connect(node_id)
def _finish_connect(self, node_id):
- if node_id not in self._conns:
- raise Errors.IllegalArgumentError('Node %s not found in connections', node_id)
+ assert node_id in self._conns, '%s is not in current conns' % node_id
state = self._conns[node_id].connect()
if state is ConnectionStates.CONNECTING:
self._connecting.add(node_id)
@@ -242,13 +240,15 @@ class KafkaClient(object):
request (Struct): request object (not-encoded)
Raises:
- IllegalStateError: if node_id is not ready
+ NodeNotReadyError: if node_id is not ready
Returns:
Future: resolves to Response struct
"""
if not self._can_send_request(node_id):
- raise Errors.IllegalStateError("Attempt to send a request to node %s which is not ready." % node_id)
+ raise Errors.NodeNotReadyError("Attempt to send a request to node"
+ " which is not ready (node id %s)."
+ % node_id)
# Every request gets a response, except one special case:
expect_response = True