diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/conn.py | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 338 |
1 files changed, 329 insertions, 9 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 9514e48..6ee5f5f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,21 +1,347 @@ +import collections import copy +import errno import logging +import io from random import shuffle +from select import select import socket import struct from threading import local +import time import six -from kafka.common import ConnectionError +import kafka.common as Errors +from kafka.future import Future +from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.types import Int32 +from kafka.version import __version__ +if six.PY2: + ConnectionError = socket.error + BlockingIOError = Exception + log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +class ConnectionStates(object): + DISCONNECTED = '<disconnected>' + CONNECTING = '<connecting>' + CONNECTED = '<connected>' + + +InFlightRequest = collections.namedtuple('InFlightRequest', + ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) + + +class BrokerConnection(object): + DEFAULT_CONFIG = { + 'client_id': 'kafka-python-' + __version__, + 'request_timeout_ms': 40000, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + 'api_version': (0, 8, 2), # default to most restrictive + } + + def __init__(self, host, port, **configs): + self.host = host + self.port = port + self.in_flight_requests = collections.deque() + + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self.state = ConnectionStates.DISCONNECTED + self._sock = None + self._rbuffer = io.BytesIO() + self._receiving = False + self._next_payload_bytes = 0 + self.last_attempt = 0 + self.last_failure = 0 + self._processing = False + self._correlation_id = 0 + + def connect(self): + """Attempt to connect and return ConnectionState""" + if self.state is ConnectionStates.DISCONNECTED: + self.close() + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + self.config['receive_buffer_bytes']) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, + self.config['send_buffer_bytes']) + self._sock.setblocking(False) + ret = self._sock.connect_ex((self.host, self.port)) + self.last_attempt = time.time() + + if not ret or ret is errno.EISCONN: + self.state = ConnectionStates.CONNECTED + elif ret in (errno.EINPROGRESS, errno.EALREADY): + self.state = ConnectionStates.CONNECTING + else: + log.error('Connect attempt to %s returned error %s.' + ' Disconnecting.', self, ret) + self.close() + self.last_failure = time.time() + + if self.state is ConnectionStates.CONNECTING: + # in non-blocking mode, use repeated calls to socket.connect_ex + # to check connection status + request_timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_attempt: + log.error('Connection attempt to %s timed out', self) + self.close() # error=TimeoutError ? + self.last_failure = time.time() + + else: + ret = self._sock.connect_ex((self.host, self.port)) + if not ret or ret is errno.EISCONN: + self.state = ConnectionStates.CONNECTED + elif ret is not errno.EALREADY: + log.error('Connect attempt to %s returned error %s.' + ' Disconnecting.', self, ret) + self.close() + self.last_failure = time.time() + return self.state + + def blacked_out(self): + """ + Return true if we are disconnected from the given node and can't + re-establish a connection yet + """ + if self.state is ConnectionStates.DISCONNECTED: + backoff = self.config['reconnect_backoff_ms'] / 1000.0 + if time.time() < self.last_attempt + backoff: + return True + return False + + def connected(self): + """Return True iff socket is connected.""" + return self.state is ConnectionStates.CONNECTED + + def close(self, error=None): + """Close socket and fail all in-flight-requests. + + Arguments: + error (Exception, optional): pending in-flight-requests + will be failed with this exception. + Default: kafka.common.ConnectionError. + """ + if self._sock: + self._sock.close() + self._sock = None + self.state = ConnectionStates.DISCONNECTED + self._receiving = False + self._next_payload_bytes = 0 + self._rbuffer.seek(0) + self._rbuffer.truncate() + if error is None: + error = Errors.ConnectionError() + while self.in_flight_requests: + ifr = self.in_flight_requests.popleft() + ifr.future.failure(error) + + def send(self, request, expect_response=True): + """send request, return Future() + + Can block on network if request is larger than send_buffer_bytes + """ + future = Future() + if not self.connected(): + return future.failure(Errors.ConnectionError()) + if not self.can_send_more(): + return future.failure(Errors.TooManyInFlightRequests()) + correlation_id = self._next_correlation_id() + header = RequestHeader(request, + correlation_id=correlation_id, + client_id=self.config['client_id']) + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + try: + # In the future we might manage an internal write buffer + # and send bytes asynchronously. For now, just block + # sending each request payload + self._sock.setblocking(True) + sent_bytes = self._sock.send(size) + assert sent_bytes == len(size) + sent_bytes = self._sock.send(message) + assert sent_bytes == len(message) + self._sock.setblocking(False) + except (AssertionError, ConnectionError) as e: + log.exception("Error sending %s to %s", request, self) + error = Errors.ConnectionError(e) + self.close(error=error) + return future.failure(error) + log.debug('%s Request %d: %s', self, correlation_id, request) + + if expect_response: + ifr = InFlightRequest(request=request, + correlation_id=correlation_id, + response_type=request.RESPONSE_TYPE, + future=future, + timestamp=time.time()) + self.in_flight_requests.append(ifr) + else: + future.success(None) + + return future + + def can_send_more(self): + """Return True unless there are max_in_flight_requests.""" + max_ifrs = self.config['max_in_flight_requests_per_connection'] + return len(self.in_flight_requests) < max_ifrs + + def recv(self, timeout=0): + """Non-blocking network receive. + + Return response if available + """ + assert not self._processing, 'Recursion not supported' + if not self.connected(): + log.warning('%s cannot recv: socket not connected', self) + # If requests are pending, we should close the socket and + # fail all the pending request futures + if self.in_flight_requests: + self.close() + return None + + elif not self.in_flight_requests: + log.warning('%s: No in-flight-requests to recv', self) + return None + + elif self._requests_timed_out(): + log.warning('%s timed out after %s ms. Closing connection.', + self, self.config['request_timeout_ms']) + self.close(error=Errors.RequestTimedOutError( + 'Request timed out after %s ms' % + self.config['request_timeout_ms'])) + return None + + readable, _, _ = select([self._sock], [], [], timeout) + if not readable: + return None + + # Not receiving is the state of reading the payload header + if not self._receiving: + try: + # An extremely small, but non-zero, probability that there are + # more than 0 but not yet 4 bytes available to read + self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) + except ConnectionError as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: + # This shouldn't happen after selecting above + # but just in case + return None + log.exception('%s: Error receiving 4-byte payload header -' + ' closing socket', self) + self.close(error=Errors.ConnectionError(e)) + return None + except BlockingIOError: + if six.PY3: + return None + raise + + if self._rbuffer.tell() == 4: + self._rbuffer.seek(0) + self._next_payload_bytes = Int32.decode(self._rbuffer) + # reset buffer and switch state to receiving payload bytes + self._rbuffer.seek(0) + self._rbuffer.truncate() + self._receiving = True + elif self._rbuffer.tell() > 4: + raise Errors.KafkaError('this should not happen - are you threading?') + + if self._receiving: + staged_bytes = self._rbuffer.tell() + try: + self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) + except ConnectionError as e: + # Extremely small chance that we have exactly 4 bytes for a + # header, but nothing to read in the body yet + if six.PY2 and e.errno == errno.EWOULDBLOCK: + return None + log.exception('%s: Error in recv', self) + self.close(error=Errors.ConnectionError(e)) + return None + except BlockingIOError: + if six.PY3: + return None + raise + + staged_bytes = self._rbuffer.tell() + if staged_bytes > self._next_payload_bytes: + self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?')) + + if staged_bytes != self._next_payload_bytes: + return None + + self._receiving = False + self._next_payload_bytes = 0 + self._rbuffer.seek(0) + response = self._process_response(self._rbuffer) + self._rbuffer.seek(0) + self._rbuffer.truncate() + return response + + def _process_response(self, read_buffer): + assert not self._processing, 'Recursion not supported' + self._processing = True + ifr = self.in_flight_requests.popleft() + + # verify send/recv correlation ids match + recv_correlation_id = Int32.decode(read_buffer) + + # 0.8.2 quirk + if (self.config['api_version'] == (0, 8, 2) and + ifr.response_type is GroupCoordinatorResponse and + recv_correlation_id == 0): + raise Errors.KafkaError( + 'Kafka 0.8.2 quirk -- try creating a topic first') + + elif ifr.correlation_id != recv_correlation_id: + + + error = Errors.CorrelationIdError( + 'Correlation ids do not match: sent %d, recv %d' + % (ifr.correlation_id, recv_correlation_id)) + ifr.future.fail(error) + self.close() + self._processing = False + return None + + # decode response + response = ifr.response_type.decode(read_buffer) + log.debug('%s Response %d: %s', self, ifr.correlation_id, response) + ifr.future.success(response) + self._processing = False + return response + + def _requests_timed_out(self): + if self.in_flight_requests: + oldest_at = self.in_flight_requests[0].timestamp + timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() >= oldest_at + timeout: + return True + return False + + def _next_correlation_id(self): + self._correlation_id = (self._correlation_id + 1) % 2**31 + return self._correlation_id + + def __repr__(self): + return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) + + def collect_hosts(hosts, randomize=True): """ Collects a comma-separated set of hosts (host:port) and optionally @@ -40,13 +366,7 @@ def collect_hosts(hosts, randomize=True): class KafkaConnection(local): - """ - A socket connection to a single Kafka broker - - This class is _not_ thread safe. Each call to `send` must be followed - by a call to `recv` in order to get the correct response. Eventually, - we can do something in here to facilitate multiplexed requests/responses - since the Kafka API includes a correlation id. + """A socket connection to a single Kafka broker Arguments: host: the host name or IP address of a kafka broker @@ -79,7 +399,7 @@ class KafkaConnection(local): self.close() # And then raise - raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): bytes_left = num_bytes |