summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py77
1 files changed, 75 insertions, 2 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 782783c..16eaf62 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -54,6 +54,15 @@ except ImportError:
class SSLWantWriteError(Exception):
pass
+# needed for SASL_GSSAPI authentication:
+try:
+ import gssapi
+ from gssapi.raw.misc import GSSError
+except ImportError:
+ #no gssapi available, will disable gssapi mechanism
+ gssapi = None
+ GSSError = None
+
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
@@ -167,9 +176,13 @@ class BrokerConnection(object):
'metric_group_prefix': '',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
- 'sasl_plain_password': None
+ 'sasl_plain_password': None,
+ 'sasl_kerberos_service_name':'kafka'
}
- SASL_MECHANISMS = ('PLAIN',)
+ if gssapi is None:
+ SASL_MECHANISMS = ('PLAIN',)
+ else:
+ SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
def __init__(self, host, port, afi, **configs):
self.hostname = host
@@ -206,6 +219,9 @@ class BrokerConnection(object):
if self.config['sasl_mechanism'] == 'PLAIN':
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
+ if self.config['sasl_mechanism'] == 'GSSAPI':
+ assert gssapi is not None, 'GSSAPI lib not available'
+ assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'
self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
@@ -437,6 +453,8 @@ class BrokerConnection(object):
if self.config['sasl_mechanism'] == 'PLAIN':
return self._try_authenticate_plain(future)
+ elif self.config['sasl_mechanism'] == 'GSSAPI':
+ return self._try_authenticate_gssapi(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
@@ -481,6 +499,61 @@ class BrokerConnection(object):
return future.success(True)
+ def _try_authenticate_gssapi(self, future):
+
+ data = b''
+ gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
+ ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
+ ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
+ log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
+ ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
+ #Exchange tokens until authentication either suceeded or failed:
+ received_token = None
+ try:
+ while not ctx_Context.complete:
+ #calculate the output token
+ try:
+ output_token = ctx_Context.step(received_token)
+ except GSSError as e:
+ log.exception("%s: Error invalid token received from server", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+
+ if not output_token:
+ if ctx_Context.complete:
+ log.debug("%s: Security Context complete ", self)
+ log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
+ break
+ try:
+ self._sock.setblocking(True)
+ # Send output token
+ msg = output_token
+ size = Int32.encode(len(msg))
+ self._sock.sendall(size + msg)
+
+ # The server will send a token back. processing of this token either
+ # establishes a security context, or needs further token exchange
+ # the gssapi will be able to identify the needed next step
+ # The connection is closed on failure
+ response = self._sock.recv(2000)
+ self._sock.setblocking(False)
+
+ except (AssertionError, ConnectionError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+ future.failure(error)
+ self.close(error=error)
+
+ #pass the received token back to gssapi, strip the first 4 bytes
+ received_token = response[4:]
+
+ except Exception as e:
+ log.exception("%s: GSSAPI handshake error", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+ future.failure(error)
+ self.close(error=error)
+
+ return future.success(True)
+
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't