summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-12-01 11:39:14 -0800
committerDana Powers <dana.powers@gmail.com>2016-12-01 11:39:14 -0800
commit229dcc89efdcf828ca929914d1384e9c8a0f141e (patch)
tree8d1d172b983c7cb4bdc70418d4e971d99b2b30d6
parent9b59c5d755af73c2e7863e98b84b5882c297afda (diff)
downloadkafka-python-drain_requests_join_group.tar.gz
Fix possible request draining in ensure_active_groupdrain_requests_join_group
-rw-r--r--kafka/client_async.py19
-rw-r--r--kafka/coordinator/base.py9
2 files changed, 16 insertions, 12 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index cba187b..bb96578 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -453,7 +453,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response)
- def poll(self, timeout_ms=None, future=None, sleep=True):
+ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
@@ -488,14 +488,15 @@ class KafkaClient(object):
metadata_timeout_ms = self._maybe_refresh_metadata()
# Send scheduled tasks
- for task, task_future in self._delayed_tasks.pop_ready():
- try:
- result = task()
- except Exception as e:
- log.error("Task %s failed: %s", task, e)
- task_future.failure(e)
- else:
- task_future.success(result)
+ if delayed_tasks:
+ for task, task_future in self._delayed_tasks.pop_ready():
+ try:
+ result = task()
+ except Exception as e:
+ log.error("Task %s failed: %s", task, e)
+ task_future.failure(e)
+ else:
+ task_future.success(result)
# If we got a future that is already done, don't block in _poll
if future and future.is_done:
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 22dffb4..e4ebcb0 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -246,9 +246,12 @@ class BaseCoordinator(object):
# This is important in particular to avoid resending a pending
# JoinGroup request.
if self._client.in_flight_request_count(self.coordinator_id):
- while self._client.in_flight_request_count(self.coordinator_id):
- self._client.poll()
- continue
+ while not self.coordinator_unknown():
+ self._client.poll(delayed_tasks=False)
+ if not self._client.in_flight_request_count(self.coordinator_id):
+ break
+ else:
+ continue
future = self._send_join_group_request()
self._client.poll(future=future)