summaryrefslogtreecommitdiff
path: root/migrate/tests/changeset/test_constraint.py
blob: 6421206a293cb88f4ef7fe8c8c624385aeb69e45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sqlalchemy import *
from sqlalchemy.util import *
from sqlalchemy.exc import *

from migrate.exceptions import *
from migrate.changeset import *

from migrate.tests import fixture


class CommonTestConstraint(fixture.DB):
    """helper functions to test constraints.

    we just create a fresh new table and make sure everything is
    as required.
    """

    def _setup(self, url):
        super(CommonTestConstraint, self)._setup(url)
        self._create_table()

    def _teardown(self):
        if hasattr(self, 'table') and self.engine.has_table(self.table.name):
            self.table.drop()
        super(CommonTestConstraint, self)._teardown()

    def _create_table(self):
        self._connect(self.url)
        self.meta = MetaData(self.engine)
        self.tablename = 'mytable'
        self.table = Table(self.tablename, self.meta,
            Column(u'id', Integer, nullable=False),
            Column(u'fkey', Integer, nullable=False),
            mysql_engine='InnoDB')
        if self.engine.has_table(self.table.name):
            self.table.drop()
        self.table.create()

        # make sure we start at zero
        self.assertEqual(len(self.table.primary_key), 0)
        self.assertTrue(isinstance(self.table.primary_key,
            schema.PrimaryKeyConstraint), self.table.primary_key.__class__)


class TestConstraint(CommonTestConstraint):
    level = fixture.DB.CONNECT

    def _define_pk(self, *cols):
        # Add a pk by creating a PK constraint
        if (self.engine.name in ('oracle', 'firebird')):
            # Can't drop Oracle PKs without an explicit name
            pk = PrimaryKeyConstraint(table=self.table, name='temp_pk_key', *cols)
        else:
            pk = PrimaryKeyConstraint(table=self.table, *cols)
        self.compare_columns_equal(pk.columns, cols)
        pk.create()
        self.refresh_table()
        if not self.url.startswith('sqlite'):
            self.compare_columns_equal(self.table.primary_key, cols, ['type', 'autoincrement'])

        # Drop the PK constraint
        #if (self.engine.name in ('oracle', 'firebird')):
        #    # Apparently Oracle PK names aren't introspected
        #    pk.name = self.table.primary_key.name
        pk.drop()
        self.refresh_table()
        self.assertEqual(len(self.table.primary_key), 0)
        self.assertTrue(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint))
        return pk

    @fixture.usedb(not_supported='sqlite')
    def test_define_fk(self):
        """FK constraints can be defined, created, and dropped"""
        # FK target must be unique
        pk = PrimaryKeyConstraint(self.table.c.id, table=self.table, name="pkid")
        pk.create()

        # Add a FK by creating a FK constraint
        if SQLA_07:
            self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
        else:
            self.assertEqual(self.table.c.fkey.foreign_keys._list, [])
        fk = ForeignKeyConstraint([self.table.c.fkey],
                                  [self.table.c.id],
                                  name="fk_id_fkey",
                                  ondelete="CASCADE")
        if SQLA_07:
            self.assertTrue(list(self.table.c.fkey.foreign_keys) is not [])
        else:
            self.assertTrue(self.table.c.fkey.foreign_keys._list is not [])
        for key in fk.columns:
            self.assertEqual(key, self.table.c.fkey.name)
        self.assertEqual([e.column for e in fk.elements], [self.table.c.id])
        self.assertEqual(list(fk.referenced), [self.table.c.id])

        if self.url.startswith('mysql'):
            # MySQL FKs need an index
            index = Index('index_name', self.table.c.fkey)
            index.create()
        fk.create()

        # test for ondelete/onupdate
        if SQLA_07:
            fkey = list(self.table.c.fkey.foreign_keys)[0]
        else:
            fkey = self.table.c.fkey.foreign_keys._list[0]
        self.assertEqual(fkey.ondelete, "CASCADE")
        # TODO: test on real db if it was set

        self.refresh_table()
        if SQLA_07:
            self.assertTrue(list(self.table.c.fkey.foreign_keys) is not [])
        else:
            self.assertTrue(self.table.c.fkey.foreign_keys._list is not [])

        fk.drop()
        self.refresh_table()
        if SQLA_07:
            self.assertEqual(list(self.table.c.fkey.foreign_keys), [])
        else:
            self.assertEqual(self.table.c.fkey.foreign_keys._list, [])

    @fixture.usedb()
    def test_define_pk(self):
        """PK constraints can be defined, created, and dropped"""
        self._define_pk(self.table.c.fkey)

    @fixture.usedb()
    def test_define_pk_multi(self):
        """Multicolumn PK constraints can be defined, created, and dropped"""
        self._define_pk(self.table.c.id, self.table.c.fkey)

    @fixture.usedb(not_supported=['firebird'])
    def test_drop_cascade(self):
        """Drop constraint cascaded"""
        pk = PrimaryKeyConstraint('fkey', table=self.table, name="id_pkey")
        pk.create()
        self.refresh_table()

        # Drop the PK constraint forcing cascade
        pk.drop(cascade=True)

        # TODO: add real assertion if it was added

    @fixture.usedb(supported=['mysql'])
    def test_fail_mysql_check_constraints(self):
        """Check constraints raise NotSupported for mysql on drop"""
        cons = CheckConstraint('id > 3', name="id_check", table=self.table)
        cons.create()
        self.refresh_table()

        try:
            cons.drop()
        except NotSupportedError:
            pass
        else:
            self.fail()

    @fixture.usedb(not_supported=['sqlite', 'mysql'])
    def test_named_check_constraints(self):
        """Check constraints can be defined, created, and dropped"""
        self.assertRaises(InvalidConstraintError, CheckConstraint, 'id > 3')
        cons = CheckConstraint('id > 3', name="id_check", table=self.table)
        cons.create()
        self.refresh_table()

        self.table.insert(values={'id': 4, 'fkey': 1}).execute()
        try:
            self.table.insert(values={'id': 1, 'fkey': 1}).execute()
        except (IntegrityError, ProgrammingError):
            pass
        else:
            self.fail()

        # Remove the name, drop the constraint; it should succeed
        cons.drop()
        self.refresh_table()
        self.table.insert(values={'id': 2, 'fkey': 2}).execute()
        self.table.insert(values={'id': 1, 'fkey': 2}).execute()


