summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py13
1 files changed, 9 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index ed28573..fcd5ede 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import copy
import logging
+import socket
import time
import six
@@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: None (relies on
- system defaults). The java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). The java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
@@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
- 'send_buffer_bytes': None,
'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'consumer_timeout_ms': -1,
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',