summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py73
-rw-r--r--kafka/cluster.py20
-rw-r--r--kafka/conn.py56
-rw-r--r--kafka/consumer/fetcher.py50
-rw-r--r--kafka/consumer/group.py105
-rw-r--r--kafka/coordinator/abstract.py34
-rw-r--r--kafka/coordinator/consumer.py58
-rw-r--r--kafka/coordinator/heartbeat.py26
8 files changed, 211 insertions, 211 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5c11fc5..6fb5fdd 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,3 +1,4 @@
+import copy
import heapq
import itertools
import logging
@@ -15,6 +16,7 @@ from .conn import BrokerConnection, ConnectionStates, collect_hosts
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
+from .version import __version__
log = logging.getLogger(__name__)
@@ -27,26 +29,23 @@ class KafkaClient(object):
This class is not thread-safe!
"""
- _bootstrap_servers = 'localhost'
- _client_id = 'kafka-python-0.10.0'
- _reconnect_backoff_ms = 50
- _retry_backoff_ms = 100
- _send_buffer_bytes = 131072
- _receive_buffer_bytes = 32768
- _request_timeout_ms = 40000
- _max_in_flight_requests_per_connection=5
-
- def __init__(self, **kwargs):
- for config in (
- 'client_id', 'max_in_flight_requests_per_connection',
- 'reconnect_backoff_ms', 'retry_backoff_ms',
- 'send_buffer_bytes', 'receive_buffer_bytes',
- 'request_timeout_ms', 'bootstrap_servers'
- ):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
- self.cluster = ClusterMetadata(**kwargs)
+ DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'request_timeout_ms': 40000,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'receive_buffer_bytes': 32768,
+ 'send_buffer_bytes': 131072,
+ }
+
+ def __init__(self, **configs):
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._conns = {}
@@ -54,11 +53,11 @@ class KafkaClient(object):
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._bootstrap(collect_hosts(self._bootstrap_servers))
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
- backoff_ms = self._reconnect_backoff_ms * 2 ** self._bootstrap_fails
+ backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
next_at = self._last_bootstrap + backoff_ms / 1000.0
now = time.time()
if next_at > now:
@@ -69,15 +68,7 @@ class KafkaClient(object):
metadata_request = MetadataRequest([])
for host, port in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(
- host, port,
- client_id=self._client_id,
- receive_buffer_bytes=self._receive_buffer_bytes,
- send_buffer_bytes=self._send_buffer_bytes,
- request_timeout_ms=self._request_timeout_ms,
- max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection,
- reconnect_backoff_ms=self._reconnect_backoff_ms
- )
+ bootstrap = BrokerConnection(host, port, **self.config)
bootstrap.connect()
while bootstrap.state is ConnectionStates.CONNECTING:
bootstrap.connect()
@@ -121,15 +112,8 @@ class KafkaClient(object):
if node_id not in self._conns:
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
- self._conns[node_id] = BrokerConnection(
- broker.host, broker.port,
- client_id=self._client_id,
- receive_buffer_bytes=self._receive_buffer_bytes,
- send_buffer_bytes=self._send_buffer_bytes,
- request_timeout_ms=self._request_timeout_ms,
- max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection,
- reconnect_backoff_ms=self._reconnect_backoff_ms
- )
+ self._conns[node_id] = BrokerConnection(broker.host, broker.port,
+ **self.config)
return self._finish_connect(node_id)
def _finish_connect(self, node_id):
@@ -194,7 +178,7 @@ class KafkaClient(object):
conn = self._conns[node_id]
time_waited_ms = time.time() - (conn.last_attempt or 0)
if conn.state is ConnectionStates.DISCONNECTED:
- return max(self._reconnect_backoff_ms - time_waited_ms, 0)
+ return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
else:
return sys.maxint
@@ -262,7 +246,7 @@ class KafkaClient(object):
@return The list of responses received.
"""
if timeout_ms is None:
- timeout_ms = self._request_timeout_ms
+ timeout_ms = self.config['request_timeout_ms']
responses = []
@@ -283,7 +267,8 @@ class KafkaClient(object):
except Exception as e:
log.error("Task %s failed: %s", task, e)
- timeout = min(timeout_ms, metadata_timeout, self._request_timeout_ms)
+ timeout = min(timeout_ms, metadata_timeout,
+ self.config['request_timeout_ms'])
timeout /= 1000.0
responses.extend(self._poll(timeout))
@@ -365,7 +350,7 @@ class KafkaClient(object):
# Last option: try to bootstrap again
log.error('No nodes found in metadata -- retrying bootstrap')
- self._bootstrap(collect_hosts(self._bootstrap_servers))
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None
def set_topics(self, topics):
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 5b5fd8e..84ad1d3 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import copy
import logging
import random
import time
@@ -12,10 +13,12 @@ log = logging.getLogger(__name__)
class ClusterMetadata(object):
- _retry_backoff_ms = 100
- _metadata_max_age_ms = 300000
+ DEFAULT_CONFIG = {
+ 'retry_backoff_ms': 100,
+ 'metadata_max_age_ms': 300000,
+ }
- def __init__(self, **kwargs):
+ def __init__(self, **configs):
self._brokers = {}
self._partitions = {}
self._groups = {}
@@ -26,9 +29,10 @@ class ClusterMetadata(object):
self._future = None
self._listeners = set()
- for config in ('retry_backoff_ms', 'metadata_max_age_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
def brokers(self):
return set(self._brokers.values())
@@ -55,8 +59,8 @@ class ClusterMetadata(object):
if self._need_update:
ttl = 0
else:
- ttl = self._last_successful_refresh_ms + self._metadata_max_age_ms - now
- retry = self._last_refresh_ms + self._retry_backoff_ms - now
+ ttl = self._last_successful_refresh_ms + self.config['metadata_max_age_ms'] - now
+ retry = self._last_refresh_ms + self.config['retry_backoff_ms'] - now
return max(ttl, retry, 0)
def request_update(self):
diff --git a/kafka/conn.py b/kafka/conn.py
index 7979ba7..8ce4a6f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -17,6 +17,7 @@ from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.types import Int32
+from kafka.version import __version__
log = logging.getLogger(__name__)
@@ -36,25 +37,24 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object):
- _receive_buffer_bytes = 32768
- _send_buffer_bytes = 131072
- _client_id = 'kafka-python-0.10.0'
- _correlation_id = 0
- _request_timeout_ms = 40000
- _max_in_flight_requests_per_connection = 5
- _reconnect_backoff_ms = 50
-
- def __init__(self, host, port, **kwargs):
+ DEFAULT_CONFIG = {
+ 'client_id': 'kafka-python-' + __version__,
+ 'request_timeout_ms': 40000,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'receive_buffer_bytes': 32768,
+ 'send_buffer_bytes': 131072,
+ }
+
+ def __init__(self, host, port, **configs):
self.host = host
self.port = port
self.in_flight_requests = collections.deque()
- for config in ('receive_buffer_bytes', 'send_buffer_bytes',
- 'client_id', 'correlation_id', 'request_timeout_ms',
- 'max_in_flight_requests_per_connection',
- 'reconnect_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self.state = ConnectionStates.DISCONNECTED
self._sock = None
@@ -64,14 +64,17 @@ class BrokerConnection(object):
self.last_attempt = 0
self.last_failure = 0
self._processing = False
+ self._correlation_id = 0
def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._receive_buffer_bytes)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
+ self.config['receive_buffer_bytes'])
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
+ self.config['send_buffer_bytes'])
self._sock.setblocking(False)
ret = self._sock.connect_ex((self.host, self.port))
self.last_attempt = time.time()
@@ -89,7 +92,8 @@ class BrokerConnection(object):
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
- if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt:
+ request_timeout = self.config['request_timeout_ms'] / 1000.0
+ if time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ?
self.last_failure = time.time()
@@ -110,8 +114,8 @@ class BrokerConnection(object):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
- now = time.time()
- if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0:
+ backoff = self.config['reconnect_backoff_ms'] / 1000.0
+ if time.time() < self.last_attempt + backoff:
return True
return False
@@ -146,7 +150,7 @@ class BrokerConnection(object):
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
- client_id=self._client_id)
+ client_id=self.config['client_id'])
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
try:
@@ -178,7 +182,8 @@ class BrokerConnection(object):
return future
def can_send_more(self):
- return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection
+ max_ifrs = self.config['max_in_flight_requests_per_connection']
+ return len(self.in_flight_requests) < max_ifrs
def recv(self, timeout=0):
"""Non-blocking network receive
@@ -202,9 +207,10 @@ class BrokerConnection(object):
elif self._requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
- self, self._request_timeout_ms)
+ self, self.config['request_timeout_ms'])
self.close(error=Errors.RequestTimedOutError(
- 'Request timed out after %s ms' % self._request_timeout_ms))
+ 'Request timed out after %s ms' %
+ self.config['request_timeout_ms']))
return None
readable, _, _ = select([self._sock], [], [], timeout)
@@ -294,7 +300,7 @@ class BrokerConnection(object):
def _requests_timed_out(self):
if self.in_flight_requests:
oldest_at = self.in_flight_requests[0].timestamp
- timeout = self._request_timeout_ms / 1000.0
+ timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index ea9c8b9..39e1244 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import collections
+import copy
import logging
import six
@@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError):
class Fetcher(object):
- _key_deserializer = None
- _value_deserializer = None
- _fetch_min_bytes = 1024
- _fetch_max_wait_ms = 500
- _max_partition_fetch_bytes = 1048576
- _check_crcs = True
- _retry_backoff_ms = 100
-
- def __init__(self, client, subscriptions, **kwargs):
+ DEFAULT_CONFIG = {
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_min_bytes': 1024,
+ 'fetch_max_wait_ms': 500,
+ 'max_partition_fetch_bytes': 1048576,
+ 'check_crcs': True,
+ }
+
+ def __init__(self, client, subscriptions, **configs):
#metrics=None,
#metric_group_prefix='consumer',
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._client = client
self._subscriptions = subscriptions
- for config in ('key_deserializer', 'value_deserializer',
- 'fetch_min_bytes', 'fetch_max_wait_ms',
- 'max_partition_fetch_bytes', 'check_crcs',
- 'retry_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
self._records = collections.deque() # (offset, topic_partition, messages)
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
@@ -204,7 +203,8 @@ class Fetcher(object):
" and hence cannot be ever returned."
" Increase the fetch size, or decrease the maximum message"
" size the broker will allow.",
- copied_record_too_large_partitions, self._max_partition_fetch_bytes)
+ copied_record_too_large_partitions,
+ self.config['max_partition_fetch_bytes'])
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets
@@ -255,7 +255,7 @@ class Fetcher(object):
for offset, size, msg in messages:
if msg.attributes:
raise Errors.KafkaError('Compressed messages not supported yet')
- elif self._check_crcs and not msg.validate_crc():
+ elif self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
key, value = self._deserialize(msg)
@@ -269,12 +269,12 @@ class Fetcher(object):
return dict(drained)
def _deserialize(self, msg):
- if self._key_deserializer:
- key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable
+ if self.config['key_deserializer']:
+ key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
else:
key = msg.key
- if self._value_deserializer:
- value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable
+ if self.config['value_deserializer']:
+ value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
else:
value = msg.value
return key, value
@@ -376,7 +376,7 @@ class Fetcher(object):
partition_info = (
partition.partition,
fetched,
- self._max_partition_fetch_bytes
+ self.config['max_partition_fetch_bytes']
)
fetchable[node_id][partition.topic].append(partition_info)
else:
@@ -388,8 +388,8 @@ class Fetcher(object):
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest(
-1, # replica_id
- self._fetch_max_wait_ms,
- self._fetch_min_bytes,
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
partition_data.items())
return requests
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 63a1b2e..b7093f3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import copy
import logging
import time
@@ -18,33 +19,36 @@ log = logging.getLogger(__name__)
class KafkaConsumer(object):
"""Consumer for Kafka 0.9"""
- _bootstrap_servers = 'localhost'
- _client_id = 'kafka-python-' + __version__
- _group_id = 'kafka-python-default-group'
- _key_deserializer = None
- _value_deserializer = None
- _fetch_max_wait_ms = 500
- _fetch_min_bytes = 1024
- _max_partition_fetch_bytes = 1 * 1024 * 1024
- _request_timeout_ms = 40 * 1000
- _retry_backoff_ms = 100
- _reconnect_backoff_ms = 50
- _auto_offset_reset = 'latest'
- _enable_auto_commit = True
- _auto_commit_interval_ms = 5000
- _check_crcs = True
- _metadata_max_age_ms = 5 * 60 * 1000
- _partition_assignment_strategy = (RoundRobinPartitionAssignor,)
- _heartbeat_interval_ms = 3000
- _session_timeout_ms = 30000
- _send_buffer_bytes = 128 * 1024
- _receive_buffer_bytes = 32 * 1024
- _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet
- #_metric_reporters = None
- #_metrics_num_samples = 2
- #_metrics_sample_window_ms = 30000
-
- def __init__(self, *topics, **kwargs):
+ DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'group_id': 'kafka-python-default-group',
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_max_wait_ms': 500,
+ 'fetch_min_bytes': 1024,
+ 'max_partition_fetch_bytes': 1 * 1024 * 1024,
+ 'request_timeout_ms': 40 * 1000,
+ 'retry_backoff_ms': 100,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'auto_offset_reset': 'latest',
+ 'enable_auto_commit': True,
+ 'auto_commit_interval_ms': 5000,
+ 'check_crcs': True,
+ 'metadata_max_age_ms': 5 * 60 * 1000,
+ 'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
+ 'heartbeat_interval_ms': 3000,
+ 'session_timeout_ms': 30000,
+ 'send_buffer_bytes': 128 * 1024,
+ 'receive_buffer_bytes': 32 * 1024,
+ 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+ #'metric_reporters': None,
+ #'metrics_num_samples': 2,
+ #'metrics_sample_window_ms': 30000,
+ }
+
+ def __init__(self, *topics, **configs):
"""A Kafka client that consumes records from a Kafka cluster.
The consumer will transparently handle the failure of servers in the
@@ -79,8 +83,8 @@ class KafkaConsumer(object):
raw message value and returns a deserialized value.
fetch_min_bytes (int): Minimum amount of data the server should
return for a fetch request, otherwise wait up to
- fetch_wait_max_ms for more data to accumulate. Default: 1024.
- fetch_wait_max_ms (int): The maximum amount of time in milliseconds
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
@@ -97,8 +101,11 @@ class KafkaConsumer(object):
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host. Defaults
- to 50.
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
auto_offset_reset (str): A policy for resetting offsets on
OffsetOutOfRange errors: 'earliest' will move to the oldest
available message, 'latest' will move to the most recent. Any
@@ -137,29 +144,19 @@ class KafkaConsumer(object):
Configuration parameters are described in more detail at
https://kafka.apache.org/090/configuration.html#newconsumerconfigs
"""
- for config in ('bootstrap_servers', 'client_id', 'group_id',
- 'key_deserializer', 'value_deserializer',
- 'fetch_max_wait_ms', 'fetch_min_bytes',
- 'max_partition_fetch_bytes', 'request_timeout_ms',
- 'retry_backoff_ms', 'reconnect_backoff_ms',
- 'auto_offset_reset', 'enable_auto_commit',
- 'auto_commit_interval_ms', 'check_crcs',
- 'metadata_max_age_ms', 'partition_assignment_strategy',
- 'heartbeat_interval_ms', 'session_timeout_ms',
- 'send_buffer_bytes', 'receive_buffer_bytes'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs[config])
-
- self._client = KafkaClient(**kwargs)
- self._subscription = SubscriptionState(self._auto_offset_reset)
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ self._client = KafkaClient(**self.config)
+ self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, **kwargs)
+ self._client, self._subscription, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._group_id, self._subscription,
- enable_auto_commit=self._enable_auto_commit,
- auto_commit_interval_ms=self._auto_commit_interval_ms,
- assignors=self._partition_assignment_strategy,
- **kwargs)
+ self._client, self.config['group_id'], self._subscription,
+ assignors=self.config['partition_assignment_strategy'],
+ **self.config)
self._closed = False
#self.metrics = None
@@ -213,11 +210,11 @@ class KafkaConsumer(object):
#self.metrics.close()
self._client.close()
try:
- self._key_deserializer.close()
+ self.config['key_deserializer'].close()
except AttributeError:
pass
try:
- self._value_deserializer.close()
+ self.config['value_deserializer'].close()
except AttributeError:
pass
log.debug("The KafkaConsumer has closed.")
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 03302a3..ea5cb97 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -1,4 +1,5 @@
import abc
+import copy
import logging
import time
@@ -44,22 +45,24 @@ class AbstractCoordinator(object):
_on_join_complete().
"""
- _session_timeout_ms = 30000
- _heartbeat_interval_ms = 3000
- _retry_backoff_ms = 100
+ DEFAULT_CONFIG = {
+ 'session_timeout_ms': 30000,
+ 'heartbeat_interval_ms': 3000,
+ 'retry_backoff_ms': 100,
+ }
- def __init__(self, client, group_id, **kwargs):
+ def __init__(self, client, group_id, **configs):
if not client:
raise Errors.IllegalStateError('a client is required to use'
' Group Coordinator')
if not group_id:
raise Errors.IllegalStateError('a group_id is required to use'
' Group Coordinator')
- for config in ('session_timeout_ms',
- 'heartbeat_interval_ms',
- 'retry_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._client = client
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
@@ -68,9 +71,7 @@ class AbstractCoordinator(object):
self.coordinator_id = None
self.rejoin_needed = True
self.needs_join_prepare = True
- self.heartbeat = Heartbeat(
- session_timeout_ms=self._session_timeout_ms,
- heartbeat_interval_ms=self._heartbeat_interval_ms)
+ self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(self)
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
@@ -222,7 +223,7 @@ class AbstractCoordinator(object):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _perform_group_join(self):
"""Join the group and return the assignment for the next generation.
@@ -242,7 +243,7 @@ class AbstractCoordinator(object):
log.debug("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest(
self.group_id,
- self._session_timeout_ms,
+ self.config['session_timeout_ms'],
self.member_id,
self.protocol_type(),
[(protocol,
@@ -492,8 +493,7 @@ class AbstractCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
- log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id,
- request.member_id)
+ log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future)
@@ -594,7 +594,7 @@ class HeartbeatTask(object):
def _handle_heartbeat_failure(self, e):
log.debug("Heartbeat failed; retrying")
self._request_in_flight = False
- etd = time.time() + self._coordinator._retry_backoff_ms / 1000.0
+ etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 211d1d0..dd3eea0 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -1,3 +1,4 @@
+import copy
import collections
import logging
import time
@@ -45,34 +46,36 @@ class ConsumerProtocol(object):
class ConsumerCoordinator(AbstractCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
- _enable_auto_commit = True
- _auto_commit_interval_ms = 5000
- _default_offset_commit_callback = lambda offsets, error: True
- _assignors = ()
- #_heartbeat_interval_ms = 3000
- #_session_timeout_ms = 30000
- #_retry_backoff_ms = 100
-
- def __init__(self, client, group_id, subscription, **kwargs):
+ DEFAULT_CONFIG = {
+ 'enable_auto_commit': True,
+ 'auto_commit_interval_ms': 5000,
+ 'default_offset_commit_callback': lambda offsets, error: True,
+ 'assignors': (),
+ 'session_timeout_ms': 30000,
+ 'heartbeat_interval_ms': 3000,
+ 'retry_backoff_ms': 100,
+ }
+
+ def __init__(self, client, group_id, subscription, **configs):
"""Initialize the coordination manager."""
- super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs)
- for config in ('enable_auto_commit', 'auto_commit_interval_ms',
- 'default_offset_commit_callback', 'assignors'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ super(ConsumerCoordinator, self).__init__(client, group_id, **configs)
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._cluster = client.cluster
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
- if not self._assignors:
+ if not self.config['assignors']:
raise Errors.IllegalStateError('Coordinator requires assignors')
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
- if self._enable_auto_commit:
- interval = self._auto_commit_interval_ms / 1000.0
+ if self.config['enable_auto_commit']:
+ interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(self, interval)
# metrics=None,
@@ -87,7 +90,7 @@ class ConsumerCoordinator(AbstractCoordinator):
"""Returns list of preferred (protocols, metadata)"""
topics = self._subscription.subscription
metadata_list = []
- for assignor in self._assignors:
+ for assignor in self.config['assignors']:
metadata = assignor.metadata(topics)
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
@@ -126,7 +129,7 @@ class ConsumerCoordinator(AbstractCoordinator):
return False
def _lookup_assignor(self, name):
- for assignor in self._assignors:
+ for assignor in self.config['assignors']:
if assignor.name == name:
return assignor
return None
@@ -152,7 +155,7 @@ class ConsumerCoordinator(AbstractCoordinator):
assignor.on_assignment(assignment)
# restart the autocommit task if needed
- if self._enable_auto_commit:
+ if self.config['enable_auto_commit']:
self._auto_commit_task.enable()
assigned = set(self._subscription.assigned_partitions())
@@ -258,7 +261,7 @@ class ConsumerCoordinator(AbstractCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def ensure_partition_assignment(self):
"""Ensure that we have a valid partition assignment from the coordinator."""
@@ -283,10 +286,11 @@ class ConsumerCoordinator(AbstractCoordinator):
Returns:
Future: indicating whether the commit was successful or not
"""
+ if callback is None:
+ callback = self.config['default_offset_commit_callback']
self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets)
- cb = callback if callback else self._default_offset_commit_callback
- future.add_both(cb, offsets)
+ future.add_both(callback, offsets)
def commit_offsets_sync(self, offsets):
"""Commit specific offsets synchronously.
@@ -314,10 +318,10 @@ class ConsumerCoordinator(AbstractCoordinator):
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
- time.sleep(self._retry_backoff_ms / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _maybe_auto_commit_offsets_sync(self):
- if self._enable_auto_commit:
+ if self.config['enable_auto_commit']:
# disable periodic commits prior to committing synchronously. note that they will
# be re-enabled after a rebalance completes
self._auto_commit_task.disable()
@@ -558,8 +562,8 @@ class AutoCommitTask(object):
if self._coordinator.coordinator_unknown():
log.debug("Cannot auto-commit offsets because the coordinator is"
" unknown, will retry after backoff")
- next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0
- self._client.schedule(self, next_at)
+ backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
+ self._client.schedule(self, time.time() + backoff)
return
self._request_in_flight = True
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index c153ddd..41ba025 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -1,23 +1,27 @@
+import copy
import time
import kafka.common as Errors
class Heartbeat(object):
- _heartbeat_interval_ms = 3000
- _session_timeout_ms = 30000
-
- def __init__(self, **kwargs):
- for config in ('heartbeat_interval_ms', 'session_timeout_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
- if self._heartbeat_interval_ms > self._session_timeout_ms:
+ DEFAULT_CONFIG = {
+ 'heartbeat_interval_ms': 3000,
+ 'session_timeout_ms': 30000,
+ }
+
+ def __init__(self, **configs):
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']:
raise Errors.IllegalArgumentError("Heartbeat interval must be set"
" lower than the session timeout")
- self.interval = self._heartbeat_interval_ms / 1000.0
- self.timeout = self._session_timeout_ms / 1000.0
+ self.interval = self.config['heartbeat_interval_ms'] / 1000.0
+ self.timeout = self.config['session_timeout_ms'] / 1000.0
self.last_send = 0
self.last_receive = 0
self.last_reset = time.time()