summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-19 13:24:28 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commitdc4198bddc9f721ef18b41d8d7714bfa968eec7d (patch)
tree802ec9cdeca039795c8907a8209b07a3709f99eb /kafka/consumer.py
parent450faeb328fe085a35c70e0dcbf93cbd44277362 (diff)
downloadkafka-python-dc4198bddc9f721ef18b41d8d7714bfa968eec7d.tar.gz
Add iter_timeout option to SimpleConsumer. If not None, it causes the iterator to exit when reached.
Also put constant timeout values in pre-defined constants
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 7e53f08..5fa7332 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -24,6 +24,9 @@ FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
FETCH_BUFFER_SIZE_BYTES = 4096
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
+
class FetchContext(object):
"""
@@ -210,6 +213,9 @@ class SimpleConsumer(Consumer):
fetch_size_bytes: number of bytes to request in a FetchRequest
buffer_size: initial number of bytes to tell kafka we have
available. This will double every time it's not enough
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -221,13 +227,15 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
fetch_size_bytes=FETCH_MIN_BYTES,
- buffer_size=FETCH_BUFFER_SIZE_BYTES):
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
self.buffer_size = buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
+ self.iter_timeout = iter_timeout
self.queue = Queue()
super(SimpleConsumer, self).__init__(
@@ -325,14 +333,22 @@ class SimpleConsumer(Consumer):
return None
def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
+
while True:
- message = self.get_message(True, 100)
+ message = self.get_message(True, timeout)
if message:
yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- time.sleep(0.1)
+ # Timed out waiting for a message
+ break
def _fetch(self):
requests = []
@@ -417,7 +433,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
else:
# In case we did not receive any message, give up the CPU for
# a while before we try again
- time.sleep(0.1)
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()