summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/parser.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py
new file mode 100644
index 0000000..4d77bb3
--- /dev/null
+++ b/kafka/protocol/parser.py
@@ -0,0 +1,177 @@
+from __future__ import absolute_import
+
+import collections
+import logging
+
+import kafka.errors as Errors
+from kafka.protocol.api import RequestHeader
+from kafka.protocol.commit import GroupCoordinatorResponse
+from kafka.protocol.frame import KafkaBytes
+from kafka.protocol.types import Int32
+from kafka.version import __version__
+
+log = logging.getLogger(__name__)
+
+
+class KafkaProtocol(object):
+ """Manage the kafka network protocol
+
+ Use an instance of KafkaProtocol to manage bytes send/recv'd
+ from a network socket to a broker.
+ """
+ def __init__(self, client_id=None, api_version=None):
+ if client_id is None:
+ client_id = self._gen_client_id()
+ self._client_id = client_id
+ self._api_version = api_version
+ self._correlation_id = 0
+ self._header = KafkaBytes(4)
+ self._rbuffer = None
+ self._receiving = False
+ self.in_flight_requests = collections.deque()
+ self.bytes_to_send = []
+
+ def _next_correlation_id(self):
+ self._correlation_id = (self._correlation_id + 1) % 2**31
+ return self._correlation_id
+
+ def _gen_client_id(self):
+ return 'kafka-python' + __version__
+
+ def send_request(self, request, correlation_id=None):
+ """Encode and queue a kafka api request for sending.
+
+ Arguments:
+ request (object): An un-encoded kafka request.
+ correlation_id (int, optional): Optionally specify an ID to
+ correlate requests with responses. If not provided, an ID will
+ be generated automatically.
+
+ Returns:
+ correlation_id
+ """
+ log.debug('Sending request %s', request)
+ if correlation_id is None:
+ correlation_id = self._next_correlation_id()
+ header = RequestHeader(request,
+ correlation_id=correlation_id,
+ client_id=self._client_id)
+ message = b''.join([header.encode(), request.encode()])
+ size = Int32.encode(len(message))
+ data = size + message
+ self.bytes_to_send.append(data)
+ if request.expect_response():
+ ifr = (correlation_id, request)
+ self.in_flight_requests.append(ifr)
+ return correlation_id
+
+ def send_bytes(self):
+ """Retrieve all pending bytes to send on the network"""
+ data = b''.join(self.bytes_to_send)
+ self.bytes_to_send = []
+ return data
+
+ def receive_bytes(self, data):
+ """Process bytes received from the network.
+
+ Arguments:
+ data (bytes): any length bytes received from a network connection
+ to a kafka broker.
+
+ Returns:
+ responses (list of (correlation_id, response)): any/all completed
+ responses, decoded from bytes to python objects.
+
+ Raises:
+ KafkaProtocolError: if the bytes received could not be decoded.
+ CorrelationIdError: if the response does not match the request
+ correlation id.
+ """
+ i = 0
+ n = len(data)
+ responses = []
+ while i < n:
+
+ # Not receiving is the state of reading the payload header
+ if not self._receiving:
+ bytes_to_read = min(4 - self._header.tell(), n - i)
+ self._header.write(data[i:i+bytes_to_read])
+ i += bytes_to_read
+
+ if self._header.tell() == 4:
+ self._header.seek(0)
+ nbytes = Int32.decode(self._header)
+ # reset buffer and switch state to receiving payload bytes
+ self._rbuffer = KafkaBytes(nbytes)
+ self._receiving = True
+ elif self._header.tell() > 4:
+ raise Errors.KafkaError('this should not happen - are you threading?')
+
+ if self._receiving:
+ total_bytes = len(self._rbuffer)
+ staged_bytes = self._rbuffer.tell()
+ bytes_to_read = min(total_bytes - staged_bytes, n - i)
+ self._rbuffer.write(data[i:i+bytes_to_read])
+ i += bytes_to_read
+
+ staged_bytes = self._rbuffer.tell()
+ if staged_bytes > total_bytes:
+ raise Errors.KafkaError('Receive buffer has more bytes than expected?')
+
+ if staged_bytes != total_bytes:
+ break
+
+ self._receiving = False
+ self._rbuffer.seek(0)
+ resp = self._process_response(self._rbuffer)
+ responses.append(resp)
+ self._reset_buffer()
+ return responses
+
+ def _process_response(self, read_buffer):
+ recv_correlation_id = Int32.decode(read_buffer)
+ log.debug('Received correlation id: %d', recv_correlation_id)
+
+ if not self.in_flight_requests:
+ raise Errors.CorrelationIdError(
+ 'No in-flight-request found for server response'
+ ' with correlation ID %d'
+ % recv_correlation_id)
+
+ (correlation_id, request) = self.in_flight_requests.popleft()
+
+ # 0.8.2 quirk
+ if (self._api_version == (0, 8, 2) and
+ request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and
+ correlation_id != 0 and
+ recv_correlation_id == 0):
+ log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
+ ' Correlation ID does not match request. This'
+ ' should go away once at least one topic has been'
+ ' initialized on the broker.')
+
+ elif correlation_id != recv_correlation_id:
+ # return or raise?
+ raise Errors.CorrelationIdError(
+ 'Correlation IDs do not match: sent %d, recv %d'
+ % (correlation_id, recv_correlation_id))
+
+ # decode response
+ log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
+ try:
+ response = request.RESPONSE_TYPE.decode(read_buffer)
+ except ValueError:
+ read_buffer.seek(0)
+ buf = read_buffer.read()
+ log.error('Response %d [ResponseType: %s Request: %s]:'
+ ' Unable to decode %d-byte buffer: %r',
+ correlation_id, request.RESPONSE_TYPE,
+ request, len(buf), buf)
+ raise Errors.KafkaProtocolError('Unable to decode response')
+
+ return (correlation_id, response)
+
+ def _reset_buffer(self):
+ self._receiving = False
+ self._header.seek(0)
+ self._rbuffer = None