summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-19 13:35:53 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commitc1ba5101b7a54382b2a68b23ba777785104e9877 (patch)
treec0f190372e59b8eeb14b7d3842e252448bbe3a9c /kafka/consumer.py
parentdc4198bddc9f721ef18b41d8d7714bfa968eec7d (diff)
downloadkafka-python-c1ba5101b7a54382b2a68b23ba777785104e9877.tar.gz
Add comments and maintain 80 character line limit
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py30
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 5fa7332..ff08da4 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -302,8 +302,9 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified time (in seconds)
- until count messages is fetched. If None, it will block forever.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
if timeout:
@@ -315,16 +316,20 @@ class SimpleConsumer(Consumer):
messages.append(message)
count -= 1
else:
- # Ran out of messages for the last request. If we're not blocking, break.
+ # Ran out of messages for the last request.
if not block:
+ # If we're not blocking, break.
break
if timeout:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
timeout = max_time - time.time()
return messages
def get_message(self, block=True, timeout=0.1):
if self.queue.empty():
+ # We're out of messages, go grab some more.
with FetchContext(self, block, timeout):
self._fetch()
try:
@@ -351,29 +356,39 @@ class SimpleConsumer(Consumer):
break
def _fetch(self):
+ # Create fetch request payloads for all the partitions
requests = []
partitions = self.offsets.keys()
for partition in partitions:
- requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size))
+ requests.append(FetchRequest(self.topic, partition,
+ self.offsets[partition],
+ self.buffer_size))
+ # Send request
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes)
+
for resp in responses:
partition = resp.partition
try:
for message in resp.messages:
+ # Update partition offset
self.offsets[partition] = message.offset + 1
+
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
+
+ # Put the message in our queue
if self.partition_info:
self.queue.put((partition, message))
else:
self.queue.put(message)
except ConsumerFetchSizeTooSmall, e:
self.buffer_size *= 2
- log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) and retry",
+ self.buffer_size)
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
@@ -560,8 +575,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified time (in seconds)
- until count messages is fetched. If None, it will block forever.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []