summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py106
1 files changed, 103 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 03c445e..2e70165 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -15,6 +15,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
+from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -48,7 +49,7 @@ class ConnectionStates(object):
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'
-
+ AUTHENTICATING = '<authenticating>'
InFlightRequest = collections.namedtuple('InFlightRequest',
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
@@ -73,6 +74,9 @@ class BrokerConnection(object):
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
+ 'sasl_mechanism': None,
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None
}
def __init__(self, host, port, afi, **configs):
@@ -188,6 +192,8 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE
+ elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
+ self.state = ConnectionStates.AUTHENTICATING
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -211,6 +217,15 @@ class BrokerConnection(object):
if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self))
+ if self.config['security_protocol'] == 'SASL_SSL':
+ self.state = ConnectionStates.AUTHENTICATING
+ else:
+ self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
+
+ if self.state is ConnectionStates.AUTHENTICATING:
+ if self._try_authenticate():
+ log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -273,6 +288,90 @@ class BrokerConnection(object):
return False
+ def _try_authenticate(self):
+ assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
+
+ if self.config['security_protocol'] == 'SASL_PLAINTEXT':
+ log.warning('%s: Sending username and password in the clear', str(self))
+
+ # Build a SaslHandShakeRequest message
+ correlation_id = self._next_correlation_id()
+ request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
+ header = RequestHeader(request,
+ correlation_id=correlation_id,
+ client_id=self.config['client_id'])
+
+ message = b''.join([header.encode(), request.encode()])
+ size = Int32.encode(len(message))
+
+ # Attempt to send it over our socket
+ try:
+ self._sock.setblocking(True)
+ self._sock.sendall(size + message)
+ self._sock.setblocking(False)
+ except (AssertionError, ConnectionError) as e:
+ log.exception("Error sending %s to %s", request, self)
+ error = Errors.ConnectionError("%s: %s" % (str(self), e))
+ self.close(error=error)
+ return False
+
+ future = Future()
+ ifr = InFlightRequest(request=request,
+ correlation_id=correlation_id,
+ response_type=request.RESPONSE_TYPE,
+ future=future,
+ timestamp=time.time())
+ self.in_flight_requests.append(ifr)
+
+ # Listen for a reply and check that the server supports the PLAIN mechanism
+ response = None
+ while not response:
+ response = self.recv()
+
+ if not response.error_code is 0:
+ raise Errors.for_code(response.error_code)
+
+ if not self.config['sasl_mechanism'] in response.enabled_mechanisms:
+ raise Errors.AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker")
+
+ return self._try_authenticate_plain()
+
+ def _try_authenticate_plain(self):
+ data = b''
+ try:
+ self._sock.setblocking(True)
+ # Send our credentials
+ msg = bytes('\0'.join([self.config['sasl_plain_username'],
+ self.config['sasl_plain_username'],
+ self.config['sasl_plain_password']]).encode('utf-8'))
+ size = Int32.encode(len(msg))
+ self._sock.sendall(size + msg)
+
+ # The server will send a zero sized message (that is Int32(0)) on success.
+ # The connection is closed on failure
+ received_bytes = 0
+ while received_bytes < 4:
+ data = data + self._sock.recv(4 - received_bytes)
+ received_bytes = received_bytes + len(data)
+ if not data:
+ log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
+ self.close(error=Errors.ConnectionError('Authentication failed'))
+ raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username']))
+ self._sock.setblocking(False)
+ except (AssertionError, ConnectionError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (str(self), e))
+ self.close(error=error)
+ return False
+
+ with io.BytesIO() as buffer:
+ buffer.write(data)
+ buffer.seek(0)
+ if not Int32.decode(buffer) == 0:
+ raise Errors.KafkaError('Expected a zero sized reply after sending credentials')
+
+ return True
+
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
@@ -292,7 +391,8 @@ class BrokerConnection(object):
"""Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING,
- ConnectionStates.HANDSHAKE)
+ ConnectionStates.HANDSHAKE,
+ ConnectionStates.AUTHENTICATING)
def disconnected(self):
"""Return True iff socket is closed"""
@@ -385,7 +485,7 @@ class BrokerConnection(object):
Return response if available
"""
assert not self._processing, 'Recursion not supported'
- if not self.connected():
+ if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
# fail all the pending request futures