summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
committerDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
commit3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9 (patch)
tree2c38fd2c577442cb90c99ee2d49b5b0f68300303 /kafka/conn.py
parente5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (diff)
downloadkafka-python-3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9.tar.gz
Switch configs from attributes to dict to make passing / inspecting easier
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py56
1 files changed, 31 insertions, 25 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 7979ba7..8ce4a6f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -17,6 +17,7 @@ from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.types import Int32
+from kafka.version import __version__
log = logging.getLogger(__name__)
@@ -36,25 +37,24 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object):
- _receive_buffer_bytes = 32768
- _send_buffer_bytes = 131072
- _client_id = 'kafka-python-0.10.0'
- _correlation_id = 0
- _request_timeout_ms = 40000
- _max_in_flight_requests_per_connection = 5
- _reconnect_backoff_ms = 50
-
- def __init__(self, host, port, **kwargs):
+ DEFAULT_CONFIG = {
+ 'client_id': 'kafka-python-' + __version__,
+ 'request_timeout_ms': 40000,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'receive_buffer_bytes': 32768,
+ 'send_buffer_bytes': 131072,
+ }
+
+ def __init__(self, host, port, **configs):
self.host = host
self.port = port
self.in_flight_requests = collections.deque()
- for config in ('receive_buffer_bytes', 'send_buffer_bytes',
- 'client_id', 'correlation_id', 'request_timeout_ms',
- 'max_in_flight_requests_per_connection',
- 'reconnect_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self.state = ConnectionStates.DISCONNECTED
self._sock = None
@@ -64,14 +64,17 @@ class BrokerConnection(object):
self.last_attempt = 0
self.last_failure = 0
self._processing = False
+ self._correlation_id = 0
def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._receive_buffer_bytes)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
+ self.config['receive_buffer_bytes'])
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
+ self.config['send_buffer_bytes'])
self._sock.setblocking(False)
ret = self._sock.connect_ex((self.host, self.port))
self.last_attempt = time.time()
@@ -89,7 +92,8 @@ class BrokerConnection(object):
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
- if time.time() > (self._request_timeout_ms / 1000.0) + self.last_attempt:
+ request_timeout = self.config['request_timeout_ms'] / 1000.0
+ if time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ?
self.last_failure = time.time()
@@ -110,8 +114,8 @@ class BrokerConnection(object):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
- now = time.time()
- if now - self.last_attempt < self._reconnect_backoff_ms / 1000.0:
+ backoff = self.config['reconnect_backoff_ms'] / 1000.0
+ if time.time() < self.last_attempt + backoff:
return True
return False
@@ -146,7 +150,7 @@ class BrokerConnection(object):
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
- client_id=self._client_id)
+ client_id=self.config['client_id'])
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
try:
@@ -178,7 +182,8 @@ class BrokerConnection(object):
return future
def can_send_more(self):
- return len(self.in_flight_requests) < self._max_in_flight_requests_per_connection
+ max_ifrs = self.config['max_in_flight_requests_per_connection']
+ return len(self.in_flight_requests) < max_ifrs
def recv(self, timeout=0):
"""Non-blocking network receive
@@ -202,9 +207,10 @@ class BrokerConnection(object):
elif self._requests_timed_out():
log.warning('%s timed out after %s ms. Closing connection.',
- self, self._request_timeout_ms)
+ self, self.config['request_timeout_ms'])
self.close(error=Errors.RequestTimedOutError(
- 'Request timed out after %s ms' % self._request_timeout_ms))
+ 'Request timed out after %s ms' %
+ self.config['request_timeout_ms']))
return None
readable, _, _ = select([self._sock], [], [], timeout)
@@ -294,7 +300,7 @@ class BrokerConnection(object):
def _requests_timed_out(self):
if self.in_flight_requests:
oldest_at = self.in_flight_requests[0].timestamp
- timeout = self._request_timeout_ms / 1000.0
+ timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False