summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-17 22:21:36 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-17 22:21:36 -0800
commit9bf304ab438b30cc554e464f1ff275dd61a6444e (patch)
treed9a275f0040f6eb14d15042019065d9b4ea10eca /kafka
parentd5c05c811e453c507ac6f7f85bceffc5a7ba1661 (diff)
downloadkafka-python-socket_buffer_size_optional.tar.gz
Dont override system rcvbuf or sndbuf unless user configures explicitlysocket_buffer_size_optional
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client_async.py10
-rw-r--r--kafka/conn.py14
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/producer/kafka.py10
4 files changed, 26 insertions, 18 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index f048be9..cb8152a 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -41,8 +41,8 @@ class KafkaClient(object):
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
- 'receive_buffer_bytes': 32768,
- 'send_buffer_bytes': 131072,
+ 'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
}
@@ -71,9 +71,11 @@ class KafkaClient(object):
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: 131072
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: 32768
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). Java client defaults to 32768.
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
diff --git a/kafka/conn.py b/kafka/conn.py
index 14c3b50..35d8d13 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -47,8 +47,8 @@ class BrokerConnection(object):
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
- 'receive_buffer_bytes': 32768,
- 'send_buffer_bytes': 131072,
+ 'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
'api_version': (0, 8, 2), # default to most restrictive
}
@@ -77,10 +77,12 @@ class BrokerConnection(object):
if self.state is ConnectionStates.DISCONNECTED:
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
- self.config['receive_buffer_bytes'])
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
- self.config['send_buffer_bytes'])
+ if self.config['receive_buffer_bytes'] is not None:
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
+ self.config['receive_buffer_bytes'])
+ if self.config['send_buffer_bytes'] is not None:
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
+ self.config['send_buffer_bytes'])
self._sock.setblocking(False)
try:
ret = self._sock.connect_ex((self.host, self.port))
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 4174b07..30abe00 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -111,9 +111,11 @@ class KafkaConsumer(six.Iterator):
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: 131072
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). The java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: 32768
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). The java client defaults to 32768.
consumer_timeout_ms (int): number of millisecond to throw a timeout
exception to the consumer if no message is available for
consumption. Default: -1 (dont throw exception)
@@ -149,8 +151,8 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
- 'send_buffer_bytes': 128 * 1024,
- 'receive_buffer_bytes': 32 * 1024,
+ 'send_buffer_bytes': None,
+ 'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f319e4a..11eeddd 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -180,9 +180,11 @@ class KafkaProducer(object):
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
receive_buffer_bytes (int): The size of the TCP receive buffer
- (SO_RCVBUF) to use when reading data. Default: 32768
+ (SO_RCVBUF) to use when reading data. Default: None (relies on
+ system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: 131072
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
@@ -215,8 +217,8 @@ class KafkaProducer(object):
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'request_timeout_ms': 30000,
- 'receive_buffer_bytes': 32768,
- 'send_buffer_bytes': 131072,
+ 'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'api_version': 'auto',