summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-01-23 18:31:17 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2015-01-23 18:43:44 -0500
commit7891f91e85990062fc7f56107a4d0c6a38a7857d (patch)
tree57279bb94fbef297d02923d5dcc866b6c38ad313
parentf215925bea1e0e170e5d80457cb18202736b926b (diff)
downloadalembic-7891f91e85990062fc7f56107a4d0c6a38a7857d.tar.gz
- fix all flake8
-rw-r--r--tests/test_version_traversal.py303
1 files changed, 116 insertions, 187 deletions
diff --git a/tests/test_version_traversal.py b/tests/test_version_traversal.py
index 72ae03a..d5261b4 100644
--- a/tests/test_version_traversal.py
+++ b/tests/test_version_traversal.py
@@ -7,6 +7,7 @@ from alembic.migration import MigrationStep, HeadMaintainer
class MigrationTest(TestBase):
+
def up_(self, rev):
return MigrationStep.upgrade_from_script(
self.env.revision_map, rev)
@@ -54,149 +55,145 @@ class RevisionPathTest(MigrationTest):
clear_staging_env()
def test_upgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
self._assert_upgrade(
- e.revision, c.revision,
+ self.e.revision, self.c.revision,
[
- self.up_(d),
- self.up_(e)
+ self.up_(self.d),
+ self.up_(self.e)
],
- set([e.revision])
+ set([self.e.revision])
)
self._assert_upgrade(
- c.revision, None,
+ self.c.revision, None,
[
- self.up_(a),
- self.up_(b),
- self.up_(c),
+ self.up_(self.a),
+ self.up_(self.b),
+ self.up_(self.c),
],
- set([c.revision])
+ set([self.c.revision])
)
def test_relative_upgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
self._assert_upgrade(
- "+2", a.revision,
+ "+2", self.a.revision,
[
- self.up_(b),
- self.up_(c),
+ self.up_(self.b),
+ self.up_(self.c),
],
- set([c.revision])
+ set([self.c.revision])
)
self._assert_upgrade(
- "+1", a.revision,
+ "+1", self.a.revision,
[
- self.up_(b)
+ self.up_(self.b)
],
- set([b.revision])
+ set([self.b.revision])
)
self._assert_upgrade(
- "+3", b.revision,
- [self.up_(c), self.up_(d), self.up_(e)],
- set([e.revision])
+ "+3", self.b.revision,
+ [self.up_(self.c), self.up_(self.d), self.up_(self.e)],
+ set([self.e.revision])
)
self._assert_upgrade(
- "%s+2" % b.revision, a.revision,
- [self.up_(b), self.up_(c), self.up_(d)],
- set([d.revision])
+ "%s+2" % self.b.revision, self.a.revision,
+ [self.up_(self.b), self.up_(self.c), self.up_(self.d)],
+ set([self.d.revision])
)
self._assert_upgrade(
- "%s-2" % d.revision, a.revision,
- [self.up_(b)],
- set([b.revision])
+ "%s-2" % self.d.revision, self.a.revision,
+ [self.up_(self.b)],
+ set([self.b.revision])
)
def test_invalid_relative_upgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
+
assert_raises_message(
util.CommandError,
"Relative revision -2 didn't produce 2 migrations",
- self.env._upgrade_revs, "-2", b.revision
+ self.env._upgrade_revs, "-2", self.b.revision
)
assert_raises_message(
util.CommandError,
r"Relative revision \+5 didn't produce 5 migrations",
- self.env._upgrade_revs, "+5", b.revision
+ self.env._upgrade_revs, "+5", self.b.revision
)
def test_downgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
self._assert_downgrade(
- c.revision, e.revision,
- [self.down_(e), self.down_(d)],
- set([c.revision])
+ self.c.revision, self.e.revision,
+ [self.down_(self.e), self.down_(self.d)],
+ set([self.c.revision])
)
self._assert_downgrade(
- None, c.revision,
- [self.down_(c), self.down_(b), self.down_(a)],
+ None, self.c.revision,
+ [self.down_(self.c), self.down_(self.b), self.down_(self.a)],
set()
)
def test_relative_downgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
+
self._assert_downgrade(
- "-1", c.revision,
- [self.down_(c)],
- set([b.revision])
+ "-1", self.c.revision,
+ [self.down_(self.c)],
+ set([self.b.revision])
)
self._assert_downgrade(
- "-3", e.revision,
- [self.down_(e), self.down_(d), self.down_(c)],
- set([b.revision])
+ "-3", self.e.revision,
+ [self.down_(self.e), self.down_(self.d), self.down_(self.c)],
+ set([self.b.revision])
)
self._assert_downgrade(
- "%s+2" % a.revision, d.revision,
- [self.down_(d)],
- set([c.revision])
+ "%s+2" % self.a.revision, self.d.revision,
+ [self.down_(self.d)],
+ set([self.c.revision])
)
self._assert_downgrade(
- "%s-2" % c.revision, d.revision,
- [self.down_(d), self.down_(c), self.down_(b)],
- set([a.revision])
+ "%s-2" % self.c.revision, self.d.revision,
+ [self.down_(self.d), self.down_(self.c), self.down_(self.b)],
+ set([self.a.revision])
)
def test_invalid_relative_downgrade_path(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
+
assert_raises_message(
util.CommandError,
"Relative revision -5 didn't produce 5 migrations",
- self.env._downgrade_revs, "-5", b.revision
+ self.env._downgrade_revs, "-5", self.b.revision
)
assert_raises_message(
util.CommandError,
r"Relative revision \+2 didn't produce 2 migrations",
- self.env._downgrade_revs, "+2", b.revision
+ self.env._downgrade_revs, "+2", self.b.revision
)
def test_invalid_move_rev_to_none(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
+
assert_raises_message(
util.CommandError,
r"Destination %s is not a valid downgrade "
- "target from current head\(s\)" % b.revision[0:3],
- self.env._downgrade_revs, b.revision[0:3], None
+ "target from current head\(s\)" % self.b.revision[0:3],
+ self.env._downgrade_revs, self.b.revision[0:3], None
)
def test_invalid_move_higher_to_lower(self):
- a, b, c, d, e = self.a, self.b, self.c, self.d, self.e
assert_raises_message(
util.CommandError,
r"Destination %s is not a valid downgrade "
- "target from current head\(s\)" % c.revision[0:4],
- self.env._downgrade_revs, c.revision[0:4], b.revision
+ "target from current head\(s\)" % self.c.revision[0:4],
+ self.env._downgrade_revs, self.c.revision[0:4], self.b.revision
)
@@ -227,9 +224,6 @@ class BranchedPathTest(MigrationTest):
clear_staging_env()
def test_stamp_down_across_multiple_branch_to_branchpoint(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
heads = [self.d1.revision, self.c2.revision]
revs = self.env._stamp_revs(
self.b.revision, heads)
@@ -241,9 +235,6 @@ class BranchedPathTest(MigrationTest):
)
def test_stamp_to_labeled_base_multiple_heads(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
revs = self.env._stamp_revs(
"c1branch@base", [self.d1.revision, self.c2.revision])
eq_(len(revs), 1)
@@ -251,9 +242,6 @@ class BranchedPathTest(MigrationTest):
eq_(revs[0].delete_version_num, self.d1.revision)
def test_stamp_to_labeled_head_multiple_heads(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
heads = [self.d1.revision, self.c2.revision]
revs = self.env._stamp_revs(
"c2branch@head", heads)
@@ -265,64 +253,50 @@ class BranchedPathTest(MigrationTest):
)
def test_upgrade_single_branch(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
self._assert_upgrade(
- d1.revision, b.revision,
- [self.up_(c1), self.up_(d1)],
- set([d1.revision])
+ self.d1.revision, self.b.revision,
+ [self.up_(self.c1), self.up_(self.d1)],
+ set([self.d1.revision])
)
def test_upgrade_multiple_branch(self):
# move from a single head to multiple heads
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
self._assert_upgrade(
- (d1.revision, d2.revision), a.revision,
- [self.up_(b), self.up_(c2), self.up_(d2),
- self.up_(c1), self.up_(d1)],
- set([d1.revision, d2.revision])
+ (self.d1.revision, self.d2.revision), self.a.revision,
+ [self.up_(self.b), self.up_(self.c2), self.up_(self.d2),
+ self.up_(self.c1), self.up_(self.d1)],
+ set([self.d1.revision, self.d2.revision])
)
def test_downgrade_multiple_branch(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
self._assert_downgrade(
- a.revision, (d1.revision, d2.revision),
- [self.down_(d1), self.down_(c1), self.down_(d2),
- self.down_(c2), self.down_(b)],
- set([a.revision])
+ self.a.revision, (self.d1.revision, self.d2.revision),
+ [self.down_(self.d1), self.down_(self.c1), self.down_(self.d2),
+ self.down_(self.c2), self.down_(self.b)],
+ set([self.a.revision])
)
def test_relative_upgrade(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
self._assert_upgrade(
- "c2branch@head-1", b.revision,
- [self.up_(c2)],
- set([c2.revision])
+ "c2branch@head-1", self.b.revision,
+ [self.up_(self.c2)],
+ set([self.c2.revision])
)
def test_relative_downgrade(self):
- a, b, c1, d1, c2, d2 = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2
- )
self._assert_downgrade(
- "c2branch@base+2", [d2.revision, d1.revision],
- [self.down_(d2), self.down_(c2), self.down_(d1)],
- set([c1.revision])
+ "c2branch@base+2", [self.d2.revision, self.d1.revision],
+ [self.down_(self.d2), self.down_(self.c2), self.down_(self.d1)],
+ set([self.c1.revision])
)
class BranchFromMergepointTest(MigrationTest):
+
"""this is a form that will come up frequently in the
"many independent roots with cross-dependencies" case.
@@ -355,31 +329,24 @@ class BranchFromMergepointTest(MigrationTest):
refresh=True, splice=True)
def test_mergepoint_to_only_one_side_upgrade(self):
- a1, b1, c1, a2, b2, c2, d1, d2 = (
- self.a1, self.b1, self.c1, self.a2, self.b2, self.c2,
- self.d1, self.d2
- )
self._assert_upgrade(
- d1.revision, (d2.revision, b1.revision),
- [self.up_(c1), self.up_(d1)],
- set([d2.revision, d1.revision])
+ self.d1.revision, (self.d2.revision, self.b1.revision),
+ [self.up_(self.c1), self.up_(self.d1)],
+ set([self.d2.revision, self.d1.revision])
)
def test_mergepoint_to_only_one_side_downgrade(self):
- a1, b1, c1, a2, b2, c2, d1, d2 = (
- self.a1, self.b1, self.c1, self.a2, self.b2, self.c2,
- self.d1, self.d2
- )
self._assert_downgrade(
- b1.revision, (d2.revision, d1.revision),
- [self.down_(d1), self.down_(c1)],
- set([d2.revision, b1.revision])
+ self.b1.revision, (self.d2.revision, self.d1.revision),
+ [self.down_(self.d1), self.down_(self.c1)],
+ set([self.d2.revision, self.b1.revision])
)
class BranchFrom3WayMergepointTest(MigrationTest):
+
"""this is a form that will come up frequently in the
"many independent roots with cross-dependencies" case.
@@ -426,51 +393,43 @@ class BranchFrom3WayMergepointTest(MigrationTest):
refresh=True, splice=True)
def test_mergepoint_to_only_one_side_upgrade(self):
- a1, b1, c1, a2, b2, c2, d1, d2, a3, b3, c3, d3 = (
- self.a1, self.b1, self.c1, self.a2, self.b2, self.c2,
- self.d1, self.d2, self.a3, self.b3, self.c3, self.d3
- )
self._assert_upgrade(
- d1.revision, (d3.revision, d2.revision, b1.revision),
- [self.up_(c1), self.up_(d1)],
- set([d3.revision, d2.revision, d1.revision])
+ self.d1.revision,
+ (self.d3.revision, self.d2.revision, self.b1.revision),
+ [self.up_(self.c1), self.up_(self.d1)],
+ set([self.d3.revision, self.d2.revision, self.d1.revision])
)
def test_mergepoint_to_only_one_side_downgrade(self):
- a1, b1, c1, a2, b2, c2, d1, d2, a3, b3, c3, d3 = (
- self.a1, self.b1, self.c1, self.a2, self.b2, self.c2,
- self.d1, self.d2, self.a3, self.b3, self.c3, self.d3
- )
self._assert_downgrade(
- b1.revision, (d3.revision, d2.revision, d1.revision),
- [self.down_(d1), self.down_(c1)],
- set([d3.revision, d2.revision, b1.revision])
+ self.b1.revision,
+ (self.d3.revision, self.d2.revision, self.d1.revision),
+ [self.down_(self.d1), self.down_(self.c1)],
+ set([self.d3.revision, self.d2.revision, self.b1.revision])
)
def test_mergepoint_to_two_sides_upgrade(self):
- a1, b1, c1, a2, b2, c2, d1, d2, a3, b3, c3, d3 = (
- self.a1, self.b1, self.c1, self.a2, self.b2, self.c2,
- self.d1, self.d2, self.a3, self.b3, self.c3, self.d3
- )
self._assert_upgrade(
- d1.revision, (d3.revision, b2.revision, b1.revision),
- [self.up_(c2), self.up_(c1), self.up_(d1)],
+ self.d1.revision,
+ (self.d3.revision, self.b2.revision, self.b1.revision),
+ [self.up_(self.c2), self.up_(self.c1), self.up_(self.d1)],
# this will merge b2 and b1 into d1
- set([d3.revision, d1.revision])
+ set([self.d3.revision, self.d1.revision])
)
# but then! b2 will break out again if we keep going with it
self._assert_upgrade(
- d2.revision, (d3.revision, d1.revision),
- [self.up_(d2)],
- set([d3.revision, d2.revision, d1.revision])
+ self.d2.revision, (self.d3.revision, self.d1.revision),
+ [self.up_(self.d2)],
+ set([self.d3.revision, self.d2.revision, self.d1.revision])
)
class DependsOnBranchTestOne(MigrationTest):
+
@classmethod
def setup_class(cls):
cls.env = env = staging_env()
@@ -512,6 +471,7 @@ class DependsOnBranchTestOne(MigrationTest):
class DependsOnBranchTestTwo(MigrationTest):
+
@classmethod
def setup_class(cls):
cls.env = env = staging_env()
@@ -578,6 +538,7 @@ class DependsOnBranchTestTwo(MigrationTest):
class ForestTest(MigrationTest):
+
@classmethod
def setup_class(cls):
cls.env = env = staging_env()
@@ -595,10 +556,10 @@ class ForestTest(MigrationTest):
clear_staging_env()
def test_base_to_heads(self):
- a1, b1, a2, b2 = self.a1, self.b1, self.a2, self.b2
eq_(
self.env._upgrade_revs("heads", "base"),
- [self.up_(a2), self.up_(b2), self.up_(a1), self.up_(b1), ]
+ [self.up_(self.a2), self.up_(self.b2),
+ self.up_(self.a1), self.up_(self.b1)]
)
@@ -633,10 +594,6 @@ class MergedPathTest(MigrationTest):
clear_staging_env()
def test_stamp_down_across_merge_point_branch(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
heads = [self.e.revision]
revs = self.env._stamp_revs(self.c2.revision, heads)
eq_(len(revs), 1)
@@ -647,10 +604,6 @@ class MergedPathTest(MigrationTest):
)
def test_stamp_down_across_merge_prior_branching(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
heads = [self.e.revision]
revs = self.env._stamp_revs(self.a.revision, heads)
eq_(len(revs), 1)
@@ -661,10 +614,6 @@ class MergedPathTest(MigrationTest):
)
def test_stamp_up_across_merge_from_single_branch(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
revs = self.env._stamp_revs(self.e.revision, [self.c2.revision])
eq_(len(revs), 1)
eq_(
@@ -674,10 +623,6 @@ class MergedPathTest(MigrationTest):
)
def test_stamp_labled_head_across_merge_from_multiple_branch(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
# this is testing that filter_for_lineage() checks for
# d1 both in terms of "c2branch" as well as that the "head"
# revision "f" is the head of both d1 and d2
@@ -691,10 +636,6 @@ class MergedPathTest(MigrationTest):
)
def test_stamp_up_across_merge_from_multiple_branch(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
heads = [self.d1.revision, self.c2.revision]
revs = self.env._stamp_revs(
self.e.revision, heads)
@@ -706,10 +647,6 @@ class MergedPathTest(MigrationTest):
)
def test_stamp_up_across_merge_prior_branching(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
heads = [self.b.revision]
revs = self.env._stamp_revs(self.e.revision, heads)
eq_(len(revs), 1)
@@ -720,39 +657,31 @@ class MergedPathTest(MigrationTest):
)
def test_upgrade_across_merge_point(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
eq_(
- self.env._upgrade_revs(f.revision, b.revision),
+ self.env._upgrade_revs(self.f.revision, self.b.revision),
[
- self.up_(c2),
- self.up_(d2),
- self.up_(c1), # b->c1, create new branch
- self.up_(d1),
- self.up_(e), # d1/d2 -> e, merge branches
- # (DELETE d2, UPDATE d1->e)
- self.up_(f)
+ self.up_(self.c2),
+ self.up_(self.d2),
+ self.up_(self.c1), # b->c1, create new branch
+ self.up_(self.d1),
+ self.up_(self.e), # d1/d2 -> e, merge branches
+ # (DELETE d2, UPDATE d1->e)
+ self.up_(self.f)
]
)
def test_downgrade_across_merge_point(self):
- a, b, c1, d1, c2, d2, e, f = (
- self.a, self.b, self.c1, self.d1, self.c2, self.d2,
- self.e, self.f
- )
eq_(
- self.env._downgrade_revs(b.revision, f.revision),
+ self.env._downgrade_revs(self.b.revision, self.f.revision),
[
- self.down_(f),
- self.down_(e), # e -> d1 and d2, unmerge branches
- # (UPDATE e->d1, INSERT d2)
- self.down_(d1),
- self.down_(c1),
- self.down_(d2),
- self.down_(c2), # c2->b, delete branch
+ self.down_(self.f),
+ self.down_(self.e), # e -> d1 and d2, unmerge branches
+ # (UPDATE e->d1, INSERT d2)
+ self.down_(self.d1),
+ self.down_(self.c1),
+ self.down_(self.d2),
+ self.down_(self.c2), # c2->b, delete branch
]
)