diff options
author | David Arthur <mumrah@gmail.com> | 2013-04-01 14:56:59 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c (patch) | |
tree | d4011fc89717f1eb9884787ae333be5b525bacd4 /kafka/consumer.py | |
parent | b6d98c07b418b16061ae92392947d5dd6958a708 (diff) | |
download | kafka-python-0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c.tar.gz |
Refactoring a bit, cleanup for 0.8
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 71 |
1 files changed, 51 insertions, 20 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c6aafce..4ce62e2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,4 @@ +from itertools import izip_longest, repeat import logging from threading import Lock @@ -30,7 +31,7 @@ class SimpleConsumer(object): self.client = client self.topic = topic self.group = group - self.client.load_metadata_for_topics(topic) + self.client._load_metadata_for_topics(topic) self.offsets = {} # Set up the auto-commit timer @@ -54,12 +55,16 @@ class SimpleConsumer(object): raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) + # Uncomment for 0.8.1 + # + #for partition in self.client.topic_partitions[topic]: + # req = OffsetFetchRequest(topic, partition) + # (offset,) = self.client.send_offset_fetch_request(group, [req], + # callback=get_or_init_offset_callback, fail_on_error=False) + # self.offsets[partition] = offset + for partition in self.client.topic_partitions[topic]: - req = OffsetFetchRequest(topic, partition) - (offset,) = self.client.send_offset_fetch_request(group, [req], - callback=get_or_init_offset_callback, fail_on_error=False) - self.offsets[partition] = offset - print self.offsets + self.offsets[partition] = 0 def seek(self, offset, whence): """ @@ -71,25 +76,30 @@ class SimpleConsumer(object): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ - if whence == 1: - # relative to current position + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offset[partition] = _offset + offset - elif whence in (0, 2): - # relative to beginning or end + elif whence in (0, 2): # relative to beginning or end + # divide the request offset by number of partitions, distribute the remained evenly + (delta, rem) = divmod(offset, len(self.offsets)) + deltas = {} + for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0): + deltas[partition] = delta + r + reqs = [] - for partition in offsets.keys(): + for partition in self.offsets.keys(): if whence == 0: reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) else: pass - resps = self.client.send_offset_request([req]) + + resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + offset + self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition] else: - raise + raise ValueError("Unexpected value for `whence`, %d" % whence) def commit(self, partitions=[]): """ @@ -98,6 +108,8 @@ class SimpleConsumer(object): partitions: list of partitions to commit, default is to commit all of them """ + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + # short circuit if nothing happened if self.count_since_commit == 0: return @@ -121,15 +133,31 @@ class SimpleConsumer(object): self.count_since_commit = 0 def __iter__(self): + """ + Create an iterate per partition. Iterate through them calling next() until they are + all exhausted. + """ iters = {} for partition, offset in self.offsets.items(): iters[partition] = self.__iter_partition__(partition, offset) + if len(iters) == 0: + return + while True: - for it in iters.values(): - yield it.next() + if len(iters) == 0: + break + + for partition, it in iters.items(): + try: + yield it.next() + except StopIteration: + log.debug("Done iterating over partition %s" % partition) + del iters[partition] + continue # skip auto-commit since we didn't yield anything + + # auto commit logic self.count_since_commit += 1 - # deal with auto commits if self.auto_commit is True: if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: if self.commit_timer is not None: @@ -140,19 +168,22 @@ class SimpleConsumer(object): self.commit() def __iter_partition__(self, partition, offset): + """ + Iterate over the messages in a partition. Create a FetchRequest to get back + a batch of messages, yield them one at a time. After a batch is exhausted, + start a new batch unless we've reached the end of ths partition. + """ while True: - req = FetchRequest(self.topic, partition, offset, 1024) + req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size (resp,) = self.client.send_fetch_request([req]) assert resp.topic == self.topic assert resp.partition == partition next_offset = None for message in resp.messages: next_offset = message.offset - print partition, message, message.offset yield message # update the internal state _after_ we yield the message self.offsets[partition] = message.offset - print partition, next_offset if next_offset is None: break else: |