diff options
author | Harald <harald.berghoff@gmx.net> | 2017-07-20 19:09:26 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-07-20 10:09:26 -0700 |
commit | 0c78f704520a42d0935cb87298dd69f8e4af5894 (patch) | |
tree | 15b92d7d80c599664473bf40470bf27489346a09 | |
parent | c8237fc53bf93c72a5530a53654dd3133a96de08 (diff) | |
download | kafka-python-0c78f704520a42d0935cb87298dd69f8e4af5894.tar.gz |
added gssapi support (Kerberos) for SASL (#1152)
-rw-r--r-- | kafka/conn.py | 77 |
1 files changed, 75 insertions, 2 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 782783c..16eaf62 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -54,6 +54,15 @@ except ImportError: class SSLWantWriteError(Exception): pass +# needed for SASL_GSSAPI authentication: +try: + import gssapi + from gssapi.raw.misc import GSSError +except ImportError: + #no gssapi available, will disable gssapi mechanism + gssapi = None + GSSError = None + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' @@ -167,9 +176,13 @@ class BrokerConnection(object): 'metric_group_prefix': '', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': None, - 'sasl_plain_password': None + 'sasl_plain_password': None, + 'sasl_kerberos_service_name':'kafka' } - SASL_MECHANISMS = ('PLAIN',) + if gssapi is None: + SASL_MECHANISMS = ('PLAIN',) + else: + SASL_MECHANISMS = ('PLAIN', 'GSSAPI') def __init__(self, host, port, afi, **configs): self.hostname = host @@ -206,6 +219,9 @@ class BrokerConnection(object): if self.config['sasl_mechanism'] == 'PLAIN': assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + if self.config['sasl_mechanism'] == 'GSSAPI': + assert gssapi is not None, 'GSSAPI lib not available' + assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl' self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() @@ -437,6 +453,8 @@ class BrokerConnection(object): if self.config['sasl_mechanism'] == 'PLAIN': return self._try_authenticate_plain(future) + elif self.config['sasl_mechanism'] == 'GSSAPI': + return self._try_authenticate_gssapi(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -481,6 +499,61 @@ class BrokerConnection(object): return future.success(True) + def _try_authenticate_gssapi(self, future): + + data = b'' + gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname + ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service) + ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) + log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) + ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') + #Exchange tokens until authentication either suceeded or failed: + received_token = None + try: + while not ctx_Context.complete: + #calculate the output token + try: + output_token = ctx_Context.step(received_token) + except GSSError as e: + log.exception("%s: Error invalid token received from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + + if not output_token: + if ctx_Context.complete: + log.debug("%s: Security Context complete ", self) + log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) + break + try: + self._sock.setblocking(True) + # Send output token + msg = output_token + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + + # The server will send a token back. processing of this token either + # establishes a security context, or needs further token exchange + # the gssapi will be able to identify the needed next step + # The connection is closed on failure + response = self._sock.recv(2000) + self._sock.setblocking(False) + + except (AssertionError, ConnectionError) as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + future.failure(error) + self.close(error=error) + + #pass the received token back to gssapi, strip the first 4 bytes + received_token = response[4:] + + except Exception as e: + log.exception("%s: GSSAPI handshake error", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + future.failure(error) + self.close(error=error) + + return future.success(True) + def blacked_out(self): """ Return true if we are disconnected from the given node and can't |