summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/changelog.rst4
-rw-r--r--migrate/__init__.pycbin381 -> 0 bytes
-rw-r--r--migrate/versioning/genmodel.py2
-rw-r--r--migrate/versioning/util/__init__.py16
-rw-r--r--setup.cfg4
-rw-r--r--setup.py2
-rw-r--r--test/fixture/database.py19
-rw-r--r--test/fixture/models.py11
-rw-r--r--test/fixture/shell.py57
-rw-r--r--test/versioning/test_api.py28
-rw-r--r--test/versioning/test_schemadiff.py4
-rw-r--r--test/versioning/test_script.py3
-rw-r--r--test/versioning/test_shell.py529
13 files changed, 310 insertions, 369 deletions
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 9792808..5e37fcb 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -1,14 +1,14 @@
0.5.5
-----
-- added `populate_default` bool argument to :meth:`Column.create <migrate.changeset.schema.ChangesetColumn.create>` which issues corresponding UPDATE statements to set defaults after column creation
-- url parameter can also be an Engine instance (this usage is discouraged though sometimes necessary)
+- `url` parameter can also be an :class:`Engine` instance (this usage is discouraged though sometimes necessary)
- added support for SQLAlchemy 0.6 (missing oracle and firebird) by Michael Bayer
- alter, create, drop column / rename table / rename index constructs now accept `alter_metadata` parameter. If True, it will modify Column/Table objects according to changes. Otherwise, everything will be untouched.
- complete refactoring of :class:`~migrate.changeset.schema.ColumnDelta` (fixes issue 23)
- added support for :ref:`firebird <firebird-d>`
- fixed bug when :meth:`Column.alter <migrate.changeset.schema.ChangesetColumn.alter>`\(server_default='string') was not properly set
- `server_defaults` passed to :meth:`Column.create <migrate.changeset.schema.ChangesetColumn.create>` are now issued correctly
+- added `populate_default` bool argument to :meth:`Column.create <migrate.changeset.schema.ChangesetColumn.create>` which issues corresponding UPDATE statements to set defaults after column creation
- constraints passed to :meth:`Column.create <migrate.changeset.schema.ChangesetColumn.create>` are correctly interpreted (``ALTER TABLE ADD CONSTRAINT`` is issued after ``ATLER TABLE ADD COLUMN``)
- :meth:`Column.create <migrate.changeset.schema.ChangesetColumn.create>` accepts `primary_key_name`, `unique_name` and `index_name` as string value which is used as contraint name when adding a column
- Constraint classes have `cascade=True` keyword argument to issue ``DROP CASCADE`` where supported
diff --git a/migrate/__init__.pyc b/migrate/__init__.pyc
deleted file mode 100644
index 26f737f..0000000
--- a/migrate/__init__.pyc
+++ /dev/null
Binary files differ
diff --git a/migrate/versioning/genmodel.py b/migrate/versioning/genmodel.py
index ba455b0..114d985 100644
--- a/migrate/versioning/genmodel.py
+++ b/migrate/versioning/genmodel.py
@@ -140,7 +140,7 @@ class ModelGenerator(object):
upgradeCommands.append("%(table)s.create()" % {'table': tableName})
downgradeCommands.append("%(table)s.drop()" % {'table': tableName})
- pre_command = 'meta.bind(migrate_engine)'
+ pre_command = ' meta.bind = migrate_engine'
return (
'\n'.join(decls),
diff --git a/migrate/versioning/util/__init__.py b/migrate/versioning/util/__init__.py
index f5f8edc..70370ac 100644
--- a/migrate/versioning/util/__init__.py
+++ b/migrate/versioning/util/__init__.py
@@ -130,3 +130,19 @@ def construct_engine(engine, **opts):
kwargs[key[11:]] = guess_obj_type(value)
return create_engine(engine, **kwargs)
+
+
+class Memoize:
+ """Memoize(fn) - an instance which acts like fn but memoizes its arguments
+ Will only work on functions with non-mutable arguments
+
+ ActiveState Code 52201
+ """
+ def __init__(self, fn):
+ self.fn = fn
+ self.memo = {}
+
+ def __call__(self, *args):
+ if not self.memo.has_key(args):
+ self.memo[args] = self.fn(*args)
+ return self.memo[args]
diff --git a/setup.cfg b/setup.cfg
index 05ad7a5..1229d67 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -7,8 +7,8 @@ tag_svn_revision = 1
tag_build = .dev
[nosetests]
-#pdb = true
-#pdb-failures = true
+pdb = true
+pdb-failures = true
#stop = true
[aliases]
diff --git a/setup.py b/setup.py
index d3c5295..bb4a149 100644
--- a/setup.py
+++ b/setup.py
@@ -14,7 +14,7 @@ try:
except ImportError:
pass
-test_requirements = ['nose >= 0.10']
+test_requirements = ['nose >= 0.10', 'ScriptTest']
required_deps = ['sqlalchemy >= 0.5', 'decorator']
readme_file = open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'README'))
diff --git a/test/fixture/database.py b/test/fixture/database.py
index c6360eb..4d051a1 100644
--- a/test/fixture/database.py
+++ b/test/fixture/database.py
@@ -7,23 +7,26 @@ from decorator import decorator
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.orm import create_session
+from migrate.versioning.util import Memoize
from test.fixture.base import Base
from test.fixture.pathed import Pathed
+@Memoize
def readurls():
"""read URLs from config file return a list"""
- filename = 'test_db.cfg'
- ret = list()
# TODO: remove tmpfile since sqlite can store db in memory
- tmpfile = Pathed.tmp()
+ filename = 'test_db.cfg'
fullpath = os.path.join(os.curdir, filename)
try:
fd = open(fullpath)
except IOError:
raise IOError("""You must specify the databases to use for testing!
- Copy %(filename)s.tmpl to %(filename)s and edit your database URLs.""" % locals())
+Copy %(filename)s.tmpl to %(filename)s and edit your database URLs.""" % locals())
+
+ ret = list()
+ tmpfile = Pathed.tmp()
for line in fd:
if line.startswith('#'):
@@ -48,10 +51,6 @@ def is_supported(url, supported, not_supported):
return not (db in not_supported)
return True
-# we make the engines global, which should make the tests run a bit faster
-urls = readurls()
-engines = dict([(url, create_engine(url, echo=True)) for url in urls])
-
def usedb(supported=None, not_supported=None):
"""Decorates tests to be run with a database connection
@@ -66,6 +65,7 @@ def usedb(supported=None, not_supported=None):
if supported is not None and not_supported is not None:
raise AssertionError("Can't specify both supported and not_supported in fixture.db()")
+ urls = readurls()
my_urls = [url for url in urls if is_supported(url, supported, not_supported)]
@decorator
@@ -98,7 +98,7 @@ class DB(Base):
def _connect(self, url):
self.url = url
- self.engine = engines[url]
+ self.engine = create_engine(url, echo=True)
self.meta = MetaData(bind=self.engine)
if self.level < self.CONNECT:
return
@@ -127,6 +127,7 @@ class DB(Base):
return not (db in func.not_supported)
# Neither list assigned; assume all are supported
return True
+
def _not_supported(self, url):
return not self._supported(url)
diff --git a/test/fixture/models.py b/test/fixture/models.py
new file mode 100644
index 0000000..4454bf5
--- /dev/null
+++ b/test/fixture/models.py
@@ -0,0 +1,11 @@
+from sqlalchemy import *
+
+# test rundiffs in shell
+meta_old_rundiffs = MetaData()
+meta_rundiffs = MetaData()
+
+tmp_account_rundiffs = Table('tmp_account_rundiffs', meta_rundiffs,
+ Column('id', Integer, primary_key=True),
+ Column('login', String(40)),
+ Column('passwd', String(40)),
+)
diff --git a/test/fixture/shell.py b/test/fixture/shell.py
index 2579973..ad8262b 100644
--- a/test/fixture/shell.py
+++ b/test/fixture/shell.py
@@ -6,51 +6,22 @@ import shutil
import sys
import types
+from scripttest import TestFileEnvironment
+
from test.fixture.pathed import *
class Shell(Pathed):
"""Base class for command line tests"""
- def execute(self, command, *p, **k):
- """Return the fd of a command; can get output (stdout/err) and exitcode"""
- # We might be passed a file descriptor for some reason; if so, just return it
- if isinstance(command, types.FileType):
- return command
-
- # Redirect stderr to stdout
- # This is a bit of a hack, but I've not found a better way
- py_path = os.environ.get('PYTHONPATH', '')
- py_path_list = py_path.split(':')
- py_path_list.append(os.path.abspath('.'))
- os.environ['PYTHONPATH'] = ':'.join(py_path_list)
- fd = os.popen(command + ' 2>&1')
-
- if py_path:
- py_path = os.environ['PYTHONPATH'] = py_path
- else:
- del os.environ['PYTHONPATH']
- return fd
-
- def output_and_exitcode(self, *p, **k):
- fd=self.execute(*p, **k)
- output = fd.read().strip()
- exitcode = fd.close()
- if k.pop('emit',False):
- print output
- return (output, exitcode)
-
- def exitcode(self, *p, **k):
- """Execute a command and return its exit code
- ...without printing its output/errors
- """
- ret = self.output_and_exitcode(*p, **k)
- return ret[1]
-
- def assertFailure(self, *p, **k):
- output,exitcode = self.output_and_exitcode(*p, **k)
- assert (exitcode), output
-
- def assertSuccess(self, *p, **k):
- output,exitcode = self.output_and_exitcode(*p, **k)
- #self.assert_(not exitcode, output)
- assert (not exitcode), output
+
+ def setUp(self):
+ super(Shell, self).setUp()
+ self.env = TestFileEnvironment(os.path.join(self.temp_usable_dir, 'env'))
+
+ def run_version(self, repos_path):
+ result = self.env.run('migrate version %s' % repos_path)
+ return int(result.stdout.strip())
+
+ def run_db_version(self, url, repos_path):
+ result = self.env.run('migrate db_version %s %s' % (url, repos_path))
+ return int(result.stdout.strip())
diff --git a/test/versioning/test_api.py b/test/versioning/test_api.py
new file mode 100644
index 0000000..f13ea1a
--- /dev/null
+++ b/test/versioning/test_api.py
@@ -0,0 +1,28 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+from migrate.versioning import api
+from migrate.versioning.exceptions import *
+
+from test.fixture.pathed import *
+
+
+class TestAPI(Pathed):
+
+ def test_help(self):
+ self.assertTrue(isinstance(api.help('help'), basestring))
+ self.assertRaises(UsageError, api.help)
+ self.assertRaises(UsageError, api.help, 'foobar')
+ self.assert_(isinstance(api.help('create'), str))
+
+ def test_help_commands(self):
+ pass
+
+ def test_create(self):
+ pass
+
+ def test_script(self):
+ pass
+
+ def test_script_sql(self):
+ pass
diff --git a/test/versioning/test_schemadiff.py b/test/versioning/test_schemadiff.py
index c82e5b1..5ee463a 100644
--- a/test/versioning/test_schemadiff.py
+++ b/test/versioning/test_schemadiff.py
@@ -63,10 +63,10 @@ class TestSchemaDiff(fixture.DB):
)
''')
self.assertEqualsIgnoreWhitespace(upgradeCommands,
- '''meta.bind(migrate_engine)
+ '''meta.bind = migrate_engine
tmp_schemadiff.create()''')
self.assertEqualsIgnoreWhitespace(downgradeCommands,
- '''meta.bind(migrate_engine)
+ '''meta.bind = migrate_engine
tmp_schemadiff.drop()''')
# Create table in database, now model should match database.
diff --git a/test/versioning/test_script.py b/test/versioning/test_script.py
index ee31842..16fa166 100644
--- a/test/versioning/test_script.py
+++ b/test/versioning/test_script.py
@@ -60,7 +60,7 @@ class TestPyScript(fixture.Pathed, fixture.DB):
"""Correctly verify a python migration script: invalid python file"""
path=self.tmp_py()
# Create empty file
- f=open(path,'w')
+ f = open(path,'w')
f.write("def fail")
f.close()
self.assertRaises(Exception,self.cls.verify_module,path)
@@ -176,7 +176,6 @@ User = Table('User', meta,
self.repo = repository.Repository.create(self.repo_path, 'repo')
self.pyscript = PythonScript.create(self.script_path)
-
def write_file(self, path, contents):
f = open(path, 'w')
f.write(contents)
diff --git a/test/versioning/test_shell.py b/test/versioning/test_shell.py
index 3544502..a4d2b64 100644
--- a/test/versioning/test_shell.py
+++ b/test/versioning/test_shell.py
@@ -2,128 +2,40 @@
# -*- coding: utf-8 -*-
import os
-import sys
-import shutil
-import traceback
-from types import FileType
-from StringIO import StringIO
+import tempfile
-from sqlalchemy import MetaData,Table
+from sqlalchemy import MetaData, Table
-from migrate.versioning.repository import Repository
from migrate.versioning import genmodel, shell, api
+from migrate.versioning.repository import Repository
from migrate.versioning.exceptions import *
-from test import fixture
-
-
-class Shell(fixture.Shell):
-
- _cmd = os.path.join(sys.executable + ' migrate', 'versioning', 'shell.py')
-
- @classmethod
- def cmd(cls, *args):
- safe_parameters = map(lambda arg: str(arg), args)
- return ' '.join([cls._cmd] + safe_parameters)
-
- def execute(self, shell_cmd, runshell=None, **kwargs):
- """A crude simulation of a shell command, to speed things up"""
-
- # If we get an fd, the command is already done
- if isinstance(shell_cmd, (FileType, StringIO)):
- return shell_cmd
-
- # Analyze the command; see if we can 'fake' the shell
- try:
- # Forced to run in shell?
- # if runshell or '--runshell' in sys.argv:
- if runshell:
- raise Exception
- # Remove the command prefix
- if not shell_cmd.startswith(self._cmd):
- raise Exception
- cmd = shell_cmd[(len(self._cmd) + 1):]
- params = cmd.split(' ')
- command = params[0]
- except:
- return super(Shell, self).execute(shell_cmd)
-
- # Redirect stdout to an object; redirect stderr to stdout
- fd = StringIO()
- orig_stdout = sys.stdout
- orig_stderr = sys.stderr
- sys.stdout = fd
- sys.stderr = fd
- # Execute this command
- try:
- try:
- shell.main(params, **kwargs)
- except SystemExit, e:
- # Simulate the exit status
- fd_close = fd.close
- def close_():
- fd_close()
- return e.args[0]
- fd.close = close_
- except Exception, e:
- # Print the exception, but don't re-raise it
- traceback.print_exc()
- # Simulate a nonzero exit status
- fd_close = fd.close
- def close_():
- fd_close()
- return 2
- fd.close = close_
- finally:
- # Clean up
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
- fd.seek(0)
- return fd
-
- def cmd_version(self, repos_path):
- fd = self.execute(self.cmd('version', repos_path))
- result = int(fd.read().strip())
- self.assertSuccess(fd)
- return result
-
- def cmd_db_version(self, url, repos_path):
- fd = self.execute(self.cmd('db_version', url, repos_path))
- txt = fd.read()
- #print txt
- ret = int(txt.strip())
- self.assertSuccess(fd)
- return ret
+from test.fixture import *
+
class TestShellCommands(Shell):
"""Tests migrate.py commands"""
def test_help(self):
"""Displays default help dialog"""
- self.assertSuccess(self.cmd('-h'), runshell=True)
- self.assertSuccess(self.cmd('--help'), runshell=True)
- self.assertSuccess(self.cmd('help'), runshell=True)
- self.assertSuccess(self.cmd('help'))
-
- self.assertRaises(UsageError, api.help)
- self.assertRaises(UsageError, api.help, 'foobar')
- self.assert_(isinstance(api.help('create'), str))
+ self.assertEqual(self.env.run('migrate -h').returncode, 0)
+ self.assertEqual(self.env.run('migrate --help').returncode, 0)
+ self.assertEqual(self.env.run('migrate help').returncode, 0)
def test_help_commands(self):
"""Display help on a specific command"""
- for cmd in shell.api.__all__:
- fd = self.execute(self.cmd('help', cmd))
- # Description may change, so best we can do is ensure it shows up
- output = fd.read()
- self.assertNotEquals(output, '')
- self.assertSuccess(fd)
+ # we can only test that we get some output
+ for cmd in api.__all__:
+ result = self.env.run('migrate help %s' % cmd)
+ self.assertTrue(isinstance(result.stdout, basestring))
+ self.assertTrue(result.stdout)
+ self.assertFalse(result.stderr)
def test_create(self):
"""Repositories are created successfully"""
repos = self.tmp_repos()
# Creating a file that doesn't exist should succeed
- cmd = self.cmd('create', repos, 'repository_name')
- self.assertSuccess(cmd)
+ result = self.env.run('migrate create %s repository_name' % repos)
# Files should actually be created
self.assert_(os.path.exists(repos))
@@ -133,241 +45,253 @@ class TestShellCommands(Shell):
self.assertNotEquals(repos_.config.get('db_settings', 'version_table'), 'None')
# Can't create it again: it already exists
- self.assertFailure(cmd)
+ result = self.env.run('migrate create %s repository_name' % repos,
+ expect_error=True)
+ self.assertEqual(result.returncode, 2)
def test_script(self):
"""We can create a migration script via the command line"""
repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', repos, 'repository_name'))
+ result = self.env.run('migrate create %s repository_name' % repos)
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'Desc'))
+ result = self.env.run('migrate script --repository=%s Desc' % repos)
self.assert_(os.path.exists('%s/versions/001_Desc.py' % repos))
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos, 'More'))
+ result = self.env.run('migrate script More %s' % repos)
self.assert_(os.path.exists('%s/versions/002_More.py' % repos))
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos, '"Some Random name"'), runshell=True)
+ result = self.env.run('migrate script "Some Random name" %s' % repos)
self.assert_(os.path.exists('%s/versions/003_Some_Random_name.py' % repos))
def test_script_sql(self):
"""We can create a migration sql script via the command line"""
repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', repos, 'repository_name'))
+ result = self.env.run('migrate create %s repository_name' % repos)
- self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'mydb'))
+ result = self.env.run('migrate script_sql mydb %s' % repos)
self.assert_(os.path.exists('%s/versions/001_mydb_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/001_mydb_downgrade.sql' % repos))
# Test creating a second
- self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos, 'postgres'))
+ result = self.env.run('migrate script_sql postgres --repository=%s' % repos)
self.assert_(os.path.exists('%s/versions/002_postgres_upgrade.sql' % repos))
self.assert_(os.path.exists('%s/versions/002_postgres_downgrade.sql' % repos))
+ # TODO: test --previews
+
def test_manage(self):
"""Create a project management script"""
script = self.tmp_py()
self.assert_(not os.path.exists(script))
# No attempt is made to verify correctness of the repository path here
- self.assertSuccess(self.cmd('manage', script, '--repository=/path/to/repository'))
+ result = self.env.run('migrate manage %s --repository=/bla/' % script)
self.assert_(os.path.exists(script))
+
class TestShellRepository(Shell):
"""Shell commands on an existing repository/python script"""
def setUp(self):
"""Create repository, python change script"""
super(TestShellRepository, self).setUp()
- self.path_repos = repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', repos, 'repository_name'))
+ self.path_repos = self.tmp_repos()
+ result = self.env.run('migrate create %s repository_name' % self.path_repos)
def test_version(self):
"""Correctly detect repository version"""
# Version: 0 (no scripts yet); successful execution
- fd = self.execute(self.cmd('version','--repository=%s' % self.path_repos))
- self.assertEquals(fd.read().strip(), "0")
- self.assertSuccess(fd)
+ result = self.env.run('migrate version --repository=%s' % self.path_repos)
+ self.assertEqual(result.stdout.strip(), "0")
# Also works as a positional param
- fd = self.execute(self.cmd('version', self.path_repos))
- self.assertEquals(fd.read().strip(), "0")
- self.assertSuccess(fd)
+ result = self.env.run('migrate version %s' % self.path_repos)
+ self.assertEqual(result.stdout.strip(), "0")
# Create a script and version should increment
- self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc'))
- fd = self.execute(self.cmd('version',self.path_repos))
- self.assertEquals(fd.read().strip(), "1")
- self.assertSuccess(fd)
+ result = self.env.run('migrate script Desc %s' % self.path_repos)
+ result = self.env.run('migrate version %s' % self.path_repos)
+ self.assertEqual(result.stdout.strip(), "1")
def test_source(self):
"""Correctly fetch a script's source"""
- self.assertSuccess(self.cmd('script', '--repository=%s' % self.path_repos, 'Desc'))
+ result = self.env.run('migrate script Desc --repository=%s' % self.path_repos)
filename = '%s/versions/001_Desc.py' % self.path_repos
source = open(filename).read()
self.assert_(source.find('def upgrade') >= 0)
# Version is now 1
- fd = self.execute(self.cmd('version', self.path_repos))
- self.assert_(fd.read().strip() == "1")
- self.assertSuccess(fd)
+ result = self.env.run('migrate version %s' % self.path_repos)
+ self.assertEqual(result.stdout.strip(), "1")
# Output/verify the source of version 1
- fd = self.execute(self.cmd('source', 1, '--repository=%s' % self.path_repos))
- result = fd.read()
- self.assertSuccess(fd)
- self.assert_(result.strip() == source.strip())
+ result = self.env.run('migrate source 1 --repository=%s' % self.path_repos)
+ self.assertEqual(result.stdout.strip(), source.strip())
# We can also send the source to a file... test that too
- self.assertSuccess(self.cmd('source', 1, filename, '--repository=%s'%self.path_repos))
+ result = self.env.run('migrate source 1 %s --repository=%s' %
+ (filename, self.path_repos))
self.assert_(os.path.exists(filename))
fd = open(filename)
result = fd.read()
self.assert_(result.strip() == source.strip())
-class TestShellDatabase(Shell, fixture.DB):
+
+class TestShellDatabase(Shell, DB):
"""Commands associated with a particular database"""
# We'll need to clean up after ourself, since the shell creates its own txn;
# we need to connect to the DB to see if things worked
- level = fixture.DB.CONNECT
+ level = DB.CONNECT
- @fixture.usedb()
+ @usedb()
def test_version_control(self):
"""Ensure we can set version control on a database"""
path_repos = repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', path_repos, 'repository_name'))
- self.exitcode(self.cmd('drop_version_control', self.url, path_repos))
- self.assertSuccess(self.cmd('version_control', self.url, path_repos))
+ url = self.url
+ result = self.env.run('migrate create %s repository_name' % repos)
+
+ result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
+ % locals(), expect_error=True)
+ self.assertEqual(result.returncode, 1)
+ result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
# Clean up
- self.assertSuccess(self.cmd('drop_version_control',self.url,path_repos))
+ result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
# Attempting to drop vc from a database without it should fail
- self.assertFailure(self.cmd('drop_version_control',self.url,path_repos))
+ result = self.env.run('migrate drop_version_control %(url)s %(repos)s'\
+ % locals(), expect_error=True)
+ self.assertEqual(result.returncode, 1)
- @fixture.usedb()
+ @usedb()
def test_wrapped_kwargs(self):
"""Commands with default arguments set by manage.py"""
path_repos = repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', '--', '--name=repository_name'), repository=path_repos)
- self.exitcode(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
- self.assertSuccess(self.cmd('version_control'), url=self.url, repository=path_repos)
+ url = self.url
+ result = self.env.run('migrate create --name=repository_name %s' % repos)
+ result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals(), expect_error=True)
+ self.assertEqual(result.returncode, 1)
+ result = self.env.run('migrate version_control %(url)s %(repos)s' % locals())
- # Clean up
- self.assertSuccess(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
- # Attempting to drop vc from a database without it should fail
- self.assertFailure(self.cmd('drop_version_control'), url=self.url, repository=path_repos)
+ result = self.env.run('migrate drop_version_control %(url)s %(repos)s' % locals())
- @fixture.usedb()
+ @usedb()
def test_version_control_specified(self):
"""Ensure we can set version control to a particular version"""
path_repos = self.tmp_repos()
- self.assertSuccess(self.cmd('create', path_repos, 'repository_name'))
- self.exitcode(self.cmd('drop_version_control', self.url, path_repos))
+ url = self.url
+ result = self.env.run('migrate create --name=repository_name %s' % path_repos)
+ result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals(), expect_error=True)
+ self.assertEqual(result.returncode, 1)
# Fill the repository
path_script = self.tmp_py()
- version = 1
+ version = 2
for i in range(version):
- self.assertSuccess(self.cmd('script', '--repository=%s' % path_repos, 'Desc'))
+ result = self.env.run('migrate script Desc --repository=%s' % path_repos)
# Repository version is correct
- fd = self.execute(self.cmd('version', path_repos))
- self.assertEquals(fd.read().strip(), str(version))
- self.assertSuccess(fd)
+ result = self.env.run('migrate version %s' % path_repos)
+ self.assertEqual(result.stdout.strip(), str(version))
# Apply versioning to DB
- self.assertSuccess(self.cmd('version_control', self.url, path_repos, version))
+ result = self.env.run('migrate version_control %(url)s %(path_repos)s %(version)s' % locals())
- # Test version number
- fd = self.execute(self.cmd('db_version', self.url, path_repos))
- self.assertEquals(fd.read().strip(), str(version))
- self.assertSuccess(fd)
+ # Test db version number (should start at 2)
+ result = self.env.run('migrate db_version %(url)s %(path_repos)s' % locals())
+ self.assertEqual(result.stdout.strip(), str(version))
# Clean up
- self.assertSuccess(self.cmd('drop_version_control', self.url, path_repos))
+ result = self.env.run('migrate drop_version_control %(url)s %(path_repos)s' % locals())
- @fixture.usedb()
+ @usedb()
def test_upgrade(self):
"""Can upgrade a versioned database"""
# Create a repository
repos_name = 'repos_name'
repos_path = self.tmp()
- self.assertSuccess(self.cmd('create', repos_path,repos_name))
- self.assertEquals(self.cmd_version(repos_path), 0)
+ result = self.env.run('migrate create %(repos_path)s %(repos_name)s' % locals())
+ self.assertEquals(self.run_version(repos_path), 0)
# Version the DB
- self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
- self.assertSuccess(self.cmd('version_control', self.url, repos_path))
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
+ result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
# Upgrades with latest version == 0
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
- self.assertSuccess(self.cmd('upgrade', self.url, repos_path))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
- self.assertSuccess(self.cmd('upgrade', self.url, repos_path, 0))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
- self.assertFailure(self.cmd('upgrade', self.url, repos_path, 1))
- self.assertFailure(self.cmd('upgrade', self.url, repos_path, -1))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate upgrade %s %s 1' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 1)
+ result = self.env.run('migrate upgrade %s %s -1' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 2)
# Add a script to the repository; upgrade the db
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc'))
- self.assertEquals(self.cmd_version(repos_path), 1)
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate script Desc --repository=%s' % (repos_path))
+ self.assertEquals(self.run_version(repos_path), 1)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
# Test preview
- self.assertSuccess(self.cmd('upgrade', self.url, repos_path, 0, "--preview_sql"))
- self.assertSuccess(self.cmd('upgrade', self.url, repos_path, 0, "--preview_py"))
+ result = self.env.run('migrate upgrade %s %s 0 --preview_sql' % (self.url, repos_path))
+ result = self.env.run('migrate upgrade %s %s 0 --preview_py' % (self.url, repos_path))
- self.assertSuccess(self.cmd('upgrade', self.url, repos_path))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 1)
+ result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 1)
# Downgrade must have a valid version specified
- self.assertFailure(self.cmd('downgrade', self.url, repos_path))
- self.assertFailure(self.cmd('downgrade', self.url, repos_path, '-1', 2))
- #self.assertFailure(self.cmd('downgrade', self.url, repos_path, '1', 2))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 1)
+ result = self.env.run('migrate downgrade %s %s' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 2)
+ result = self.env.run('migrate downgrade %s %s -1' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 2)
+ result = self.env.run('migrate downgrade %s %s 2' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 2)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 1)
- self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
- self.assertFailure(self.cmd('downgrade',self.url, repos_path, 1))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate downgrade %s %s 1' % (self.url, repos_path), expect_error=True)
+ self.assertEquals(result.returncode, 2)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
- self.assertSuccess(self.cmd('drop_version_control', self.url, repos_path))
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path))
def _run_test_sqlfile(self, upgrade_script, downgrade_script):
# TODO: add test script that checks if db really changed
-
repos_path = self.tmp()
repos_name = 'repos'
- self.assertSuccess(self.cmd('create', repos_path, repos_name))
- self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
- self.assertSuccess(self.cmd('version_control', self.url, repos_path))
- self.assertEquals(self.cmd_version(repos_path), 0)
- self.assertEquals(self.cmd_db_version(self.url,repos_path), 0)
+
+ result = self.env.run('migrate create %s %s' % (repos_path, repos_name))
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
+ result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_version(repos_path), 0)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
beforeCount = len(os.listdir(os.path.join(repos_path, 'versions'))) # hmm, this number changes sometimes based on running from svn
- self.assertSuccess(self.cmd('script_sql', '--repository=%s' % repos_path, 'postgres'))
- self.assertEquals(self.cmd_version(repos_path), 1)
- self.assertEquals(len(os.listdir(os.path.join(repos_path,'versions'))), beforeCount + 2)
+ result = self.env.run('migrate script_sql %s --repository=%s' % ('postgres', repos_path))
+ self.assertEquals(self.run_version(repos_path), 1)
+ self.assertEquals(len(os.listdir(os.path.join(repos_path, 'versions'))), beforeCount + 2)
open('%s/versions/001_postgres_upgrade.sql' % repos_path, 'a').write(upgrade_script)
open('%s/versions/001_postgres_downgrade.sql' % repos_path, 'a').write(downgrade_script)
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
- self.assertSuccess(self.cmd('upgrade', self.url,repos_path))
- self.assertEquals(self.cmd_db_version(self.url,repos_path), 1)
+ result = self.env.run('migrate upgrade %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 1)
self.engine.text('select * from t_table').execute()
- self.assertSuccess(self.cmd('downgrade', self.url, repos_path, 0))
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate downgrade %s %s 0' % (self.url, repos_path))
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
self.assertRaises(Exception, self.engine.text('select * from t_table').execute)
# The tests below are written with some postgres syntax, but the stuff
# being tested (.sql files) ought to work with any db.
- @fixture.usedb(supported='postgres')
+ @usedb(supported='postgres')
def test_sqlfile(self):
upgrade_script = """
create table t_table (
@@ -381,8 +305,7 @@ class TestShellDatabase(Shell, fixture.DB):
self.meta.drop_all()
self._run_test_sqlfile(upgrade_script, downgrade_script)
-
- @fixture.usedb(supported='postgres')
+ @usedb(supported='postgres')
def test_sqlfile_comment(self):
upgrade_script = """
-- Comments in SQL break postgres autocommit
@@ -395,28 +318,28 @@ class TestShellDatabase(Shell, fixture.DB):
-- Comments in SQL break postgres autocommit
drop table t_table;
"""
- self._run_test_sqlfile(upgrade_script,downgrade_script)
+ self._run_test_sqlfile(upgrade_script, downgrade_script)
- @fixture.usedb()
+ @usedb()
def test_command_test(self):
repos_name = 'repos_name'
repos_path = self.tmp()
- self.assertSuccess(self.cmd('create', repos_path, repos_name))
- self.exitcode(self.cmd('drop_version_control', self.url, repos_path))
- self.assertSuccess(self.cmd('version_control', self.url, repos_path))
- self.assertEquals(self.cmd_version(repos_path), 0)
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate create repository_name --repository=%s' % repos_path)
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
+ result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_version(repos_path), 0)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
# Empty script should succeed
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc'))
- self.assertSuccess(self.cmd('test', repos_path, self.url))
- self.assertEquals(self.cmd_version(repos_path), 1)
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate script Desc %s' % repos_path)
+ result = self.env.run('migrate test %s %s' % (repos_path, self.url))
+ self.assertEquals(self.run_version(repos_path), 1)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
# Error script should fail
script_path = self.tmp_py()
- script_text="""
+ script_text='''
from sqlalchemy import *
from migrate import *
@@ -427,26 +350,27 @@ class TestShellDatabase(Shell, fixture.DB):
def downgrade():
print 'sdfsgf'
raise Exception()
- """.replace("\n ","\n")
+ '''.replace("\n ", "\n")
file = open(script_path, 'w')
file.write(script_text)
file.close()
- self.assertFailure(self.cmd('test', repos_path, self.url, 'blah blah'))
- self.assertEquals(self.cmd_version(repos_path), 1)
- self.assertEquals(self.cmd_db_version(self.url, repos_path),0)
+ result = self.env.run('migrate test %s %s bla' % (repos_path, self.url), expect_error=True)
+ self.assertEqual(result.returncode, 2)
+ self.assertEquals(self.run_version(repos_path), 1)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
# Nonempty script using migrate_engine should succeed
script_path = self.tmp_py()
- script_text="""
+ script_text = '''
from sqlalchemy import *
from migrate import *
meta = MetaData(migrate_engine)
- account = Table('account',meta,
- Column('id',Integer,primary_key=True),
- Column('login',String(40)),
- Column('passwd',String(40)),
+ account = Table('account', meta,
+ Column('id', Integer, primary_key=True),
+ Column('login', String(40)),
+ Column('passwd', String(40)),
)
def upgrade():
# Upgrade operations go here. Don't create your own engine; use the engine
@@ -456,113 +380,104 @@ class TestShellDatabase(Shell, fixture.DB):
def downgrade():
# Operations to reverse the above upgrade go here.
meta.drop_all()
- """.replace("\n ","\n")
+ '''.replace("\n ", "\n")
file = open(script_path, 'w')
file.write(script_text)
file.close()
- self.assertSuccess(self.cmd('test', repos_path, self.url))
- self.assertEquals(self.cmd_version(repos_path), 1)
- self.assertEquals(self.cmd_db_version(self.url, repos_path), 0)
+ result = self.env.run('migrate test %s %s' % (repos_path, self.url))
+ self.assertEquals(self.run_version(repos_path), 1)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
- @fixture.usedb()
+ @usedb()
def test_rundiffs_in_shell(self):
# This is a variant of the test_schemadiff tests but run through the shell level.
# These shell tests are hard to debug (since they keep forking processes), so they shouldn't replace the lower-level tests.
repos_name = 'repos_name'
repos_path = self.tmp()
script_path = self.tmp_py()
- old_model_path = self.tmp_named('oldtestmodel.py')
- model_path = self.tmp_named('testmodel.py')
+ model_module = 'test.fixture.models:meta_rundiffs'
+ old_model_module = 'test.fixture.models:meta_old_rundiffs'
# Create empty repository.
self.meta = MetaData(self.engine, reflect=True)
+ self.meta.reflect()
self.meta.drop_all() # in case junk tables are lying around in the test database
- self.assertSuccess(self.cmd('create',repos_path,repos_name))
- self.exitcode(self.cmd('drop_version_control',self.url,repos_path))
- self.assertSuccess(self.cmd('version_control',self.url,repos_path))
- self.assertEquals(self.cmd_version(repos_path),0)
- self.assertEquals(self.cmd_db_version(self.url,repos_path),0)
+
+ result = self.env.run('migrate create %s %s' % (repos_path, repos_name))
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
+ result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
+ self.assertEquals(self.run_version(repos_path), 0)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0)
# Setup helper script.
- model_module = 'testmodel:meta'
- self.assertSuccess(self.cmd('manage',script_path,'--repository=%s --url=%s --model=%s' % (repos_path, self.url, model_module)))
+ result = self.env.run('migrate manage %s --repository=%s --url=%s --model=%s'\
+ % (script_path, repos_path, self.url, model_module))
self.assert_(os.path.exists(script_path))
- # Write old and new model to disk - old model is empty!
- script_preamble="""
- from sqlalchemy import *
-
- meta = MetaData()
- """.replace("\n ","\n")
-
- script_text="""
- """.replace("\n ","\n")
- open(old_model_path, 'w').write(script_preamble + script_text)
-
- script_text="""
- tmp_account_rundiffs = Table('tmp_account_rundiffs',meta,
- Column('id',Integer,primary_key=True),
- Column('login',String(40)),
- Column('passwd',String(40)),
- )
- """.replace("\n ","\n")
- open(model_path, 'w').write(script_preamble + script_text)
-
# Model is defined but database is empty.
- output, exitcode = self.output_and_exitcode('%s %s compare_model_to_db' % (sys.executable, script_path))
- assert "tables missing in database: tmp_account_rundiffs" in output, output
+ result = self.env.run('migrate compare_model_to_db %s %s --model=%s' % (self.url, repos_path, model_module))
+ self.assert_("tables missing in database: tmp_account_rundiffs" in result.stdout)
# Test Deprecation
- output, exitcode = self.output_and_exitcode('%s %s compare_model_to_db --model=testmodel.meta' % (sys.executable, script_path))
- assert "tables missing in database: tmp_account_rundiffs" in output, output
+ result = self.env.run('migrate compare_model_to_db %s %s --model=%s' % (self.url, repos_path, model_module.replace(":", ".")), expect_error=True)
+ self.assertEqual(result.returncode, 0)
+ self.assertTrue("DeprecationWarning" in result.stderr)
+ self.assert_("tables missing in database: tmp_account_rundiffs" in result.stdout)
# Update db to latest model.
- output, exitcode = self.output_and_exitcode('%s %s update_db_from_model' % (sys.executable, script_path))
- self.assertEquals(exitcode, None)
- self.assertEquals(self.cmd_version(repos_path),0)
- self.assertEquals(self.cmd_db_version(self.url,repos_path),0) # version did not get bumped yet because new version not yet created
- output, exitcode = self.output_and_exitcode('%s %s compare_model_to_db' % (sys.executable, script_path))
- assert "No schema diffs" in output, output
- output, exitcode = self.output_and_exitcode('%s %s create_model' % (sys.executable, script_path))
- output = output.replace(genmodel.HEADER.strip(), '') # need strip b/c output_and_exitcode called strip
- assert """tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
+ result = self.env.run('migrate update_db_from_model %s %s %s'\
+ % (self.url, model_module, repos_path))
+ self.assertEquals(self.run_version(repos_path), 0)
+ self.assertEquals(self.run_db_version(self.url, repos_path), 0) # version did not get bumped yet because new version not yet created
+
+ result = self.env.run('migrate compare_model_to_db %s %s %s'\
+ % (self.url, model_module, repos_path))
+ self.assert_("No schema diffs" in result.stdout)
+
+ result = self.env.run('migrate drop_version_control %s %s' % (self.url, repos_path), expect_error=True)
+ result = self.env.run('migrate version_control %s %s' % (self.url, repos_path))
+
+ result = self.env.run('migrate create_model %s %s' % (self.url, repos_path))
+ self.assertTrue("""tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
Column('id', Integer(), primary_key=True, nullable=False),
Column('login', String(length=None, convert_unicode=False, assert_unicode=None)),
- Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None)),""" in output.strip(), output
+ Column('passwd', String(length=None, convert_unicode=False, assert_unicode=None))""" in result.stdout)
# We're happy with db changes, make first db upgrade script to go from version 0 -> 1.
- output, exitcode = self.output_and_exitcode('%s %s make_update_script_for_model' % (sys.executable, script_path)) # intentionally omit a parameter
- self.assertEquals('Not enough arguments' in output, True)
- output, exitcode = self.output_and_exitcode('%s %s make_update_script_for_model --oldmodel=oldtestmodel:meta' % (sys.executable, script_path))
- self.assertEqualsIgnoreWhitespace(output,
+ result = self.env.run('migrate make_update_script_for_model', expect_error=True)
+ self.assertTrue('Not enough arguments' in result.stderr)
+
+ result_script = self.env.run('migrate make_update_script_for_model %s %s %s %s'\
+ % (self.url, old_model_module, model_module, repos_path))
+ self.assertEqualsIgnoreWhitespace(result_script.stdout,
"""from sqlalchemy import *
-from migrate import *
+ from migrate import *
-meta = MetaData()
-tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
- Column('id', Integer(), primary_key=True, nullable=False),
- Column('login', String(length=40, convert_unicode=False, assert_unicode=None)),
- Column('passwd', String(length=40, convert_unicode=False, assert_unicode=None)),
-)
-
-def upgrade(migrate_engine):
- # Upgrade operations go here. Don't create your own engine; bind migrate_engine
- # to your metadata
- meta.bind(migrate_engine)
- tmp_account_rundiffs.create()
-
-def downgrade(migrate_engine):
- # Operations to reverse the above upgrade go here.
- meta.bind(migrate_engine)
- tmp_account_rundiffs.drop()""")
+ meta = MetaData()
+ tmp_account_rundiffs = Table('tmp_account_rundiffs', meta,
+ Column('id', Integer(), primary_key=True, nullable=False),
+ Column('login', String(length=40, convert_unicode=False, assert_unicode=None)),
+ Column('passwd', String(length=40, convert_unicode=False, assert_unicode=None)),
+ )
+
+ def upgrade(migrate_engine):
+ # Upgrade operations go here. Don't create your own engine; bind migrate_engine
+ # to your metadata
+ meta.bind = migrate_engine
+ tmp_account_rundiffs.create()
+
+ def downgrade(migrate_engine):
+ # Operations to reverse the above upgrade go here.
+ meta.bind = migrate_engine
+ tmp_account_rundiffs.drop()""")
# Save the upgrade script.
- self.assertSuccess(self.cmd('script', '--repository=%s' % repos_path, 'Desc'))
+ result = self.env.run('migrate script Desc %s' % repos_path)
upgrade_script_path = '%s/versions/001_Desc.py' % repos_path
- open(upgrade_script_path, 'w').write(output)
- #output, exitcode = self.output_and_exitcode('%s %s test %s' % (sys.executable, script_path, upgrade_script_path)) # no, we already upgraded the db above
- #self.assertEquals(output, "")
- output, exitcode = self.output_and_exitcode('%s %s update_db_from_model' % (sys.executable, script_path)) # bump the db_version
- self.assertEquals(exitcode, None)
- self.assertEquals(self.cmd_version(repos_path),1)
- self.assertEquals(self.cmd_db_version(self.url,repos_path),1)
+ open(upgrade_script_path, 'w').write(result_script.stdout)
+
+ result = self.env.run('migrate compare_model_to_db %s %s %s'\
+ % (self.url, model_module, repos_path))
+ self.assert_("No schema diffs" in result.stdout)
+
+ self.meta.drop_all() # in case junk tables are lying around in the test database