diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-12-27 09:45:32 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-12-27 09:45:32 -0800 |
commit | 4116026fab0d514f880a1dfbd55791d7091a4b14 (patch) | |
tree | 10a05b4838440846945f73f5ff4639163d07d767 | |
parent | 75e3ca9a4d48e405a8b11e6c33d126ad2f4d9f7c (diff) | |
download | kafka-python-4116026fab0d514f880a1dfbd55791d7091a4b14.tar.gz |
Add experimental support for configuring socket chunking parameters
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 8 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 |
4 files changed, 10 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 24162ad..d9c2714 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -152,6 +152,8 @@ class KafkaClient(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol': 'PLAINTEXT', diff --git a/kafka/conn.py b/kafka/conn.py index f4fd8bf..9dfe031 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -180,6 +180,8 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, @@ -744,11 +746,9 @@ class BrokerConnection(object): def _recv(self): """Take all available bytes from socket, return list of any responses from parser""" recvd = [] - SOCK_CHUNK_BYTES = 4096 - BUFFERED_CHUNKS = 1000 - while len(recvd) < BUFFERED_CHUNKS: + while len(recvd) < self.config['sock_chunk_buffer_count']: try: - data = self._sock.recv(SOCK_CHUNK_BYTES) + data = self._sock.recv(self.config['sock_chunk_bytes']) # We expect socket.recv to raise an exception if there are no # bytes available to read from the socket in non-blocking mode. # but if the socket is disconnected, we will get empty data diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c345e7..6e86979 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -270,6 +270,8 @@ class KafkaConsumer(six.Iterator): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'consumer_timeout_ms': float('inf'), 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e0c8a41..f2205fa 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -292,6 +292,8 @@ class KafkaProducer(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], + 'sock_chunk_bytes': 4096, # undocumented experimental option + 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'reconnect_backoff_ms': 50, 'reconnect_backoff_max': 1000, 'max_in_flight_requests_per_connection': 5, |