diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 73 |
1 files changed, 29 insertions, 44 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 5c11fc5..6fb5fdd 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,3 +1,4 @@ +import copy import heapq import itertools import logging @@ -15,6 +16,7 @@ from .conn import BrokerConnection, ConnectionStates, collect_hosts from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest +from .version import __version__ log = logging.getLogger(__name__) @@ -27,26 +29,23 @@ class KafkaClient(object): This class is not thread-safe! """ - _bootstrap_servers = 'localhost' - _client_id = 'kafka-python-0.10.0' - _reconnect_backoff_ms = 50 - _retry_backoff_ms = 100 - _send_buffer_bytes = 131072 - _receive_buffer_bytes = 32768 - _request_timeout_ms = 40000 - _max_in_flight_requests_per_connection=5 - - def __init__(self, **kwargs): - for config in ( - 'client_id', 'max_in_flight_requests_per_connection', - 'reconnect_backoff_ms', 'retry_backoff_ms', - 'send_buffer_bytes', 'receive_buffer_bytes', - 'request_timeout_ms', 'bootstrap_servers' - ): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - - self.cluster = ClusterMetadata(**kwargs) + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'request_timeout_ms': 40000, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + } + + def __init__(self, **configs): + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._conns = {} @@ -54,11 +53,11 @@ class KafkaClient(object): self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._bootstrap(collect_hosts(self._bootstrap_servers)) + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails - backoff_ms = self._reconnect_backoff_ms * 2 ** self._bootstrap_fails + backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails next_at = self._last_bootstrap + backoff_ms / 1000.0 now = time.time() if next_at > now: @@ -69,15 +68,7 @@ class KafkaClient(object): metadata_request = MetadataRequest([]) for host, port in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - bootstrap = BrokerConnection( - host, port, - client_id=self._client_id, - receive_buffer_bytes=self._receive_buffer_bytes, - send_buffer_bytes=self._send_buffer_bytes, - request_timeout_ms=self._request_timeout_ms, - max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection, - reconnect_backoff_ms=self._reconnect_backoff_ms - ) + bootstrap = BrokerConnection(host, port, **self.config) bootstrap.connect() while bootstrap.state is ConnectionStates.CONNECTING: bootstrap.connect() @@ -121,15 +112,8 @@ class KafkaClient(object): if node_id not in self._conns: log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) - self._conns[node_id] = BrokerConnection( - broker.host, broker.port, - client_id=self._client_id, - receive_buffer_bytes=self._receive_buffer_bytes, - send_buffer_bytes=self._send_buffer_bytes, - request_timeout_ms=self._request_timeout_ms, - max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection, - reconnect_backoff_ms=self._reconnect_backoff_ms - ) + self._conns[node_id] = BrokerConnection(broker.host, broker.port, + **self.config) return self._finish_connect(node_id) def _finish_connect(self, node_id): @@ -194,7 +178,7 @@ class KafkaClient(object): conn = self._conns[node_id] time_waited_ms = time.time() - (conn.last_attempt or 0) if conn.state is ConnectionStates.DISCONNECTED: - return max(self._reconnect_backoff_ms - time_waited_ms, 0) + return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) else: return sys.maxint @@ -262,7 +246,7 @@ class KafkaClient(object): @return The list of responses received. """ if timeout_ms is None: - timeout_ms = self._request_timeout_ms + timeout_ms = self.config['request_timeout_ms'] responses = [] @@ -283,7 +267,8 @@ class KafkaClient(object): except Exception as e: log.error("Task %s failed: %s", task, e) - timeout = min(timeout_ms, metadata_timeout, self._request_timeout_ms) + timeout = min(timeout_ms, metadata_timeout, + self.config['request_timeout_ms']) timeout /= 1000.0 responses.extend(self._poll(timeout)) @@ -365,7 +350,7 @@ class KafkaClient(object): # Last option: try to bootstrap again log.error('No nodes found in metadata -- retrying bootstrap') - self._bootstrap(collect_hosts(self._bootstrap_servers)) + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None def set_topics(self, topics): |