diff options
-rw-r--r-- | kafka/client08.py | 194 | ||||
-rw-r--r-- | kafka/util.py | 9 |
2 files changed, 138 insertions, 65 deletions
diff --git a/kafka/client08.py b/kafka/client08.py index 11910d1..b048d68 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,6 +14,7 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string +from .util import group_list_by_key from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") @@ -217,7 +218,7 @@ class KafkaProtocol(object): @classmethod - def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000): + def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): """ Encode some ProduceRequest structs @@ -236,8 +237,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) for payload in payloads: message_set = KafkaProtocol._encode_message_set(payload.messages) @@ -280,8 +280,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -312,8 +311,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -406,8 +404,7 @@ class KafkaProtocol(object): message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) message += write_short_string(group) message += struct.pack('>i', len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -450,8 +447,7 @@ class KafkaProtocol(object): message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message += write_short_string(group) message += struct.pack('>i', len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -582,13 +578,14 @@ class KafkaClient(object): def load_metadata_for_topics(self, *topics): """ - Discover brokers and metadata for a set of topics + Discover brokers and metadata for a set of topics. This method will + recurse in the event of a retry. """ requestId = self.next_id() request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) - conn = self.conns.values()[0] # Just get the first one in the list - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + raise Exception("All servers failed to process request") (brokers, topics) = KafkaProtocol.decode_metadata_response(response) log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) @@ -600,7 +597,6 @@ class KafkaClient(object): log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) self.load_metadata_for_topics(topic) - return else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] @@ -608,18 +604,44 @@ class KafkaClient(object): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: self.load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: + raise Exception("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + """ + Encode and send some ProduceRequests + + ProduceRequests will be grouped by (topic, partition) and then sent to a specific + broker. Output is a list of responses in the same order as the list of payloads + specified + + Params + ====== + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we encounter an API error? + callback: function, instead of returning the ProduceResponse, first pass it through this function + + Return + ====== + list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + """ + key_fn = lambda x: (x.topic, x.partition) + + # Note the order of the incoming payloads + original_keys = [key_fn(payload) for payload in payloads] + # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in payloads_by_topic_and_partition: - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + for (topic, partition), payloads in payloads_by_topic_and_partition.items(): + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + + # Accumulate the responses in a dictionary, keyed by key_fn + acc = {} - out = [] # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) @@ -635,10 +657,13 @@ class KafkaClient(object): (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: - out.append(callback(produce_response)) + acc[key_fn(produce_response)] = callback(produce_response) else: - out.append(produce_response) - return out + acc[key_fn(produce_response)] = produce_response + + print(acc) + # Order the accumulated responses by the original key order + return (acc[k] for k in original_keys) def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): """ @@ -647,15 +672,22 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ + key_fn = lambda x: (x.topic, x.partition) + + # Note the order of the incoming payloads + original_keys = [key_fn(payload) for payload in payloads] + # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in payloads_by_topic_and_partition: - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + for (topic, partition), payloads in payloads_by_topic_and_partition.items(): + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + + # Accumulate the responses in a dictionary, keyed by key_fn + acc = {} - out = [] # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) @@ -667,21 +699,41 @@ class KafkaClient(object): for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): # Check for errors if fail_on_error == True and fetch_response.error != 0: - raise Exception("FetchRequest %s failed with errorcode=%d", + raise Exception("FetchRequest %s failed with errorcode=%d" % (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback if callback is not None: - out.append(callback(fetch_response)) + acc[key_fn(fetch_response)] = callback(fetch_response) else: - out.append(fetch_response) - return out + acc[key_fn(fetch_response)] = fetch_response + + # Order the accumulated responses by the original key order + return (acc[k] for k in original_keys) + + def try_send_request(self, requestId, request): + """ + Attempt to send a broker-agnostic request to one of the available brokers. + Keep trying until you succeed. + """ + for conn in self.conns.values(): + try: + conn.send(requestId, request) + response = conn.recv(requestId) + return response + except Exception: + log.warning("Could not commit offset to server %s, trying next server", conn) + continue + return None def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - conn = self.conns.values()[0] # Just get the first one in the list requestId = self.next_id() request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): if fail_on_error == True and offset_commit_response.error != 0: @@ -693,15 +745,19 @@ class KafkaClient(object): return out def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - conn = self.conns.values()[0] # Just get the first one in the list requestId = self.next_id() request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None out = [] for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): if fail_on_error == True and offset_fetch_response.error != 0: - raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error) + raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( + offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) if callback is not None: out.append(callback(offset_fetch_response)) else: @@ -709,20 +765,22 @@ class KafkaClient(object): return out - if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + topic = "foo8" # Bootstrap connection conn = KafkaClient("localhost", 9092) # Create some Messages - messages = (KafkaProtocol.create_gzip_message("GZIPPed"), + messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), KafkaProtocol.create_message("not-gzipped")) - # Create a ProduceRequest - produce = ProduceRequest(topic="foo5", partition=0, messages=messages) + produce1 = ProduceRequest(topic=topic, partition=0, messages=messages) + produce2 = ProduceRequest(topic=topic, partition=1, messages=messages) # Send the ProduceRequest - produce_resp = conn.send_produce_request(payloads=[produce]) + produce_resp = conn.send_produce_request(payloads=[produce1, produce2]) # Check for errors for resp in produce_resp: @@ -734,29 +792,41 @@ if __name__ == "__main__": #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")]) #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)]) - print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)]) - offset = 0 - done = False - while not done: - print offset - for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]): + def init_offsets(offset_response): + if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON): + raise Exception("OffsetFetch failed: %s" % (offset_response)) + elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + return offset_response.offset + + # Load offsets + (offset1, offset2) = conn.send_offset_fetch_request( + group="group1", + payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)], + fail_on_error=False, + callback=init_offsets + ) + print offset1, offset2 + + while True: + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]): i = 0 for msg in resp.messages: - print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")]) - print msg, offset - offset = msg.offset+1 + print msg + offset1 = msg.offset+1 + print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")]) i += 1 if i == 0: raise StopIteration("no more messages") -class Consumer(object): - def __init__(self, conn): - self._conn = conn - - - -class Producer(object): - pass - - + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]): + i = 0 + for msg in resp.messages: + print msg + offset2 = msg.offset+1 + print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")]) + i += 1 + if i == 0: + raise StopIteration("no more messages") diff --git a/kafka/util.py b/kafka/util.py index 0623f35..cb8f7f5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -44,9 +44,12 @@ def relative_unpack(fmt, data, cur): out = struct.unpack(fmt, data[cur:cur+size]) return (out, cur+size) -def group_list_by_key(l, key): - sorted_l = sorted(l, key=key) - return list(groupby(sorted_l, key=key)) +def group_list_by_key(it, key): + sorted_it = sorted(it, key=key) + out = {} + for k, group in groupby(sorted_it, key=key): + out[k] = list(group) + return out class BufferUnderflowError(Exception): pass |