summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py20
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 5489d1f..da98028 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -60,6 +60,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
@@ -84,6 +85,15 @@ class BrokerConnection(object):
if key in configs:
self.config[key] = configs[key]
+ if self.config['receive_buffer_bytes'] is not None:
+ self.config['socket_options'].append(
+ (socket.SOL_SOCKET, socket.SO_RCVBUF,
+ self.config['receive_buffer_bytes']))
+ if self.config['send_buffer_bytes'] is not None:
+ self.config['socket_options'].append(
+ (socket.SOL_SOCKET, socket.SO_SNDBUF,
+ self.config['send_buffer_bytes']))
+
self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
@@ -144,12 +154,10 @@ class BrokerConnection(object):
self._sock = socket.socket(afi, socket.SOCK_STREAM)
else:
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
- if self.config['receive_buffer_bytes'] is not None:
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
- self.config['receive_buffer_bytes'])
- if self.config['send_buffer_bytes'] is not None:
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
- self.config['send_buffer_bytes'])
+
+ for option in self.config['socket_options']:
+ self._sock.setsockopt(*option)
+
self._sock.setblocking(False)
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()