diff options
author | David Arthur <mumrah@gmail.com> | 2013-02-22 23:09:25 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 2a3d231aa61642c57537bc2128dd4f2bd30f35dd (patch) | |
tree | 6bfdfa13b228481df9c79bcb926c2036b476b891 /kafka/client08.py | |
parent | e87c561723be25fcfa2564030367196231aa366e (diff) | |
download | kafka-python-2a3d231aa61642c57537bc2128dd4f2bd30f35dd.tar.gz |
Protocol and low-level client done, adding tests
Diffstat (limited to 'kafka/client08.py')
-rw-r--r-- | kafka/client08.py | 180 |
1 files changed, 110 insertions, 70 deletions
diff --git a/kafka/client08.py b/kafka/client08.py index b048d68..49d786f 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,7 +14,7 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string -from .util import group_list_by_key +from .util import group_by_topic_and_partition from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") @@ -33,7 +33,7 @@ OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) -OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) @@ -74,6 +74,9 @@ class KafkaProtocol(object): OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 ################### # Private API # @@ -171,13 +174,13 @@ class KafkaProtocol(object): (key, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur) - if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE: yield (offset, Message(magic, att, key, value)) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP: gz = gzip_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): yield (offset, message) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY: snp = snappy_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): yield (offset, message) @@ -214,8 +217,25 @@ class KafkaProtocol(object): message_set = KafkaProtocol._encode_message_set( [KafkaProtocol.create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) - return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) + @classmethod + def create_snappy_message(cls, payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Params + ====== + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + snapped = snappy_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped) @classmethod def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): @@ -234,14 +254,14 @@ class KafkaProtocol(object): -1: waits for all replicas to be in sync timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) - message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): - message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) - for payload in payloads: + message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): + message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads)) + for partition, payload in topic_payloads.items(): message_set = KafkaProtocol._encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -276,15 +296,15 @@ class KafkaProtocol(object): max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before returning the response """ - - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) - message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -308,14 +328,14 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=[]): - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) - message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.time, payload.max_offsets) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -332,8 +352,12 @@ class KafkaProtocol(object): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) for i in range(num_partitions): - ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) - yield OffsetResponse(topic, partition, error, offset) + ((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur) + offsets = [] + for j in range(num_offsets): + ((offset,), cur) = relative_unpack('>q', data, cur) + offsets.append(offset) + yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod def encode_metadata_request(cls, client_id, correlation_id, topics=[]): @@ -400,15 +424,15 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads= group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iq', payload.partition, payload.offset) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iq', partition, payload.offset) message += write_short_string(payload.metadata) return struct.pack('>i%ds' % len(message), len(message), message) @@ -421,6 +445,7 @@ class KafkaProtocol(object): ====== data: bytes to decode """ + data = data[2:] # TODO remove me when versionId is removed ((correlation_id,), cur) = relative_unpack('>i', data, 0) (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -443,15 +468,15 @@ class KafkaProtocol(object): group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>i', payload.partition) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>i', partition) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -493,6 +518,9 @@ class KafkaConnection(object): self._sock.connect((host, port)) self._sock.settimeout(10) + def __str__(self): + return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) + ################### # Private API # ################### @@ -536,6 +564,8 @@ class KafkaConnection(object): # Public API # ################## + # TODO multiplex socket communication to allow for multi-threaded clients + def send(self, requestId, payload): "Send a request to Kafka" sent = self._sock.sendall(payload) @@ -566,6 +596,10 @@ class KafkaClient(object): self.topics_to_brokers = {} # topic_id -> broker_id self.load_metadata_for_topics() + def close(self): + for conn in self.conns.values(): + conn.close() + def get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: @@ -626,20 +660,14 @@ class KafkaClient(object): ====== list of ProduceResponse or callback(ProduceResponse), in the order of input payloads """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary acc = {} # For each broker, send the list of request payloads @@ -657,11 +685,10 @@ class KafkaClient(object): (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: - acc[key_fn(produce_response)] = callback(produce_response) + acc[(produce_response.topic, produce_response.partition)] = callback(produce_response) else: - acc[key_fn(produce_response)] = produce_response + acc[(produce_response.topic, produce_response.partition)] = produce_response - print(acc) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -672,20 +699,14 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary, keyed by topic+partition acc = {} # For each broker, send the list of request payloads @@ -703,9 +724,9 @@ class KafkaClient(object): (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback if callback is not None: - acc[key_fn(fetch_response)] = callback(fetch_response) + acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response) else: - acc[key_fn(fetch_response)] = fetch_response + acc[(fetch_response.topic, fetch_response.partition)] = fetch_response # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -720,11 +741,30 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception: - log.warning("Could not commit offset to server %s, trying next server", conn) + except Exception, e: + log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) continue return None + def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): + requestId = self.next_id() + request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None + out = [] + for offset_response in KafkaProtocol.decode_offset_response(response): + if fail_on_error == True and offset_response.error != 0: + raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) + if callback is not None: + out.append(callback(offset_response)) + else: + out.append(offset_response) + return out + def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): requestId = self.next_id() request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) @@ -737,6 +777,7 @@ class KafkaClient(object): out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): if fail_on_error == True and offset_commit_response.error != 0: + print(offset_commit_response) raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) if callback is not None: out.append(callback(offset_commit_response)) @@ -770,7 +811,7 @@ if __name__ == "__main__": topic = "foo8" # Bootstrap connection - conn = KafkaClient("localhost", 9092) + conn = KafkaClient("localhost", 49720) # Create some Messages messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), @@ -799,7 +840,6 @@ if __name__ == "__main__": return 0 else: return offset_response.offset - # Load offsets (offset1, offset2) = conn.send_offset_fetch_request( group="group1", |