import sys import os import posix import socket import contextlib from systemd.daemon import (booted, is_fifo, _is_fifo, is_socket, _is_socket, is_socket_inet, _is_socket_inet, is_socket_unix, _is_socket_unix, is_mq, _is_mq, listen_fds) import pytest @contextlib.contextmanager def closing_socketpair(family): pair = socket.socketpair(family) try: yield pair finally: pair[0].close() pair[1].close() def test_booted(): if os.path.exists('/run/systemd'): # assume we are running under systemd assert booted() else: # don't assume anything assert booted() in {False, True} def test__is_fifo(tmpdir): path = tmpdir.join('test.fifo').strpath posix.mkfifo(path) fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) assert _is_fifo(fd, None) assert _is_fifo(fd, path) def test__is_fifo_file(tmpdir): file = tmpdir.join('test.fifo') file.write('boo') path = file.strpath fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) assert not _is_fifo(fd, None) assert not _is_fifo(fd, path) def test__is_fifo_bad_fd(tmpdir): path = tmpdir.join('test.fifo').strpath with pytest.raises(OSError): assert not _is_fifo(-1, None) with pytest.raises(OSError): assert not _is_fifo(-1, path) def test_is_fifo(tmpdir): path = tmpdir.join('test.fifo').strpath posix.mkfifo(path) fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) file = os.fdopen(fd, 'r') assert is_fifo(file, None) assert is_fifo(file, path) assert is_fifo(fd, None) assert is_fifo(fd, path) def test_is_fifo_file(tmpdir): file = tmpdir.join('test.fifo') file.write('boo') path = file.strpath fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) file = os.fdopen(fd, 'r') assert not is_fifo(file, None) assert not is_fifo(file, path) assert not is_fifo(fd, None) assert not is_fifo(fd, path) def test_is_fifo_bad_fd(tmpdir): path = tmpdir.join('test.fifo').strpath with pytest.raises(OSError): assert not is_fifo(-1, None) with pytest.raises(OSError): assert not is_fifo(-1, path) def test_no_mismatch(): with closing_socketpair(socket.AF_UNIX) as pair: for sock in pair: assert not is_fifo(sock) assert not is_mq(sock) assert not is_socket_inet(sock) fd = sock.fileno() assert not is_fifo(fd) assert not is_mq(fd) assert not is_socket_inet(fd) assert not _is_fifo(fd) assert not _is_mq(fd) assert not _is_socket_inet(fd) def test_is_socket(): with closing_socketpair(socket.AF_UNIX) as pair: for sock in pair: for arg in (sock, sock.fileno()): assert is_socket(arg) assert is_socket(arg, socket.AF_UNIX) assert not is_socket(arg, socket.AF_INET) assert is_socket(arg, socket.AF_UNIX, socket.SOCK_STREAM) assert not is_socket(arg, socket.AF_INET, socket.SOCK_DGRAM) assert is_socket(sock) assert is_socket(arg, socket.AF_UNIX) assert not is_socket(arg, socket.AF_INET) assert is_socket(arg, socket.AF_UNIX, socket.SOCK_STREAM) assert not is_socket(arg, socket.AF_INET, socket.SOCK_DGRAM) def test__is_socket(): with closing_socketpair(socket.AF_UNIX) as pair: for sock in pair: fd = sock.fileno() assert _is_socket(fd) assert _is_socket(fd, socket.AF_UNIX) assert not _is_socket(fd, socket.AF_INET) assert _is_socket(fd, socket.AF_UNIX, socket.SOCK_STREAM) assert not _is_socket(fd, socket.AF_INET, socket.SOCK_DGRAM) assert _is_socket(fd) assert _is_socket(fd, socket.AF_UNIX) assert not _is_socket(fd, socket.AF_INET) assert _is_socket(fd, socket.AF_UNIX, socket.SOCK_STREAM) assert not _is_socket(fd, socket.AF_INET, socket.SOCK_DGRAM) def test_is_socket_unix(): with closing_socketpair(socket.AF_UNIX) as pair: for sock in pair: for arg in (sock, sock.fileno()): assert is_socket_unix(arg) assert not is_socket_unix(arg, path="/no/such/path") assert is_socket_unix(arg, socket.SOCK_STREAM) assert not is_socket_unix(arg, socket.SOCK_DGRAM) def test__is_socket_unix(): with closing_socketpair(socket.AF_UNIX) as pair: for sock in pair: fd = sock.fileno() assert _is_socket_unix(fd) assert not _is_socket_unix(fd, 0, -1, "/no/such/path") assert _is_socket_unix(fd, socket.SOCK_STREAM) assert not _is_socket_unix(fd, socket.SOCK_DGRAM) def test_listen_fds_no_fds(): # make sure we have no fds to listen to os.unsetenv('LISTEN_FDS') os.unsetenv('LISTEN_PID') assert listen_fds() == [] assert listen_fds(True) == [] assert listen_fds(False) == [] def test_listen_fds(): os.environ['LISTEN_FDS'] = '3' os.environ['LISTEN_PID'] = str(os.getpid()) assert listen_fds(False) == [3, 4, 5] assert listen_fds(True) == [3, 4, 5] assert listen_fds() == [] def test_listen_fds_default_unset(): os.environ['LISTEN_FDS'] = '1' os.environ['LISTEN_PID'] = str(os.getpid()) assert listen_fds(False) == [3] assert listen_fds() == [3] assert listen_fds() == []