summaryrefslogtreecommitdiff
path: root/yoyo/tests/test_backends.py
blob: 406d4ae9e55df5739162d058a2e3839668f2d796 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
from tempfile import NamedTemporaryFile
from threading import Thread
import time

from mock import Mock
from mock import call
from mock import patch
import pytest

from yoyo import backends
from yoyo import read_migrations
from yoyo import exceptions
from yoyo.connections import get_backend
from yoyo.tests import get_test_backends
from yoyo.tests import get_test_dburis
from yoyo.tests import with_migrations
from yoyo.tests import migrations_dir


class TestTransactionHandling(object):
    def test_it_commits(self, backend):
        with backend.transaction():
            backend.execute("INSERT INTO yoyo_t values ('A')")

        with backend.transaction():
            rows = list(backend.execute("SELECT * FROM yoyo_t").fetchall())
            assert rows == [("A",)]

    def test_it_rolls_back(self, backend):
        with pytest.raises(backend.DatabaseError):
            with backend.transaction():
                backend.execute("INSERT INTO yoyo_t values ('A')")
                # Invalid SQL to produce an error
                backend.execute("INSERT INTO nonexistant values ('A')")

        with backend.transaction():
            rows = list(backend.execute("SELECT * FROM yoyo_t").fetchall())
            assert rows == []

    def test_it_nests_transactions(self, backend):
        if "redshift" in backend.uri.scheme:
            pytest.skip("Nested transactions not supported for Redshift")

        with backend.transaction():
            backend.execute("INSERT INTO yoyo_t values ('A')")

            with backend.transaction() as trans:
                backend.execute("INSERT INTO yoyo_t values ('B')")
                trans.rollback()

            with backend.transaction() as trans:
                backend.execute("INSERT INTO yoyo_t values ('C')")

        with backend.transaction():
            rows = list(backend.execute("SELECT * FROM yoyo_t").fetchall())
            assert rows == [("A",), ("C",)]

    def test_redshift_nested_transactions(self, backend):
        if "redshift" not in backend.uri.scheme:
            pytest.skip("Redshift only test")

        with backend.transaction():
            backend.execute("INSERT INTO yoyo_t values ('A')")

            with backend.transaction() as trans:
                backend.execute("INSERT INTO yoyo_t values ('B')")
                trans.rollback()

            with backend.transaction() as trans:
                backend.execute("INSERT INTO yoyo_t values ('C')")

        with backend.transaction():
            rows = list(backend.execute("SELECT * FROM yoyo_t").fetchall())
            assert rows == [("C",)]

    def test_backend_detects_transactional_ddl(self, backend):
        expected = {
            backends.PostgresqlBackend: True,
            backends.RedshiftBackend: True,
            backends.SQLiteBackend: True,
            backends.MySQLBackend: False,
        }
        if backend.__class__ in expected:
            assert backend.has_transactional_ddl is expected[backend.__class__]

    def test_non_transactional_ddl_behaviour(self, backend):
        """
        DDL queries in MySQL commit the current transaction,
        but it still seems to respect a subsequent rollback.

        We don't rely on this behaviour, but it's weird and worth having
        a test to document how it works and flag up in future should a new
        backend do things differently
        """
        if backend.has_transactional_ddl:
            return

        with backend.transaction() as trans:
            backend.execute("CREATE TABLE yoyo_a (id INT)")  # implicit commit
            backend.execute("INSERT INTO yoyo_a VALUES (1)")
            backend.execute("CREATE TABLE yoyo_b (id INT)")  # implicit commit
            backend.execute("INSERT INTO yoyo_b VALUES (1)")
            trans.rollback()

        count_a = backend.execute("SELECT COUNT(1) FROM yoyo_a").fetchall()[0][
            0
        ]
        assert count_a == 1

        count_b = backend.execute("SELECT COUNT(1) FROM yoyo_b").fetchall()[0][
            0
        ]
        assert count_b == 0

    @with_migrations(
        a="""
        __transactional__ = False
        step('CREATE DATABASE yoyo_test_tmp',
             'DROP DATABASE yoyo_test_tmp',
             )
    """
    )
    def test_statements_requiring_no_transaction(self, tmpdir):
        """
        PostgreSQL will error if certain statements (eg CREATE DATABASE)
        are run within a transaction block.

        As far as I know this behavior is PostgreSQL specific. We can't run
        this test in sqlite or oracle as they do not support CREATE DATABASE.
        """
        for backend in get_test_backends(exclude={"sqlite", "oracle"}):
            migrations = read_migrations(tmpdir)
            backend.apply_migrations(migrations)
            backend.rollback_migrations(migrations)

    @with_migrations(
        a="""
        __transactional__ = False
        def reopen_db(conn):
            import sqlite3
            for _, db, filename in conn.execute('PRAGMA database_list'):
                if db == 'main':
                     reconn = sqlite3.connect(filename)
                     reconn.execute("CREATE TABLE yoyo_test_b (id int)")
                     break
            else:
                raise AssertionError("sqlite main database not found")

        step('CREATE TABLE yoyo_test_a (id int)')
        step(reopen_db)
        step('CREATE TABLE yoyo_test_c (id int)')
    """
    )
    def test_disabling_transactions_in_sqlite(self, tmpdir):
        """
        Transactions cause sqlite databases to become locked, preventing
        other tools from accessing them:

        https://bitbucket.org/ollyc/yoyo/issues/43/run-step-outside-of-transaction
        """
        with NamedTemporaryFile() as tmp:
            backend = get_backend("sqlite:///" + tmp.name)
            backend.apply_migrations(read_migrations(tmpdir))
            assert "yoyo_test_a" in backend.list_tables()
            assert "yoyo_test_b" in backend.list_tables()
            assert "yoyo_test_c" in backend.list_tables()


