diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-02 18:25:28 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-12-02 18:25:28 -0800 |
commit | 010ebb53a9e3b1c4e8d69a623e4a082b5a2b9baa (patch) | |
tree | 8d1d172b983c7cb4bdc70418d4e971d99b2b30d6 /kafka/client_async.py | |
parent | 9b59c5d755af73c2e7863e98b84b5882c297afda (diff) | |
download | kafka-python-010ebb53a9e3b1c4e8d69a623e4a082b5a2b9baa.tar.gz |
Fix possible request draining in ensure_active_group (#896)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cba187b..bb96578 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -453,7 +453,7 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) - def poll(self, timeout_ms=None, future=None, sleep=True): + def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -488,14 +488,15 @@ class KafkaClient(object): metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks - for task, task_future in self._delayed_tasks.pop_ready(): - try: - result = task() - except Exception as e: - log.error("Task %s failed: %s", task, e) - task_future.failure(e) - else: - task_future.success(result) + if delayed_tasks: + for task, task_future in self._delayed_tasks.pop_ready(): + try: + result = task() + except Exception as e: + log.error("Task %s failed: %s", task, e) + task_future.failure(e) + else: + task_future.success(result) # If we got a future that is already done, don't block in _poll if future and future.is_done: |