diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cba187b..bb96578 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -453,7 +453,7 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) - def poll(self, timeout_ms=None, future=None, sleep=True): + def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -488,14 +488,15 @@ class KafkaClient(object): metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks - for task, task_future in self._delayed_tasks.pop_ready(): - try: - result = task() - except Exception as e: - log.error("Task %s failed: %s", task, e) - task_future.failure(e) - else: - task_future.success(result) + if delayed_tasks: + for task, task_future in self._delayed_tasks.pop_ready(): + try: + result = task() + except Exception as e: + log.error("Task %s failed: %s", task, e) + task_future.failure(e) + else: + task_future.success(result) # If we got a future that is already done, don't block in _poll if future and future.is_done: |