summaryrefslogtreecommitdiff
path: root/tests/integration/test_sqlite.py
diff options
context:
space:
mode:
authorJordan Cook <JWCook@users.noreply.github.com>2021-04-22 13:27:24 -0500
committerGitHub <noreply@github.com>2021-04-22 13:27:24 -0500
commitf2bf9bbfd8d711e9dfc8ccce83f439d765d484cb (patch)
treefb617be7c7760390b7239899e6522d865956342f /tests/integration/test_sqlite.py
parent38ddcf5425eadc6b174a8b053b2175c4dda00a1f (diff)
parent3ec85f0107caedbe3b3bd8080bd3dac81b7baa17 (diff)
downloadrequests-cache-f2bf9bbfd8d711e9dfc8ccce83f439d765d484cb.tar.gz
Merge pull request #242 from JWCook/integration-tests
Refactor backend integration tests
Diffstat (limited to 'tests/integration/test_sqlite.py')
-rw-r--r--tests/integration/test_sqlite.py136
1 files changed, 60 insertions, 76 deletions
diff --git a/tests/integration/test_sqlite.py b/tests/integration/test_sqlite.py
index a6fd50d..63c67ec 100644
--- a/tests/integration/test_sqlite.py
+++ b/tests/integration/test_sqlite.py
@@ -1,110 +1,94 @@
import os
-import unittest
from threading import Thread
from unittest.mock import patch
-from requests_cache.backends.sqlite import DbDict, DbPickleDict
-from tests.integration.test_backends import BaseStorageTestCase
+from requests_cache.backends.sqlite import DbCache, DbDict, DbPickleDict
+from tests.integration.test_backends import CACHE_NAME, BaseCacheTest, BaseStorageTest
-class SQLiteTestCase(BaseStorageTestCase):
- def tearDown(self):
+class SQLiteTestCase(BaseStorageTest):
+ @classmethod
+ def teardown_class(cls):
try:
- os.unlink(self.NAMESPACE)
+ os.unlink(CACHE_NAME)
except Exception:
pass
def test_bulk_commit(self):
- d = self.storage_class(self.NAMESPACE, self.TABLES[0])
- with d.bulk_commit():
+ cache = self.init_cache()
+ with cache.bulk_commit():
pass
- d.clear()
+
n = 1000
- with d.bulk_commit():
+ with cache.bulk_commit():
for i in range(n):
- d[i] = i
- assert list(d.keys()) == list(range(n))
+ cache[i] = i
+ assert list(cache.keys()) == list(range(n))
def test_switch_commit(self):
- d = self.storage_class(self.NAMESPACE)
- d.clear()
- d[1] = 1
- d = self.storage_class(self.NAMESPACE)
- assert 1 in d
+ cache = self.init_cache()
+ cache.clear()
+ cache['key_1'] = 'value_1'
+ cache = self.init_cache(clear=False)
+ assert 'key_1' in cache
- d._can_commit = False
- d[2] = 2
+ cache._can_commit = False
+ cache['key_2'] = 'value_2'
- d = self.storage_class(self.NAMESPACE)
- assert 2 not in d
- assert d._can_commit is True
+ cache = self.init_cache(clear=False)
+ assert 2 not in cache
+ assert cache._can_commit is True
def test_fast_save(self):
- d1 = self.storage_class(self.NAMESPACE, fast_save=True)
- d2 = self.storage_class(self.NAMESPACE, self.TABLES[1], fast_save=True)
- d1.clear()
+ cache_1 = self.init_cache(1, fast_save=True)
+ cache_2 = self.init_cache(2, fast_save=True)
+
n = 1000
for i in range(n):
- d1[i] = i
- d2[i * 2] = i
- # HACK if we will not sort, fast save can produce different order of records
- assert sorted(d1.keys()) == list(range(n))
- assert sorted(d2.values()) == list(range(n))
-
- def test_usage_with_threads(self):
- def do_test_for(d, n_threads=5):
- d.clear()
-
- def do_inserts(values):
- for v in values:
- d[v] = v
-
- def values(x, n):
- return [i * x for i in range(n)]
-
- threads = [Thread(target=do_inserts, args=(values(i, n_threads),)) for i in range(n_threads)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
-
- for i in range(n_threads):
- for x in values(i, n_threads):
- assert d[x] == x
-
- do_test_for(self.storage_class(self.NAMESPACE))
- do_test_for(self.storage_class(self.NAMESPACE, fast_save=True), 20)
- do_test_for(self.storage_class(self.NAMESPACE, fast_save=True))
- do_test_for(self.storage_class(self.NAMESPACE, self.TABLES[1], fast_save=True))
+ cache_1[i] = i
+ cache_2[i * 2] = i
+
+ assert set(cache_1.keys()) == set(range(n))
+ assert set(cache_2.values()) == set(range(n))
def test_noop(self):
- def do_noop_bulk(d):
- with d.bulk_commit():
+ def do_noop_bulk(cache):
+ with cache.bulk_commit():
pass
- del d
+ del cache
- d = self.storage_class(self.NAMESPACE)
- t = Thread(target=do_noop_bulk, args=(d,))
- t.start()
- t.join()
+ cache = self.init_cache()
+ thread = Thread(target=do_noop_bulk, args=(cache,))
+ thread.start()
+ thread.join()
# make sure connection is not closed by the thread
- d[0] = 0
- assert str(d) == "{0: 0}"
+ cache['key_1'] = 'value_1'
+ assert list(cache.keys()) == ['key_1']
+
+ @patch('requests_cache.backends.sqlite.sqlite3')
+ def test_connection_kwargs(self, mock_sqlite):
+ """A spot check to make sure optional connection kwargs gets passed to connection"""
+ cache = self.storage_class('test', timeout=0.5, invalid_kwarg='???')
+ mock_sqlite.connect.assert_called_with(cache.db_path, timeout=0.5)
-class DbDictTestCase(SQLiteTestCase, unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, storage_class=DbDict, **kwargs)
+class TestDbDict(SQLiteTestCase):
+ storage_class = DbDict
-class DbPickleDictTestCase(SQLiteTestCase, unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, storage_class=DbPickleDict, picklable=True, **kwargs)
+class TestDbPickleDict(SQLiteTestCase):
+ storage_class = DbPickleDict
+ picklable = True
-@patch('requests_cache.backends.sqlite.sqlite3')
-def test_connection_kwargs(mock_sqlite):
- """A spot check to make sure optional connection kwargs gets passed to connection"""
- DbDict('test', timeout=0.5, invalid_kwarg='???')
- mock_sqlite.connect.assert_called_with('test', timeout=0.5)
+class TestDbCache(BaseCacheTest):
+ backend_class = DbCache
+ init_kwargs = {'use_temp': True}
+
+ @classmethod
+ def teardown_class(cls):
+ try:
+ os.unlink(CACHE_NAME)
+ except Exception:
+ pass