diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-30 17:06:47 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-30 17:06:47 -0700 |
commit | cf3b370da9610d72f5945c99ac04248f28b2a948 (patch) | |
tree | 23981a905d92c19dc81574906d0beb443f9f152b /kafka/conn.py | |
parent | 64d3607b8796f6ef1cf71fbecfc6887b3b15c700 (diff) | |
download | kafka-python-socket_options.tar.gz |
Use socket_options configuration to setsockopts(). Default TCP_NODELAYsocket_options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 5489d1f..da98028 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -60,6 +60,7 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, @@ -84,6 +85,15 @@ class BrokerConnection(object): if key in configs: self.config[key] = configs[key] + if self.config['receive_buffer_bytes'] is not None: + self.config['socket_options'].append( + (socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes'])) + if self.config['send_buffer_bytes'] is not None: + self.config['socket_options'].append( + (socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes'])) + self.state = ConnectionStates.DISCONNECTED self._sock = None self._ssl_context = None @@ -144,12 +154,10 @@ class BrokerConnection(object): self._sock = socket.socket(afi, socket.SOCK_STREAM) else: self._sock = socket.socket(self.afi, socket.SOCK_STREAM) - if self.config['receive_buffer_bytes'] is not None: - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, - self.config['receive_buffer_bytes']) - if self.config['send_buffer_bytes'] is not None: - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, - self.config['send_buffer_bytes']) + + for option in self.config['socket_options']: + self._sock.setsockopt(*option) + self._sock.setblocking(False) if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() |