summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client_async.py7
-rw-r--r--kafka/conn.py10
-rw-r--r--kafka/producer/sender.py2
3 files changed, 11 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 96c0647..975202e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -267,9 +267,9 @@ class KafkaClient(object):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
- self._selector.modify(sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE, conn)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -624,6 +624,9 @@ class KafkaClient(object):
self._clear_wake_fd()
continue
elif not (events & selectors.EVENT_READ):
+ conn = key.data
+ if conn.node_id in self._connecting:
+ self._maybe_connect(conn.node_id)
continue
conn = key.data
processed.add(conn)
diff --git a/kafka/conn.py b/kafka/conn.py
index 5ef141c..07b667a 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -748,16 +748,16 @@ class BrokerConnection(object):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
- the reconnect backoff time. When connecting, returns 0 to allow
- non-blocking connect to finish. When connected, returns a very large
- number to handle slow/stalled connections.
+ the reconnect backoff time. When connecting or connected, returns a very
+ large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
- elif self.connecting():
- return 0
else:
+ # When connecting or connected, we should be able to delay
+ # indefinitely since other events (connection or data acked) will
+ # cause a wakeup once data can be sent.
return float('inf')
def connected(self):
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 064fee4..88ec07c 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -157,7 +157,7 @@ class Sender(threading.Thread):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
- self._client.poll(poll_timeout_ms)
+ self._client.poll(timeout_ms=poll_timeout_ms)
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""