diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:21:36 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-17 22:21:36 -0800 |
commit | 9bf304ab438b30cc554e464f1ff275dd61a6444e (patch) | |
tree | d9a275f0040f6eb14d15042019065d9b4ea10eca | |
parent | d5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff) | |
download | kafka-python-socket_buffer_size_optional.tar.gz |
Dont override system rcvbuf or sndbuf unless user configures explicitlysocket_buffer_size_optional
-rw-r--r-- | kafka/client_async.py | 10 | ||||
-rw-r--r-- | kafka/conn.py | 14 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 10 |
4 files changed, 26 insertions, 18 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index f048be9..cb8152a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -41,8 +41,8 @@ class KafkaClient(object): 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, - 'receive_buffer_bytes': 32768, - 'send_buffer_bytes': 131072, + 'receive_buffer_bytes': None, + 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, } @@ -71,9 +71,11 @@ class KafkaClient(object): to kafka brokers up to this number of maximum requests per broker connection. Default: 5. send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: 131072 + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). Java client defaults to 131072. receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: 32768 + (SO_RCVBUF) to use when reading data. Default: None (relies on + system defaults). Java client defaults to 32768. metadata_max_age_ms (int): The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new diff --git a/kafka/conn.py b/kafka/conn.py index 14c3b50..35d8d13 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -47,8 +47,8 @@ class BrokerConnection(object): 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, - 'receive_buffer_bytes': 32768, - 'send_buffer_bytes': 131072, + 'receive_buffer_bytes': None, + 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive } @@ -77,10 +77,12 @@ class BrokerConnection(object): if self.state is ConnectionStates.DISCONNECTED: self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, - self.config['receive_buffer_bytes']) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, - self.config['send_buffer_bytes']) + if self.config['receive_buffer_bytes'] is not None: + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes']) + if self.config['send_buffer_bytes'] is not None: + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes']) self._sock.setblocking(False) try: ret = self._sock.connect_ex((self.host, self.port)) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4174b07..30abe00 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -111,9 +111,11 @@ class KafkaConsumer(six.Iterator): session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: 131072 + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). The java client defaults to 131072. receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: 32768 + (SO_RCVBUF) to use when reading data. Default: None (relies on + system defaults). The java client defaults to 32768. consumer_timeout_ms (int): number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Default: -1 (dont throw exception) @@ -149,8 +151,8 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, - 'send_buffer_bytes': 128 * 1024, - 'receive_buffer_bytes': 32 * 1024, + 'send_buffer_bytes': None, + 'receive_buffer_bytes': None, 'consumer_timeout_ms': -1, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f319e4a..11eeddd 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -180,9 +180,11 @@ class KafkaProducer(object): request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. receive_buffer_bytes (int): The size of the TCP receive buffer - (SO_RCVBUF) to use when reading data. Default: 32768 + (SO_RCVBUF) to use when reading data. Default: None (relies on + system defaults). Java client defaults to 32768. send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: 131072 + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). Java client defaults to 131072. reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. @@ -215,8 +217,8 @@ class KafkaProducer(object): 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, - 'receive_buffer_bytes': 32768, - 'send_buffer_bytes': 131072, + 'receive_buffer_bytes': None, + 'send_buffer_bytes': None, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'api_version': 'auto', |