summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-10 22:20:42 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-10 22:20:42 -0800
commit829b0379eb6f4df2e20fceb6673ad60e21b348f3 (patch)
tree43692e5c5114f1fc6b30a161c7b2c9e1076cf971
parent5d2886bae36c8336a15e0f58c827556de186350a (diff)
parentecb4d49c06484e8ed9bdb6db35350d104e13b730 (diff)
downloadkafka-python-829b0379eb6f4df2e20fceb6673ad60e21b348f3.tar.gz
Merge pull request #496 from dpkp/idle_sleep
Prevent idle CPU spin by sleeping if there are no sockets to read
-rw-r--r--kafka/client_async.py34
-rw-r--r--kafka/consumer/group.py7
2 files changed, 30 insertions, 11 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1c74c6f..88b8ec6 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,14 +314,21 @@ class KafkaClient(object):
else:
task_future.success(result)
- timeout = min(
- timeout_ms,
- metadata_timeout_ms,
- self._delayed_tasks.next_at() * 1000,
- self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0)
+ # If we got a future that is already done, dont block in _poll
+ if future and future.is_done:
+ timeout = 0
+ else:
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ self._delayed_tasks.next_at() * 1000,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000.0) # avoid negative timeouts
responses.extend(self._poll(timeout))
+
+ # If all we had was a timeout (future is None) - only do one poll
+ # If we do have a future, we keep looping until it is done
if not future or future.is_done:
break
@@ -334,16 +341,25 @@ class KafkaClient(object):
if (conn.state is ConnectionStates.CONNECTED
and conn.in_flight_requests)])
if not sockets:
+ # if sockets are connecting, we can wake when they are writeable
+ if self._connecting:
+ sockets = [self._conns[node]._sock for node in self._connecting]
+ select.select([], sockets, [], timeout)
+ # otherwise just sleep to prevent CPU spinning
+ else:
+ log.debug('Nothing to do in _poll -- sleeping for %s', timeout)
+ time.sleep(timeout)
return []
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
responses = []
- # list, not iterator, because inline callbacks may add to self._conns
for sock in ready:
conn = sockets[sock]
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
- if response:
+ while conn.in_flight_requests:
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ if not response:
+ break
responses.append(response)
return responses
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 3fb9c8e..704c994 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -441,7 +441,7 @@ class KafkaConsumer(six.Iterator):
return records
self._fetcher.init_fetches()
- self._client.poll(timeout_ms / 1000.0)
+ self._client.poll(timeout_ms)
return self._fetcher.fetched_records()
def position(self, partition):
@@ -628,11 +628,14 @@ class KafkaConsumer(six.Iterator):
# init any new fetches (won't resend pending fetches)
self._fetcher.init_fetches()
- self._client.poll()
+ self._client.poll(
+ max(0, self._consumer_timeout - time.time()) * 1000)
timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
+ if time.time() > timeout_at:
+ continue
for msg in self._fetcher:
yield msg
if time.time() > timeout_at: