diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 189 |
1 files changed, 44 insertions, 145 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index af01efa..7bbd979 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -14,11 +14,10 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest -from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest -from kafka.protocol.frame import KafkaBytes +from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -73,9 +72,6 @@ class ConnectionStates(object): CONNECTED = '<connected>' AUTHENTICATING = '<authenticating>' -InFlightRequest = collections.namedtuple('InFlightRequest', - ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) - class BrokerConnection(object): """Initialize a Kafka broker connection @@ -226,6 +222,9 @@ class BrokerConnection(object): assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl' + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None @@ -233,12 +232,7 @@ class BrokerConnection(object): if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] self._sasl_auth_future = None - self._header = KafkaBytes(4) - self._rbuffer = None - self._receiving = False self.last_attempt = 0 - self._processing = False - self._correlation_id = 0 self._gai = None self._gai_index = 0 self._sensors = None @@ -628,19 +622,16 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self.last_attempt = time.time() self._sasl_auth_future = None - self._reset_buffer() + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) if error is None: error = Errors.Cancelled(str(self)) while self.in_flight_requests: - ifr = self.in_flight_requests.popleft() - ifr.future.failure(error) + (_, future, _) = self.in_flight_requests.popleft() + future.failure(error) self.config['state_change_callback'](self) - def _reset_buffer(self): - self._receiving = False - self._header.seek(0) - self._rbuffer = None - def send(self, request): """send request, return Future() @@ -658,13 +649,8 @@ class BrokerConnection(object): def _send(self, request): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() - correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self.config['client_id']) - message = b''.join([header.encode(), request.encode()]) - size = Int32.encode(len(message)) - data = size + message + correlation_id = self._protocol.send_request(request) + data = self._protocol.send_bytes() try: # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block @@ -686,11 +672,7 @@ class BrokerConnection(object): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - ifr = InFlightRequest(request=request, - correlation_id=correlation_id, - response_type=request.RESPONSE_TYPE, - future=future, - timestamp=time.time()) + ifr = (correlation_id, future, time.time()) self.in_flight_requests.append(ifr) else: future.success(None) @@ -707,7 +689,6 @@ class BrokerConnection(object): Return response if available """ - assert not self._processing, 'Recursion not supported' if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and @@ -720,15 +701,28 @@ class BrokerConnection(object): log.warning('%s: No in-flight-requests to recv', self) return () - response = self._recv() - if not response and self.requests_timed_out(): + responses = self._recv() + if not responses and self.requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', self, self.config['request_timeout_ms']) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % self.config['request_timeout_ms'])) return () - return response + + for response in responses: + (correlation_id, future, timestamp) = self.in_flight_requests.popleft() + if isinstance(response, Errors.KafkaError): + self.close(response) + break + + if self._sensors: + self._sensors.request_time.record((time.time() - timestamp) * 1000) + + log.debug('%s Response %d: %s', self, correlation_id, response) + future.success(response) + + return responses def _recv(self): responses = [] @@ -744,10 +738,7 @@ class BrokerConnection(object): log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) break - else: - responses.extend(self.receive_bytes(data)) - if len(data) < SOCK_CHUNK_BYTES: - break + except SSLWantReadError: break except ConnectionError as e: @@ -761,118 +752,26 @@ class BrokerConnection(object): if six.PY3: break raise - return responses - def receive_bytes(self, data): - i = 0 - n = len(data) - responses = [] - if self._sensors: - self._sensors.bytes_received.record(n) - while i < n: - - # Not receiving is the state of reading the payload header - if not self._receiving: - bytes_to_read = min(4 - self._header.tell(), n - i) - self._header.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - if self._header.tell() == 4: - self._header.seek(0) - nbytes = Int32.decode(self._header) - # reset buffer and switch state to receiving payload bytes - self._rbuffer = KafkaBytes(nbytes) - self._receiving = True - elif self._header.tell() > 4: - raise Errors.KafkaError('this should not happen - are you threading?') - - - if self._receiving: - total_bytes = len(self._rbuffer) - staged_bytes = self._rbuffer.tell() - bytes_to_read = min(total_bytes - staged_bytes, n - i) - self._rbuffer.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - staged_bytes = self._rbuffer.tell() - if staged_bytes > total_bytes: - self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?')) - - if staged_bytes != total_bytes: - break + if self._sensors: + self._sensors.bytes_received.record(len(data)) - self._receiving = False - self._rbuffer.seek(0) - resp = self._process_response(self._rbuffer) - if resp is not None: - responses.append(resp) - self._reset_buffer() - return responses + try: + more_responses = self._protocol.receive_bytes(data) + except Errors.KafkaProtocolError as e: + self.close(e) + break + else: + responses.extend([resp for (_, resp) in more_responses]) - def _process_response(self, read_buffer): - assert not self._processing, 'Recursion not supported' - self._processing = True - recv_correlation_id = Int32.decode(read_buffer) - - if not self.in_flight_requests: - error = Errors.CorrelationIdError( - '%s: No in-flight-request found for server response' - ' with correlation ID %d' - % (self, recv_correlation_id)) - self.close(error) - self._processing = False - return None - else: - ifr = self.in_flight_requests.popleft() - - if self._sensors: - self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) - - # verify send/recv correlation ids match - - # 0.8.2 quirk - if (self.config['api_version'] == (0, 8, 2) and - ifr.response_type is GroupCoordinatorResponse[0] and - ifr.correlation_id != 0 and - recv_correlation_id == 0): - log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' - ' Correlation ID does not match request. This' - ' should go away once at least one topic has been' - ' initialized on the broker.') - - elif ifr.correlation_id != recv_correlation_id: - error = Errors.CorrelationIdError( - '%s: Correlation IDs do not match: sent %d, recv %d' - % (self, ifr.correlation_id, recv_correlation_id)) - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - # decode response - try: - response = ifr.response_type.decode(read_buffer) - except ValueError: - read_buffer.seek(0) - buf = read_buffer.read() - log.error('%s Response %d [ResponseType: %s Request: %s]:' - ' Unable to decode %d-byte buffer: %r', self, - ifr.correlation_id, ifr.response_type, - ifr.request, len(buf), buf) - error = Errors.UnknownError('Unable to decode response') - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - log.debug('%s Response %d: %s', self, ifr.correlation_id, response) - ifr.future.success(response) - self._processing = False - return response + if len(data) < SOCK_CHUNK_BYTES: + break + + return responses def requests_timed_out(self): if self.in_flight_requests: - oldest_at = self.in_flight_requests[0].timestamp + (_, _, oldest_at) = self.in_flight_requests[0] timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() >= oldest_at + timeout: return True |