summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
commit3e28b42e6691b864b6f940034a4ccdce0b69d406 (patch)
treeb2badd68fc5ac37ab50016f25a0efdb2abb76c55
parentee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43 (diff)
parentda03827d12520bd9c8c5b35bb43e35168f09771a (diff)
downloadkafka-python-3e28b42e6691b864b6f940034a4ccdce0b69d406.tar.gz
Merge pull request #420 from toddpalino/master
Initial support for consumer coordinator
-rw-r--r--kafka/client.py128
-rw-r--r--kafka/common.py22
-rw-r--r--kafka/protocol.py53
-rw-r--r--test/test_protocol.py30
4 files changed, 226 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 64b814b..810fa46 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -98,6 +98,26 @@ class KafkaClient(object):
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]
+ def _get_coordinator_for_group(self, group):
+ """
+ Returns the coordinator broker for a consumer group.
+
+ ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
+ does not currently exist for the group.
+
+ OffsetsLoadInProgressCode is raised if the coordinator is available
+ but is still loading offsets from the internal topic
+ """
+
+ resp = self.send_consumer_metadata_request(group)
+
+ # If there's a problem with finding the coordinator, raise the
+ # provided error
+ kafka.common.check_error(resp)
+
+ # Otherwise return the BrokerMetadata
+ return BrokerMetadata(resp.nodeId, resp.host, resp.port)
+
def _next_id(self):
"""Generate a new correlation id"""
# modulo to keep w/i int32
@@ -254,6 +274,96 @@ class KafkaClient(object):
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
+ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
+ """
+ Send a list of requests to the consumer coordinator for the group
+ specified using the supplied encode/decode functions. As the payloads
+ that use consumer-aware requests do not contain the group (e.g.
+ OffsetFetchRequest), all payloads must be for a single group.
+
+ Arguments:
+
+ group: the name of the consumer group (str) the payloads are for
+ payloads: list of object-like entities with topic (str) and
+ partition (int) attributes; payloads with duplicate
+ topic+partition are not supported.
+
+ encode_fn: a method to encode the list of payloads to a request body,
+ must accept client_id, correlation_id, and payloads as
+ keyword arguments
+
+ decode_fn: a method to decode a response body into response objects.
+ The response objects must be object-like and have topic
+ and partition attributes
+
+ Returns:
+
+ List of response objects in the same order as the supplied payloads
+ """
+ # encoders / decoders do not maintain ordering currently
+ # so we need to keep this so we can rebuild order before returning
+ original_ordering = [(p.topic, p.partition) for p in payloads]
+
+ broker = self._get_coordinator_for_group(group)
+
+ # Send the list of request payloads and collect the responses and
+ # errors
+ responses = {}
+ requestId = self._next_id()
+ log.debug('Request %s to %s: %s', requestId, broker, payloads)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId, payloads=payloads)
+
+ # Send the request, recv the response
+ try:
+ conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ conn.send(requestId, request)
+
+ except ConnectionError as e:
+ log.warning('ConnectionError attempting to send request %s '
+ 'to server %s: %s', requestId, broker, e)
+
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
+
+ # No exception, try to get response
+ else:
+
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
+ if decoder_fn is None:
+ log.debug('Request %s does not expect a response '
+ '(skipping conn.recv)', requestId)
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = None
+ return []
+
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
+
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
+
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
+
+ # Return responses in the same order as provided
+ return [responses[tp] for tp in original_ordering]
+
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
@@ -446,6 +556,13 @@ class KafkaClient(object):
return self._send_broker_unaware_request(payloads, encoder, decoder)
+ def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+ encoder = KafkaProtocol.encode_consumer_metadata_request
+ decoder = KafkaProtocol.decode_consumer_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -550,3 +667,14 @@ class KafkaClient(object):
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
+
+ def send_offset_fetch_request_kafka(self, group, payloads=[],
+ fail_on_error=True, callback=None):
+
+ encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
+ group=group, from_kafka=True)
+ decoder = KafkaProtocol.decode_offset_fetch_response
+ resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
+
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]
diff --git a/kafka/common.py b/kafka/common.py
index 66987ff..a7d8164 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -13,6 +13,13 @@ MetadataRequest = namedtuple("MetadataRequest",
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
+ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
+ ["groups"])
+
+ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
+ ["error", "nodeId", "host", "port"])
+
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
@@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError):
message = 'STALE_LEADER_EPOCH_CODE'
+class OffsetsLoadInProgressCode(BrokerResponseError):
+ errno = 14
+ message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
+
+
+class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
+ errno = 15
+ message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
+
+
+class NotCoordinatorForConsumerCode(BrokerResponseError):
+ errno = 16
+ message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
+
+
class KafkaUnavailableError(KafkaError):
pass
diff --git a/kafka/protocol.py b/kafka/protocol.py
index a916974..412a957 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -14,7 +14,8 @@ from kafka.common import (
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
- ConsumerFetchSizeTooSmall, UnsupportedCodecError
+ ConsumerFetchSizeTooSmall, UnsupportedCodecError,
+ ConsumerMetadataResponse
)
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
@@ -43,19 +44,21 @@ class KafkaProtocol(object):
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
+ CONSUMER_METADATA_KEY = 10
###################
# Private API #
###################
@classmethod
- def _encode_message_header(cls, client_id, correlation_id, request_key):
+ def _encode_message_header(cls, client_id, correlation_id, request_key,
+ version=0):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
- 0, # ApiVersion
+ version, # ApiVersion
correlation_id, # CorrelationId
len(client_id), # ClientId size
client_id) # ClientId
@@ -430,6 +433,38 @@ class KafkaProtocol(object):
return MetadataResponse(brokers, topic_metadata)
@classmethod
+ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
+ """
+ Encode a ConsumerMetadataRequest
+
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: string (consumer group)
+ """
+ message = []
+ message.append(cls._encode_message_header(client_id, correlation_id,
+ KafkaProtocol.CONSUMER_METADATA_KEY))
+ message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
+
+ msg = b''.join(message)
+ return write_int_string(msg)
+
+ @classmethod
+ def decode_consumer_metadata_response(cls, data):
+ """
+ Decode bytes to a ConsumerMetadataResponse
+
+ Arguments:
+ data: bytes to decode
+ """
+ ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
+ (host, cur) = read_short_string(data, cur)
+ ((port,), cur) = relative_unpack('>i', data, cur)
+
+ return ConsumerMetadataResponse(error, nodeId, host, port)
+
+ @classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
"""
@@ -481,21 +516,27 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id,
- group, payloads):
+ group, payloads, from_kafka=False):
"""
- Encode some OffsetFetchRequest structs
+ Encode some OffsetFetchRequest structs. The request is encoded using
+ version 0 if from_kafka is false, indicating a request for Zookeeper
+ offsets. It is encoded using version 1 otherwise, indicating a request
+ for Kafka offsets.
Arguments:
client_id: string
correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
+ from_kafka: bool, default False, set True for Kafka-committed offsets
"""
grouped_payloads = group_by_topic_and_partition(payloads)
message = []
+ reqver = 1 if from_kafka else 0
message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.OFFSET_FETCH_KEY))
+ KafkaProtocol.OFFSET_FETCH_KEY,
+ version=reqver))
message.append(write_short_string(group))
message.append(struct.pack('>i', len(grouped_payloads)))
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 0938228..ac7bea6 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -13,7 +13,7 @@ from kafka.common import (
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
- ProtocolError
+ ProtocolError, ConsumerMetadataResponse
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -560,6 +560,34 @@ class TestProtocol(unittest.TestCase):
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))
+ def test_encode_consumer_metadata_request(self):
+ expected = b"".join([
+ struct.pack(">i", 17), # Total length of the request
+ struct.pack('>h', 10), # API key consumer metadata
+ struct.pack('>h', 0), # API version
+ struct.pack('>i', 4), # Correlation ID
+ struct.pack('>h3s', 3, b"cid"),# The client ID
+ struct.pack('>h2s', 2, b"g1"), # Group "g1"
+ ])
+
+ encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
+
+ self.assertEqual(encoded, expected)
+
+ def test_decode_consumer_metadata_response(self):
+ encoded = b"".join([
+ struct.pack(">i", 42), # Correlation ID
+ struct.pack(">h", 0), # No Error
+ struct.pack(">i", 1), # Broker ID
+ struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
+ struct.pack(">i", 1000), # Broker Port
+ ])
+
+ results = KafkaProtocol.decode_consumer_metadata_response(encoded)
+ self.assertEqual(results,
+ ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
+ )
+
def test_encode_offset_request(self):
expected = b"".join([
struct.pack(">i", 21), # Total length of the request