summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-27 09:45:32 -0800
committerDana Powers <dana.powers@gmail.com>2017-12-27 09:45:32 -0800
commit4116026fab0d514f880a1dfbd55791d7091a4b14 (patch)
tree10a05b4838440846945f73f5ff4639163d07d767
parent75e3ca9a4d48e405a8b11e6c33d126ad2f4d9f7c (diff)
downloadkafka-python-4116026fab0d514f880a1dfbd55791d7091a4b14.tar.gz
Add experimental support for configuring socket chunking parameters
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py8
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/producer/kafka.py2
4 files changed, 10 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 24162ad..d9c2714 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -152,6 +152,8 @@ class KafkaClient(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
diff --git a/kafka/conn.py b/kafka/conn.py
index f4fd8bf..9dfe031 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -180,6 +180,8 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
@@ -744,11 +746,9 @@ class BrokerConnection(object):
def _recv(self):
"""Take all available bytes from socket, return list of any responses from parser"""
recvd = []
- SOCK_CHUNK_BYTES = 4096
- BUFFERED_CHUNKS = 1000
- while len(recvd) < BUFFERED_CHUNKS:
+ while len(recvd) < self.config['sock_chunk_buffer_count']:
try:
- data = self._sock.recv(SOCK_CHUNK_BYTES)
+ data = self._sock.recv(self.config['sock_chunk_bytes'])
# We expect socket.recv to raise an exception if there are no
# bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7c345e7..6e86979 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -270,6 +270,8 @@ class KafkaConsumer(six.Iterator):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e0c8a41..f2205fa 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -292,6 +292,8 @@ class KafkaProducer(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
+ 'sock_chunk_bytes': 4096, # undocumented experimental option
+ 'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max': 1000,
'max_in_flight_requests_per_connection': 5,