import pytest import time from redis.exceptions import LockError, LockNotOwnedError from redis.client import Redis from redis.lock import Lock from .conftest import _get_client @pytest.mark.onlynoncluster class TestLock: @pytest.fixture() def r_decoded(self, request): return _get_client(Redis, request=request, decode_responses=True) def get_lock(self, redis, *args, **kwargs): kwargs['lock_class'] = Lock return redis.lock(*args, **kwargs) def test_lock(self, r): lock = self.get_lock(r, 'foo') assert lock.acquire(blocking=False) assert r.get('foo') == lock.local.token assert r.ttl('foo') == -1 lock.release() assert r.get('foo') is None def test_lock_token(self, r): lock = self.get_lock(r, 'foo') self._test_lock_token(r, lock) def test_lock_token_thread_local_false(self, r): lock = self.get_lock(r, 'foo', thread_local=False) self._test_lock_token(r, lock) def _test_lock_token(self, r, lock): assert lock.acquire(blocking=False, token='test') assert r.get('foo') == b'test' assert lock.local.token == b'test' assert r.ttl('foo') == -1 lock.release() assert r.get('foo') is None assert lock.local.token is None def test_locked(self, r): lock = self.get_lock(r, 'foo') assert lock.locked() is False lock.acquire(blocking=False) assert lock.locked() is True lock.release() assert lock.locked() is False def _test_owned(self, client): lock = self.get_lock(client, 'foo') assert lock.owned() is False lock.acquire(blocking=False) assert lock.owned() is True lock.release() assert lock.owned() is False lock2 = self.get_lock(client, 'foo') assert lock.owned() is False assert lock2.owned() is False lock2.acquire(blocking=False) assert lock.owned() is False assert lock2.owned() is True lock2.release() assert lock.owned() is False assert lock2.owned() is False def test_owned(self, r): self._test_owned(r) def test_owned_with_decoded_responses(self, r_decoded): self._test_owned(r_decoded) def test_competing_locks(self, r): lock1 = self.get_lock(r, 'foo') lock2 = self.get_lock(r, 'foo') assert lock1.acquire(blocking=False) assert not lock2.acquire(blocking=False) lock1.release() assert lock2.acquire(blocking=False) assert not lock1.acquire(blocking=False) lock2.release() def test_timeout(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) assert 8 < r.ttl('foo') <= 10 lock.release() def test_float_timeout(self, r): lock = self.get_lock(r, 'foo', timeout=9.5) assert lock.acquire(blocking=False) assert 8 < r.pttl('foo') <= 9500 lock.release() def test_blocking_timeout(self, r): lock1 = self.get_lock(r, 'foo') assert lock1.acquire(blocking=False) bt = 0.2 sleep = 0.05 lock2 = self.get_lock(r, 'foo', sleep=sleep, blocking_timeout=bt) start = time.monotonic() assert not lock2.acquire() # The elapsed duration should be less than the total blocking_timeout assert bt > (time.monotonic() - start) > bt - sleep lock1.release() def test_context_manager(self, r): # blocking_timeout prevents a deadlock if the lock can't be acquired # for some reason with self.get_lock(r, 'foo', blocking_timeout=0.2) as lock: assert r.get('foo') == lock.local.token assert r.get('foo') is None def test_context_manager_raises_when_locked_not_acquired(self, r): r.set('foo', 'bar') with pytest.raises(LockError): with self.get_lock(r, 'foo', blocking_timeout=0.1): pass def test_high_sleep_small_blocking_timeout(self, r): lock1 = self.get_lock(r, 'foo') assert lock1.acquire(blocking=False) sleep = 60 bt = 1 lock2 = self.get_lock(r, 'foo', sleep=sleep, blocking_timeout=bt) start = time.monotonic() assert not lock2.acquire() # the elapsed timed is less than the blocking_timeout as the lock is # unattainable given the sleep/blocking_timeout configuration assert bt > (time.monotonic() - start) lock1.release() def test_releasing_unlocked_lock_raises_error(self, r): lock = self.get_lock(r, 'foo') with pytest.raises(LockError): lock.release() def test_releasing_lock_no_longer_owned_raises_error(self, r): lock = self.get_lock(r, 'foo') lock.acquire(blocking=False) # manually change the token r.set('foo', 'a') with pytest.raises(LockNotOwnedError): lock.release() # even though we errored, the token is still cleared assert lock.local.token is None def test_extend_lock(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) assert 8000 < r.pttl('foo') <= 10000 assert lock.extend(10) assert 16000 < r.pttl('foo') <= 20000 lock.release() def test_extend_lock_replace_ttl(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) assert 8000 < r.pttl('foo') <= 10000 assert lock.extend(10, replace_ttl=True) assert 8000 < r.pttl('foo') <= 10000 lock.release() def test_extend_lock_float(self, r): lock = self.get_lock(r, 'foo', timeout=10.0) assert lock.acquire(blocking=False) assert 8000 < r.pttl('foo') <= 10000 assert lock.extend(10.0) assert 16000 < r.pttl('foo') <= 20000 lock.release() def test_extending_unlocked_lock_raises_error(self, r): lock = self.get_lock(r, 'foo', timeout=10) with pytest.raises(LockError): lock.extend(10) def test_extending_lock_with_no_timeout_raises_error(self, r): lock = self.get_lock(r, 'foo') assert lock.acquire(blocking=False) with pytest.raises(LockError): lock.extend(10) lock.release() def test_extending_lock_no_longer_owned_raises_error(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) r.set('foo', 'a') with pytest.raises(LockNotOwnedError): lock.extend(10) def test_reacquire_lock(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) assert r.pexpire('foo', 5000) assert r.pttl('foo') <= 5000 assert lock.reacquire() assert 8000 < r.pttl('foo') <= 10000 lock.release() def test_reacquiring_unlocked_lock_raises_error(self, r): lock = self.get_lock(r, 'foo', timeout=10) with pytest.raises(LockError): lock.reacquire() def test_reacquiring_lock_with_no_timeout_raises_error(self, r): lock = self.get_lock(r, 'foo') assert lock.acquire(blocking=False) with pytest.raises(LockError): lock.reacquire() lock.release() def test_reacquiring_lock_no_longer_owned_raises_error(self, r): lock = self.get_lock(r, 'foo', timeout=10) assert lock.acquire(blocking=False) r.set('foo', 'a') with pytest.raises(LockNotOwnedError): lock.reacquire() @pytest.mark.onlynoncluster class TestLockClassSelection: def test_lock_class_argument(self, r): class MyLock: def __init__(self, *args, **kwargs): pass lock = r.lock('foo', lock_class=MyLock) assert type(lock) == MyLock