diff options
Diffstat (limited to 'tests/integration/test_sqlite.py')
| -rw-r--r-- | tests/integration/test_sqlite.py | 136 |
1 files changed, 60 insertions, 76 deletions
diff --git a/tests/integration/test_sqlite.py b/tests/integration/test_sqlite.py index a6fd50d..63c67ec 100644 --- a/tests/integration/test_sqlite.py +++ b/tests/integration/test_sqlite.py @@ -1,110 +1,94 @@ import os -import unittest from threading import Thread from unittest.mock import patch -from requests_cache.backends.sqlite import DbDict, DbPickleDict -from tests.integration.test_backends import BaseStorageTestCase +from requests_cache.backends.sqlite import DbCache, DbDict, DbPickleDict +from tests.integration.test_backends import CACHE_NAME, BaseCacheTest, BaseStorageTest -class SQLiteTestCase(BaseStorageTestCase): - def tearDown(self): +class SQLiteTestCase(BaseStorageTest): + @classmethod + def teardown_class(cls): try: - os.unlink(self.NAMESPACE) + os.unlink(CACHE_NAME) except Exception: pass def test_bulk_commit(self): - d = self.storage_class(self.NAMESPACE, self.TABLES[0]) - with d.bulk_commit(): + cache = self.init_cache() + with cache.bulk_commit(): pass - d.clear() + n = 1000 - with d.bulk_commit(): + with cache.bulk_commit(): for i in range(n): - d[i] = i - assert list(d.keys()) == list(range(n)) + cache[i] = i + assert list(cache.keys()) == list(range(n)) def test_switch_commit(self): - d = self.storage_class(self.NAMESPACE) - d.clear() - d[1] = 1 - d = self.storage_class(self.NAMESPACE) - assert 1 in d + cache = self.init_cache() + cache.clear() + cache['key_1'] = 'value_1' + cache = self.init_cache(clear=False) + assert 'key_1' in cache - d._can_commit = False - d[2] = 2 + cache._can_commit = False + cache['key_2'] = 'value_2' - d = self.storage_class(self.NAMESPACE) - assert 2 not in d - assert d._can_commit is True + cache = self.init_cache(clear=False) + assert 2 not in cache + assert cache._can_commit is True def test_fast_save(self): - d1 = self.storage_class(self.NAMESPACE, fast_save=True) - d2 = self.storage_class(self.NAMESPACE, self.TABLES[1], fast_save=True) - d1.clear() + cache_1 = self.init_cache(1, fast_save=True) + cache_2 = self.init_cache(2, fast_save=True) + n = 1000 for i in range(n): - d1[i] = i - d2[i * 2] = i - # HACK if we will not sort, fast save can produce different order of records - assert sorted(d1.keys()) == list(range(n)) - assert sorted(d2.values()) == list(range(n)) - - def test_usage_with_threads(self): - def do_test_for(d, n_threads=5): - d.clear() - - def do_inserts(values): - for v in values: - d[v] = v - - def values(x, n): - return [i * x for i in range(n)] - - threads = [Thread(target=do_inserts, args=(values(i, n_threads),)) for i in range(n_threads)] - for t in threads: - t.start() - for t in threads: - t.join() - - for i in range(n_threads): - for x in values(i, n_threads): - assert d[x] == x - - do_test_for(self.storage_class(self.NAMESPACE)) - do_test_for(self.storage_class(self.NAMESPACE, fast_save=True), 20) - do_test_for(self.storage_class(self.NAMESPACE, fast_save=True)) - do_test_for(self.storage_class(self.NAMESPACE, self.TABLES[1], fast_save=True)) + cache_1[i] = i + cache_2[i * 2] = i + + assert set(cache_1.keys()) == set(range(n)) + assert set(cache_2.values()) == set(range(n)) def test_noop(self): - def do_noop_bulk(d): - with d.bulk_commit(): + def do_noop_bulk(cache): + with cache.bulk_commit(): pass - del d + del cache - d = self.storage_class(self.NAMESPACE) - t = Thread(target=do_noop_bulk, args=(d,)) - t.start() - t.join() + cache = self.init_cache() + thread = Thread(target=do_noop_bulk, args=(cache,)) + thread.start() + thread.join() # make sure connection is not closed by the thread - d[0] = 0 - assert str(d) == "{0: 0}" + cache['key_1'] = 'value_1' + assert list(cache.keys()) == ['key_1'] + + @patch('requests_cache.backends.sqlite.sqlite3') + def test_connection_kwargs(self, mock_sqlite): + """A spot check to make sure optional connection kwargs gets passed to connection""" + cache = self.storage_class('test', timeout=0.5, invalid_kwarg='???') + mock_sqlite.connect.assert_called_with(cache.db_path, timeout=0.5) -class DbDictTestCase(SQLiteTestCase, unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, storage_class=DbDict, **kwargs) +class TestDbDict(SQLiteTestCase): + storage_class = DbDict -class DbPickleDictTestCase(SQLiteTestCase, unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, storage_class=DbPickleDict, picklable=True, **kwargs) +class TestDbPickleDict(SQLiteTestCase): + storage_class = DbPickleDict + picklable = True -@patch('requests_cache.backends.sqlite.sqlite3') -def test_connection_kwargs(mock_sqlite): - """A spot check to make sure optional connection kwargs gets passed to connection""" - DbDict('test', timeout=0.5, invalid_kwarg='???') - mock_sqlite.connect.assert_called_with('test', timeout=0.5) +class TestDbCache(BaseCacheTest): + backend_class = DbCache + init_kwargs = {'use_temp': True} + + @classmethod + def teardown_class(cls): + try: + os.unlink(CACHE_NAME) + except Exception: + pass |
