diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-06 06:44:25 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-03-07 16:52:04 -0800 |
commit | 957c62d6ded7a3652e7897db20a23e070a6ad852 (patch) | |
tree | 524d8ee0c42ddc40459aea78e3e9cd1508a14675 | |
parent | 23132863d0e00bd8aabc0e19c7e1822dabfb05b9 (diff) | |
download | kafka-python-957c62d6ded7a3652e7897db20a23e070a6ad852.tar.gz |
Move all network connection IO into KafkaClient.poll()
-rw-r--r-- | kafka/client_async.py | 30 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 4 | ||||
-rw-r--r-- | test/fixtures.py | 7 | ||||
-rw-r--r-- | test/test_client_async.py | 9 |
4 files changed, 32 insertions, 18 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index c1bdd82..d608e6a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -304,7 +304,10 @@ class KafkaClient(object): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) + try: self._selector.register(conn._sock, selectors.EVENT_WRITE) + except KeyError: + self._selector.modify(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -312,10 +315,10 @@ class KafkaClient(object): self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.modify(conn._sock, selectors.EVENT_READ, conn) except KeyError: - pass - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: self._sensors.connection_created.record() @@ -336,6 +339,7 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass + if self._sensors: self._sensors.connection_closed.record() @@ -348,6 +352,17 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() + def maybe_connect(self, node_id): + """Queues a node for asynchronous connection during the next .poll()""" + if self._can_connect(node_id): + self._connecting.add(node_id) + # Wakeup signal is useful in case another thread is + # blocked waiting for incoming network traffic while holding + # the client lock in poll(). + self.wakeup() + return True + return False + def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: @@ -397,7 +412,7 @@ class KafkaClient(object): Returns: bool: True if we are ready to send to the given node """ - self._maybe_connect(node_id) + self.maybe_connect(node_id) return self.is_ready(node_id, metadata_priority=metadata_priority) def connected(self, node_id): @@ -520,8 +535,8 @@ class KafkaClient(object): Future: resolves to Response struct or Error """ if not self._can_send_request(node_id): - if not self._maybe_connect(node_id): - return Future().failure(Errors.NodeNotReadyError(node_id)) + self.maybe_connect(node_id) + return Future().failure(Errors.NodeNotReadyError(node_id)) # conn.send will queue the request internally # we will need to call send_pending_requests() @@ -814,9 +829,8 @@ class KafkaClient(object): # have such application level configuration, using request timeout instead. return self.config['request_timeout_ms'] - if self._can_connect(node_id): + if self.maybe_connect(node_id): log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) return self.config['reconnect_backoff_ms'] # connected but can't send more, OR connecting diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1435183..664e8d2 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -252,7 +252,7 @@ class BaseCoordinator(object): if self.config['api_version'] < (0, 8, 2): self.coordinator_id = self._client.least_loaded_node() if self.coordinator_id is not None: - self._client.ready(self.coordinator_id) + self._client.maybe_connect(self.coordinator_id) continue future = self.lookup_coordinator() @@ -686,7 +686,7 @@ class BaseCoordinator(object): self.coordinator_id = response.coordinator_id log.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) - self._client.ready(self.coordinator_id) + self._client.maybe_connect(self.coordinator_id) self.heartbeat.reset_timeouts() future.success(self.coordinator_id) diff --git a/test/fixtures.py b/test/fixtures.py index 34373e6..8b156e6 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -405,10 +405,11 @@ class KafkaFixture(Fixture): retries = 10 while True: node_id = self._client.least_loaded_node() - for ready_retry in range(40): - if self._client.ready(node_id, False): + for connect_retry in range(40): + self._client.maybe_connect(node_id) + if self._client.connected(node_id): break - time.sleep(.1) + self._client.poll(timeout_ms=100) else: raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) diff --git a/test/test_client_async.py b/test/test_client_async.py index 09781ac..1c8a50f 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn): conn.state = ConnectionStates.CONNECTED cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting - sel.unregister.assert_called_with(conn._sock) - sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn) + sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False @@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn): def test_ready(mocker, cli, conn): - maybe_connect = mocker.patch.object(cli, '_maybe_connect') + maybe_connect = mocker.patch.object(cli, 'maybe_connect') node_id = 1 cli.ready(node_id) maybe_connect.assert_called_with(node_id) @@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) + mocker.patch.object(client, 'maybe_connect', return_value=True) now = time.time() t = mocker.patch('time.time') @@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(2.222) # reconnect backoff - client._can_connect.assert_called_once_with('foobar') - client._maybe_connect.assert_called_once_with('foobar') + client.maybe_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') |