class TestConcurrency(object):

    # How long to lock for: long enough to allow a migration to be loaded and
    # started without unduly slowing down the test suite
    lock_duration = 0.5

    def get_lock_sleeper(self, dburi):
        """
        Return a function that acquires the backend lock, then sleeps
        """
        backend = get_backend(dburi)

        def lock_sleep():
            with backend.lock():
                time.sleep(self.lock_duration)

        return lock_sleep

    def skip_if_not_concurrency_safe(self, backend):
        if (
            "sqlite" in backend.uri.scheme
            and backend.uri.database == ":memory:"
        ):
            pytest.skip(
                "Concurrency tests not supported for SQLite "
                "in-memory databases, which cannot be shared "
                "between threads"
            )
        if backend.driver.threadsafety < 1:
            pytest.skip(
                "Concurrency tests not supported for non-threadsafe backends"
            )

    def test_lock(self, dburi):
        """
        Test that :meth:`~yoyo.backends.DatabaseBackend.lock`
        acquires an exclusive lock
        """
        backend = get_backend(dburi)
        self.skip_if_not_concurrency_safe(backend)
        thread = Thread(target=self.get_lock_sleeper(dburi))
        t = time.time()
        thread.start()

        # Give the thread time to acquire the lock, but not enough
        # to complete
        time.sleep(self.lock_duration * 0.6)

        with backend.lock():
            delta = time.time() - t
            assert delta >= self.lock_duration

        thread.join()

    def test_lock_times_out(self, dburi):

        backend = get_backend(dburi)
        self.skip_if_not_concurrency_safe(backend)

        thread = Thread(target=self.get_lock_sleeper(dburi))
        thread.start()
        # Give the thread time to acquire the lock, but not enough
        # to complete
        time.sleep(self.lock_duration * 0.6)
        with pytest.raises(exceptions.LockTimeout):
            with backend.lock(timeout=0.001):
                assert False, "Execution should never reach this point"

        thread.join()


class TestInitConnection(object):
    class MockBackend(backends.DatabaseBackend):
        driver = Mock(DatabaseError=Exception, paramstyle="format")

        def list_tables(self):
            return []

        def connect(self, dburi):
            return Mock()

    def test_it_calls_init_connection(self):

        with patch("yoyo.internalmigrations.upgrade"), patch.object(
            self.MockBackend, "init_connection", Mock()
        ) as mock_init:

            backend = self.MockBackend("", "")
            connection = backend.connection
            assert mock_init.call_args == call(connection)

            mock_init.reset_mock()
            backend.rollback()
            assert mock_init.call_args_list == [call(connection)]

    def test_postgresql_backend_sets_search_path(self):
        class MockPGBackend(backends.PostgresqlBackend):
            driver = Mock(DatabaseError=Exception, paramstyle="format")
            schema = "foo"

            def connect(self, dburi):
                return Mock()

        with patch("yoyo.internalmigrations.upgrade"):
            backend = MockPGBackend("", "")
            backend.rollback()
            assert backend.connection.cursor().execute.call_args == call(
                "SET search_path TO foo"
            )

    def test_postgresql_connects_with_schema(self):
        dburi = next(iter(get_test_dburis(only={"postgresql"})), None)
        if dburi is None:
            pytest.skip("PostgreSQL backend not available")
            return
        backend = get_backend(dburi)
        with backend.transaction():
            backend.execute("CREATE SCHEMA foo")
        try:
            assert get_backend(dburi + "?schema=foo").execute(
                "SHOW search_path"
            ).fetchone() == ("foo",)
        finally:
            with backend.transaction():
                backend.execute("DROP SCHEMA foo CASCADE")

    def test_postgresql_list_table_uses_current_schema(self):
        dburi = next(iter(get_test_dburis(only={"postgresql"})), None)
        if dburi is None:
            pytest.skip("PostgreSQL backend not available")
        backend = get_backend(dburi)
        dbname = backend.uri.database
        with backend.transaction():
            backend.execute(
                "ALTER DATABASE {} SET SEARCH_PATH = custom_schema,public".format(
                    dbname
                )
            )
        try:
            with backend.transaction():
                backend.execute("CREATE SCHEMA custom_schema")
                backend.execute("CREATE TABLE custom_schema.foo (x int)")
            assert "foo" in get_backend(dburi).list_tables()

        finally:
            with backend.transaction():
                backend.execute(
                    "ALTER DATABASE {} RESET SEARCH_PATH".format(dbname)
                )
                backend.execute("DROP SCHEMA custom_schema CASCADE")

    def test_postgresql_migrations_can_change_schema_search_path(self):
        """
        https://todo.sr.ht/~olly/yoyo/72
        """
        dburi = next(iter(get_test_dburis(only={"postgresql"})), None)
        if dburi is None:
            pytest.skip("PostgreSQL backend not available")
        backend = get_backend(dburi)
        with migrations_dir(
            **{
                "1.sql": "SELECT pg_catalog.set_config('search_path', '', false)"
            }
        ) as tmpdir:
            migrations = read_migrations(tmpdir)
            backend.apply_migrations(migrations)
            applied = backend.execute("SELECT migration_id FROM _yoyo_log").fetchall()
            assert applied == [("1",)]