diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2013-12-19 13:24:28 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | dc4198bddc9f721ef18b41d8d7714bfa968eec7d (patch) | |
tree | 802ec9cdeca039795c8907a8209b07a3709f99eb /kafka/consumer.py | |
parent | 450faeb328fe085a35c70e0dcbf93cbd44277362 (diff) | |
download | kafka-python-dc4198bddc9f721ef18b41d8d7714bfa968eec7d.tar.gz |
Add iter_timeout option to SimpleConsumer. If not None, it causes the iterator to exit when reached.
Also put constant timeout values in pre-defined constants
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 7e53f08..5fa7332 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,9 @@ FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 + class FetchContext(object): """ @@ -210,6 +213,9 @@ class SimpleConsumer(Consumer): fetch_size_bytes: number of bytes to request in a FetchRequest buffer_size: initial number of bytes to tell kafka we have available. This will double every time it's not enough + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -221,13 +227,15 @@ class SimpleConsumer(Consumer): auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES): + buffer_size=FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.iter_timeout = iter_timeout self.queue = Queue() super(SimpleConsumer, self).__init__( @@ -325,14 +333,22 @@ class SimpleConsumer(Consumer): return None def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout + while True: - message = self.get_message(True, 100) + message = self.get_message(True, timeout) if message: yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(0.1) + # Timed out waiting for a message + break def _fetch(self): requests = [] @@ -417,7 +433,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): else: # In case we did not receive any message, give up the CPU for # a while before we try again - time.sleep(0.1) + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) consumer.stop() |