diff options
-rw-r--r-- | kafka/client_async.py | 73 | ||||
-rw-r--r-- | kafka/cluster.py | 20 | ||||
-rw-r--r-- | kafka/conn.py | 56 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 50 | ||||
-rw-r--r-- | kafka/consumer/group.py | 105 | ||||
-rw-r--r-- | kafka/coordinator/abstract.py | 34 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 58 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 26 |
8 files changed, 211 insertions, 211 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 5c11fc5..6fb5fdd 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,3 +1,4 @@ +import copy import heapq import itertools import logging @@ -15,6 +16,7 @@ from .conn import BrokerConnection, ConnectionStates, collect_hosts from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest +from .version import __version__ log = logging.getLogger(__name__) @@ -27,26 +29,23 @@ class KafkaClient(object): This class is not thread-safe! """ - _bootstrap_servers = 'localhost' - _client_id = 'kafka-python-0.10.0' - _reconnect_backoff_ms = 50 - _retry_backoff_ms = 100 - _send_buffer_bytes = 131072 - _receive_buffer_bytes = 32768 - _request_timeout_ms = 40000 - _max_in_flight_requests_per_connection=5 - - def __init__(self, **kwargs): - for config in ( - 'client_id', 'max_in_flight_requests_per_connection', - 'reconnect_backoff_ms', 'retry_backoff_ms', - 'send_buffer_bytes', 'receive_buffer_bytes', - 'request_timeout_ms', 'bootstrap_servers' - ): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - - self.cluster = ClusterMetadata(**kwargs) + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'request_timeout_ms': 40000, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + } + + def __init__(self, **configs): + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._conns = {} @@ -54,11 +53,11 @@ class KafkaClient(object): self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._bootstrap(collect_hosts(self._bootstrap_servers)) + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails - backoff_ms = self._reconnect_backoff_ms * 2 ** self._bootstrap_fails + backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails next_at = self._last_bootstrap + backoff_ms / 1000.0 now = time.time() if next_at > now: @@ -69,15 +68,7 @@ class KafkaClient(object): metadata_request = MetadataRequest([]) for host, port in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection( - host, port, - client_id=self._client_id, - receive_buffer_bytes=self._receive_buffer_bytes, - send_buffer_bytes=self._send_buffer_bytes, - request_timeout_ms=self._request_timeout_ms, - max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection, - reconnect_backoff_ms=self._reconnect_backoff_ms - ) + bootstrap = BrokerConnection(host, port, **self.config) bootstrap.connect() while bootstrap.state is ConnectionStates.CONNECTING: bootstrap.connect() @@ -121,15 +112,8 @@ class KafkaClient(object): if node_id not in self._conns: log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - self._conns[node_id] = BrokerConnection( - broker.host, broker.port, - client_id=self._client_id, - receive_buffer_bytes=self._receive_buffer_bytes, - send_buffer_bytes=self._send_buffer_bytes, - request_timeout_ms=self._request_timeout_ms, - max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection, - reconnect_backoff_ms=self._reconnect_backoff_ms - ) + self._conns[node_id] = BrokerConnection(broker.host, broker.port, + **self.config) return self._finish_connect(node_id) def _finish_connect(self, node_id): @@ -194,7 +178,7 @@ class KafkaClient(object): conn = self._conns[node_id] time_waited_ms = time.time() - (conn.last_attempt or 0) if conn.state is ConnectionStates.DISCONNECTED: - return max(self._reconnect_backoff_ms - time_waited_ms, 0) + return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) else: return sys.maxint @@ -262,7 +246,7 @@ class KafkaClient(object): @return The list of responses received. """ if timeout_ms is None: - timeout_ms = self._request_timeout_ms + timeout_ms = self.config['request_timeout_ms'] responses = [] @@ -283,7 +267,8 @@ class KafkaClient(object): except Exception as e: log.error("Task %s failed: %s", task, e) - timeout = min(timeout_ms, metadata_timeout, self._request_timeout_ms) + timeout = min(timeout_ms, metadata_timeout, + self.config['request_timeout_ms']) timeout /= 1000.0 responses.extend(self._poll(timeout)) @@ -365,7 +350,7 @@ class KafkaClient(object): # Last option: try to bootstrap again log.error('No nodes found in metadata -- retrying bootstrap') - self._bootstrap(collect_hosts(self._bootstrap_servers)) + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None def set_topics(self, topics): diff --git a/kafka/cluster.py b/kafka/cluster.py index 5b5fd8e..84ad1d3 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import copy import logging import random import time @@ -12,10 +13,12 @@ log = logging.getLogger(__name__) class ClusterMetadata(object): - _retry_backoff_ms = 100 - _metadata_max_age_ms = 300000 + DEFAULT_CONFIG = { + 'retry_backoff_ms': 100, + 'metadata_max_age_ms': 300000, + } - def __init__(self, **kwargs): + def __init__(self, **configs): self._brokers = {} self._partitions = {} self._groups = {} @@ -26,9 +29,10 @@ class ClusterMetadata(object): self._future = None self._listeners = set() - for config in ('retry_backoff_ms', 'metadata_max_age_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] def brokers(self): return set(self._brokers.values()) @@ -55,8 +59,8 @@ class ClusterMetadata(object): if self._need_update: ttl = 0 else: - ttl = self._last_successful_refresh_ms + self._metadata_max_age_ms - now - retry = self._last_refresh_ms + self._retry_backoff_ms - now + ttl = self._last_successful_refresh_ms + self.config['metadata_max_age_ms'] - now + retry = self._last_refresh_ms + self.config['retry_backoff_ms'] - now return max(ttl, retry, 0) def request_update(self): diff --git a/kafka/conn.py b/kafka/conn.py index 7979ba7..8ce4a6f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,6 +17,7 @@ from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.types import Int32 +from kafka.version import __version__ log = logging.getLogger(__name__) @@ -36,25 +37,24 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): - _receive_buffer_bytes = 32768 - _send_buffer_bytes = 131072 - _client_id = 'kafka-python-0.10.0' - _correlation_id = 0 - _request_timeout_ms = 40000 - _max_in_flight_requests_per_connection = 5 - _reconnect_backoff_ms = 50 - - def __init__(self, host, port, **kwargs): + DEFAULT_CONFIG = { + 'client_id': 'kafka-python-' + __version__, + 'request_timeout_ms': 40000, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + } + + def __init__(self, host, port, **configs): self.host = host self.port = port self.in_flight_requests = collections.deque() - for config in ('receive_buffer_bytes', 'send_buffer_bytes', - 'client_id', 'correlation_id', 'request_timeout_ms', - 'max_in_flight_requests_per_connection', - 'reconnect_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self.state = ConnectionStates.DISCONNECTED self._sock = None @@ -64,14 +64,17 @@ class BrokerConnection(object): self.last_attempt = 0 self.last_failure = 0 self._processing = False + self._correlation_id = 0 def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._receive_buffer_bytes) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes']) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes']) self._sock.setblocking(False) ret = self._sock.connect_ex((self.host, self.port)) self.last_attempt = time.time() @@ -89,7 +92,8 @@ class BrokerConnection(object): if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status - if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt: + request_timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_attempt: log.error('Connection attempt to %s timed out', self) self.close() # error=TimeoutError ? self.last_failure = time.time() @@ -110,8 +114,8 @@ class BrokerConnection(object): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - now = time.time() - if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0: + backoff = self.config['reconnect_backoff_ms'] / 1000.0 + if time.time() < self.last_attempt + backoff: return True return False @@ -146,7 +150,7 @@ class BrokerConnection(object): correlation_id = self._next_correlation_id() header = RequestHeader(request, correlation_id=correlation_id, - client_id=self._client_id) + client_id=self.config['client_id']) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) try: @@ -178,7 +182,8 @@ class BrokerConnection(object): return future def can_send_more(self): - return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection + max_ifrs = self.config['max_in_flight_requests_per_connection'] + return len(self.in_flight_requests) < max_ifrs def recv(self, timeout=0): """Non-blocking network receive @@ -202,9 +207,10 @@ class BrokerConnection(object): elif self._requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', - self, self._request_timeout_ms) + self, self.config['request_timeout_ms']) self.close(error=Errors.RequestTimedOutError( - 'Request timed out after %s ms' % self._request_timeout_ms)) + 'Request timed out after %s ms' % + self.config['request_timeout_ms'])) return None readable, _, _ = select([self._sock], [], [], timeout) @@ -294,7 +300,7 @@ class BrokerConnection(object): def _requests_timed_out(self): if self.in_flight_requests: oldest_at = self.in_flight_requests[0].timestamp - timeout = self._request_timeout_ms / 1000.0 + timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() >= oldest_at + timeout: return True return False diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index ea9c8b9..39e1244 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import collections +import copy import logging import six @@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError): class Fetcher(object): - _key_deserializer = None - _value_deserializer = None - _fetch_min_bytes = 1024 - _fetch_max_wait_ms = 500 - _max_partition_fetch_bytes = 1048576 - _check_crcs = True - _retry_backoff_ms = 100 - - def __init__(self, client, subscriptions, **kwargs): + DEFAULT_CONFIG = { + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_min_bytes': 1024, + 'fetch_max_wait_ms': 500, + 'max_partition_fetch_bytes': 1048576, + 'check_crcs': True, + } + + def __init__(self, client, subscriptions, **configs): #metrics=None, #metric_group_prefix='consumer', + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._client = client self._subscriptions = subscriptions - for config in ('key_deserializer', 'value_deserializer', - 'fetch_min_bytes', 'fetch_max_wait_ms', - 'max_partition_fetch_bytes', 'check_crcs', - 'retry_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} @@ -204,7 +203,8 @@ class Fetcher(object): " and hence cannot be ever returned." " Increase the fetch size, or decrease the maximum message" " size the broker will allow.", - copied_record_too_large_partitions, self._max_partition_fetch_bytes) + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) def fetched_records(self): """Returns previously fetched records and updates consumed offsets @@ -255,7 +255,7 @@ class Fetcher(object): for offset, size, msg in messages: if msg.attributes: raise Errors.KafkaError('Compressed messages not supported yet') - elif self._check_crcs and not msg.validate_crc(): + elif self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) key, value = self._deserialize(msg) @@ -269,12 +269,12 @@ class Fetcher(object): return dict(drained) def _deserialize(self, msg): - if self._key_deserializer: - key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable + if self.config['key_deserializer']: + key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable else: key = msg.key - if self._value_deserializer: - value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable + if self.config['value_deserializer']: + value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable else: value = msg.value return key, value @@ -376,7 +376,7 @@ class Fetcher(object): partition_info = ( partition.partition, fetched, - self._max_partition_fetch_bytes + self.config['max_partition_fetch_bytes'] ) fetchable[node_id][partition.topic].append(partition_info) else: @@ -388,8 +388,8 @@ class Fetcher(object): for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest( -1, # replica_id - self._fetch_max_wait_ms, - self._fetch_min_bytes, + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], partition_data.items()) return requests diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 63a1b2e..b7093f3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import copy import logging import time @@ -18,33 +19,36 @@ log = logging.getLogger(__name__) class KafkaConsumer(object): """Consumer for Kafka 0.9""" - _bootstrap_servers = 'localhost' - _client_id = 'kafka-python-' + __version__ - _group_id = 'kafka-python-default-group' - _key_deserializer = None - _value_deserializer = None - _fetch_max_wait_ms = 500 - _fetch_min_bytes = 1024 - _max_partition_fetch_bytes = 1 * 1024 * 1024 - _request_timeout_ms = 40 * 1000 - _retry_backoff_ms = 100 - _reconnect_backoff_ms = 50 - _auto_offset_reset = 'latest' - _enable_auto_commit = True - _auto_commit_interval_ms = 5000 - _check_crcs = True - _metadata_max_age_ms = 5 * 60 * 1000 - _partition_assignment_strategy = (RoundRobinPartitionAssignor,) - _heartbeat_interval_ms = 3000 - _session_timeout_ms = 30000 - _send_buffer_bytes = 128 * 1024 - _receive_buffer_bytes = 32 * 1024 - _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet - #_metric_reporters = None - #_metrics_num_samples = 2 - #_metrics_sample_window_ms = 30000 - - def __init__(self, *topics, **kwargs): + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'group_id': 'kafka-python-default-group', + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'check_crcs': True, + 'metadata_max_age_ms': 5 * 60 * 1000, + 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet + #'metric_reporters': None, + #'metrics_num_samples': 2, + #'metrics_sample_window_ms': 30000, + } + + def __init__(self, *topics, **configs): """A Kafka client that consumes records from a Kafka cluster. The consumer will transparently handle the failure of servers in the @@ -79,8 +83,8 @@ class KafkaConsumer(object): raw message value and returns a deserialized value. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to - fetch_wait_max_ms for more data to accumulate. Default: 1024. - fetch_wait_max_ms (int): The maximum amount of time in milliseconds + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. @@ -97,8 +101,11 @@ class KafkaConsumer(object): retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. Defaults - to 50. + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. auto_offset_reset (str): A policy for resetting offsets on OffsetOutOfRange errors: 'earliest' will move to the oldest available message, 'latest' will move to the most recent. Any @@ -137,29 +144,19 @@ class KafkaConsumer(object): Configuration parameters are described in more detail at https://kafka.apache.org/090/configuration.html#newconsumerconfigs """ - for config in ('bootstrap_servers', 'client_id', 'group_id', - 'key_deserializer', 'value_deserializer', - 'fetch_max_wait_ms', 'fetch_min_bytes', - 'max_partition_fetch_bytes', 'request_timeout_ms', - 'retry_backoff_ms', 'reconnect_backoff_ms', - 'auto_offset_reset', 'enable_auto_commit', - 'auto_commit_interval_ms', 'check_crcs', - 'metadata_max_age_ms', 'partition_assignment_strategy', - 'heartbeat_interval_ms', 'session_timeout_ms', - 'send_buffer_bytes', 'receive_buffer_bytes'): - if config in kwargs: - setattr(self, '_' + config, kwargs[config]) - - self._client = KafkaClient(**kwargs) - self._subscription = SubscriptionState(self._auto_offset_reset) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self._client = KafkaClient(**self.config) + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, **kwargs) + self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._group_id, self._subscription, - enable_auto_commit=self._enable_auto_commit, - auto_commit_interval_ms=self._auto_commit_interval_ms, - assignors=self._partition_assignment_strategy, - **kwargs) + self._client, self.config['group_id'], self._subscription, + assignors=self.config['partition_assignment_strategy'], + **self.config) self._closed = False #self.metrics = None @@ -213,11 +210,11 @@ class KafkaConsumer(object): #self.metrics.close() self._client.close() try: - self._key_deserializer.close() + self.config['key_deserializer'].close() except AttributeError: pass try: - self._value_deserializer.close() + self.config['value_deserializer'].close() except AttributeError: pass log.debug("The KafkaConsumer has closed.") diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 03302a3..ea5cb97 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -1,4 +1,5 @@ import abc +import copy import logging import time @@ -44,22 +45,24 @@ class AbstractCoordinator(object): _on_join_complete(). """ - _session_timeout_ms = 30000 - _heartbeat_interval_ms = 3000 - _retry_backoff_ms = 100 + DEFAULT_CONFIG = { + 'session_timeout_ms': 30000, + 'heartbeat_interval_ms': 3000, + 'retry_backoff_ms': 100, + } - def __init__(self, client, group_id, **kwargs): + def __init__(self, client, group_id, **configs): if not client: raise Errors.IllegalStateError('a client is required to use' ' Group Coordinator') if not group_id: raise Errors.IllegalStateError('a group_id is required to use' ' Group Coordinator') - for config in ('session_timeout_ms', - 'heartbeat_interval_ms', - 'retry_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._client = client self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID @@ -68,9 +71,7 @@ class AbstractCoordinator(object): self.coordinator_id = None self.rejoin_needed = True self.needs_join_prepare = True - self.heartbeat = Heartbeat( - session_timeout_ms=self._session_timeout_ms, - heartbeat_interval_ms=self._heartbeat_interval_ms) + self.heartbeat = Heartbeat(**self.config) self.heartbeat_task = HeartbeatTask(self) #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) @@ -222,7 +223,7 @@ class AbstractCoordinator(object): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _perform_group_join(self): """Join the group and return the assignment for the next generation. @@ -242,7 +243,7 @@ class AbstractCoordinator(object): log.debug("(Re-)joining group %s", self.group_id) request = JoinGroupRequest( self.group_id, - self._session_timeout_ms, + self.config['session_timeout_ms'], self.member_id, self.protocol_type(), [(protocol, @@ -492,8 +493,7 @@ class AbstractCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" request = HeartbeatRequest(self.group_id, self.generation, self.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, - request.member_id) + log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future) @@ -594,7 +594,7 @@ class HeartbeatTask(object): def _handle_heartbeat_failure(self, e): log.debug("Heartbeat failed; retrying") self._request_in_flight = False - etd = time.time() + self._coordinator._retry_backoff_ms / 1000.0 + etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, etd) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 211d1d0..dd3eea0 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -1,3 +1,4 @@ +import copy import collections import logging import time @@ -45,34 +46,36 @@ class ConsumerProtocol(object): class ConsumerCoordinator(AbstractCoordinator): """This class manages the coordination process with the consumer coordinator.""" - _enable_auto_commit = True - _auto_commit_interval_ms = 5000 - _default_offset_commit_callback = lambda offsets, error: True - _assignors = () - #_heartbeat_interval_ms = 3000 - #_session_timeout_ms = 30000 - #_retry_backoff_ms = 100 - - def __init__(self, client, group_id, subscription, **kwargs): + DEFAULT_CONFIG = { + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'default_offset_commit_callback': lambda offsets, error: True, + 'assignors': (), + 'session_timeout_ms': 30000, + 'heartbeat_interval_ms': 3000, + 'retry_backoff_ms': 100, + } + + def __init__(self, client, group_id, subscription, **configs): """Initialize the coordination manager.""" - super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs) - for config in ('enable_auto_commit', 'auto_commit_interval_ms', - 'default_offset_commit_callback', 'assignors'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + super(ConsumerCoordinator, self).__init__(client, group_id, **configs) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._cluster = client.cluster self._subscription = subscription self._partitions_per_topic = {} self._auto_commit_task = None - if not self._assignors: + if not self.config['assignors']: raise Errors.IllegalStateError('Coordinator requires assignors') self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) - if self._enable_auto_commit: - interval = self._auto_commit_interval_ms / 1000.0 + if self.config['enable_auto_commit']: + interval = self.config['auto_commit_interval_ms'] / 1000.0 self._auto_commit_task = AutoCommitTask(self, interval) # metrics=None, @@ -87,7 +90,7 @@ class ConsumerCoordinator(AbstractCoordinator): """Returns list of preferred (protocols, metadata)""" topics = self._subscription.subscription metadata_list = [] - for assignor in self._assignors: + for assignor in self.config['assignors']: metadata = assignor.metadata(topics) group_protocol = (assignor.name, metadata) metadata_list.append(group_protocol) @@ -126,7 +129,7 @@ class ConsumerCoordinator(AbstractCoordinator): return False def _lookup_assignor(self, name): - for assignor in self._assignors: + for assignor in self.config['assignors']: if assignor.name == name: return assignor return None @@ -152,7 +155,7 @@ class ConsumerCoordinator(AbstractCoordinator): assignor.on_assignment(assignment) # restart the autocommit task if needed - if self._enable_auto_commit: + if self.config['enable_auto_commit']: self._auto_commit_task.enable() assigned = set(self._subscription.assigned_partitions()) @@ -258,7 +261,7 @@ class ConsumerCoordinator(AbstractCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def ensure_partition_assignment(self): """Ensure that we have a valid partition assignment from the coordinator.""" @@ -283,10 +286,11 @@ class ConsumerCoordinator(AbstractCoordinator): Returns: Future: indicating whether the commit was successful or not """ + if callback is None: + callback = self.config['default_offset_commit_callback'] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) - cb = callback if callback else self._default_offset_commit_callback - future.add_both(cb, offsets) + future.add_both(callback, offsets) def commit_offsets_sync(self, offsets): """Commit specific offsets synchronously. @@ -314,10 +318,10 @@ class ConsumerCoordinator(AbstractCoordinator): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self._retry_backoff_ms / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _maybe_auto_commit_offsets_sync(self): - if self._enable_auto_commit: + if self.config['enable_auto_commit']: # disable periodic commits prior to committing synchronously. note that they will # be re-enabled after a rebalance completes self._auto_commit_task.disable() @@ -558,8 +562,8 @@ class AutoCommitTask(object): if self._coordinator.coordinator_unknown(): log.debug("Cannot auto-commit offsets because the coordinator is" " unknown, will retry after backoff") - next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0 - self._client.schedule(self, next_at) + backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 + self._client.schedule(self, time.time() + backoff) return self._request_in_flight = True diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index c153ddd..41ba025 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -1,23 +1,27 @@ +import copy import time import kafka.common as Errors class Heartbeat(object): - _heartbeat_interval_ms = 3000 - _session_timeout_ms = 30000 - - def __init__(self, **kwargs): - for config in ('heartbeat_interval_ms', 'session_timeout_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - - if self._heartbeat_interval_ms > self._session_timeout_ms: + DEFAULT_CONFIG = { + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + } + + def __init__(self, **configs): + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + if self.config['heartbeat_interval_ms'] > self.config['session_timeout_ms']: raise Errors.IllegalArgumentError("Heartbeat interval must be set" " lower than the session timeout") - self.interval = self._heartbeat_interval_ms / 1000.0 - self.timeout = self._session_timeout_ms / 1000.0 + self.interval = self.config['heartbeat_interval_ms'] / 1000.0 + self.timeout = self.config['session_timeout_ms'] / 1000.0 self.last_send = 0 self.last_receive = 0 self.last_reset = time.time() |