summaryrefslogtreecommitdiff
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test__xxsubinterpreters.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 8a368dc113..44f4d3fa0f 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -1207,6 +1207,185 @@ class ChannelTests(TestBase):
self.assertEqual(cid2, int(cid1) + 1)
+ def test_channel_list_interpreters_none(self):
+ """Test listing interpreters for a channel with no associations."""
+ # Test for channel with no associated interpreters.
+ cid = interpreters.channel_create()
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(send_interps, [])
+ self.assertEqual(recv_interps, [])
+
+ def test_channel_list_interpreters_basic(self):
+ """Test basic listing channel interpreters."""
+ interp0 = interpreters.get_main()
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, "send")
+ # Test for a channel that has one end associated to an interpreter.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(send_interps, [interp0])
+ self.assertEqual(recv_interps, [])
+
+ interp1 = interpreters.create()
+ _run_output(interp1, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ # Test for channel that has boths ends associated to an interpreter.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(send_interps, [interp0])
+ self.assertEqual(recv_interps, [interp1])
+
+ def test_channel_list_interpreters_multiple(self):
+ """Test listing interpreters for a channel with many associations."""
+ interp0 = interpreters.get_main()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ interp3 = interpreters.create()
+ cid = interpreters.channel_create()
+
+ interpreters.channel_send(cid, "send")
+ _run_output(interp1, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ _interpreters.channel_send({cid}, "send")
+ """))
+ _run_output(interp2, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ _run_output(interp3, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(set(send_interps), {interp0, interp1})
+ self.assertEqual(set(recv_interps), {interp2, interp3})
+
+ def test_channel_list_interpreters_destroyed(self):
+ """Test listing channel interpreters with a destroyed interpreter."""
+ interp0 = interpreters.get_main()
+ interp1 = interpreters.create()
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, "send")
+ _run_output(interp1, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ # Should be one interpreter associated with each end.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(send_interps, [interp0])
+ self.assertEqual(recv_interps, [interp1])
+
+ interpreters.destroy(interp1)
+ # Destroyed interpreter should not be listed.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(send_interps, [interp0])
+ self.assertEqual(recv_interps, [])
+
+ def test_channel_list_interpreters_released(self):
+ """Test listing channel interpreters with a released channel."""
+ # Set up one channel with main interpreter on the send end and two
+ # subinterpreters on the receive end.
+ interp0 = interpreters.get_main()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, "data")
+ _run_output(interp1, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ interpreters.channel_send(cid, "data")
+ _run_output(interp2, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ """))
+ # Check the setup.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(send_interps), 1)
+ self.assertEqual(len(recv_interps), 2)
+
+ # Release the main interpreter from the send end.
+ interpreters.channel_release(cid, send=True)
+ # Send end should have no associated interpreters.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(send_interps), 0)
+ self.assertEqual(len(recv_interps), 2)
+
+ # Release one of the subinterpreters from the receive end.
+ _run_output(interp2, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ _interpreters.channel_release({cid})
+ """))
+ # Receive end should have the released interpreter removed.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(send_interps), 0)
+ self.assertEqual(recv_interps, [interp1])
+
+ def test_channel_list_interpreters_closed(self):
+ """Test listing channel interpreters with a closed channel."""
+ interp0 = interpreters.get_main()
+ interp1 = interpreters.create()
+ cid = interpreters.channel_create()
+ # Put something in the channel so that it's not empty.
+ interpreters.channel_send(cid, "send")
+
+ # Check initial state.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(send_interps), 1)
+ self.assertEqual(len(recv_interps), 0)
+
+ # Force close the channel.
+ interpreters.channel_close(cid, force=True)
+ # Both ends should raise an error.
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=True)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=False)
+
+ def test_channel_list_interpreters_closed_send_end(self):
+ """Test listing channel interpreters with a channel's send end closed."""
+ interp0 = interpreters.get_main()
+ interp1 = interpreters.create()
+ cid = interpreters.channel_create()
+ # Put something in the channel so that it's not empty.
+ interpreters.channel_send(cid, "send")
+
+ # Check initial state.
+ send_interps = interpreters.channel_list_interpreters(cid, send=True)
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(send_interps), 1)
+ self.assertEqual(len(recv_interps), 0)
+
+ # Close the send end of the channel.
+ interpreters.channel_close(cid, send=True)
+ # Send end should raise an error.
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=True)
+ # Receive end should not be closed (since channel is not empty).
+ recv_interps = interpreters.channel_list_interpreters(cid, send=False)
+ self.assertEqual(len(recv_interps), 0)
+
+ # Close the receive end of the channel from a subinterpreter.
+ _run_output(interp1, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ _interpreters.channel_close({cid}, force=True)
+ """))
+ # Both ends should raise an error.
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=True)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=False)
+
####################
def test_send_recv_main(self):
@@ -1540,6 +1719,23 @@ class ChannelTests(TestBase):
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid)
+ def test_channel_list_interpreters_invalid_channel(self):
+ cid = interpreters.channel_create()
+ # Test for invalid channel ID.
+ with self.assertRaises(interpreters.ChannelNotFoundError):
+ interpreters.channel_list_interpreters(1000, send=True)
+
+ interpreters.channel_close(cid)
+ # Test for a channel that has been closed.
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_list_interpreters(cid, send=True)
+
+ def test_channel_list_interpreters_invalid_args(self):
+ # Tests for invalid arguments passed to the API.
+ cid = interpreters.channel_create()
+ with self.assertRaises(TypeError):
+ interpreters.channel_list_interpreters(cid)
+
class ChannelReleaseTests(TestBase):