summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-09 10:29:08 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-09 10:29:08 -0700
commit0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch)
tree54c8520e94af2d72ca715c4db9bb855fbfa5574d /kafka/client_async.py
parentcda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff)
parent097198cceaed97d5b804166d0c76a816c8dfead0 (diff)
downloadkafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py39
1 files changed, 37 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 36e808c..2eb86cf 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -53,6 +53,12 @@ class KafkaClient(object):
'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
}
def __init__(self, **configs):
@@ -90,6 +96,21 @@ class KafkaClient(object):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -168,8 +189,10 @@ class KafkaClient(object):
def _conn_state_change(self, node_id, conn):
if conn.connecting():
- self._connecting.add(node_id)
- self._selector.register(conn._sock, selectors.EVENT_WRITE)
+ # SSL connections can enter this state 2x (second during Handshake)
+ if node_id not in self._connecting:
+ self._connecting.add(node_id)
+ self._selector.register(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
@@ -412,7 +435,9 @@ class KafkaClient(object):
def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep
+
responses = []
+ processed = set()
for key, events in self._selector.select(timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
@@ -420,6 +445,7 @@ class KafkaClient(object):
elif not (events & selectors.EVENT_READ):
continue
conn = key.data
+ processed.add(conn)
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -428,6 +454,15 @@ class KafkaClient(object):
if not response:
break
responses.append(response)
+
+ # Check for additional pending SSL bytes
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ # TODO: optimize
+ for conn in self._conns.values():
+ if conn not in processed and conn.connected() and conn._sock.pending():
+ response = conn.recv()
+ if response:
+ responses.append(response)
return responses
def in_flight_request_count(self, node_id=None):