diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
commit | 0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch) | |
tree | 54c8520e94af2d72ca715c4db9bb855fbfa5574d /kafka/client_async.py | |
parent | cda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff) | |
parent | 097198cceaed97d5b804166d0c76a816c8dfead0 (diff) | |
download | kafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz |
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 39 |
1 files changed, 37 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 36e808c..2eb86cf 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -53,6 +53,12 @@ class KafkaClient(object): 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, } def __init__(self, **configs): @@ -90,6 +96,21 @@ class KafkaClient(object): brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -168,8 +189,10 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): - self._connecting.add(node_id) - self._selector.register(conn._sock, selectors.EVENT_WRITE) + # SSL connections can enter this state 2x (second during Handshake) + if node_id not in self._connecting: + self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -412,7 +435,9 @@ class KafkaClient(object): def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout assert self.in_flight_request_count() > 0 or self._connecting or sleep + responses = [] + processed = set() for key, events in self._selector.select(timeout): if key.fileobj is self._wake_r: self._clear_wake_fd() @@ -420,6 +445,7 @@ class KafkaClient(object): elif not (events & selectors.EVENT_READ): continue conn = key.data + processed.add(conn) while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -428,6 +454,15 @@ class KafkaClient(object): if not response: break responses.append(response) + + # Check for additional pending SSL bytes + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + # TODO: optimize + for conn in self._conns.values(): + if conn not in processed and conn.connected() and conn._sock.pending(): + response = conn.recv() + if response: + responses.append(response) return responses def in_flight_request_count(self, node_id=None): |