diff options
author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2014-08-04 05:45:35 +0000 |
---|---|---|
committer | <> | 2014-12-10 05:33:45 +0000 |
commit | afcc4ea312255a2545f9c67d7c34ffefb00c80c0 (patch) | |
tree | 5ca5269e5d4fa6263242a7a96b713616e5f389e0 /test/test_pidfile.py | |
parent | 02378192d5bb4b16498d87ace57da425166426bf (diff) | |
download | python-daemon-master.tar.gz |
Imported from /home/lorry/working-area/delta_python-packages_python-daemon/python-daemon-1.6.1.tar.gz.HEADpython-daemon-1.6.1master
Diffstat (limited to 'test/test_pidfile.py')
-rw-r--r-- | test/test_pidfile.py | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/test/test_pidfile.py b/test/test_pidfile.py new file mode 100644 index 0000000..6c31a0e --- /dev/null +++ b/test/test_pidfile.py @@ -0,0 +1,407 @@ +# -*- coding: utf-8 -*- +# +# test/test_pidfile.py +# Part of ‘python-daemon’, an implementation of PEP 3143. +# +# Copyright © 2008–2014 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Apache License, version 2.0 as published by the +# Apache Software Foundation. +# No warranty expressed or implied. See the file LICENSE.ASF-2 for details. + +""" Unit test for ‘pidfile’ module. + """ + +from __future__ import unicode_literals + +import __builtin__ as builtins +import os +from StringIO import StringIO +import itertools +import tempfile +import errno + +import lockfile +from lockfile import pidlockfile + +import scaffold + +import daemon.pidfile + + +class FakeFileDescriptorStringIO(StringIO, object): + """ A StringIO class that fakes a file descriptor. """ + + _fileno_generator = itertools.count() + + def __init__(self, *args, **kwargs): + self._fileno = self._fileno_generator.next() + super_instance = super(FakeFileDescriptorStringIO, self) + super_instance.__init__(*args, **kwargs) + + def fileno(self): + return self._fileno + + +def make_pidlockfile_scenarios(): + """ Make a collection of scenarios for testing PIDLockFile instances. """ + + mock_current_pid = 235 + mock_other_pid = 8642 + mock_pidfile_path = tempfile.mktemp() + + mock_pidfile_empty = FakeFileDescriptorStringIO() + mock_pidfile_current_pid = FakeFileDescriptorStringIO( + "%(mock_current_pid)d\n" % vars()) + mock_pidfile_other_pid = FakeFileDescriptorStringIO( + "%(mock_other_pid)d\n" % vars()) + mock_pidfile_bogus = FakeFileDescriptorStringIO( + "b0gUs") + + scenarios = { + 'simple': {}, + 'not-exist': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'not-exist-write-denied': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'not-exist-write-busy': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'exist-read-denied': { + 'open_func_name': 'mock_open_read_denied', + 'os_open_func_name': 'mock_os_open_read_denied', + }, + 'exist-locked-read-denied': { + 'locking_pid': mock_other_pid, + 'open_func_name': 'mock_open_read_denied', + 'os_open_func_name': 'mock_os_open_read_denied', + }, + 'exist-empty': {}, + 'exist-invalid': { + 'pidfile': mock_pidfile_bogus, + }, + 'exist-current-pid': { + 'pidfile': mock_pidfile_current_pid, + 'pidfile_pid': mock_current_pid, + }, + 'exist-current-pid-locked': { + 'pidfile': mock_pidfile_current_pid, + 'pidfile_pid': mock_current_pid, + 'locking_pid': mock_current_pid, + }, + 'exist-other-pid': { + 'pidfile': mock_pidfile_other_pid, + 'pidfile_pid': mock_other_pid, + }, + 'exist-other-pid-locked': { + 'pidfile': mock_pidfile_other_pid, + 'pidfile_pid': mock_other_pid, + 'locking_pid': mock_other_pid, + }, + } + + for scenario in scenarios.values(): + scenario['pid'] = mock_current_pid + scenario['path'] = mock_pidfile_path + if 'pidfile' not in scenario: + scenario['pidfile'] = mock_pidfile_empty + if 'pidfile_pid' not in scenario: + scenario['pidfile_pid'] = None + if 'locking_pid' not in scenario: + scenario['locking_pid'] = None + if 'open_func_name' not in scenario: + scenario['open_func_name'] = 'mock_open_okay' + if 'os_open_func_name' not in scenario: + scenario['os_open_func_name'] = 'mock_os_open_okay' + + return scenarios + + +def setup_pidfile_fixtures(testcase): + """ Set up common fixtures for PID file test cases. """ + testcase.mock_tracker = scaffold.MockTracker() + + scenarios = make_pidlockfile_scenarios() + testcase.pidlockfile_scenarios = scenarios + + def get_scenario_option(testcase, key, default=None): + value = default + try: + value = testcase.scenario[key] + except (NameError, TypeError, AttributeError, KeyError): + pass + return value + + scaffold.mock( + "os.getpid", + returns=scenarios['simple']['pid'], + tracker=testcase.mock_tracker) + + def make_mock_open_funcs(testcase): + + def mock_open_nonexist(filename, mode, buffering): + if 'r' in mode: + raise IOError( + errno.ENOENT, "No such file %(filename)r" % vars()) + else: + result = testcase.scenario['pidfile'] + return result + + def mock_open_read_denied(filename, mode, buffering): + if 'r' in mode: + raise IOError( + errno.EPERM, "Read denied on %(filename)r" % vars()) + else: + result = testcase.scenario['pidfile'] + return result + + def mock_open_okay(filename, mode, buffering): + result = testcase.scenario['pidfile'] + return result + + def mock_os_open_nonexist(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + raise OSError( + errno.ENOENT, "No such file %(filename)r" % vars()) + return result + + def mock_os_open_read_denied(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + raise OSError( + errno.EPERM, "Read denied on %(filename)r" % vars()) + return result + + def mock_os_open_okay(filename, flags, mode): + result = testcase.scenario['pidfile'].fileno() + return result + + funcs = dict( + (name, obj) for (name, obj) in vars().items() + if hasattr(obj, '__call__')) + + return funcs + + testcase.mock_pidfile_open_funcs = make_mock_open_funcs(testcase) + + def mock_open(filename, mode='r', buffering=None): + scenario_path = get_scenario_option(testcase, 'path') + if filename == scenario_path: + func_name = testcase.scenario['open_func_name'] + mock_open_func = testcase.mock_pidfile_open_funcs[func_name] + result = mock_open_func(filename, mode, buffering) + else: + result = FakeFileDescriptorStringIO() + return result + + scaffold.mock( + "builtins.open", + returns_func=mock_open, + tracker=testcase.mock_tracker) + + def mock_os_open(filename, flags, mode=None): + scenario_path = get_scenario_option(testcase, 'path') + if filename == scenario_path: + func_name = testcase.scenario['os_open_func_name'] + mock_os_open_func = testcase.mock_pidfile_open_funcs[func_name] + result = mock_os_open_func(filename, flags, mode) + else: + result = FakeFileDescriptorStringIO().fileno() + return result + + scaffold.mock( + "os.open", + returns_func=mock_os_open, + tracker=testcase.mock_tracker) + + def mock_os_fdopen(fd, mode='r', buffering=None): + scenario_pidfile = get_scenario_option( + testcase, 'pidfile', FakeFileDescriptorStringIO()) + if fd == testcase.scenario['pidfile'].fileno(): + result = testcase.scenario['pidfile'] + else: + raise OSError(errno.EBADF, "Bad file descriptor") + return result + + scaffold.mock( + "os.fdopen", + returns_func=mock_os_fdopen, + tracker=testcase.mock_tracker) + + testcase.scenario = NotImplemented + + +def setup_lockfile_method_mocks(testcase, scenario, class_name): + """ Set up common mock methods for lockfile class. """ + + def mock_read_pid(): + return scenario['pidfile_pid'] + def mock_is_locked(): + return (scenario['locking_pid'] is not None) + def mock_i_am_locking(): + return ( + scenario['locking_pid'] == scenario['pid']) + def mock_acquire(timeout=None): + if scenario['locking_pid'] is not None: + raise lockfile.AlreadyLocked() + scenario['locking_pid'] = scenario['pid'] + def mock_release(): + if scenario['locking_pid'] is None: + raise lockfile.NotLocked() + if scenario['locking_pid'] != scenario['pid']: + raise lockfile.NotMyLock() + scenario['locking_pid'] = None + def mock_break_lock(): + scenario['locking_pid'] = None + + for func_name in [ + 'read_pid', + 'is_locked', 'i_am_locking', + 'acquire', 'release', 'break_lock', + ]: + mock_func = vars()["mock_%(func_name)s" % vars()] + lockfile_func_name = "%(class_name)s.%(func_name)s" % vars() + mock_lockfile_func = scaffold.Mock( + lockfile_func_name, + returns_func=mock_func, + tracker=testcase.mock_tracker) + try: + scaffold.mock( + lockfile_func_name, + mock_obj=mock_lockfile_func, + tracker=testcase.mock_tracker) + except NameError: + pass + + +def setup_pidlockfile_fixtures(testcase, scenario_name=None): + """ Set up common fixtures for PIDLockFile test cases. """ + + setup_pidfile_fixtures(testcase) + + scaffold.mock( + "pidlockfile.write_pid_to_pidfile", + tracker=testcase.mock_tracker) + scaffold.mock( + "pidlockfile.remove_existing_pidfile", + tracker=testcase.mock_tracker) + + if scenario_name is not None: + set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=False) + + +def set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=True): + """ Set up the test case to the specified scenario. """ + testcase.scenario = testcase.pidlockfile_scenarios[scenario_name] + setup_lockfile_method_mocks( + testcase, testcase.scenario, "lockfile.LinkLockFile") + testcase.pidlockfile_args = dict( + path=testcase.scenario['path'], + ) + testcase.test_instance = pidlockfile.PIDLockFile( + **testcase.pidlockfile_args) + if clear_tracker: + testcase.mock_tracker.clear() + + +class TimeoutPIDLockFile_TestCase(scaffold.TestCase): + """ Test cases for ‘TimeoutPIDLockFile’ class. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + pidlockfile_scenarios = make_pidlockfile_scenarios() + self.pidlockfile_scenario = pidlockfile_scenarios['simple'] + pidfile_path = self.pidlockfile_scenario['path'] + + scaffold.mock( + "pidlockfile.PIDLockFile.__init__", + tracker=self.mock_tracker) + scaffold.mock( + "pidlockfile.PIDLockFile.acquire", + tracker=self.mock_tracker) + + self.scenario = { + 'pidfile_path': self.pidlockfile_scenario['path'], + 'acquire_timeout': object(), + } + + self.test_kwargs = dict( + path=self.scenario['pidfile_path'], + acquire_timeout=self.scenario['acquire_timeout'], + ) + self.test_instance = daemon.pidfile.TimeoutPIDLockFile( + **self.test_kwargs) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_inherits_from_pidlockfile(self): + """ Should inherit from PIDLockFile. """ + instance = self.test_instance + self.failUnlessIsInstance(instance, pidlockfile.PIDLockFile) + + def test_init_has_expected_signature(self): + """ Should have expected signature for ‘__init__’. """ + def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass + test_func.__name__ = b'__init__' + self.failUnlessFunctionSignatureMatch( + test_func, + daemon.pidfile.TimeoutPIDLockFile.__init__) + + def test_has_specified_acquire_timeout(self): + """ Should have specified ‘acquire_timeout’ value. """ + instance = self.test_instance + expect_timeout = self.test_kwargs['acquire_timeout'] + self.failUnlessEqual(expect_timeout, instance.acquire_timeout) + + def test_calls_superclass_init(self): + """ Should call the superclass ‘__init__’. """ + expect_path = self.test_kwargs['path'] + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.__init__( + %(expect_path)r) + """ % vars() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_acquire_uses_specified_timeout(self): + """ Should call the superclass ‘acquire’ with specified timeout. """ + instance = self.test_instance + test_timeout = object() + expect_timeout = test_timeout + self.mock_tracker.clear() + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r) + """ % vars() + instance.acquire(test_timeout) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_acquire_uses_stored_timeout_by_default(self): + """ Should call superclass ‘acquire’ with stored timeout by default. """ + instance = self.test_instance + test_timeout = self.test_kwargs['acquire_timeout'] + expect_timeout = test_timeout + self.mock_tracker.clear() + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r) + """ % vars() + instance.acquire() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +# Local variables: +# coding: utf-8 +# mode: python +# End: +# vim: fileencoding=utf-8 filetype=python : |