summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py299
1 files changed, 218 insertions, 81 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d45b824..c2b8fb0 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,15 +1,20 @@
-from collections import deque
+import collections
import copy
+import errno
import logging
+import io
from random import shuffle
from select import select
import socket
import struct
from threading import local
+import time
import six
+import kafka.common as Errors
from kafka.common import ConnectionError
+from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.types import Int32
@@ -20,106 +25,244 @@ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
-class BrokerConnection(local):
- def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
- client_id='kafka-python-0.10.0', correlation_id=0):
- super(BrokerConnection, self).__init__()
+class ConnectionStates(object):
+ DISCONNECTED = 1
+ CONNECTING = 2
+ CONNECTED = 3
+
+
+InFlightRequest = collections.namedtuple('InFlightRequest',
+ ['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
+
+
+class BrokerConnection(object):
+ _receive_buffer_bytes = 32768
+ _send_buffer_bytes = 32768
+ _client_id = 'kafka-python-0.10.0'
+ _correlation_id = 0
+ _request_timeout_ms = 40000
+
+ def __init__(self, host, port, **kwargs):
self.host = host
self.port = port
- self.timeout = timeout
- self._write_fd = None
- self._read_fd = None
- self.correlation_id = correlation_id
- self.client_id = client_id
- self.in_flight_requests = deque()
+ self.in_flight_requests = collections.deque()
+
+ for config in ('receive_buffer_bytes', 'send_buffer_bytes',
+ 'client_id', 'correlation_id', 'request_timeout_ms'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs.pop(config))
+
+ self.state = ConnectionStates.DISCONNECTED
+ self._sock = None
+ self._rbuffer = io.BytesIO()
+ self._receiving = False
+ self._next_payload_bytes = 0
+ self._last_connection_attempt = None
+ self._last_connection_failure = None
def connect(self):
- if self.connected():
+ """Attempt to connect and return ConnectionState"""
+ if self.state is ConnectionStates.DISCONNECTED:
self.close()
- try:
- sock = socket.create_connection((self.host, self.port), self.timeout)
- self._write_fd = sock.makefile('wb')
- self._read_fd = sock.makefile('rb')
- except socket.error:
- log.exception("Error in BrokerConnection.connect()")
- return None
- self.in_flight_requests.clear()
- return True
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self._receive_buffer_bytes)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer_bytes)
+ self._sock.setblocking(False)
+ ret = self._sock.connect_ex((self.host, self.port))
+ self._last_connection_attempt = time.time()
+
+ if not ret or ret is errno.EISCONN:
+ self.state = ConnectionStates.CONNECTED
+ elif ret in (errno.EINPROGRESS, errno.EALREADY):
+ self.state = ConnectionStates.CONNECTING
+ else:
+ log.error('Connect attempt returned error %s. Disconnecting.', ret)
+ self.close()
+ self._last_connection_failure = time.time()
+
+ if self.state is ConnectionStates.CONNECTING:
+ # in non-blocking mode, use repeated calls to socket.connect_ex
+ # to check connection status
+ if time.time() > (self._request_timeout_ms / 1000.0) + self._last_connection_attempt:
+ log.error('Connection attempt timed out')
+ self.close() # error=TimeoutError ?
+ self._last_connection_failure = time.time()
+
+ ret = self._sock.connect_ex((self.host, self.port))
+ if not ret or ret is errno.EISCONN:
+ self.state = ConnectionStates.CONNECTED
+ elif ret is not errno.EALREADY:
+ log.error('Connect attempt returned error %s. Disconnecting.', ret)
+ self.close()
+ self._last_connection_failure = time.time()
+ return self.state
def connected(self):
- return (self._read_fd is not None and self._write_fd is not None)
+ return self.state is ConnectionStates.CONNECTED
- def close(self):
- if self.connected():
- try:
- self._read_fd.close()
- self._write_fd.close()
- except socket.error:
- log.exception("Error in BrokerConnection.close()")
- pass
- self._read_fd = None
- self._write_fd = None
+ def close(self, error=None):
+ if self._sock:
+ self._sock.close()
+ self._sock = None
+ self.state = ConnectionStates.DISCONNECTED
+
+ if error is None:
+ error = Errors.DisconnectError()
+ while self.in_flight_requests:
+ ifr = self.in_flight_requests.popleft()
+ ifr.future.failure(error)
self.in_flight_requests.clear()
+ self._receiving = False
+ self._next_payload_bytes = 0
+ self._rbuffer.seek(0)
+ self._rbuffer.truncate()
def send(self, request, expect_response=True):
- if not self.connected() and not self.connect():
- return None
- self.correlation_id += 1
+ """send request, return Future()
+
+ Can block on network if request is larger than send_buffer_bytes
+ """
+ future = Future()
+ if not self.connected():
+ return future.failure(Errors.DisconnectError())
+ self._correlation_id += 1
header = RequestHeader(request,
- correlation_id=self.correlation_id,
- client_id=self.client_id)
+ correlation_id=self._correlation_id,
+ client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
try:
- self._write_fd.write(size)
- self._write_fd.write(message)
- self._write_fd.flush()
- except socket.error:
- log.exception("Error in BrokerConnection.send(): %s", request)
- self.close()
- return None
+ # In the future we might manage an internal write buffer
+ # and send bytes asynchronously. For now, just block
+ # sending each request payload
+ self._sock.setblocking(True)
+ sent_bytes = self._sock.send(size)
+ assert sent_bytes == len(size)
+ sent_bytes = self._sock.send(message)
+ assert sent_bytes == len(message)
+ self._sock.setblocking(False)
+ except (AssertionError, socket.error) as e:
+ log.debug("Error in BrokerConnection.send(): %s", request)
+ self.close(error=e)
+ return future.failure(e)
+ log.debug('Request %d: %s', self._correlation_id, request)
+
if expect_response:
- self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE))
- log.debug('Request %d: %s', self.correlation_id, request)
- return self.correlation_id
+ ifr = InFlightRequest(request=request,
+ correlation_id=self._correlation_id,
+ response_type=request.RESPONSE_TYPE,
+ future=future,
+ timestamp=time.time())
+ self.in_flight_requests.append(ifr)
+ else:
+ future.success(None)
+
+ return future
+
+ def recv(self, timeout=0):
+ """Non-blocking network receive
- def recv(self, timeout=None):
+ Return response if available
+ """
if not self.connected():
+ log.warning('Cannot recv: socket not connected')
+ # If requests are pending, we should close the socket and
+ # fail all the pending request futures
+ if self.in_flight_requests:
+ self.close()
return None
- readable, _, _ = select([self._read_fd], [], [], timeout)
- if not readable:
- return None
+
if not self.in_flight_requests:
log.warning('No in-flight-requests to recv')
return None
- correlation_id, response_type = self.in_flight_requests.popleft()
- # Current implementation does not use size
- # instead we read directly from the socket fd buffer
- # alternatively, we could read size bytes into a separate buffer
- # and decode from that buffer (and verify buffer is empty afterwards)
- try:
- size = Int32.decode(self._read_fd)
- recv_correlation_id = Int32.decode(self._read_fd)
- if correlation_id != recv_correlation_id:
- raise RuntimeError('Correlation ids do not match!')
- response = response_type.decode(self._read_fd)
- except (RuntimeError, socket.error, struct.error):
- log.exception("Error in BrokerConnection.recv() for request %d", correlation_id)
- self.close()
+
+ self._fail_timed_out_requests()
+
+ readable, _, _ = select([self._sock], [], [], timeout)
+ if not readable:
return None
- log.debug('Response %d: %s', correlation_id, response)
- return response
- def next_correlation_id_recv(self):
- if len(self.in_flight_requests) == 0:
+ # Not receiving is the state of reading the payload header
+ if not self._receiving:
+ try:
+ # An extremely small, but non-zero, probability that there are
+ # more than 0 but not yet 4 bytes available to read
+ self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
+ except socket.error as e:
+ if e.errno == errno.EWOULDBLOCK:
+ # This shouldn't happen after selecting above
+ # but just in case
+ return None
+ log.exception("Error receiving 4-byte payload header - closing socket")
+ self.close(error=e)
+ return None
+
+ if self._rbuffer.tell() == 4:
+ self._rbuffer.seek(0)
+ self._next_payload_bytes = Int32.decode(self._rbuffer)
+ # reset buffer and switch state to receiving payload bytes
+ self._rbuffer.seek(0)
+ self._rbuffer.truncate()
+ self._receiving = True
+ elif self._rbuffer.tell() > 4:
+ raise Errors.KafkaError('this should not happen - are you threading?')
+
+ if self._receiving:
+ staged_bytes = self._rbuffer.tell()
+ try:
+ self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
+ except socket.error as e:
+ # Extremely small chance that we have exactly 4 bytes for a
+ # header, but nothing to read in the body yet
+ if e.errno == errno.EWOULDBLOCK:
+ return None
+ log.exception()
+ self.close(error=e)
+ return None
+
+ staged_bytes = self._rbuffer.tell()
+ if staged_bytes > self._next_payload_bytes:
+ self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
+
+ if staged_bytes != self._next_payload_bytes:
+ return None
+
+ self._receiving = False
+ self._next_payload_bytes = 0
+ self._rbuffer.seek(0)
+ response = self._process_response(self._rbuffer)
+ self._rbuffer.seek(0)
+ self._rbuffer.truncate()
+ return response
+
+ def _process_response(self, read_buffer):
+ ifr = self.in_flight_requests.popleft()
+
+ # verify send/recv correlation ids match
+ recv_correlation_id = Int32.decode(read_buffer)
+ if ifr.correlation_id != recv_correlation_id:
+ error = Errors.CorrelationIdError(
+ 'Correlation ids do not match: sent %d, recv %d'
+ % (ifr.correlation_id, recv_correlation_id))
+ ifr.future.fail(error)
+ self.close()
return None
- return self.in_flight_requests[0][0]
- def next_correlation_id_send(self):
- return self.correlation_id + 1
+ # decode response
+ response = ifr.response_type.decode(read_buffer)
+ ifr.future.success(response)
+ log.debug('Response %d: %s', ifr.correlation_id, response)
+ return response
- def __getnewargs__(self):
- return (self.host, self.port, self.timeout)
+ def _fail_timed_out_requests(self):
+ now = time.time()
+ while self.in_flight_requests:
+ next_timeout = self.in_flight_requests[0].timestamp + (self._request_timeout_ms / 1000.0)
+ if now < next_timeout:
+ break
+ timed_out = self.in_flight_requests.popleft()
+ error = Errors.RequestTimedOutError('Request timed out after %s ms' % self._request_timeout_ms)
+ timed_out.future.failure(error)
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
@@ -149,13 +292,7 @@ def collect_hosts(hosts, randomize=True):
class KafkaConnection(local):
- """
- A socket connection to a single Kafka broker
-
- This class is _not_ thread safe. Each call to `send` must be followed
- by a call to `recv` in order to get the correct response. Eventually,
- we can do something in here to facilitate multiplexed requests/responses
- since the Kafka API includes a correlation id.
+ """A socket connection to a single Kafka broker
Arguments:
host: the host name or IP address of a kafka broker