diff options
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/consumer.py | 41 |
2 files changed, 35 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9893737..eb2c25c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -65,6 +65,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} for topic, partitions in topics.items(): + if not partitions: + log.info("Partition is unassigned, delay for 1s and retry") + time.sleep(1) + self._load_metadata_for_topics(topic) + break + for partition, meta in partitions.items(): if meta.leader == -1: log.info("Partition is unassigned, delay for 1s and retry") diff --git a/kafka/consumer.py b/kafka/consumer.py index 93da316..7f6a6f0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -7,8 +7,15 @@ from kafka.common import ( OffsetRequest, OffsetFetchRequest, OffsetCommitRequest ) +from kafka.util import ( + ReentrantTimer +) + log = logging.getLogger("kafka") +AUTO_COMMIT_MSG_COUNT = 100 +AUTO_COMMIT_INTERVAL = 5000 + class SimpleConsumer(object): """ A simple consumer implementation that consumes all partitions for a topic @@ -27,7 +34,9 @@ class SimpleConsumer(object): manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None): + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client self.topic = topic self.group = group @@ -151,19 +160,21 @@ class SimpleConsumer(object): with self.commit_lock: reqs = [] if len(partitions) == 0: # commit all partitions - for partition, offset in self.offsets.items(): - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - else: - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.send_offset_commit_request(self.group, reqs) + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: assert resp.error == 0 + self.count_since_commit = 0 def __iter__(self): @@ -207,6 +218,12 @@ class SimpleConsumer(object): a batch of messages, yield them one at a time. After a batch is exhausted, start a new batch unless we've reached the end of ths partition. """ + + # Unless it is the first message in the queue, we have to fetch + # the next one + if offset != 0: + offset += 1 + while True: req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size (resp,) = self.client.send_fetch_request([req]) |