diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-01 11:39:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-12-01 11:39:14 -0800 |
commit | 229dcc89efdcf828ca929914d1384e9c8a0f141e (patch) | |
tree | 8d1d172b983c7cb4bdc70418d4e971d99b2b30d6 /kafka/client_async.py | |
parent | 9b59c5d755af73c2e7863e98b84b5882c297afda (diff) | |
download | kafka-python-drain_requests_join_group.tar.gz |
Fix possible request draining in ensure_active_groupdrain_requests_join_group
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cba187b..bb96578 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -453,7 +453,7 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) - def poll(self, timeout_ms=None, future=None, sleep=True): + def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -488,14 +488,15 @@ class KafkaClient(object): metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks - for task, task_future in self._delayed_tasks.pop_ready(): - try: - result = task() - except Exception as e: - log.error("Task %s failed: %s", task, e) - task_future.failure(e) - else: - task_future.success(result) + if delayed_tasks: + for task, task_future in self._delayed_tasks.pop_ready(): + try: + result = task() + except Exception as e: + log.error("Task %s failed: %s", task, e) + task_future.failure(e) + else: + task_future.success(result) # If we got a future that is already done, don't block in _poll if future and future.is_done: |