diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ed28573..fcd5ede 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import copy import logging +import socket import time import six @@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator): rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 - send_buffer_bytes (int): The size of the TCP send buffer - (SO_SNDBUF) to use when sending data. Default: None (relies on - system defaults). The java client defaults to 131072. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). The java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default -1 (block forever). @@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, - 'send_buffer_bytes': None, 'receive_buffer_bytes': None, + 'send_buffer_bytes': None, + 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], 'consumer_timeout_ms': -1, 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', |