From 60404ed3dbbd1683d49a966caa39cd5bee0a821c Mon Sep 17 00:00:00 2001 From: Olly Cope Date: Sun, 24 Jan 2021 15:18:48 +0000 Subject: tests: replace `@with_migrations` decorator with equivalent context manager This has been causing problems because pytest fixtures don't work well with function decorators. If `with_migrations` doesn't correctly wrap the decorator using `functools.wraps`, pytest fixtures cannot be used by the decorated test function. But if we do use `functools.wraps`, pytest fixtures assumes the extra function argument is a fixture name and then fails either because it conflicts with the builtin `tmpdir` fixture or, if we rename it, because such a fixture doesn't exist. --- yoyo/tests/__init__.py | 1 - yoyo/tests/test_backends.py | 68 ++++--- yoyo/tests/test_cli_script.py | 329 ++++++++++++++++---------------- yoyo/tests/test_migrations.py | 430 +++++++++++++++++++++--------------------- 4 files changed, 416 insertions(+), 412 deletions(-) diff --git a/yoyo/tests/__init__.py b/yoyo/tests/__init__.py index e74a1d5..f00efd1 100644 --- a/yoyo/tests/__init__.py +++ b/yoyo/tests/__init__.py @@ -100,5 +100,4 @@ def tempdir(): rmtree(tmpdir) -with_migrations = MigrationsContextManager migrations_dir = MigrationsContextManager diff --git a/yoyo/tests/test_backends.py b/yoyo/tests/test_backends.py index 406d4ae..f89861a 100644 --- a/yoyo/tests/test_backends.py +++ b/yoyo/tests/test_backends.py @@ -13,7 +13,6 @@ from yoyo import exceptions from yoyo.connections import get_backend from yoyo.tests import get_test_backends from yoyo.tests import get_test_dburis -from yoyo.tests import with_migrations from yoyo.tests import migrations_dir @@ -112,15 +111,7 @@ class TestTransactionHandling(object): ] assert count_b == 0 - @with_migrations( - a=""" - __transactional__ = False - step('CREATE DATABASE yoyo_test_tmp', - 'DROP DATABASE yoyo_test_tmp', - ) - """ - ) - def test_statements_requiring_no_transaction(self, tmpdir): + def test_statements_requiring_no_transaction(self): """ PostgreSQL will error if certain statements (eg CREATE DATABASE) are run within a transaction block. @@ -128,37 +119,44 @@ class TestTransactionHandling(object): As far as I know this behavior is PostgreSQL specific. We can't run this test in sqlite or oracle as they do not support CREATE DATABASE. """ - for backend in get_test_backends(exclude={"sqlite", "oracle"}): - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - backend.rollback_migrations(migrations) - - @with_migrations( - a=""" - __transactional__ = False - def reopen_db(conn): - import sqlite3 - for _, db, filename in conn.execute('PRAGMA database_list'): - if db == 'main': - reconn = sqlite3.connect(filename) - reconn.execute("CREATE TABLE yoyo_test_b (id int)") - break - else: - raise AssertionError("sqlite main database not found") - - step('CREATE TABLE yoyo_test_a (id int)') - step(reopen_db) - step('CREATE TABLE yoyo_test_c (id int)') - """ - ) - def test_disabling_transactions_in_sqlite(self, tmpdir): + with migrations_dir( + a=""" + __transactional__ = False + step('CREATE DATABASE yoyo_test_tmp', + 'DROP DATABASE yoyo_test_tmp', + ) + """ + ) as tmpdir: + for backend in get_test_backends(exclude={"sqlite", "oracle"}): + migrations = read_migrations(tmpdir) + backend.apply_migrations(migrations) + backend.rollback_migrations(migrations) + + def test_disabling_transactions_in_sqlite(self): """ Transactions cause sqlite databases to become locked, preventing other tools from accessing them: https://bitbucket.org/ollyc/yoyo/issues/43/run-step-outside-of-transaction """ - with NamedTemporaryFile() as tmp: + with migrations_dir( + a=""" + __transactional__ = False + def reopen_db(conn): + import sqlite3 + for _, db, filename in conn.execute('PRAGMA database_list'): + if db == 'main': + reconn = sqlite3.connect(filename) + reconn.execute("CREATE TABLE yoyo_test_b (id int)") + break + else: + raise AssertionError("sqlite main database not found") + + step('CREATE TABLE yoyo_test_a (id int)') + step(reopen_db) + step('CREATE TABLE yoyo_test_c (id int)') + """ + ) as tmpdir, NamedTemporaryFile() as tmp: backend = get_backend("sqlite:///" + tmp.name) backend.apply_migrations(read_migrations(tmpdir)) assert "yoyo_test_a" in backend.list_tables() diff --git a/yoyo/tests/test_cli_script.py b/yoyo/tests/test_cli_script.py index 9c59b22..3b6afe9 100644 --- a/yoyo/tests/test_cli_script.py +++ b/yoyo/tests/test_cli_script.py @@ -32,7 +32,6 @@ import tms from yoyo import read_migrations from yoyo.config import get_configparser from yoyo.tests import dburi_sqlite3 -from yoyo.tests import with_migrations from yoyo.tests import migrations_dir from yoyo.tests import get_backend from yoyo.scripts.main import main, parse_args, LEGACY_CONFIG_FILENAME @@ -86,91 +85,93 @@ class TestInteractiveScript(object): class TestYoyoScript(TestInteractiveScript): - @with_migrations() def test_it_sets_verbosity_level(self, tmpdir): with patch("yoyo.scripts.main.configure_logging") as m: - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) assert m.call_args == call(0) - main(["-vvv", "apply", tmpdir, "--database", dburi_sqlite3]) + main(["-vvv", "apply", str(tmpdir), "--database", dburi_sqlite3]) assert m.call_args == call(3) - @with_migrations() def test_it_prompts_to_create_config_file(self, tmpdir): - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) assert "save migration config" in self.confirm.call_args[0][0].lower() - @with_migrations() def test_it_creates_config_file(self, tmpdir): self.confirm.return_value = True - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) assert os.path.exists("yoyo.ini") with open("yoyo.ini") as f: assert "database = {0}".format(dburi_sqlite3) in f.read() - @with_migrations() def test_it_uses_config_file(self, tmpdir): self.writeconfig(batch_mode="on") with patch("yoyo.scripts.migrate.apply") as apply: - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) args_used = apply.call_args[0][0] assert args_used.batch_mode is True - @with_migrations() def test_it_ignores_config_file(self, tmpdir): self.writeconfig(batch_mode="on") with patch("yoyo.scripts.migrate.apply") as apply: - main(["apply", "--no-config-file", tmpdir, "--database", dburi_sqlite3]) + main( + [ + "apply", + "--no-config-file", + str(tmpdir), + "--database", + dburi_sqlite3, + ] + ) args_used = apply.call_args[0][0] assert args_used.batch_mode is False - @with_migrations() def test_it_prompts_password(self, tmpdir): dburi = "sqlite://user@/:memory" with patch( "yoyo.scripts.main.getpass", return_value="fish" ) as getpass, patch("yoyo.connections.get_backend") as get_backend: - main(["apply", tmpdir, "--database", dburi, "--prompt-password"]) + main(["apply", str(tmpdir), "--database", dburi, "--prompt-password"]) assert getpass.call_count == 1 assert get_backend.call_args == call( "sqlite://user:fish@/:memory", "_yoyo_migration" ) - @with_migrations() def test_it_prompts_migrations(self, tmpdir): with patch( "yoyo.scripts.migrate.prompt_migrations" ) as prompt_migrations, patch( "yoyo.scripts.migrate.get_backend" ) as get_backend: - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) migrations = get_backend().to_apply() assert migrations in prompt_migrations.call_args[0] - @with_migrations() def test_it_applies_migrations(self, tmpdir): with patch("yoyo.scripts.migrate.get_backend") as get_backend: - main(["-b", "apply", tmpdir, "--database", dburi_sqlite3]) + main(["-b", "apply", str(tmpdir), "--database", dburi_sqlite3]) assert get_backend().rollback_migrations.call_count == 0 assert get_backend().apply_migrations.call_count == 1 - @with_migrations() def test_it_rollsback_migrations(self, tmpdir): with patch("yoyo.scripts.migrate.get_backend") as get_backend: - main(["-b", "rollback", tmpdir, "--database", dburi_sqlite3]) + main(["-b", "rollback", str(tmpdir), "--database", dburi_sqlite3]) assert get_backend().rollback_migrations.call_count == 1 assert get_backend().apply_migrations.call_count == 0 - @with_migrations() def test_it_reapplies_migrations(self, tmpdir): with patch("yoyo.scripts.migrate.get_backend") as get_backend: - main(["-b", "reapply", tmpdir, "--database", dburi_sqlite3]) + main(["-b", "reapply", str(tmpdir), "--database", dburi_sqlite3]) assert get_backend().rollback_migrations.call_count == 1 assert get_backend().apply_migrations.call_count == 1 - @with_migrations(m1='step("CREATE TABLE yoyo_test1 (id INT)")') - @with_migrations(m2='step("CREATE TABLE yoyo_test2 (id INT)")') - def test_it_applies_from_multiple_sources(self, t1, t2): - with patch("yoyo.backends.DatabaseBackend.apply_migrations") as apply: + def test_it_applies_from_multiple_sources(self): + with migrations_dir( + m1='step("CREATE TABLE yoyo_test1 (id INT)")' + ) as t1, migrations_dir( + m2='step("CREATE TABLE yoyo_test2 (id INT)")' + ) as t2, patch( + "yoyo.backends.DatabaseBackend.apply_migrations" + ) as apply: main(["-b", "apply", t1, t2, "--database", dburi_sqlite3]) call_posargs, call_kwargs = apply.call_args migrations, _ = call_posargs @@ -179,16 +180,15 @@ class TestYoyoScript(TestInteractiveScript): os.path.join(t2, "m2.py"), ] - @with_migrations() def test_it_offers_to_upgrade(self, tmpdir): - legacy_config_path = os.path.join(tmpdir, LEGACY_CONFIG_FILENAME) + legacy_config_path = os.path.join(str(tmpdir), LEGACY_CONFIG_FILENAME) with io.open(legacy_config_path, "w", encoding="utf-8") as f: f.write("[DEFAULT]\n") f.write("migration_table=_yoyo_migration\n") f.write("dburi=sqlite:///\n") self.confirm.return_value = True - main(["apply", tmpdir]) + main(["apply", str(tmpdir)]) prompts = [ args[0].lower() for args, kwargs in self.confirm.call_args_list ] @@ -204,24 +204,22 @@ class TestYoyoScript(TestInteractiveScript): assert "batch_mode = off\n" in config assert "verbosity = 0\n" in config - @with_migrations() def test_it_upgrades_migration_table_None(self, tmpdir): - legacy_config_path = os.path.join(tmpdir, LEGACY_CONFIG_FILENAME) + legacy_config_path = os.path.join(str(tmpdir), LEGACY_CONFIG_FILENAME) with io.open(legacy_config_path, "w", encoding="utf-8") as f: f.write("[DEFAULT]\n") f.write("migration_table=None\n") f.write("dburi=sqlite:///\n") self.confirm.return_value = True - main(["apply", tmpdir]) + main(["apply", str(tmpdir)]) with open("yoyo.ini", "r") as f: config = f.read() assert "migration_table = _yoyo_migration\n" in config - @with_migrations() def test_it_forces_batch_mode_if_not_running_in_a_tty(self, tmpdir): with patch("sys.stdout", isatty=lambda: False): - main(["apply", tmpdir, "--database", dburi_sqlite3]) + main(["apply", str(tmpdir), "--database", dburi_sqlite3]) assert self.prompt.call_count == 0 assert self.confirm.call_count == 0 @@ -273,7 +271,7 @@ class TestYoyoScript(TestInteractiveScript): ).fetchone()[0] assert lock_count == 0 - def test_it_prompts_password_on_break_lock(self, tmpdir): + def test_it_prompts_password_on_break_lock(self): dburi = "sqlite://user@/:memory" with patch( "yoyo.scripts.main.getpass", return_value="fish" @@ -321,81 +319,83 @@ class TestArgParsing(TestInteractiveScript): class TestMarkCommand(TestInteractiveScript): - @with_migrations( - m1='step("CREATE TABLE test1 (id INT)")', - m2='step("CREATE TABLE test2 (id INT)")', - m3='step("CREATE TABLE test3 (id INT)")', - ) - def test_it_prompts_only_unapplied(self, tmpdir): - from yoyo.connections import get_backend - - migrations = read_migrations(tmpdir) - backend = get_backend(self.dburi) - backend.apply_migrations(migrations[:1]) - - with patch( - "yoyo.scripts.migrate.prompt_migrations" - ) as prompt_migrations: - main(["mark", tmpdir, "--database", self.dburi]) - _, prompted, _ = prompt_migrations.call_args[0] - prompted = [m.id for m in prompted] - assert prompted == ["m2", "m3"] - - @with_migrations( - m1='step("INSERT INTO t VALUES (1)")', - m2='__depends__=["m1"]; step("INSERT INTO t VALUES (2)")', - m3='step("INSERT INTO t VALUES (2)")', - ) - def test_it_marks_at_selected_version(self, tmpdir): - from yoyo.connections import get_backend + def test_it_prompts_only_unapplied(self): + with migrations_dir( + m1='step("CREATE TABLE test1 (id INT)")', + m2='step("CREATE TABLE test2 (id INT)")', + m3='step("CREATE TABLE test3 (id INT)")', + ) as tmpdir: + from yoyo.connections import get_backend - self.confirm.return_value = True - migrations = read_migrations(tmpdir) - backend = get_backend(self.dburi) - with backend.transaction(): - backend.execute("CREATE TABLE t (id INT)") + migrations = read_migrations(tmpdir) + backend = get_backend(self.dburi) + backend.apply_migrations(migrations[:1]) - main(["mark", "-r", "m2", tmpdir, "--database", self.dburi]) - assert backend.is_applied(migrations[0]) - assert backend.is_applied(migrations[1]) - assert not backend.is_applied(migrations[2]) + with patch( + "yoyo.scripts.migrate.prompt_migrations" + ) as prompt_migrations: + main(["mark", tmpdir, "--database", self.dburi]) + _, prompted, _ = prompt_migrations.call_args[0] + prompted = [m.id for m in prompted] + assert prompted == ["m2", "m3"] - # Check that migration steps have not been applied - c = backend.execute("SELECT * FROM t") - assert len(c.fetchall()) == 0 + def test_it_marks_at_selected_version(self): + with migrations_dir( + m1='step("INSERT INTO t VALUES (1)")', + m2='__depends__=["m1"]; step("INSERT INTO t VALUES (2)")', + m3='step("INSERT INTO t VALUES (2)")', + ) as tmpdir: + from yoyo.connections import get_backend + self.confirm.return_value = True + migrations = read_migrations(tmpdir) + backend = get_backend(self.dburi) + with backend.transaction(): + backend.execute("CREATE TABLE t (id INT)") -class TestUnmarkCommand(TestInteractiveScript): - @with_migrations(m1="", m2="", m3="") - def test_it_prompts_only_applied(self, tmpdir): - from yoyo.connections import get_backend + main(["mark", "-r", "m2", tmpdir, "--database", self.dburi]) + assert backend.is_applied(migrations[0]) + assert backend.is_applied(migrations[1]) + assert not backend.is_applied(migrations[2]) - migrations = read_migrations(tmpdir) - backend = get_backend(self.dburi) - backend.apply_migrations(migrations[:2]) - assert len(backend.get_applied_migration_hashes()) == 2 + # Check that migration steps have not been applied + c = backend.execute("SELECT * FROM t") + assert len(c.fetchall()) == 0 - with patch( - "yoyo.scripts.migrate.prompt_migrations" - ) as prompt_migrations: - main(["unmark", tmpdir, "--database", self.dburi]) - _, prompted, _ = prompt_migrations.call_args[0] - prompted = [m.id for m in prompted] - assert prompted == ["m2", "m1"] - @with_migrations(m1="", m2='__depends__=["m1"]', m3='__depends__=["m2"]') - def test_it_unmarks_to_selected_revision(self, tmpdir): - from yoyo.connections import get_backend +class TestUnmarkCommand(TestInteractiveScript): + def test_it_prompts_only_applied(self): + with migrations_dir(m1="", m2="", m3="") as tmpdir: + from yoyo.connections import get_backend + + migrations = read_migrations(tmpdir) + backend = get_backend(self.dburi) + backend.apply_migrations(migrations[:2]) + assert len(backend.get_applied_migration_hashes()) == 2 + + with patch( + "yoyo.scripts.migrate.prompt_migrations" + ) as prompt_migrations: + main(["unmark", tmpdir, "--database", self.dburi]) + _, prompted, _ = prompt_migrations.call_args[0] + prompted = [m.id for m in prompted] + assert prompted == ["m2", "m1"] + + def test_it_unmarks_to_selected_revision(self): + with migrations_dir( + m1="", m2='__depends__=["m1"]', m3='__depends__=["m2"]' + ) as tmpdir: + from yoyo.connections import get_backend - self.confirm.return_value = True - migrations = read_migrations(tmpdir) - backend = get_backend(self.dburi) - backend.apply_migrations(migrations) + self.confirm.return_value = True + migrations = read_migrations(tmpdir) + backend = get_backend(self.dburi) + backend.apply_migrations(migrations) - main(["unmark", "-r", "m2", tmpdir, "--database", self.dburi]) - assert backend.is_applied(migrations[0]) - assert not backend.is_applied(migrations[1]) - assert not backend.is_applied(migrations[2]) + main(["unmark", "-r", "m2", tmpdir, "--database", self.dburi]) + assert backend.is_applied(migrations[0]) + assert not backend.is_applied(migrations[1]) + assert not backend.is_applied(migrations[2]) class TestNewMigration(TestInteractiveScript): @@ -415,61 +415,61 @@ class TestNewMigration(TestInteractiveScript): self.subprocess_patch.stop() self.stat_patch.stop() - @with_migrations() def test_it_creates_an_empty_migration(self, tmpdir): - main(["new", "-b", "-m", "foo", tmpdir, "--database", dburi_sqlite3]) - assert any("-foo.py" in f for f in os.listdir(tmpdir)) - - @with_migrations( - m1="", m2='__depends__=["m1"]; step("INSERT INTO t VALUES (2)")', m3="" - ) - def test_it_depends_on_all_current_heads(self, tmpdir): - main(["new", "-b", "-m", "foo", tmpdir, "--database", dburi_sqlite3]) - m = next(f for f in os.listdir(tmpdir) if "-foo.py" in f) - with io.open(os.path.join(tmpdir, m), encoding="utf-8") as f: - assert "__depends__ = {'m2', 'm3'}" in f.read() - - @with_migrations() + main(["new", "-b", "-m", "foo", str(tmpdir), "--database", dburi_sqlite3]) + assert any("-foo.py" in f for f in os.listdir(str(tmpdir))) + + def test_it_depends_on_all_current_heads(self): + with migrations_dir( + m1="", m2='__depends__=["m1"]; step("INSERT INTO t VALUES (2)")', m3="" + ) as tmpdir: + main(["new", "-b", "-m", "foo", tmpdir, "--database", dburi_sqlite3]) + m = next(f for f in os.listdir(tmpdir) if "-foo.py" in f) + with io.open(os.path.join(tmpdir, m), encoding="utf-8") as f: + assert "__depends__ = {'m2', 'm3'}" in f.read() + def test_it_names_file_by_date_and_sequence(self, tmpdir): with frozendate.freeze(2001, 1, 1): - main(["new", "-b", "-m", "foo", tmpdir, "--database", dburi_sqlite3]) - main(["new", "-b", "-m", "bar", tmpdir, "--database", dburi_sqlite3]) - names = [n for n in sorted(os.listdir(tmpdir)) if n.endswith(".py")] + main( + ["new", "-b", "-m", "foo", str(tmpdir), "--database", dburi_sqlite3] + ) + main( + ["new", "-b", "-m", "bar", str(tmpdir), "--database", dburi_sqlite3] + ) + names = [n for n in sorted(os.listdir(str(tmpdir))) if n.endswith(".py")] assert names[0].startswith("20010101_01_") assert names[0].endswith("-foo.py") assert names[1].startswith("20010101_02_") assert names[1].endswith("-bar.py") - @with_migrations() def test_it_invokes_correct_editor_binary_from_config(self, tmpdir): self.writeconfig(editor="vim {} -c +10") - main(["new", tmpdir, "--database", dburi_sqlite3]) + main(["new", str(tmpdir), "--database", dburi_sqlite3]) assert self.subprocess.call.call_args == call( [ "vim", - tms.Matcher(partial(is_tmpfile, directory=tmpdir)), + tms.Matcher(partial(is_tmpfile, directory=str(tmpdir))), "-c", "+10", ] ) - @with_migrations() def test_it_invokes_correct_editor_binary_from_env(self, tmpdir): # default to $VISUAL with patch("os.environ", {"EDITOR": "ed", "VISUAL": "visualed"}): - main(["new", tmpdir, "--database", dburi_sqlite3]) + main(["new", str(tmpdir), "--database", dburi_sqlite3]) assert self.subprocess.call.call_args == call( ["visualed", tms.Unicode()] ) # fallback to $EDITOR with patch("os.environ", {"EDITOR": "ed"}): - main(["new", tmpdir, "--database", dburi_sqlite3]) + main(["new", str(tmpdir), "--database", dburi_sqlite3]) assert self.subprocess.call.call_args == call(["ed", tms.Unicode()]) # Otherwise, vi with patch("os.environ", {}): - main(["new", tmpdir, "--database", dburi_sqlite3]) == call( + main(["new", str(tmpdir), "--database", dburi_sqlite3]) == call( ["vi", tms.Unicode()] ) @@ -477,44 +477,50 @@ class TestNewMigration(TestInteractiveScript): # file, which should not be the case. assert self.prompt.call_args_list == [] - @with_migrations() def test_it_pulls_message_from_docstring(self, tmpdir): def write_migration(argv): with io.open(argv[-1], "w", encoding="utf8") as f: f.write('"""\ntest docstring\nsplit over\n\nlines\n"""\n') self.subprocess.call = write_migration - main(["new", tmpdir, "--database", dburi_sqlite3]) - names = [n for n in sorted(os.listdir(tmpdir)) if n.endswith(".py")] + main(["new", str(tmpdir), "--database", dburi_sqlite3]) + names = [n for n in sorted(os.listdir(str(tmpdir))) if n.endswith(".py")] assert "test-docstring" in names[0] - @with_migrations() def test_it_prompts_to_reedit_bad_migration(self, tmpdir): def write_migration(argv): with io.open(argv[-1], "w", encoding="utf8") as f: f.write("this is not valid python!") self.subprocess.call = write_migration - main(["new", tmpdir, "--database", dburi_sqlite3]) + main(["new", str(tmpdir), "--database", dburi_sqlite3]) prompts = [ args[0].lower() for args, kwargs in self.prompt.call_args_list ] assert "retry editing?" in prompts[0] - @with_migrations() def test_it_defaults_docstring_to_message(self, tmpdir): - main(["new", "-b", "-m", "your ad here", tmpdir, "--database", dburi_sqlite3]) + main( + [ + "new", + "-b", + "-m", + "your ad here", + str(tmpdir), + "--database", + dburi_sqlite3, + ] + ) names = [n for n in sorted(os.listdir(tmpdir)) if n.endswith(".py")] with io.open( - os.path.join(tmpdir, names[0]), "r", encoding="utf-8" + os.path.join(str(tmpdir), names[0]), "r", encoding="utf-8" ) as f: assert "your ad here" in f.read() - @with_migrations() def test_it_calls_post_create_command(self, tmpdir): self.writeconfig(post_create_command="/bin/ls -l {} {}") with frozendate.freeze(2001, 1, 1): - main(["new", "-b", tmpdir, "--database", dburi_sqlite3]) + main(["new", "-b", str(tmpdir), "--database", dburi_sqlite3]) is_filename = tms.Str( lambda s: os.path.basename(s).startswith("20010101_01_") ) @@ -522,36 +528,35 @@ class TestNewMigration(TestInteractiveScript): ["/bin/ls", "-l", is_filename, is_filename] ) - @with_migrations() def test_it_uses_configured_prefix(self, tmpdir): self.writeconfig(prefix="foo_") - main(["new", "-b", "-m", "bar", tmpdir, "--database", dburi_sqlite3]) - names = [n for n in sorted(os.listdir(tmpdir)) if n.endswith(".py")] + main(["new", "-b", "-m", "bar", str(tmpdir), "--database", dburi_sqlite3]) + names = [n for n in sorted(os.listdir(str(tmpdir))) if n.endswith(".py")] assert re.match("foo_.*-bar", names[0]) is not None - @with_migrations(m1="") - def test_it_creates_sql_file(self, tmpdir): - main( - [ - "new", - "-b", - "-m", - "comment", - "--sql", - tmpdir, - "--database", - dburi_sqlite3, - ] - ) - name = next(n for n in sorted(os.listdir(tmpdir)) if n.endswith(".sql")) - with io.open(os.path.join(tmpdir, name), "r", encoding="utf-8") as f: - assert f.read() == textwrap.dedent( - """\ - -- comment - -- depends: m1 - - """ + def test_it_creates_sql_file(self): + with migrations_dir(m1="") as tmpdir: + main( + [ + "new", + "-b", + "-m", + "comment", + "--sql", + tmpdir, + "--database", + dburi_sqlite3, + ] ) + name = next(n for n in sorted(os.listdir(tmpdir)) if n.endswith(".sql")) + with io.open(os.path.join(tmpdir, name), "r", encoding="utf-8") as f: + assert f.read() == textwrap.dedent( + """\ + -- comment + -- depends: m1 + + """ + ) class TestList(TestInteractiveScript): @@ -561,7 +566,7 @@ class TestList(TestInteractiveScript): main(["list", tmpdir, "--database", dburi_sqlite3]) return captured.getvalue() - def test_it_lists_migrations(self, tmpdir): + def test_it_lists_migrations(self): output = self.get_output({"m1": "", "m2": ""}) assert re.search(r"^U\s+m1", output, re.M) assert re.search(r"^U\s+m2", output, re.M) @@ -576,7 +581,7 @@ class TestDevelopCommand(TestInteractiveScript): ("m2", "apply"), ] - def test_it_reapplies_last_migration(self, tmpdir): + def test_it_reapplies_last_migration(self): with migrations_dir( m1='step("CREATE TABLE yoyo_test1 (id INT)", "DROP TABLE yoyo_test1")', m2='step("CREATE TABLE yoyo_test2 (id INT)", "DROP TABLE yoyo_test2")', @@ -594,7 +599,7 @@ class TestDevelopCommand(TestInteractiveScript): ("m2", "apply"), ] - def test_it_reapplies_last_n_migrations(self, tmpdir): + def test_it_reapplies_last_n_migrations(self): with migrations_dir( m1='step("CREATE TABLE yoyo_test1 (id INT)", "DROP TABLE yoyo_test1")', m2='step("CREATE TABLE yoyo_test2 (id INT)", "DROP TABLE yoyo_test2")', diff --git a/yoyo/tests/test_migrations.py b/yoyo/tests/test_migrations.py index 1568626..68c7653 100644 --- a/yoyo/tests/test_migrations.py +++ b/yoyo/tests/test_migrations.py @@ -26,119 +26,122 @@ from yoyo import read_migrations from yoyo import exceptions from yoyo import ancestors, descendants -from yoyo.tests import with_migrations, migrations_dir, dburi_sqlite3 +from yoyo.tests import migrations_dir, dburi_sqlite3 from yoyo.tests import clear_database from yoyo.tests import tempdir from yoyo.migrations import topological_sort, MigrationList from yoyo.scripts import newmigration -@with_migrations( - """ - step("CREATE TABLE yoyo_test (id INT)") - """, - """ -step("INSERT INTO yoyo_test VALUES (1)") -step("INSERT INTO yoyo_test VALUES ('x', 'y')") - """, -) -def test_transaction_is_not_committed_on_error(tmpdir): - backend = get_backend(dburi_sqlite3) - try: - migrations = read_migrations(tmpdir) - with pytest.raises(backend.DatabaseError): +def test_transaction_is_not_committed_on_error(): + with migrations_dir( + 'step("CREATE TABLE yoyo_test (id INT)")', + """ + step("INSERT INTO yoyo_test VALUES (1)") + step("INSERT INTO yoyo_test VALUES ('x', 'y')") + """, + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + try: + migrations = read_migrations(tmpdir) + with pytest.raises(backend.DatabaseError): + backend.apply_migrations(migrations) + backend.rollback() + cursor = backend.cursor() + cursor.execute("SELECT count(1) FROM yoyo_test") + assert cursor.fetchone() == (0,) + finally: + clear_database(backend) + + +def test_rollbacks_happen_in_reverse(): + with migrations_dir( + 'step("CREATE TABLE yoyo_test (id INT)")', + """ + step( + "INSERT INTO yoyo_test VALUES (1)", "DELETE FROM yoyo_test WHERE id=1" + ) + step( + "UPDATE yoyo_test SET id=2 WHERE id=1", + "UPDATE yoyo_test SET id=1 WHERE id=2" + ) + """, + ) as tmpdir: + try: + backend = get_backend(dburi_sqlite3) + migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) - backend.rollback() - cursor = backend.cursor() - cursor.execute("SELECT count(1) FROM yoyo_test") - assert cursor.fetchone() == (0,) - finally: - clear_database(backend) + cursor = backend.cursor() + cursor.execute("SELECT * FROM yoyo_test") + assert cursor.fetchall() == [(2,)] + backend.rollback_migrations(migrations) + cursor.execute("SELECT * FROM yoyo_test") + assert cursor.fetchall() == [] + finally: + clear_database(backend) -@with_migrations( - 'step("CREATE TABLE yoyo_test (id INT)")', - """ -step("INSERT INTO yoyo_test VALUES (1)", "DELETE FROM yoyo_test WHERE id=1") -step("UPDATE yoyo_test SET id=2 WHERE id=1", "UPDATE yoyo_test SET id=1 WHERE id=2") - """, -) -def test_rollbacks_happen_in_reverse(tmpdir): - try: +def test_execution_continues_with_ignore_errors(): + with migrations_dir( + """ + step("CREATE TABLE yoyo_test (id INT)") + step("INSERT INTO yoyo_test VALUES (1)") + step("INSERT INTO yoyo_test VALUES ('a', 'b')", ignore_errors='all') + step("INSERT INTO yoyo_test VALUES (2)") + """ + ) as tmpdir: + try: + backend = get_backend(dburi_sqlite3) + migrations = read_migrations(tmpdir) + backend.apply_migrations(migrations) + cursor = backend.cursor() + cursor.execute("SELECT * FROM yoyo_test") + assert cursor.fetchall() == [(1,), (2,)] + finally: + clear_database(backend) + + +def test_execution_continues_with_ignore_errors_in_transaction(): + with migrations_dir( + """ + from yoyo import step, group + step("CREATE TABLE yoyo_test (id INT)") + group( + step("INSERT INTO yoyo_test VALUES (1)"), + step("INSERT INTO yoyo_test VALUES ('a', 'b')"), + ignore_errors='all' + ) + step("INSERT INTO yoyo_test VALUES (2)") + """ + ) as tmpdir: backend = get_backend(dburi_sqlite3) migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) cursor = backend.cursor() cursor.execute("SELECT * FROM yoyo_test") assert cursor.fetchall() == [(2,)] - backend.rollback_migrations(migrations) - cursor.execute("SELECT * FROM yoyo_test") - assert cursor.fetchall() == [] - finally: - clear_database(backend) -@with_migrations( - """ - step("CREATE TABLE yoyo_test (id INT)") - step("INSERT INTO yoyo_test VALUES (1)") - step("INSERT INTO yoyo_test VALUES ('a', 'b')", ignore_errors='all') - step("INSERT INTO yoyo_test VALUES (2)") - """ -) -def test_execution_continues_with_ignore_errors(tmpdir): - try: +def test_rollbackignores_errors(): + with migrations_dir( + """ + step("CREATE TABLE yoyo_test (id INT)") + step("INSERT INTO yoyo_test VALUES (1)", + "DELETE FROM yoyo_test WHERE id=2") + step("UPDATE yoyo_test SET id=2 WHERE id=1", + "SELECT nonexistent FROM imaginary", ignore_errors='rollback') + """ + ) as tmpdir: backend = get_backend(dburi_sqlite3) migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) cursor = backend.cursor() cursor.execute("SELECT * FROM yoyo_test") - assert cursor.fetchall() == [(1,), (2,)] - finally: - clear_database(backend) - - -@with_migrations( - """ - from yoyo import step, group - step("CREATE TABLE yoyo_test (id INT)") - group( - step("INSERT INTO yoyo_test VALUES (1)"), - step("INSERT INTO yoyo_test VALUES ('a', 'b')"), - ignore_errors='all' - ) - step("INSERT INTO yoyo_test VALUES (2)") - """ -) -def test_execution_continues_with_ignore_errors_in_transaction(tmpdir): - backend = get_backend(dburi_sqlite3) - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - cursor = backend.cursor() - cursor.execute("SELECT * FROM yoyo_test") - assert cursor.fetchall() == [(2,)] - - -@with_migrations( - """ - step("CREATE TABLE yoyo_test (id INT)") - step("INSERT INTO yoyo_test VALUES (1)", - "DELETE FROM yoyo_test WHERE id=2") - step("UPDATE yoyo_test SET id=2 WHERE id=1", - "SELECT nonexistent FROM imaginary", ignore_errors='rollback') - """ -) -def test_rollbackignores_errors(tmpdir): - backend = get_backend(dburi_sqlite3) - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - cursor = backend.cursor() - cursor.execute("SELECT * FROM yoyo_test") - assert cursor.fetchall() == [(2,)] + assert cursor.fetchall() == [(2,)] - backend.rollback_migrations(migrations) - cursor.execute("SELECT * FROM yoyo_test") - assert cursor.fetchall() == [] + backend.rollback_migrations(migrations) + cursor.execute("SELECT * FROM yoyo_test") + assert cursor.fetchall() == [] def test_migration_is_committed(backend): @@ -146,19 +149,20 @@ def test_migration_is_committed(backend): migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) - backend.rollback() - rows = backend.execute("SELECT * FROM yoyo_test").fetchall() - assert list(rows) == [] + backend.rollback() + rows = backend.execute("SELECT * FROM yoyo_test").fetchall() + assert list(rows) == [] def test_rollback_happens_on_step_failure(backend): with migrations_dir( """ - step("", - "CREATE TABLE yoyo_is_rolledback (i INT)"), - step("CREATE TABLE yoyo_test (s VARCHAR(100))", - "DROP TABLE yoyo_test") - step("invalid sql!")""" + step("", + "CREATE TABLE yoyo_is_rolledback (i INT)"), + step("CREATE TABLE yoyo_test (s VARCHAR(100))", + "DROP TABLE yoyo_test") + step("invalid sql!") + """ ) as tmpdir: migrations = read_migrations(tmpdir) with pytest.raises(backend.DatabaseError): @@ -180,87 +184,87 @@ def test_rollback_happens_on_step_failure(backend): assert list(cursor.fetchall()) == [] -@with_migrations( - """ - step("CREATE TABLE yoyo_test (id INT)") - step("DROP TABLE yoyo_test") - """ -) def test_specify_migration_table(tmpdir, dburi): - backend = get_backend(dburi, migration_table="another_migration_table") - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - cursor = backend.cursor() - cursor.execute("SELECT migration_id FROM another_migration_table") - assert cursor.fetchall() == [("0",)] + with migrations_dir( + """ + step("CREATE TABLE yoyo_test (id INT)") + step("DROP TABLE yoyo_test") + """ + ) as tmpdir: + backend = get_backend(dburi, migration_table="another_migration_table") + migrations = read_migrations(tmpdir) + backend.apply_migrations(migrations) + cursor = backend.cursor() + cursor.execute("SELECT migration_id FROM another_migration_table") + assert cursor.fetchall() == [("0",)] -@with_migrations( - """ - def foo(conn): - conn.cursor().execute("CREATE TABLE foo_test (id INT)") - conn.cursor().execute("INSERT INTO foo_test VALUES (1)") - def bar(conn): - foo(conn) - step(bar) - """ -) -def test_migration_functions_have_namespace_access(tmpdir): +def test_migration_functions_have_namespace_access(): """ Test that functions called via step have access to the script namespace """ - backend = get_backend(dburi_sqlite3) - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - cursor = backend.cursor() - cursor.execute("SELECT id FROM foo_test") - assert cursor.fetchall() == [(1,)] + with migrations_dir( + """ + def foo(conn): + conn.cursor().execute("CREATE TABLE foo_test (id INT)") + conn.cursor().execute("INSERT INTO foo_test VALUES (1)") + def bar(conn): + foo(conn) + step(bar) + """ + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + migrations = read_migrations(tmpdir) + backend.apply_migrations(migrations) + cursor = backend.cursor() + cursor.execute("SELECT id FROM foo_test") + assert cursor.fetchall() == [(1,)] -@with_migrations( - """ - from yoyo import group, step - step("CREATE TABLE yoyo_test (id INT)") - group(step("INSERT INTO yoyo_test VALUES (1)")), - """ -) -def test_migrations_can_import_step_and_group(tmpdir): - backend = get_backend(dburi_sqlite3) - migrations = read_migrations(tmpdir) - backend.apply_migrations(migrations) - cursor = backend.cursor() - cursor.execute("SELECT id FROM yoyo_test") - assert cursor.fetchall() == [(1,)] +def test_migrations_can_import_step_and_group(): + with migrations_dir( + """ + from yoyo import group, step + step("CREATE TABLE yoyo_test (id INT)") + group(step("INSERT INTO yoyo_test VALUES (1)")), + """ + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + migrations = read_migrations(tmpdir) + backend.apply_migrations(migrations) + cursor = backend.cursor() + cursor.execute("SELECT id FROM yoyo_test") + assert cursor.fetchall() == [(1,)] -@with_migrations( - """ - step("CREATE TABLE yoyo_test (id INT, c VARCHAR(1))") - step("INSERT INTO yoyo_test VALUES (1, 'a')") - step("INSERT INTO yoyo_test VALUES (2, 'b')") - step("SELECT * FROM yoyo_test") - """ -) -def test_migrations_display_selected_data(tmpdir): - backend = get_backend(dburi_sqlite3) - migrations = read_migrations(tmpdir) - with patch("yoyo.migrations.sys.stdout") as stdout: - backend.apply_migrations(migrations) - written = "".join(a[0] for a, kw in stdout.write.call_args_list) - assert written == ( - " id | c \n" "----+---\n" " 1 | a \n" " 2 | b \n" "(2 rows)\n" - ) +def test_migrations_display_selected_data(): + with migrations_dir( + """ + step("CREATE TABLE yoyo_test (id INT, c VARCHAR(1))") + step("INSERT INTO yoyo_test VALUES (1, 'a')") + step("INSERT INTO yoyo_test VALUES (2, 'b')") + step("SELECT * FROM yoyo_test") + """ + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + migrations = read_migrations(tmpdir) + with patch("yoyo.migrations.sys.stdout") as stdout: + backend.apply_migrations(migrations) + written = "".join(a[0] for a, kw in stdout.write.call_args_list) + assert written == ( + " id | c \n" "----+---\n" " 1 | a \n" " 2 | b \n" "(2 rows)\n" + ) def test_grouped_migrations_can_be_rolled_back(backend): - with with_migrations( + with migrations_dir( a="from yoyo import step\n" 'steps = [step("CREATE TABLE p (n INT)",' ' "DROP TABLE p")]' ) as t1: backend.apply_migrations(read_migrations(t1)) - with with_migrations( + with migrations_dir( b="from yoyo import group\n" "from yoyo import step\n" "steps = [\n" @@ -428,41 +432,43 @@ class TestAncestorsDescendants(object): class TestReadMigrations(object): - @with_migrations(**{newmigration.tempfile_prefix + "test": ""}) - def test_it_ignores_yoyo_new_tmp_files(self, tmpdir): + def test_it_ignores_yoyo_new_tmp_files(self): """ The yoyo new command creates temporary files in the migrations directory. These shouldn't be picked up by yoyo apply etc """ - assert len(read_migrations(tmpdir)) == 0 + with migrations_dir( + **{newmigration.tempfile_prefix + "test": ""} + ) as tmpdir: + assert len(read_migrations(tmpdir)) == 0 - @with_migrations(**{"post-apply": """step('SELECT 1')"""}) - def test_it_loads_post_apply_scripts(self, tmpdir): - migrations = read_migrations(tmpdir) - assert len(migrations) == 0 + def test_it_loads_post_apply_scripts(self): + with migrations_dir(**{"post-apply": "step('SELECT 1')"}) as tmpdir: + migrations = read_migrations(tmpdir) + assert len(migrations) == 0 assert len(migrations.post_apply) == 1 - @with_migrations(**{"a": """step('SELECT 1')"""}) - def test_it_does_not_add_duplicate_steps(self, tmpdir): - m = read_migrations(tmpdir)[0] - m.load() - assert len(m.steps) == 1 + def test_it_does_not_add_duplicate_steps(self): + with migrations_dir("step('SELECT 1')") as tmpdir: + m = read_migrations(tmpdir)[0] + m.load() + assert len(m.steps) == 1 - m = read_migrations(tmpdir)[0] - m.load() - assert len(m.steps) == 1 + m = read_migrations(tmpdir)[0] + m.load() + assert len(m.steps) == 1 - @with_migrations(**{"a": """from yoyo import step; step('SELECT 1')"""}) def test_it_does_not_add_duplicate_steps_with_imported_symbols( self, tmpdir ): - m = read_migrations(tmpdir)[0] - m.load() - assert len(m.steps) == 1 + with migrations_dir(a="from yoyo import step; step('SELECT 1')") as tmpdir: + m = read_migrations(tmpdir)[0] + m.load() + assert len(m.steps) == 1 - m = read_migrations(tmpdir)[0] - m.load() - assert len(m.steps) == 1 + m = read_migrations(tmpdir)[0] + m.load() + assert len(m.steps) == 1 def test_it_reads_from_package_data(self): migrations = read_migrations("package:yoyo:tests/migrations") @@ -594,32 +600,32 @@ class TestPostApplyHooks(object): _apply_migrations() assert count_postapply_calls() == 2 - @with_migrations( - **{ - "a": "step('create table postapply (i int)')", - "post-apply": "step('insert into postapply values (1)')", - "post-apply2": "step('insert into postapply values (2)')", - } - ) - def test_it_runs_multiple_post_apply_hooks(self, tmpdir): - backend = get_backend(dburi_sqlite3) - backend.apply_migrations(backend.to_apply(read_migrations(tmpdir))) - cursor = backend.cursor() - cursor.execute("SELECT * FROM postapply") - assert cursor.fetchall() == [(1,), (2,)] - - @with_migrations( - **{ - "a": "step('create table postapply (i int)')", - "post-apply": "step('insert into postapply values (1)')", - } - ) - def test_apply_migrations_only_does_not_run_hooks(self, tmpdir): - backend = get_backend(dburi_sqlite3) - backend.apply_migrations_only(backend.to_apply(read_migrations(tmpdir))) - cursor = backend.cursor() - cursor.execute("SELECT * FROM postapply") - assert cursor.fetchall() == [] + def test_it_runs_multiple_post_apply_hooks(self): + with migrations_dir( + **{ + "a": "step('create table postapply (i int)')", + "post-apply": "step('insert into postapply values (1)')", + "post-apply2": "step('insert into postapply values (2)')", + } + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + backend.apply_migrations(backend.to_apply(read_migrations(tmpdir))) + cursor = backend.cursor() + cursor.execute("SELECT * FROM postapply") + assert cursor.fetchall() == [(1,), (2,)] + + def test_apply_migrations_only_does_not_run_hooks(self, backend): + with migrations_dir( + **{ + "a": "step('create table postapply (i int)')", + "post-apply": "step('insert into postapply values (1)')", + } + ) as tmpdir: + backend = get_backend(dburi_sqlite3) + backend.apply_migrations_only(backend.to_apply(read_migrations(tmpdir))) + cursor = backend.cursor() + cursor.execute("SELECT * FROM postapply") + assert cursor.fetchall() == [] class TestLogging(object): @@ -639,9 +645,7 @@ class TestLogging(object): return backend.execute("SELECT count(1) FROM _yoyo_log").fetchone()[0] def test_it_logs_apply_and_rollback(self, backend): - with with_migrations( - a='step("CREATE TABLE yoyo_test (id INT)")' - ) as tmpdir: + with migrations_dir(a='step("CREATE TABLE yoyo_test (id INT)")') as tmpdir: migrations = read_migrations(tmpdir) backend.apply_migrations(migrations) assert self.get_log_count(backend) == 1 @@ -661,9 +665,7 @@ class TestLogging(object): assert logged["created_at_utc"] >= apply_time def test_it_logs_mark_and_unmark(self, backend): - with with_migrations( - a='step("CREATE TABLE yoyo_test (id INT)")' - ) as tmpdir: + with migrations_dir(a='step("CREATE TABLE yoyo_test (id INT)")') as tmpdir: migrations = read_migrations(tmpdir) backend.mark_migrations(migrations) assert self.get_log_count(backend) == 1 -- cgit v1.2.1