class TestAutoname(CommonTestConstraint):
    """Every method tests for a type of constraint wether it can autoname
    itself and if you can pass object instance and names to classes.
    """
    level = fixture.DB.CONNECT

    @fixture.usedb(not_supported=['oracle', 'firebird'])
    def test_autoname_pk(self):
        """PrimaryKeyConstraints can guess their name if None is given"""
        # Don't supply a name; it should create one
        cons = PrimaryKeyConstraint(self.table.c.id)
        cons.create()
        self.refresh_table()
        if not self.url.startswith('sqlite'):
            # TODO: test for index for sqlite
            self.compare_columns_equal(cons.columns, self.table.primary_key, ['autoincrement', 'type'])

        # Remove the name, drop the constraint; it should succeed
        cons.name = None
        cons.drop()
        self.refresh_table()
        self.assertEqual(list(), list(self.table.primary_key))

        # test string names
        cons = PrimaryKeyConstraint('id', table=self.table)
        cons.create()
        self.refresh_table()
        if not self.url.startswith('sqlite'):
            # TODO: test for index for sqlite
            self.compare_columns_equal(cons.columns, self.table.primary_key)
        cons.name = None
        cons.drop()

    @fixture.usedb(not_supported=['oracle', 'sqlite', 'firebird'])
    def test_autoname_fk(self):
        """ForeignKeyConstraints can guess their name if None is given"""
        cons = PrimaryKeyConstraint(self.table.c.id)
        cons.create()

        cons = ForeignKeyConstraint([self.table.c.fkey], [self.table.c.id])
        cons.create()
        self.refresh_table()
        if SQLA_07:
            list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
        else:
            self.table.c.fkey.foreign_keys[0].column is self.table.c.id

        # Remove the name, drop the constraint; it should succeed
        cons.name = None
        cons.drop()
        self.refresh_table()
        if SQLA_07:
            self.assertEqual(list(self.table.c.fkey.foreign_keys), list())
        else:
            self.assertEqual(self.table.c.fkey.foreign_keys._list, list())

        # test string names
        cons = ForeignKeyConstraint(['fkey'], ['%s.id' % self.tablename], table=self.table)
        cons.create()
        self.refresh_table()
        if SQLA_07:
            list(self.table.c.fkey.foreign_keys)[0].column is self.table.c.id
        else:
            self.table.c.fkey.foreign_keys[0].column is self.table.c.id

        # Remove the name, drop the constraint; it should succeed
        cons.name = None
        cons.drop()

    @fixture.usedb(not_supported=['oracle', 'sqlite', 'mysql'])
    def test_autoname_check(self):
        """CheckConstraints can guess their name if None is given"""
        cons = CheckConstraint('id > 3', columns=[self.table.c.id])
        cons.create()
        self.refresh_table()

        if not self.engine.name == 'mysql':
            self.table.insert(values={'id': 4, 'fkey': 1}).execute()
            try:
                self.table.insert(values={'id': 1, 'fkey': 2}).execute()
            except (IntegrityError, ProgrammingError):
                pass
            else:
                self.fail()

        # Remove the name, drop the constraint; it should succeed
        cons.name = None
        cons.drop()
        self.refresh_table()
        self.table.insert(values={'id': 2, 'fkey': 2}).execute()
        self.table.insert(values={'id': 1, 'fkey': 3}).execute()

    @fixture.usedb(not_supported=['oracle'])
    def test_autoname_unique(self):
        """UniqueConstraints can guess their name if None is given"""
        cons = UniqueConstraint(self.table.c.fkey)
        cons.create()
        self.refresh_table()

        self.table.insert(values={'fkey': 4, 'id': 1}).execute()
        try:
            self.table.insert(values={'fkey': 4, 'id': 2}).execute()
        except (sqlalchemy.exc.IntegrityError,
                sqlalchemy.exc.ProgrammingError):
            pass
        else:
            self.fail()

        # Remove the name, drop the constraint; it should succeed
        cons.name = None
        cons.drop()
        self.refresh_table()
        self.table.insert(values={'fkey': 4, 'id': 2}).execute()
        self.table.insert(values={'fkey': 4, 'id': 1}).execute()