summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-04-01 14:56:59 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c (patch)
treed4011fc89717f1eb9884787ae333be5b525bacd4 /kafka/consumer.py
parentb6d98c07b418b16061ae92392947d5dd6958a708 (diff)
downloadkafka-python-0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c.tar.gz
Refactoring a bit, cleanup for 0.8
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py71
1 files changed, 51 insertions, 20 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c6aafce..4ce62e2 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,3 +1,4 @@
+from itertools import izip_longest, repeat
import logging
from threading import Lock
@@ -30,7 +31,7 @@ class SimpleConsumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client.load_metadata_for_topics(topic)
+ self.client._load_metadata_for_topics(topic)
self.offsets = {}
# Set up the auto-commit timer
@@ -54,12 +55,16 @@ class SimpleConsumer(object):
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))
+ # Uncomment for 0.8.1
+ #
+ #for partition in self.client.topic_partitions[topic]:
+ # req = OffsetFetchRequest(topic, partition)
+ # (offset,) = self.client.send_offset_fetch_request(group, [req],
+ # callback=get_or_init_offset_callback, fail_on_error=False)
+ # self.offsets[partition] = offset
+
for partition in self.client.topic_partitions[topic]:
- req = OffsetFetchRequest(topic, partition)
- (offset,) = self.client.send_offset_fetch_request(group, [req],
- callback=get_or_init_offset_callback, fail_on_error=False)
- self.offsets[partition] = offset
- print self.offsets
+ self.offsets[partition] = 0
def seek(self, offset, whence):
"""
@@ -71,25 +76,30 @@ class SimpleConsumer(object):
1 is relative to the current offset
2 is relative to the latest known offset (tail)
"""
- if whence == 1:
- # relative to current position
+ if whence == 1: # relative to current position
for partition, _offset in self.offsets.items():
self.offset[partition] = _offset + offset
- elif whence in (0, 2):
- # relative to beginning or end
+ elif whence in (0, 2): # relative to beginning or end
+ # divide the request offset by number of partitions, distribute the remained evenly
+ (delta, rem) = divmod(offset, len(self.offsets))
+ deltas = {}
+ for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0):
+ deltas[partition] = delta + r
+
reqs = []
- for partition in offsets.keys():
+ for partition in self.offsets.keys():
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
else:
pass
- resps = self.client.send_offset_request([req])
+
+ resps = self.client.send_offset_request(reqs)
for resp in resps:
- self.offsets[resp.partition] = resp.offsets[0] + offset
+ self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition]
else:
- raise
+ raise ValueError("Unexpected value for `whence`, %d" % whence)
def commit(self, partitions=[]):
"""
@@ -98,6 +108,8 @@ class SimpleConsumer(object):
partitions: list of partitions to commit, default is to commit all of them
"""
+ raise NotImplementedError("Broker-managed offsets not supported in 0.8")
+
# short circuit if nothing happened
if self.count_since_commit == 0:
return
@@ -121,15 +133,31 @@ class SimpleConsumer(object):
self.count_since_commit = 0
def __iter__(self):
+ """
+ Create an iterate per partition. Iterate through them calling next() until they are
+ all exhausted.
+ """
iters = {}
for partition, offset in self.offsets.items():
iters[partition] = self.__iter_partition__(partition, offset)
+ if len(iters) == 0:
+ return
+
while True:
- for it in iters.values():
- yield it.next()
+ if len(iters) == 0:
+ break
+
+ for partition, it in iters.items():
+ try:
+ yield it.next()
+ except StopIteration:
+ log.debug("Done iterating over partition %s" % partition)
+ del iters[partition]
+ continue # skip auto-commit since we didn't yield anything
+
+ # auto commit logic
self.count_since_commit += 1
- # deal with auto commits
if self.auto_commit is True:
if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
if self.commit_timer is not None:
@@ -140,19 +168,22 @@ class SimpleConsumer(object):
self.commit()
def __iter_partition__(self, partition, offset):
+ """
+ Iterate over the messages in a partition. Create a FetchRequest to get back
+ a batch of messages, yield them one at a time. After a batch is exhausted,
+ start a new batch unless we've reached the end of ths partition.
+ """
while True:
- req = FetchRequest(self.topic, partition, offset, 1024)
+ req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
(resp,) = self.client.send_fetch_request([req])
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
for message in resp.messages:
next_offset = message.offset
- print partition, message, message.offset
yield message
# update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
- print partition, next_offset
if next_offset is None:
break
else: