diff options
author | David Arthur <mumrah@gmail.com> | 2013-04-01 14:56:59 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c (patch) | |
tree | d4011fc89717f1eb9884787ae333be5b525bacd4 | |
parent | b6d98c07b418b16061ae92392947d5dd6958a708 (diff) | |
download | kafka-python-0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c.tar.gz |
Refactoring a bit, cleanup for 0.8
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
-rw-r--r-- | example.py | 26 | ||||
m--------- | kafka-src | 0 | ||||
-rw-r--r-- | kafka/client.py | 267 | ||||
-rw-r--r-- | kafka/consumer.py | 71 | ||||
-rw-r--r-- | kafka/producer.py | 5 | ||||
-rw-r--r-- | kafka/protocol.py | 2 | ||||
-rw-r--r-- | kafka/queue.py | 2 | ||||
-rw-r--r-- | test/integration.py | 25 |
8 files changed, 215 insertions, 183 deletions
@@ -2,27 +2,21 @@ import logging from kafka.client import KafkaClient, FetchRequest, ProduceRequest from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer -def produce_example(kafka): - message = kafka.create_message("testing") - request = ProduceRequest("my-topic", -1, [message]) - kafka.send_message_set(request) +def produce_example(client): + producer = SimpleProducer(client, "my-topic") + producer.send_message("test") -def consume_example(kafka): - request = FetchRequest("my-topic", 0, 0, 1024) - (messages, nextRequest) = kafka.get_message_set(request) - for message in messages: - print("Got Message: %s" % (message,)) - print(nextRequest) - -def produce_gz_example(kafka): - message = kafka.create_gzip_message("this message was gzipped", "along with this one") - request = ProduceRequest("my-topic", 0, [message]) - kafka.send_message_set(request) +def consume_example(client): + consumer = SimpleConsumer(client, "test-group", "my-topic") + for message in consumer: + print(message) def main(): client = KafkaClient("localhost", 9092) - consumer = SimpleConsumer(client, "test-group", "my-topic") + produce_example(client) + consume_example(client) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) diff --git a/kafka-src b/kafka-src -Subproject 218e6a53c1385be897d9f8a3a39baa38b68d799 +Subproject 30f992c52dd3aba8cc35c9835df5b5e56581e0d diff --git a/kafka/client.py b/kafka/client.py index f7e39a9..862a30e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -28,30 +28,35 @@ class KafkaClient(object): self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self.load_metadata_for_topics() + self._load_metadata_for_topics() + + ################## + # Private API # + ################## - def close(self): - for conn in self.conns.values(): - conn.close() - def get_conn_for_broker(self, broker): + def _get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) return self.conns[(broker.host, broker.port)] - def next_id(self): - "Generate a new correlation id" - return KafkaClient.ID_GEN.next() + def _get_leader_for_partition(self, topic, partition): + key = TopicAndPartition(topic, partition) + if key not in self.topics_to_brokers: + self._load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: + raise Exception("Partition does not exist: %s" % str(key)) + return self.topics_to_brokers[key] - def load_metadata_for_topics(self, *topics): + def _load_metadata_for_topics(self, *topics): """ Discover brokers and metadata for a set of topics. This method will recurse in the event of a retry. """ - requestId = self.next_id() + requestId = self._next_id() request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) - response = self.try_send_request(requestId, request) + response = self._send_broker_unaware_request(requestId, request) if response is None: raise Exception("All servers failed to process request") (brokers, topics) = KafkaProtocol.decode_metadata_response(response) @@ -64,69 +69,114 @@ class KafkaClient(object): if meta.leader == -1: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) - self.load_metadata_for_topics(topic) + self._load_metadata_for_topics(topic) else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] self.topic_partitions[topic].append(partition) - def get_leader_for_partition(self, topic, partition): - key = TopicAndPartition(topic, partition) - if key not in self.topics_to_brokers: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) - return self.topics_to_brokers[key] + def _next_id(self): + "Generate a new correlation id" + return KafkaClient.ID_GEN.next() - def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + def _send_broker_unaware_request(self, requestId, request): """ - Encode and send some ProduceRequests + Attempt to send a broker-agnostic request to one of the available brokers. + Keep trying until you succeed. + """ + for conn in self.conns.values(): + try: + conn.send(requestId, request) + response = conn.recv(requestId) + return response + except Exception, e: + log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) + continue + return None - ProduceRequests will be grouped by (topic, partition) and then sent to a specific - broker. Output is a list of responses in the same order as the list of payloads - specified + def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): + """ + Group a list of request payloads by topic+partition and send them to the + leader broker for that partition using the supplied encode/decode functions Params ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we encounter an API error? - callback: function, instead of returning the ProduceResponse, first pass it through this function + payloads: list of object-like entities with a topic and partition attribute + encode_fn: a method to encode the list of payloads to a request body, must accept + client_id, correlation_id, and payloads as keyword arguments + decode_fn: a method to decode a response body into response objects. The response + objects must be object-like and have topic and partition attributes Return ====== - list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + List of response objects in the same order as the supplied payloads """ - # Group the produce requests by which broker they go to + # Group the requests by topic+partition original_keys = [] payloads_by_broker = defaultdict(list) for payload in payloads: - payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads + payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload) original_keys.append((payload.topic, payload.partition)) - + # Accumulate the responses in a dictionary acc = {} # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self.get_conn_for_broker(broker) - requestId = self.next_id() - request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads) - # Send the request + conn = self._get_conn_for_broker(broker) + requestId = self._next_id() + request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response conn.send(requestId, request) response = conn.recv(requestId) - for produce_response in KafkaProtocol.decode_produce_response(response): - # Check for errors - if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) - # Run the callback - if callback is not None: - acc[(produce_response.topic, produce_response.partition)] = callback(produce_response) - else: - acc[(produce_response.topic, produce_response.partition)] = produce_response + for response in decoder_fn(response): + acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) + ################# + # Public API # + ################# + + def close(self): + for conn in self.conns.values(): + conn.close() + + def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + """ + Encode and send some ProduceRequests + + ProduceRequests will be grouped by (topic, partition) and then sent to a specific + broker. Output is a list of responses in the same order as the list of payloads + specified + + Params + ====== + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we encounter an API error? + callback: function, instead of returning the ProduceResponse, first pass it through this function + + Return + ====== + list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + """ + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_produce_request, + KafkaProtocol.decode_produce_response) + out = [] + for resp in resps: + # Check for errors + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("ProduceRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + # Run the callback + if callback is not None: + out.append(callback(resp)) + else: + out.append(resp) + return out + def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): """ Encode and send a FetchRequest @@ -134,108 +184,63 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - # Group the produce requests by which broker they go to - original_keys = [] - payloads_by_broker = defaultdict(list) - for payload in payloads: - payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) - original_keys.append((payload.topic, payload.partition)) - - # Accumulate the responses in a dictionary, keyed by topic+partition - acc = {} - - # For each broker, send the list of request payloads - for broker, payloads in payloads_by_broker.items(): - conn = self.get_conn_for_broker(broker) - requestId = self.next_id() - request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads) - # Send the request - conn.send(requestId, request) - response = conn.recv(requestId) - for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): - # Check for errors - if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest %s failed with errorcode=%d" % - (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) - # Run the callback - if callback is not None: - acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response) - else: - acc[(fetch_response.topic, fetch_response.partition)] = fetch_response + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_fetch_request, + KafkaProtocol.decode_fetch_response) + out = [] + for resp in resps: + # Check for errors + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("FetchRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + # Run the callback + if callback is not None: + out.append(callback(resp)) + else: + out.append(resp) + return out - # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) - - def try_send_request(self, requestId, request): - """ - Attempt to send a broker-agnostic request to one of the available brokers. - Keep trying until you succeed. - """ - for conn in self.conns.values(): - try: - conn.send(requestId, request) - response = conn.recv(requestId) - return response - except Exception, e: - log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) - continue - return None def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] - for offset_response in KafkaProtocol.decode_offset_response(response): - if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_response)) + out.append(callback(resp)) else: - out.append(offset_response) + out.append(resp) return out def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + resps = self._send_broker_aware_request(payloads, + partial(KafkaProtocol.encode_offset_commit_request, group=group), + KafkaProtocol.decode_offset_commit_response) out = [] - for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): - log.debug(offset_commit_response) - if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_commit_response)) + out.append(callback(resp)) else: - out.append(offset_commit_response) + out.append(resp) return out def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + resps = self._send_broker_aware_request(payloads, + partial(KafkaProtocol.encode_offset_commit_fetch, group=group), + KafkaProtocol.decode_offset_fetch_response) out = [] - for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): - if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( - offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_fetch_response)) + out.append(callback(resp)) else: - out.append(offset_fetch_response) + out.append(resp) return out diff --git a/kafka/consumer.py b/kafka/consumer.py index c6aafce..4ce62e2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,4 @@ +from itertools import izip_longest, repeat import logging from threading import Lock @@ -30,7 +31,7 @@ class SimpleConsumer(object): self.client = client self.topic = topic self.group = group - self.client.load_metadata_for_topics(topic) + self.client._load_metadata_for_topics(topic) self.offsets = {} # Set up the auto-commit timer @@ -54,12 +55,16 @@ class SimpleConsumer(object): raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) + # Uncomment for 0.8.1 + # + #for partition in self.client.topic_partitions[topic]: + # req = OffsetFetchRequest(topic, partition) + # (offset,) = self.client.send_offset_fetch_request(group, [req], + # callback=get_or_init_offset_callback, fail_on_error=False) + # self.offsets[partition] = offset + for partition in self.client.topic_partitions[topic]: - req = OffsetFetchRequest(topic, partition) - (offset,) = self.client.send_offset_fetch_request(group, [req], - callback=get_or_init_offset_callback, fail_on_error=False) - self.offsets[partition] = offset - print self.offsets + self.offsets[partition] = 0 def seek(self, offset, whence): """ @@ -71,25 +76,30 @@ class SimpleConsumer(object): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ - if whence == 1: - # relative to current position + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offset[partition] = _offset + offset - elif whence in (0, 2): - # relative to beginning or end + elif whence in (0, 2): # relative to beginning or end + # divide the request offset by number of partitions, distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + deltas = {} + for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0): + deltas[partition] = delta + r + reqs = [] - for partition in offsets.keys(): + for partition in self.offsets.keys(): if whence == 0: reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) else: pass - resps = self.client.send_offset_request([req]) + + resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + offset + self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition] else: - raise + raise ValueError("Unexpected value for `whence`, %d" % whence) def commit(self, partitions=[]): """ @@ -98,6 +108,8 @@ class SimpleConsumer(object): partitions: list of partitions to commit, default is to commit all of them """ + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + # short circuit if nothing happened if self.count_since_commit == 0: return @@ -121,15 +133,31 @@ class SimpleConsumer(object): self.count_since_commit = 0 def __iter__(self): + """ + Create an iterate per partition. Iterate through them calling next() until they are + all exhausted. + """ iters = {} for partition, offset in self.offsets.items(): iters[partition] = self.__iter_partition__(partition, offset) + if len(iters) == 0: + return + while True: - for it in iters.values(): - yield it.next() + if len(iters) == 0: + break + + for partition, it in iters.items(): + try: + yield it.next() + except StopIteration: + log.debug("Done iterating over partition %s" % partition) + del iters[partition] + continue # skip auto-commit since we didn't yield anything + + # auto commit logic self.count_since_commit += 1 - # deal with auto commits if self.auto_commit is True: if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: if self.commit_timer is not None: @@ -140,19 +168,22 @@ class SimpleConsumer(object): self.commit() def __iter_partition__(self, partition, offset): + """ + Iterate over the messages in a partition. Create a FetchRequest to get back + a batch of messages, yield them one at a time. After a batch is exhausted, + start a new batch unless we've reached the end of ths partition. + """ while True: - req = FetchRequest(self.topic, partition, offset, 1024) + req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size (resp,) = self.client.send_fetch_request([req]) assert resp.topic == self.topic assert resp.partition == partition next_offset = None for message in resp.messages: next_offset = message.offset - print partition, message, message.offset yield message # update the internal state _after_ we yield the message self.offsets[partition] = message.offset - print partition, next_offset if next_offset is None: break else: diff --git a/kafka/producer.py b/kafka/producer.py index 823d923..93d6e3b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -13,10 +13,11 @@ class SimpleProducer(object): def __init__(self, client, topic): self.client = client self.topic = topic - self.client.load_metadata_for_topics(topic) + self.client._load_metadata_for_topics(topic) self.next_partition = cycle(self.client.topic_partitions[topic]) def send_message(self, msg): req = ProduceRequest(self.topic, self.next_partition.next(), messages=[create_message(msg)]) - resp = self.client.send_produce_request([req]).next() + resp = self.client.send_produce_request([req])[0] + assert resp.error == 0 diff --git a/kafka/protocol.py b/kafka/protocol.py index ca21db3..fc866ad 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -217,7 +217,7 @@ class KafkaProtocol(object): return struct.pack('>i%ds' % len(message), len(message), message) @classmethod - def decode_fetch_response_iter(cls, data): + def decode_fetch_response(cls, data): """ Decode bytes to a FetchResponse diff --git a/kafka/queue.py b/kafka/queue.py index d4f5b6c..6fe9eaa 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -8,6 +8,8 @@ from .client import KafkaClient, FetchRequest, ProduceRequest log = logging.getLogger("kafka") +raise NotImplementedError("Still need to refactor this class") + class KafkaConsumerProcess(Process): def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): self.client = copy(client) diff --git a/test/integration.py b/test/integration.py index b7ad056..e51b398 100644 --- a/test/integration.py +++ b/test/integration.py @@ -243,7 +243,7 @@ class TestKafkaClient(unittest.TestCase): def test_consume_none(self): fetch = FetchRequest("test_consume_none", 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]).next() + fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) self.assertEquals(fetch_resp.topic, "test_consume_none") self.assertEquals(fetch_resp.partition, 0) @@ -263,7 +263,7 @@ class TestKafkaClient(unittest.TestCase): fetch = FetchRequest("test_produce_consume", 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]).next() + fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) messages = list(fetch_resp.messages) @@ -343,6 +343,7 @@ class TestKafkaClient(unittest.TestCase): # Offset Tests # #################### + @unittest.skip("No supported until 0.8.1") def test_commit_fetch_offsets(self): req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) @@ -428,22 +429,20 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes - # Produce more messages - produce3 = ProduceRequest("test_consumer", 1, messages=[ - create_message("Test message 3 %d" % i) for i in range(10) - ]) - - for resp in self.client.send_produce_request([produce3]): - self.assertEquals(resp.error, 0) - self.assertEquals(resp.offset, 100) + consumer.seek(-10, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) - # Start a new consumer, make sure we only get the newly produced messages - consumer = SimpleConsumer(self.client, "group1", "test_consumer") + self.assertEquals(len(all_messages), 10) + consumer.seek(-13, 2) all_messages = [] for message in consumer: all_messages.append(message) - self.assertEquals(len(all_messages), 10) + + self.assertEquals(len(all_messages), 13) + if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) |