diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-30 00:28:00 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | b6d98c07b418b16061ae92392947d5dd6958a708 (patch) | |
tree | e777fcf3019ef0ddc6c278ef733c487f5b0532c3 /kafka/consumer.py | |
parent | 3499e2f6ead76e1c2db6ac754358bd57f9a15268 (diff) | |
download | kafka-python-b6d98c07b418b16061ae92392947d5dd6958a708.tar.gz |
Big code re-org
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py new file mode 100644 index 0000000..c6aafce --- /dev/null +++ b/kafka/consumer.py @@ -0,0 +1,159 @@ +import logging +from threading import Lock + +from kafka.common import ( + ErrorMapping, FetchRequest, + OffsetRequest, OffsetFetchRequest, OffsetCommitRequest +) + +log = logging.getLogger("kafka") + +class SimpleConsumer(object): + """ + A simple consumer implementation that consumes all partitions for a topic + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another + when one is triggered. These triggers simply call the commit method on this class. A + manual call to commit will also reset these triggers + + """ + def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None): + self.client = client + self.topic = topic + self.group = group + self.client.load_metadata_for_topics(topic) + self.offsets = {} + + # Set up the auto-commit timer + if auto_commit is True: + if auto_commit_every_t is not None: + self.commit_timer = ReentrantTimer(auto_commit_every_t, self.commit) + self.commit_timer.start() + + self.commit_lock = Lock() + self.count_since_commit = 0 + self.auto_commit = auto_commit + self.auto_commit_every_n = auto_commit_every_n + self.auto_commit_every_t = auto_commit_every_t + + def get_or_init_offset_callback(resp): + if resp.error == ErrorMapping.NO_ERROR: + return resp.offset + elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) + + for partition in self.client.topic_partitions[topic]: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, fail_on_error=False) + self.offsets[partition] = offset + print self.offsets + + def seek(self, offset, whence): + """ + Alter the current offset in the consumer, similar to fseek + + offset: how much to modify the offset + whence: where to modify it from + 0 is relative to the earliest available offset (head) + 1 is relative to the current offset + 2 is relative to the latest known offset (tail) + """ + if whence == 1: + # relative to current position + for partition, _offset in self.offsets.items(): + self.offset[partition] = _offset + offset + elif whence in (0, 2): + # relative to beginning or end + reqs = [] + for partition in offsets.keys(): + if whence == 0: + reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + elif whence == 2: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + else: + pass + resps = self.client.send_offset_request([req]) + for resp in resps: + self.offsets[resp.partition] = resp.offsets[0] + offset + else: + raise + + def commit(self, partitions=[]): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit all of them + """ + + # short circuit if nothing happened + if self.count_since_commit == 0: + return + + with self.commit_lock: + reqs = [] + if len(partitions) == 0: # commit all partitions + for partition, offset in self.offsets.items(): + log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( + offset, self.group, self.topic, partition)) + reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) + else: + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( + offset, self.group, self.topic, partition)) + reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) + resps = self.send_offset_commit_request(self.group, reqs) + for resp in resps: + assert resp.error == 0 + self.count_since_commit = 0 + + def __iter__(self): + iters = {} + for partition, offset in self.offsets.items(): + iters[partition] = self.__iter_partition__(partition, offset) + + while True: + for it in iters.values(): + yield it.next() + self.count_since_commit += 1 + # deal with auto commits + if self.auto_commit is True: + if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + self.commit_timer.start() + else: + self.commit() + + def __iter_partition__(self, partition, offset): + while True: + req = FetchRequest(self.topic, partition, offset, 1024) + (resp,) = self.client.send_fetch_request([req]) + assert resp.topic == self.topic + assert resp.partition == partition + next_offset = None + for message in resp.messages: + next_offset = message.offset + print partition, message, message.offset + yield message + # update the internal state _after_ we yield the message + self.offsets[partition] = message.offset + print partition, next_offset + if next_offset is None: + break + else: + offset = next_offset + 1 |