summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py73
1 files changed, 29 insertions, 44 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5c11fc5..6fb5fdd 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,3 +1,4 @@
+import copy
import heapq
import itertools
import logging
@@ -15,6 +16,7 @@ from .conn import BrokerConnection, ConnectionStates, collect_hosts
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
+from .version import __version__
log = logging.getLogger(__name__)
@@ -27,26 +29,23 @@ class KafkaClient(object):
This class is not thread-safe!
"""
- _bootstrap_servers = 'localhost'
- _client_id = 'kafka-python-0.10.0'
- _reconnect_backoff_ms = 50
- _retry_backoff_ms = 100
- _send_buffer_bytes = 131072
- _receive_buffer_bytes = 32768
- _request_timeout_ms = 40000
- _max_in_flight_requests_per_connection=5
-
- def __init__(self, **kwargs):
- for config in (
- 'client_id', 'max_in_flight_requests_per_connection',
- 'reconnect_backoff_ms', 'retry_backoff_ms',
- 'send_buffer_bytes', 'receive_buffer_bytes',
- 'request_timeout_ms', 'bootstrap_servers'
- ):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
- self.cluster = ClusterMetadata(**kwargs)
+ DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'request_timeout_ms': 40000,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'receive_buffer_bytes': 32768,
+ 'send_buffer_bytes': 131072,
+ }
+
+ def __init__(self, **configs):
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._conns = {}
@@ -54,11 +53,11 @@ class KafkaClient(object):
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._bootstrap(collect_hosts(self._bootstrap_servers))
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
- backoff_ms = self._reconnect_backoff_ms * 2 ** self._bootstrap_fails
+ backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
next_at = self._last_bootstrap + backoff_ms / 1000.0
now = time.time()
if next_at > now:
@@ -69,15 +68,7 @@ class KafkaClient(object):
metadata_request = MetadataRequest([])
for host, port in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- bootstrap = BrokerConnection(
- host, port,
- client_id=self._client_id,
- receive_buffer_bytes=self._receive_buffer_bytes,
- send_buffer_bytes=self._send_buffer_bytes,
- request_timeout_ms=self._request_timeout_ms,
- max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection,
- reconnect_backoff_ms=self._reconnect_backoff_ms
- )
+ bootstrap = BrokerConnection(host, port, **self.config)
bootstrap.connect()
while bootstrap.state is ConnectionStates.CONNECTING:
bootstrap.connect()
@@ -121,15 +112,8 @@ class KafkaClient(object):
if node_id not in self._conns:
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
- self._conns[node_id] = BrokerConnection(
- broker.host, broker.port,
- client_id=self._client_id,
- receive_buffer_bytes=self._receive_buffer_bytes,
- send_buffer_bytes=self._send_buffer_bytes,
- request_timeout_ms=self._request_timeout_ms,
- max_in_flight_requests_per_connection=self._max_in_flight_requests_per_connection,
- reconnect_backoff_ms=self._reconnect_backoff_ms
- )
+ self._conns[node_id] = BrokerConnection(broker.host, broker.port,
+ **self.config)
return self._finish_connect(node_id)
def _finish_connect(self, node_id):
@@ -194,7 +178,7 @@ class KafkaClient(object):
conn = self._conns[node_id]
time_waited_ms = time.time() - (conn.last_attempt or 0)
if conn.state is ConnectionStates.DISCONNECTED:
- return max(self._reconnect_backoff_ms - time_waited_ms, 0)
+ return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
else:
return sys.maxint
@@ -262,7 +246,7 @@ class KafkaClient(object):
@return The list of responses received.
"""
if timeout_ms is None:
- timeout_ms = self._request_timeout_ms
+ timeout_ms = self.config['request_timeout_ms']
responses = []
@@ -283,7 +267,8 @@ class KafkaClient(object):
except Exception as e:
log.error("Task %s failed: %s", task, e)
- timeout = min(timeout_ms, metadata_timeout, self._request_timeout_ms)
+ timeout = min(timeout_ms, metadata_timeout,
+ self.config['request_timeout_ms'])
timeout /= 1000.0
responses.extend(self._poll(timeout))
@@ -365,7 +350,7 @@ class KafkaClient(object):
# Last option: try to bootstrap again
log.error('No nodes found in metadata -- retrying bootstrap')
- self._bootstrap(collect_hosts(self._bootstrap_servers))
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None
def set_topics(self, topics):