summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/consumer.py41
2 files changed, 35 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9893737..eb2c25c 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -65,6 +65,12 @@ class KafkaClient(object):
self.brokers.update(brokers)
self.topics_to_brokers = {}
for topic, partitions in topics.items():
+ if not partitions:
+ log.info("Partition is unassigned, delay for 1s and retry")
+ time.sleep(1)
+ self._load_metadata_for_topics(topic)
+ break
+
for partition, meta in partitions.items():
if meta.leader == -1:
log.info("Partition is unassigned, delay for 1s and retry")
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 93da316..7f6a6f0 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -7,8 +7,15 @@ from kafka.common import (
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
)
+from kafka.util import (
+ ReentrantTimer
+)
+
log = logging.getLogger("kafka")
+AUTO_COMMIT_MSG_COUNT = 100
+AUTO_COMMIT_INTERVAL = 5000
+
class SimpleConsumer(object):
"""
A simple consumer implementation that consumes all partitions for a topic
@@ -27,7 +34,9 @@ class SimpleConsumer(object):
manual call to commit will also reset these triggers
"""
- def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None):
+ def __init__(self, client, group, topic, auto_commit=True,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
self.topic = topic
self.group = group
@@ -151,19 +160,21 @@ class SimpleConsumer(object):
with self.commit_lock:
reqs = []
if len(partitions) == 0: # commit all partitions
- for partition, offset in self.offsets.items():
- log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
- offset, self.group, self.topic, partition))
- reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
- else:
- for partition in partitions:
- offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
- offset, self.group, self.topic, partition))
- reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
- resps = self.send_offset_commit_request(self.group, reqs)
+ partitions = self.offsets.keys()
+
+ for partition in partitions:
+ offset = self.offsets[partition]
+ log.debug("Commit offset %d in SimpleConsumer: "
+ "group=%s, topic=%s, partition=%s" %
+ (offset, self.group, self.topic, partition))
+
+ reqs.append(OffsetCommitRequest(self.topic, partition,
+ offset, None))
+
+ resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
+
self.count_since_commit = 0
def __iter__(self):
@@ -207,6 +218,12 @@ class SimpleConsumer(object):
a batch of messages, yield them one at a time. After a batch is exhausted,
start a new batch unless we've reached the end of ths partition.
"""
+
+ # Unless it is the first message in the queue, we have to fetch
+ # the next one
+ if offset != 0:
+ offset += 1
+
while True:
req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
(resp,) = self.client.send_fetch_request([req])