summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-30 00:28:00 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commitb6d98c07b418b16061ae92392947d5dd6958a708 (patch)
treee777fcf3019ef0ddc6c278ef733c487f5b0532c3 /kafka/consumer.py
parent3499e2f6ead76e1c2db6ac754358bd57f9a15268 (diff)
downloadkafka-python-b6d98c07b418b16061ae92392947d5dd6958a708.tar.gz
Big code re-org
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
new file mode 100644
index 0000000..c6aafce
--- /dev/null
+++ b/kafka/consumer.py
@@ -0,0 +1,159 @@
+import logging
+from threading import Lock
+
+from kafka.common import (
+ ErrorMapping, FetchRequest,
+ OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
+)
+
+log = logging.getLogger("kafka")
+
+class SimpleConsumer(object):
+ """
+ A simple consumer implementation that consumes all partitions for a topic
+
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit
+
+ Auto commit details:
+ If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another
+ when one is triggered. These triggers simply call the commit method on this class. A
+ manual call to commit will also reset these triggers
+
+ """
+ def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None):
+ self.client = client
+ self.topic = topic
+ self.group = group
+ self.client.load_metadata_for_topics(topic)
+ self.offsets = {}
+
+ # Set up the auto-commit timer
+ if auto_commit is True:
+ if auto_commit_every_t is not None:
+ self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit)
+ self.commit_timer.start()
+
+ self.commit_lock = Lock()
+ self.count_since_commit = 0
+ self.auto_commit = auto_commit
+ self.auto_commit_every_n = auto_commit_every_n
+ self.auto_commit_every_t = auto_commit_every_t
+
+ def get_or_init_offset_callback(resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return resp.offset
+ elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ return 0
+ else:
+ raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
+ resp.topic, resp.partition, resp.error))
+
+ for partition in self.client.topic_partitions[topic]:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback, fail_on_error=False)
+ self.offsets[partition] = offset
+ print self.offsets
+
+ def seek(self, offset, whence):
+ """
+ Alter the current offset in the consumer, similar to fseek
+
+ offset: how much to modify the offset
+ whence: where to modify it from
+ 0 is relative to the earliest available offset (head)
+ 1 is relative to the current offset
+ 2 is relative to the latest known offset (tail)
+ """
+ if whence == 1:
+ # relative to current position
+ for partition, _offset in self.offsets.items():
+ self.offset[partition] = _offset + offset
+ elif whence in (0, 2):
+ # relative to beginning or end
+ reqs = []
+ for partition in offsets.keys():
+ if whence == 0:
+ reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ elif whence == 2:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ else:
+ pass
+ resps = self.client.send_offset_request([req])
+ for resp in resps:
+ self.offsets[resp.partition] = resp.offsets[0] + offset
+ else:
+ raise
+
+ def commit(self, partitions=[]):
+ """
+ Commit offsets for this consumer
+
+ partitions: list of partitions to commit, default is to commit all of them
+ """
+
+ # short circuit if nothing happened
+ if self.count_since_commit == 0:
+ return
+
+ with self.commit_lock:
+ reqs = []
+ if len(partitions) == 0: # commit all partitions
+ for partition, offset in self.offsets.items():
+ log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
+ offset, self.group, self.topic, partition))
+ reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
+ else:
+ for partition in partitions:
+ offset = self.offsets[partition]
+ log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
+ offset, self.group, self.topic, partition))
+ reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
+ resps = self.send_offset_commit_request(self.group, reqs)
+ for resp in resps:
+ assert resp.error == 0
+ self.count_since_commit = 0
+
+ def __iter__(self):
+ iters = {}
+ for partition, offset in self.offsets.items():
+ iters[partition] = self.__iter_partition__(partition, offset)
+
+ while True:
+ for it in iters.values():
+ yield it.next()
+ self.count_since_commit += 1
+ # deal with auto commits
+ if self.auto_commit is True:
+ if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n:
+ if self.commit_timer is not None:
+ self.commit_timer.stop()
+ self.commit()
+ self.commit_timer.start()
+ else:
+ self.commit()
+
+ def __iter_partition__(self, partition, offset):
+ while True:
+ req = FetchRequest(self.topic, partition, offset, 1024)
+ (resp,) = self.client.send_fetch_request([req])
+ assert resp.topic == self.topic
+ assert resp.partition == partition
+ next_offset = None
+ for message in resp.messages:
+ next_offset = message.offset
+ print partition, message, message.offset
+ yield message
+ # update the internal state _after_ we yield the message
+ self.offsets[partition] = message.offset
+ print partition, next_offset
+ if next_offset is None:
+ break
+ else:
+ offset = next_offset + 1