diff options
-rw-r--r-- | kafka/client.py | 128 | ||||
-rw-r--r-- | kafka/common.py | 22 | ||||
-rw-r--r-- | kafka/protocol.py | 53 | ||||
-rw-r--r-- | test/test_protocol.py | 30 |
4 files changed, 226 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py index 64b814b..810fa46 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -98,6 +98,26 @@ class KafkaClient(object): # Otherwise return the BrokerMetadata return self.brokers[meta.leader] + def _get_coordinator_for_group(self, group): + """ + Returns the coordinator broker for a consumer group. + + ConsumerCoordinatorNotAvailableCode will be raised if the coordinator + does not currently exist for the group. + + OffsetsLoadInProgressCode is raised if the coordinator is available + but is still loading offsets from the internal topic + """ + + resp = self.send_consumer_metadata_request(group) + + # If there's a problem with finding the coordinator, raise the + # provided error + kafka.common.check_error(resp) + + # Otherwise return the BrokerMetadata + return BrokerMetadata(resp.nodeId, resp.host, resp.port) + def _next_id(self): """Generate a new correlation id""" # modulo to keep w/i int32 @@ -254,6 +274,96 @@ class KafkaClient(object): # Return responses in the same order as provided return [responses[tp] for tp in original_ordering] + def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): + """ + Send a list of requests to the consumer coordinator for the group + specified using the supplied encode/decode functions. As the payloads + that use consumer-aware requests do not contain the group (e.g. + OffsetFetchRequest), all payloads must be for a single group. + + Arguments: + + group: the name of the consumer group (str) the payloads are for + payloads: list of object-like entities with topic (str) and + partition (int) attributes; payloads with duplicate + topic+partition are not supported. + + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes + + Returns: + + List of response objects in the same order as the supplied payloads + """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + + broker = self._get_coordinator_for_group(group) + + # Send the list of request payloads and collect the responses and + # errors + responses = {} + requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response + try: + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + conn.send(requestId, request) + + except ConnectionError as e: + log.warning('ConnectionError attempting to send request %s ' + 'to server %s: %s', requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + # No exception, try to get response + else: + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + if decoder_fn is None: + log.debug('Request %s does not expect a response ' + '(skipping conn.recv)', requestId) + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None + return [] + + try: + response = conn.recv(requestId) + except ConnectionError as e: + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) + + # Return responses in the same order as provided + return [responses[tp] for tp in original_ordering] + def __repr__(self): return '<KafkaClient client_id=%s>' % (self.client_id) @@ -446,6 +556,13 @@ class KafkaClient(object): return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_consumer_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + encoder = KafkaProtocol.encode_consumer_metadata_request + decoder = KafkaProtocol.decode_consumer_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -550,3 +667,14 @@ class KafkaClient(object): return [resp if not callback else callback(resp) for resp in resps if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_fetch_request_kafka(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, + group=group, from_kafka=True) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] diff --git a/kafka/common.py b/kafka/common.py index 66987ff..a7d8164 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -13,6 +13,13 @@ MetadataRequest = namedtuple("MetadataRequest", MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest +ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", + ["groups"]) + +ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", + ["error", "nodeId", "host", "port"]) + # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) @@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError): message = 'STALE_LEADER_EPOCH_CODE' +class OffsetsLoadInProgressCode(BrokerResponseError): + errno = 14 + message = 'OFFSETS_LOAD_IN_PROGRESS_CODE' + + +class ConsumerCoordinatorNotAvailableCode(BrokerResponseError): + errno = 15 + message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE' + + +class NotCoordinatorForConsumerCode(BrokerResponseError): + errno = 16 + message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE' + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/protocol.py b/kafka/protocol.py index a916974..412a957 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -14,7 +14,8 @@ from kafka.common import ( MetadataResponse, ProduceResponse, FetchResponse, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, ProtocolError, BufferUnderflowError, ChecksumError, - ConsumerFetchSizeTooSmall, UnsupportedCodecError + ConsumerFetchSizeTooSmall, UnsupportedCodecError, + ConsumerMetadataResponse ) from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, @@ -43,19 +44,21 @@ class KafkaProtocol(object): METADATA_KEY = 3 OFFSET_COMMIT_KEY = 8 OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 ################### # Private API # ################### @classmethod - def _encode_message_header(cls, client_id, correlation_id, request_key): + def _encode_message_header(cls, client_id, correlation_id, request_key, + version=0): """ Encode the common request envelope """ return struct.pack('>hhih%ds' % len(client_id), request_key, # ApiKey - 0, # ApiVersion + version, # ApiVersion correlation_id, # CorrelationId len(client_id), # ClientId size client_id) # ClientId @@ -430,6 +433,38 @@ class KafkaProtocol(object): return MetadataResponse(brokers, topic_metadata) @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): + """ + Encode a ConsumerMetadataRequest + + Arguments: + client_id: string + correlation_id: int + payloads: string (consumer group) + """ + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY)) + message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + + return ConsumerMetadataResponse(error, nodeId, host, port) + + @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads): """ @@ -481,21 +516,27 @@ class KafkaProtocol(object): @classmethod def encode_offset_fetch_request(cls, client_id, correlation_id, - group, payloads): + group, payloads, from_kafka=False): """ - Encode some OffsetFetchRequest structs + Encode some OffsetFetchRequest structs. The request is encoded using + version 0 if from_kafka is false, indicating a request for Zookeeper + offsets. It is encoded using version 1 otherwise, indicating a request + for Kafka offsets. Arguments: client_id: string correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest + from_kafka: bool, default False, set True for Kafka-committed offsets """ grouped_payloads = group_by_topic_and_partition(payloads) message = [] + reqver = 1 if from_kafka else 0 message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.OFFSET_FETCH_KEY)) + KafkaProtocol.OFFSET_FETCH_KEY, + version=reqver)) message.append(write_short_string(group)) message.append(struct.pack('>i', len(grouped_payloads))) diff --git a/test/test_protocol.py b/test/test_protocol.py index 0938228..ac7bea6 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -13,7 +13,7 @@ from kafka.common import ( ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, - ProtocolError + ProtocolError, ConsumerMetadataResponse ) from kafka.protocol import ( ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, @@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase): decoded = KafkaProtocol.decode_metadata_response(encoded) self.assertEqual(decoded, (node_brokers, topic_partitions)) + def test_encode_consumer_metadata_request(self): + expected = b"".join([ + struct.pack(">i", 17), # Total length of the request + struct.pack('>h', 10), # API key consumer metadata + struct.pack('>h', 0), # API version + struct.pack('>i', 4), # Correlation ID + struct.pack('>h3s', 3, b"cid"),# The client ID + struct.pack('>h2s', 2, b"g1"), # Group "g1" + ]) + + encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1") + + self.assertEqual(encoded, expected) + + def test_decode_consumer_metadata_response(self): + encoded = b"".join([ + struct.pack(">i", 42), # Correlation ID + struct.pack(">h", 0), # No Error + struct.pack(">i", 1), # Broker ID + struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host + struct.pack(">i", 1000), # Broker Port + ]) + + results = KafkaProtocol.decode_consumer_metadata_response(encoded) + self.assertEqual(results, + ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000) + ) + def test_encode_offset_request(self): expected = b"".join([ struct.pack(">i", 21), # Total length of the request |