diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 19:08:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 19:08:35 -0800 |
commit | 3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9 (patch) | |
tree | 2c38fd2c577442cb90c99ee2d49b5b0f68300303 /kafka/conn.py | |
parent | e5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (diff) | |
download | kafka-python-3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9.tar.gz |
Switch configs from attributes to dict to make passing / inspecting easier
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 56 |
1 files changed, 31 insertions, 25 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 7979ba7..8ce4a6f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,6 +17,7 @@ from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.types import Int32 +from kafka.version import __version__ log = logging.getLogger(__name__) @@ -36,25 +37,24 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): - _receive_buffer_bytes = 32768 - _send_buffer_bytes = 131072 - _client_id = 'kafka-python-0.10.0' - _correlation_id = 0 - _request_timeout_ms = 40000 - _max_in_flight_requests_per_connection = 5 - _reconnect_backoff_ms = 50 - - def __init__(self, host, port, **kwargs): + DEFAULT_CONFIG = { + 'client_id': 'kafka-python-' + __version__, + 'request_timeout_ms': 40000, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + } + + def __init__(self, host, port, **configs): self.host = host self.port = port self.in_flight_requests = collections.deque() - for config in ('receive_buffer_bytes', 'send_buffer_bytes', - 'client_id', 'correlation_id', 'request_timeout_ms', - 'max_in_flight_requests_per_connection', - 'reconnect_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self.state = ConnectionStates.DISCONNECTED self._sock = None @@ -64,14 +64,17 @@ class BrokerConnection(object): self.last_attempt = 0 self.last_failure = 0 self._processing = False + self._correlation_id = 0 def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED: self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._receive_buffer_bytes) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes']) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes']) self._sock.setblocking(False) ret = self._sock.connect_ex((self.host, self.port)) self.last_attempt = time.time() @@ -89,7 +92,8 @@ class BrokerConnection(object): if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status - if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt: + request_timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_attempt: log.error('Connection attempt to %s timed out', self) self.close() # error=TimeoutError ? self.last_failure = time.time() @@ -110,8 +114,8 @@ class BrokerConnection(object): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - now = time.time() - if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0: + backoff = self.config['reconnect_backoff_ms'] / 1000.0 + if time.time() < self.last_attempt + backoff: return True return False @@ -146,7 +150,7 @@ class BrokerConnection(object): correlation_id = self._next_correlation_id() header = RequestHeader(request, correlation_id=correlation_id, - client_id=self._client_id) + client_id=self.config['client_id']) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) try: @@ -178,7 +182,8 @@ class BrokerConnection(object): return future def can_send_more(self): - return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection + max_ifrs = self.config['max_in_flight_requests_per_connection'] + return len(self.in_flight_requests) < max_ifrs def recv(self, timeout=0): """Non-blocking network receive @@ -202,9 +207,10 @@ class BrokerConnection(object): elif self._requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', - self, self._request_timeout_ms) + self, self.config['request_timeout_ms']) self.close(error=Errors.RequestTimedOutError( - 'Request timed out after %s ms' % self._request_timeout_ms)) + 'Request timed out after %s ms' % + self.config['request_timeout_ms'])) return None readable, _, _ = select([self._sock], [], [], timeout) @@ -294,7 +300,7 @@ class BrokerConnection(object): def _requests_timed_out(self): if self.in_flight_requests: oldest_at = self.in_flight_requests[0].timestamp - timeout = self._request_timeout_ms / 1000.0 + timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() >= oldest_at + timeout: return True return False |