diff options
| -rw-r--r-- | kafka/client_async.py | 10 | ||||
| -rw-r--r-- | kafka/conn.py | 106 | ||||
| -rw-r--r-- | kafka/consumer/group.py | 10 | ||||
| -rw-r--r-- | kafka/errors.py | 20 | ||||
| -rw-r--r-- | kafka/producer/kafka.py | 13 | ||||
| -rw-r--r-- | kafka/protocol/admin.py | 21 | 
6 files changed, 176 insertions, 4 deletions
| diff --git a/kafka/client_async.py b/kafka/client_async.py index dd4df82..6e07ab0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -70,6 +70,9 @@ class KafkaClient(object):          'selector': selectors.DefaultSelector,          'metrics': None,          'metric_group_prefix': '', +        'sasl_mechanism': None, +        'sasl_plain_username': None, +        'sasl_plain_password': None,      }      API_VERSIONS = [          (0, 10), @@ -150,6 +153,13 @@ class KafkaClient(object):              metrics (kafka.metrics.Metrics): Optionally provide a metrics                  instance for capturing network IO stats. Default: None.              metric_group_prefix (str): Prefix for metric names. Default: '' +            sasl_mechanism (str): string picking sasl mechanism when security_protocol +                is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. +                Default: None +            sasl_plain_username (str): username for sasl PLAIN authentication. +                Default: None +            sasl_plain_password (str): passowrd for sasl PLAIN authentication. +                Defualt: None          """          self.config = copy.copy(self.DEFAULT_CONFIG)          for key in self.config: diff --git a/kafka/conn.py b/kafka/conn.py index 03c445e..2e70165 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -15,6 +15,7 @@ from kafka.vendor import six  import kafka.errors as Errors  from kafka.future import Future  from kafka.protocol.api import RequestHeader +from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse  from kafka.protocol.commit import GroupCoordinatorResponse  from kafka.protocol.types import Int32  from kafka.version import __version__ @@ -48,7 +49,7 @@ class ConnectionStates(object):      CONNECTING = '<connecting>'      HANDSHAKE = '<handshake>'      CONNECTED = '<connected>' - +    AUTHENTICATING = '<authenticating>'  InFlightRequest = collections.namedtuple('InFlightRequest',      ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) @@ -73,6 +74,9 @@ class BrokerConnection(object):          'ssl_password': None,          'api_version': (0, 8, 2),  # default to most restrictive          'state_change_callback': lambda conn: True, +        'sasl_mechanism': None, +        'sasl_plain_username': None, +        'sasl_plain_password': None      }      def __init__(self, host, port, afi, **configs): @@ -188,6 +192,8 @@ class BrokerConnection(object):                  if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):                      log.debug('%s: initiating SSL handshake', str(self))                      self.state = ConnectionStates.HANDSHAKE +                elif self.config['security_protocol'] == 'SASL_PLAINTEXT': +                    self.state = ConnectionStates.AUTHENTICATING                  else:                      self.state = ConnectionStates.CONNECTED                  self.config['state_change_callback'](self) @@ -211,6 +217,15 @@ class BrokerConnection(object):          if self.state is ConnectionStates.HANDSHAKE:              if self._try_handshake():                  log.debug('%s: completed SSL handshake.', str(self)) +                if self.config['security_protocol'] == 'SASL_SSL': +                    self.state = ConnectionStates.AUTHENTICATING +                else: +                    self.state = ConnectionStates.CONNECTED +                self.config['state_change_callback'](self) + +        if self.state is ConnectionStates.AUTHENTICATING: +            if self._try_authenticate(): +                log.debug('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])                  self.state = ConnectionStates.CONNECTED                  self.config['state_change_callback'](self) @@ -273,6 +288,90 @@ class BrokerConnection(object):          return False +    def _try_authenticate(self): +        assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') + +        if self.config['security_protocol'] == 'SASL_PLAINTEXT': +            log.warning('%s: Sending username and password in the clear', str(self)) + +        # Build a SaslHandShakeRequest message +        correlation_id = self._next_correlation_id() +        request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) +        header = RequestHeader(request, +                               correlation_id=correlation_id, +                               client_id=self.config['client_id']) + +        message = b''.join([header.encode(), request.encode()]) +        size = Int32.encode(len(message)) + +        # Attempt to send it over our socket +        try: +            self._sock.setblocking(True) +            self._sock.sendall(size + message) +            self._sock.setblocking(False) +        except (AssertionError, ConnectionError) as e: +            log.exception("Error sending %s to %s", request, self) +            error = Errors.ConnectionError("%s: %s" % (str(self), e)) +            self.close(error=error) +            return False + +        future = Future() +        ifr = InFlightRequest(request=request, +                              correlation_id=correlation_id, +                              response_type=request.RESPONSE_TYPE, +                              future=future, +                              timestamp=time.time()) +        self.in_flight_requests.append(ifr) + +        # Listen for a reply and check that the server supports the PLAIN mechanism +        response = None +        while not response: +            response = self.recv() + +        if not response.error_code is 0: +            raise Errors.for_code(response.error_code) + +        if not self.config['sasl_mechanism'] in response.enabled_mechanisms: +            raise Errors.AuthenticationMethodNotSupported(self.config['sasl_mechanism'] + " is not supported by broker") + +        return self._try_authenticate_plain() + +    def _try_authenticate_plain(self): +        data = b'' +        try: +            self._sock.setblocking(True) +            # Send our credentials +            msg = bytes('\0'.join([self.config['sasl_plain_username'], +                                   self.config['sasl_plain_username'], +                                   self.config['sasl_plain_password']]).encode('utf-8')) +            size = Int32.encode(len(msg)) +            self._sock.sendall(size + msg) + +            # The server will send a zero sized message (that is Int32(0)) on success. +            # The connection is closed on failure +            received_bytes = 0 +            while received_bytes < 4: +                data = data + self._sock.recv(4 - received_bytes) +                received_bytes = received_bytes + len(data) +                if not data: +                    log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) +                    self.close(error=Errors.ConnectionError('Authentication failed')) +                    raise Errors.AuthenticationFailedError('Authentication failed for user {}'.format(self.config['sasl_plain_username'])) +            self._sock.setblocking(False) +        except (AssertionError, ConnectionError) as e: +            log.exception("%s: Error receiving reply from server",  self) +            error = Errors.ConnectionError("%s: %s" % (str(self), e)) +            self.close(error=error) +            return False + +        with io.BytesIO() as buffer: +            buffer.write(data) +            buffer.seek(0) +            if not Int32.decode(buffer) == 0: +                raise Errors.KafkaError('Expected a zero sized reply after sending credentials') + +        return True +      def blacked_out(self):          """          Return true if we are disconnected from the given node and can't @@ -292,7 +391,8 @@ class BrokerConnection(object):          """Returns True if still connecting (this may encompass several          different states, such as SSL handshake, authorization, etc)."""          return self.state in (ConnectionStates.CONNECTING, -                              ConnectionStates.HANDSHAKE) +                              ConnectionStates.HANDSHAKE, +                              ConnectionStates.AUTHENTICATING)      def disconnected(self):          """Return True iff socket is closed""" @@ -385,7 +485,7 @@ class BrokerConnection(object):          Return response if available          """          assert not self._processing, 'Recursion not supported' -        if not self.connected(): +        if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:              log.warning('%s cannot recv: socket not connected', self)              # If requests are pending, we should close the socket and              # fail all the pending request futures diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ed12ec0..489d96d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -186,6 +186,13 @@ class KafkaConsumer(six.Iterator):              (such as offsets) should be exposed to the consumer. If set to True              the only way to receive records from an internal topic is              subscribing to it. Requires 0.10+ Default: True +        sasl_mechanism (str): string picking sasl mechanism when security_protocol +            is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. +            Default: None +        sasl_plain_username (str): username for sasl PLAIN authentication. +            Default: None +        sasl_plain_password (str): passowrd for sasl PLAIN authentication. +            Defualt: None      Note:          Configuration parameters are described in more detail at @@ -234,6 +241,9 @@ class KafkaConsumer(six.Iterator):          'metrics_sample_window_ms': 30000,          'selector': selectors.DefaultSelector,          'exclude_internal_topics': True, +        'sasl_mechanism': None, +        'sasl_plain_username': None, +        'sasl_plain_password': None,      }      def __init__(self, *topics, **configs): diff --git a/kafka/errors.py b/kafka/errors.py index c005bf8..069c9e4 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -58,6 +58,14 @@ class CommitFailedError(KafkaError):      pass +class AuthenticationMethodNotSupported(KafkaError): +    pass + + +class AuthenticationFailedError(KafkaError): +    retriable = False + +  class BrokerResponseError(KafkaError):      errno = None      message = None @@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError):      description = ('The timestamp of the message is out of acceptable range.') +class UnsupportedSaslMechanismError(BrokerResponseError): +    errno = 33 +    message = 'UNSUPPORTED_SASL_MECHANISM' +    description = ('The broker does not support the requested SASL mechanism.') + + +class IllegalSaslStateError(BrokerResponseError): +    errno = 34 +    message = 'ILLEGAL_SASL_STATE' +    description = ('Request is not valid given the current SASL state.') + +  class KafkaUnavailableError(KafkaError):      pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 381ad74..aef50d0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -199,7 +199,8 @@ class KafkaProducer(object):              to kafka brokers up to this number of maximum requests per              broker connection. Default: 5.          security_protocol (str): Protocol used to communicate with brokers. -            Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. +            Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. +            Default: PLAINTEXT.          ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping              socket connections. If provided, all other ssl_* configurations              will be ignored. Default: None. @@ -235,6 +236,13 @@ class KafkaProducer(object):          selector (selectors.BaseSelector): Provide a specific selector              implementation to use for I/O multiplexing.              Default: selectors.DefaultSelector +        sasl_mechanism (str): string picking sasl mechanism when security_protocol +            is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. +            Default: None +        sasl_plain_username (str): username for sasl PLAIN authentication. +            Default: None +        sasl_plain_password (str): passowrd for sasl PLAIN authentication. +            Defualt: None      Note:          Configuration parameters are described in more detail at @@ -276,6 +284,9 @@ class KafkaProducer(object):          'metrics_num_samples': 2,          'metrics_sample_window_ms': 30000,          'selector': selectors.DefaultSelector, +        'sasl_mechanism': None, +        'sasl_plain_username': None, +        'sasl_plain_password': None,      }      def __init__(self, **configs): diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 12181d7..747684f 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct):  DescribeGroupsRequest = [DescribeGroupsRequest_v0]  DescribeGroupsResponse = [DescribeGroupsResponse_v0] + + +class SaslHandShakeResponse_v0(Struct): +    API_KEY = 17 +    API_VERSION = 0 +    SCHEMA = Schema( +        ('error_code', Int16), +        ('enabled_mechanisms', Array(String('utf-8'))) +    ) + + +class SaslHandShakeRequest_v0(Struct): +    API_KEY = 17 +    API_VERSION = 0 +    RESPONSE_TYPE = SaslHandShakeResponse_v0 +    SCHEMA = Schema( +        ('mechanism', String('utf-8')) +    ) + +SaslHandShakeRequest = [SaslHandShakeRequest_v0] +SaslHandShakeResponse = [SaslHandShakeResponse_v0] | 
