diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-19 14:03:51 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 3499e2f6ead76e1c2db6ac754358bd57f9a15268 (patch) | |
tree | 8d7f827f2caabedadb68ad37cee4dcc908d9cd11 /kafka/client.py | |
parent | 1b721325fe6b170cdfe001749dbd7b750fe59512 (diff) | |
download | kafka-python-3499e2f6ead76e1c2db6ac754358bd57f9a15268.tar.gz |
Some work on a simple consumer
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/kafka/client.py b/kafka/client.py index 3f1fa39..8dfc4f0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -685,7 +685,7 @@ class KafkaClient(object): for produce_response in KafkaProtocol.decode_produce_response(response): # Check for errors if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d", + raise Exception("ProduceRequest for %s failed with errorcode=%d" % (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: @@ -825,6 +825,9 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req]).next() class SimpleConsumer(object): + """ + A simple consumer implementation that consumes all partitions for a topic + """ def __init__(self, client, group, topic): self.client = client self.topic = topic @@ -832,7 +835,7 @@ class SimpleConsumer(object): self.client.load_metadata_for_topics(topic) self.offsets = {} - def get_or_init_offset(resp): + def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: return resp.offset elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: @@ -843,8 +846,33 @@ class SimpleConsumer(object): for partition in self.client.topic_partitions[topic]: req = OffsetFetchRequest(topic, partition) - (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, fail_on_error=False) self.offsets[partition] = offset - print self.offsets + def __iter__(self): + iters = {} + for partition, offset in self.offsets.items(): + iters[partition] = self.__iter_partition__(partition, offset) + + while True: + for it in iters.values(): + yield it.next() + + def __iter_partition__(self, partition, offset): + while True: + req = FetchRequest(self.topic, partition, offset, 1024) + (resp,) = self.client.send_fetch_request([req]) + assert resp.topic == self.topic + assert resp.partition == partition + next_offset = None + for message in resp.messages: + next_offset = message.offset + yield message + if next_offset is None: + raise StopIteration("No more messages") + else: + offset = next_offset + 1 + # Commit offset here? + |