diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2013-12-19 13:35:53 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | c1ba5101b7a54382b2a68b23ba777785104e9877 (patch) | |
tree | c0f190372e59b8eeb14b7d3842e252448bbe3a9c | |
parent | dc4198bddc9f721ef18b41d8d7714bfa968eec7d (diff) | |
download | kafka-python-c1ba5101b7a54382b2a68b23ba777785104e9877.tar.gz |
Add comments and maintain 80 character line limit
-rw-r--r-- | kafka/consumer.py | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 5fa7332..ff08da4 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -302,8 +302,9 @@ class SimpleConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout: @@ -315,16 +316,20 @@ class SimpleConsumer(Consumer): messages.append(message) count -= 1 else: - # Ran out of messages for the last request. If we're not blocking, break. + # Ran out of messages for the last request. if not block: + # If we're not blocking, break. break if timeout: + # If we're blocking and have a timeout, reduce it to the + # appropriate value timeout = max_time - time.time() return messages def get_message(self, block=True, timeout=0.1): if self.queue.empty(): + # We're out of messages, go grab some more. with FetchContext(self, block, timeout): self._fetch() try: @@ -351,29 +356,39 @@ class SimpleConsumer(Consumer): break def _fetch(self): + # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() for partition in partitions: - requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), min_bytes=self.fetch_min_bytes) + for resp in responses: partition = resp.partition try: for message in resp.messages: + # Update partition offset self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary self.count_since_commit += 1 self._auto_commit() + + # Put the message in our queue if self.partition_info: self.queue.put((partition, message)) else: self.queue.put(message) except ConsumerFetchSizeTooSmall, e: self.buffer_size *= 2 - log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) + log.warn("Fetch size too small, increase to %d (2x) and retry", + self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) except StopIteration: @@ -560,8 +575,9 @@ class MultiProcessConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] |