1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
import os
from os.path import join
from tempfile import NamedTemporaryFile, gettempdir
from threading import Thread
from unittest.mock import patch
import pytest
from appdirs import user_cache_dir
from requests_cache.backends.base import BaseCache
from requests_cache.backends.sqlite import MEMORY_URI, SQLiteCache, SQLiteDict, SQLitePickleDict
from tests.integration.base_cache_test import BaseCacheTest
from tests.integration.base_storage_test import CACHE_NAME, BaseStorageTest
class SQLiteTestCase(BaseStorageTest):
init_kwargs = {'use_temp': True}
@classmethod
def teardown_class(cls):
try:
os.unlink(f'{CACHE_NAME}.sqlite')
except Exception:
pass
def test_use_cache_dir(self):
relative_path = self.storage_class(CACHE_NAME).db_path
cache_dir_path = self.storage_class(CACHE_NAME, use_cache_dir=True).db_path
assert not str(relative_path).startswith(user_cache_dir())
assert str(cache_dir_path).startswith(user_cache_dir())
def test_use_temp(self):
relative_path = self.storage_class(CACHE_NAME).db_path
temp_path = self.storage_class(CACHE_NAME, use_temp=True).db_path
assert not str(relative_path).startswith(gettempdir())
assert str(temp_path).startswith(gettempdir())
def test_use_memory(self):
cache = self.init_cache(use_memory=True)
assert cache.db_path == MEMORY_URI
for i in range(20):
cache[f'key_{i}'] = f'value_{i}'
for i in range(5):
del cache[f'key_{i}']
assert len(cache) == 15
assert set(cache.keys()) == {f'key_{i}' for i in range(5, 20)}
assert set(cache.values()) == {f'value_{i}' for i in range(5, 20)}
cache.clear()
assert len(cache) == 0
def test_use_memory__uri(self):
self.init_cache(':memory:').db_path == ':memory:'
def test_non_dir_parent_exists(self):
"""Expect a custom error message if a parent path already exists but isn't a directory"""
with NamedTemporaryFile() as tmp:
with pytest.raises(FileExistsError) as exc_info:
self.storage_class(join(tmp.name, 'invalid_path'))
assert 'not a directory' in str(exc_info.value)
def test_bulk_commit(self):
cache = self.init_cache()
with cache.bulk_commit():
pass
n_items = 1000
with cache.bulk_commit():
for i in range(n_items):
cache[f'key_{i}'] = f'value_{i}'
assert set(cache.keys()) == {f'key_{i}' for i in range(n_items)}
assert set(cache.values()) == {f'value_{i}' for i in range(n_items)}
def test_chunked_bulk_delete(self):
"""When deleting more items than SQLite can handle in a single statement, it should be
chunked into multiple smaller statements
"""
# Populate the cache with more items than can fit in a single delete statement
cache = self.init_cache()
with cache.bulk_commit():
for i in range(2000):
cache[f'key_{i}'] = f'value_{i}'
keys = list(cache.keys())
# First pass to ensure that bulk_delete is split across three statements
with patch.object(cache, 'connection') as mock_connection:
con = mock_connection().__enter__.return_value
cache.bulk_delete(keys)
assert con.execute.call_count == 3
# Second pass to actually delete keys and make sure it doesn't explode
cache.bulk_delete(keys)
assert len(cache) == 0
def test_switch_commit(self):
cache = self.init_cache()
cache.clear()
cache['key_1'] = 'value_1'
cache = self.init_cache(clear=False)
assert 'key_1' in cache
cache._can_commit = False
cache['key_2'] = 'value_2'
cache = self.init_cache(clear=False)
assert 2 not in cache
assert cache._can_commit is True
def test_fast_save(self):
cache_1 = self.init_cache(1, fast_save=True)
cache_2 = self.init_cache(2, fast_save=True)
n = 1000
for i in range(n):
cache_1[i] = i
cache_2[i * 2] = i
assert set(cache_1.keys()) == set(range(n))
assert set(cache_2.values()) == set(range(n))
def test_noop(self):
def do_noop_bulk(cache):
with cache.bulk_commit():
pass
del cache
cache = self.init_cache()
thread = Thread(target=do_noop_bulk, args=(cache,))
thread.start()
thread.join()
# make sure connection is not closed by the thread
cache['key_1'] = 'value_1'
assert list(cache.keys()) == ['key_1']
@patch('requests_cache.backends.sqlite.sqlite3')
def test_connection_kwargs(self, mock_sqlite):
"""A spot check to make sure optional connection kwargs gets passed to connection"""
cache = self.storage_class('test', use_temp=True, timeout=0.5, invalid_kwarg='???')
mock_sqlite.connect.assert_called_with(cache.db_path, timeout=0.5)
class TestSQLiteDict(SQLiteTestCase):
storage_class = SQLiteDict
class TestSQLitePickleDict(SQLiteTestCase):
storage_class = SQLitePickleDict
picklable = True
class TestSQLiteCache(BaseCacheTest):
backend_class = SQLiteCache
init_kwargs = {'use_temp': True}
@classmethod
def teardown_class(cls):
try:
os.unlink(CACHE_NAME)
except Exception:
pass
@patch.object(BaseCache, 'clear', side_effect=IOError)
@patch('requests_cache.backends.sqlite.unlink', side_effect=os.unlink)
def test_clear__failure(self, mock_unlink, mock_clear):
"""When a corrupted cache prevents a normal DROP TABLE, clear() should still succeed"""
session = self.init_session(clear=False)
session.cache.responses['key_1'] = 'value_1'
session.cache.clear()
assert len(session.cache.responses) == 0
assert mock_unlink.call_count == 1
@patch.object(BaseCache, 'clear', side_effect=IOError)
def test_clear__file_already_deleted(self, mock_clear):
session = self.init_session(clear=False)
session.cache.responses['key_1'] = 'value_1'
os.unlink(session.cache.responses.db_path)
session.cache.clear()
assert len(session.cache.responses) == 0
def test_db_path(self):
"""This is just provided as an alias, since both requests and redirects share the same db
file
"""
session = self.init_session()
assert session.cache.db_path == session.cache.responses.db_path
|