summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py45
1 files changed, 28 insertions, 17 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index bb9df69..c7a077c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -7,7 +7,6 @@ import logging
import io
from random import shuffle
import socket
-import ssl
import time
import traceback
@@ -31,19 +30,28 @@ log = logging.getLogger(__name__)
DEFAULT_KAFKA_PORT = 9092
-# support older ssl libraries
try:
- ssl.SSLWantReadError
- ssl.SSLWantWriteError
- ssl.SSLZeroReturnError
-except:
- log.warning('Old SSL module detected.'
- ' SSL error handling may not operate cleanly.'
- ' Consider upgrading to Python 3.3 or 2.7.9')
- ssl.SSLWantReadError = ssl.SSLError
- ssl.SSLWantWriteError = ssl.SSLError
- ssl.SSLZeroReturnError = ssl.SSLError
-
+ import ssl
+ ssl_available = True
+ try:
+ SSLWantReadError = ssl.SSLWantReadError
+ SSLWantWriteError = ssl.SSLWantWriteError
+ SSLZeroReturnError = ssl.SSLZeroReturnError
+ except:
+ # support older ssl libraries
+ log.warning('Old SSL module detected.'
+ ' SSL error handling may not operate cleanly.'
+ ' Consider upgrading to Python 3.3 or 2.7.9')
+ SSLWantReadError = ssl.SSLError
+ SSLWantWriteError = ssl.SSLError
+ SSLZeroReturnError = ssl.SSLError
+except ImportError:
+ # support Python without ssl libraries
+ ssl_available = False
+ class SSLWantReadError(Exception):
+ pass
+ class SSLWantWriteError(Exception):
+ pass
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
@@ -177,6 +185,9 @@ class BrokerConnection(object):
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ assert ssl_available, "Python wasn't built with SSL support"
+
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
@@ -368,9 +379,9 @@ class BrokerConnection(object):
self._sock.do_handshake()
return True
# old ssl in python2.6 will swallow all SSLErrors here...
- except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
+ except (SSLWantReadError, SSLWantWriteError):
pass
- except ssl.SSLZeroReturnError:
+ except SSLZeroReturnError:
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
@@ -608,7 +619,7 @@ class BrokerConnection(object):
self.close(error=Errors.ConnectionError('socket disconnected'))
return None
self._rbuffer.write(data)
- except ssl.SSLWantReadError:
+ except SSLWantReadError:
return None
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
@@ -646,7 +657,7 @@ class BrokerConnection(object):
self.close(error=Errors.ConnectionError('socket disconnected'))
return None
self._rbuffer.write(data)
- except ssl.SSLWantReadError:
+ except SSLWantReadError:
return None
except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a