diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 106 |
1 files changed, 103 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 03c445e..2e70165 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -15,6 +15,7 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.protocol.api import RequestHeader +from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -48,7 +49,7 @@ class ConnectionStates(object): CONNECTING = '<connecting>' HANDSHAKE = '<handshake>' CONNECTED = '<connected>' - + AUTHENTICATING = '<authenticating>' InFlightRequest = collections.namedtuple('InFlightRequest', ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) @@ -73,6 +74,9 @@ class BrokerConnection(object): 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, + 'sasl_mechanism': None, + 'sasl_plain_username': None, + 'sasl_plain_password': None } def __init__(self, host, port, afi, **configs): @@ -188,6 +192,8 @@ class BrokerConnection(object): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', str(self)) self.state = ConnectionStates.HANDSHAKE + elif self.config['security_protocol'] == 'SASL_PLAINTEXT': + self.state = ConnectionStates.AUTHENTICATING else: self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) @@ -211,6 +217,15 @@ class BrokerConnection(object): if self.state is ConnectionStates.HANDSHAKE: if self._try_handshake(): log.debug('%s: completed SSL handshake.', str(self)) + if self.config['security_protocol'] == 'SASL_SSL': + self.state = ConnectionStates.AUTHENTICATING + else: + self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) + + if self.state is ConnectionStates.AUTHENTICATING: + if self._try_authenticate(): + log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username']) self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) @@ -273,6 +288,90 @@ class BrokerConnection(object): return False + def _try_authenticate(self): + assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') + + if self.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Sending username and password in the clear', str(self)) + + # Build a SaslHandShakeRequest message + correlation_id = self._next_correlation_id() + request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) + header = RequestHeader(request, + correlation_id=correlation_id, + client_id=self.config['client_id']) + + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + + # Attempt to send it over our socket + try: + self._sock.setblocking(True) + self._sock.sendall(size + message) + self._sock.setblocking(False) + except (AssertionError, ConnectionError) as e: + log.exception("Error sending %s to %s", request, self) + error = Errors.ConnectionError("%s: %s" % (str(self), e)) + self.close(error=error) + return False + + future = Future() + ifr = InFlightRequest(request=request, + correlation_id=correlation_id, + response_type=request.RESPONSE_TYPE, + future=future, + timestamp=time.time()) + self.in_flight_requests.append(ifr) + + # Listen for a reply and check that the server supports the PLAIN mechanism + response = None + while not response: + response = self.recv() + + if not response.error_code is 0: + raise Errors.for_code(response.error_code) + + if not self.config['sasl_mechanism'] in response.enabled_mechanisms: + raise Errors.AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker") + + return self._try_authenticate_plain() + + def _try_authenticate_plain(self): + data = b'' + try: + self._sock.setblocking(True) + # Send our credentials + msg = bytes('\0'.join([self.config['sasl_plain_username'], + self.config['sasl_plain_username'], + self.config['sasl_plain_password']]).encode('utf-8')) + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + received_bytes = 0 + while received_bytes < 4: + data = data + self._sock.recv(4 - received_bytes) + received_bytes = received_bytes + len(data) + if not data: + log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) + self.close(error=Errors.ConnectionError('Authentication failed')) + raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username'])) + self._sock.setblocking(False) + except (AssertionError, ConnectionError) as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (str(self), e)) + self.close(error=error) + return False + + with io.BytesIO() as buffer: + buffer.write(data) + buffer.seek(0) + if not Int32.decode(buffer) == 0: + raise Errors.KafkaError('Expected a zero sized reply after sending credentials') + + return True + def blacked_out(self): """ Return true if we are disconnected from the given node and can't @@ -292,7 +391,8 @@ class BrokerConnection(object): """Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).""" return self.state in (ConnectionStates.CONNECTING, - ConnectionStates.HANDSHAKE) + ConnectionStates.HANDSHAKE, + ConnectionStates.AUTHENTICATING) def disconnected(self): """Return True iff socket is closed""" @@ -385,7 +485,7 @@ class BrokerConnection(object): Return response if available """ assert not self._processing, 'Recursion not supported' - if not self.connected(): + if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and # fail all the pending request futures |