diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
commit | 0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch) | |
tree | 54c8520e94af2d72ca715c4db9bb855fbfa5574d /kafka/conn.py | |
parent | cda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff) | |
parent | 097198cceaed97d5b804166d0c76a816c8dfead0 (diff) | |
download | kafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz |
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 94 |
1 files changed, 91 insertions, 3 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 28c09d9..f13ab64 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,7 @@ import logging import io from random import shuffle import socket +import ssl import struct from threading import local import time @@ -29,11 +30,25 @@ log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +# support older ssl libraries +try: + assert ssl.SSLWantReadError + assert ssl.SSLWantWriteError + assert ssl.SSLZeroReturnError +except: + log.warning('old ssl module detected.' + ' ssl error handling may not operate cleanly.' + ' Consider upgrading to python 3.5 or 2.7') + ssl.SSLWantReadError = ssl.SSLError + ssl.SSLWantWriteError = ssl.SSLError + ssl.SSLZeroReturnError = ssl.SSLError + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' + HANDSHAKE = '<handshake>' CONNECTED = '<connected>' @@ -49,6 +64,12 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, } @@ -66,6 +87,9 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self._sock = None + self._ssl_context = None + if self.config['ssl_context'] is not None: + self._ssl_context = self.config['ssl_context'] self._rbuffer = io.BytesIO() self._receiving = False self._next_payload_bytes = 0 @@ -87,6 +111,8 @@ class BrokerConnection(object): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.config['send_buffer_bytes']) self._sock.setblocking(False) + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + self._wrap_ssl() self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() self.config['state_change_callback'](self) @@ -103,7 +129,11 @@ class BrokerConnection(object): # Connection succeeded if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) - self.state = ConnectionStates.CONNECTED + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + log.debug('%s: initiating SSL handshake', str(self)) + self.state = ConnectionStates.HANDSHAKE + else: + self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) # Connection failed @@ -122,8 +152,60 @@ class BrokerConnection(object): else: pass + if self.state is ConnectionStates.HANDSHAKE: + if self._try_handshake(): + log.debug('%s: completed SSL handshake.', str(self)) + self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) + return self.state + def _wrap_ssl(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + if self._ssl_context is None: + log.debug('%s: configuring default SSL Context', str(self)) + self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member + self._ssl_context.verify_mode = ssl.CERT_OPTIONAL + if self.config['ssl_check_hostname']: + self._ssl_context.check_hostname = True + if self.config['ssl_cafile']: + log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile']) + self._ssl_context.load_verify_locations(self.config['ssl_cafile']) + self._ssl_context.verify_mode = ssl.CERT_REQUIRED + if self.config['ssl_certfile'] and self.config['ssl_keyfile']: + log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile']) + log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile']) + self._ssl_context.load_cert_chain( + certfile=self.config['ssl_certfile'], + keyfile=self.config['ssl_keyfile']) + log.debug('%s: wrapping socket in ssl context', str(self)) + try: + self._sock = self._ssl_context.wrap_socket( + self._sock, + server_hostname=self.host, + do_handshake_on_connect=False) + except ssl.SSLError: + log.exception('%s: Failed to wrap socket in SSLContext!', str(self)) + self.close() + self.last_failure = time.time() + + def _try_handshake(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + try: + self._sock.do_handshake() + return True + # old ssl in python2.6 will swallow all SSLErrors here... + except (ssl.SSLWantReadError, ssl.SSLWantWriteError): + pass + except ssl.SSLZeroReturnError: + log.warning('SSL connection closed by server during handshake.') + self.close() + # Other SSLErrors will be raised to user + + return False + def blacked_out(self): """ Return true if we are disconnected from the given node and can't @@ -140,8 +222,10 @@ class BrokerConnection(object): return self.state is ConnectionStates.CONNECTED def connecting(self): - """Return True iff socket is in intermediate connecting state.""" - return self.state is ConnectionStates.CONNECTING + """Returns True if still connecting (this may encompass several + different states, such as SSL handshake, authorization, etc).""" + return self.state in (ConnectionStates.CONNECTING, + ConnectionStates.HANDSHAKE) def disconnected(self): """Return True iff socket is closed""" @@ -260,6 +344,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) + except ssl.SSLWantReadError: + return None except ConnectionError as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: return None @@ -286,6 +372,8 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) + except ssl.SSLWantReadError: + return None except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet |