diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-30 17:06:47 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-30 17:06:47 -0700 |
commit | cf3b370da9610d72f5945c99ac04248f28b2a948 (patch) | |
tree | 23981a905d92c19dc81574906d0beb443f9f152b | |
parent | 64d3607b8796f6ef1cf71fbecfc6887b3b15c700 (diff) | |
download | kafka-python-socket_options.tar.gz |
Use socket_options configuration to setsockopts(). Default TCP_NODELAYsocket_options
-rw-r--r-- | kafka/client_async.py | 18 | ||||
-rw-r--r-- | kafka/conn.py | 20 | ||||
-rw-r--r-- | kafka/consumer/group.py | 13 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 5 |
4 files changed, 39 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index dee4a12..6bffa9e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -54,6 +54,7 @@ class KafkaClient(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol': 'PLAINTEXT', @@ -93,26 +94,29 @@ class KafkaClient(object): server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}' - request_timeout_ms (int): Client request timeout in milliseconds. - Default: 40000. reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + retry_backoff_ms (int): Milliseconds to backoff when retrying on + errors. Default: 100. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: None (relies on - system defaults). Java client defaults to 131072. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). Java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] metadata_max_age_ms (int): The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000 - retry_backoff_ms (int): Milliseconds to backoff when retrying on - errors. Default: 100. security_protocol (str): Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping diff --git a/kafka/conn.py b/kafka/conn.py index 5489d1f..da98028 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -60,6 +60,7 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, @@ -84,6 +85,15 @@ class BrokerConnection(object): if key in configs: self.config[key] = configs[key] + if self.config['receive_buffer_bytes'] is not None: + self.config['socket_options'].append( + (socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes'])) + if self.config['send_buffer_bytes'] is not None: + self.config['socket_options'].append( + (socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes'])) + self.state = ConnectionStates.DISCONNECTED self._sock = None self._ssl_context = None @@ -144,12 +154,10 @@ class BrokerConnection(object): self._sock = socket.socket(afi, socket.SOCK_STREAM) else: self._sock = socket.socket(self.afi, socket.SOCK_STREAM) - if self.config['receive_buffer_bytes'] is not None: - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, - self.config['receive_buffer_bytes']) - if self.config['send_buffer_bytes'] is not None: - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, - self.config['send_buffer_bytes']) + + for option in self.config['socket_options']: + self._sock.setsockopt(*option) + self._sock.setblocking(False) if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ed28573..fcd5ede 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import copy import logging +import socket import time import six @@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator): rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: None (relies on - system defaults). The java client defaults to 131072. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). The java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default -1 (block forever). @@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, - 'send_buffer_bytes': None, 'receive_buffer_bytes': None, + 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'consumer_timeout_ms': -1, 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 02e4621..b91ba24 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import atexit import copy import logging +import socket import threading import time import weakref @@ -188,6 +189,9 @@ class KafkaProducer(object): send_buffer_bytes (int): The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. @@ -256,6 +260,7 @@ class KafkaProducer(object): 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', |