diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
| commit | 8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch) | |
| tree | ae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/sql | |
| parent | 7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff) | |
| download | sqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz | |
merge 0.6 series to trunk.
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_constraints.py | 287 | ||||
| -rw-r--r-- | test/sql/test_defaults.py | 111 | ||||
| -rw-r--r-- | test/sql/test_functions.py | 38 | ||||
| -rw-r--r-- | test/sql/test_labels.py | 15 | ||||
| -rw-r--r-- | test/sql/test_query.py | 331 | ||||
| -rw-r--r-- | test/sql/test_quote.py | 2 | ||||
| -rw-r--r-- | test/sql/test_returning.py | 159 | ||||
| -rw-r--r-- | test/sql/test_select.py | 64 | ||||
| -rw-r--r-- | test/sql/test_selectable.py | 3 | ||||
| -rw-r--r-- | test/sql/test_types.py | 432 | ||||
| -rw-r--r-- | test/sql/test_unicode.py | 5 |
11 files changed, 827 insertions, 620 deletions
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py index 8abeb3533..4ad52604d 100644 --- a/test/sql/test_constraints.py +++ b/test/sql/test_constraints.py @@ -1,10 +1,14 @@ -from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message +from sqlalchemy.test.testing import assert_raises, assert_raises_message from sqlalchemy import * -from sqlalchemy import exc +from sqlalchemy import exc, schema from sqlalchemy.test import * from sqlalchemy.test import config, engines +from sqlalchemy.engine import ddl +from sqlalchemy.test.testing import eq_ +from sqlalchemy.test.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL +from sqlalchemy.dialects.postgresql import base as postgresql -class ConstraintTest(TestBase, AssertsExecutionResults): +class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): def setup(self): global metadata @@ -33,11 +37,8 @@ class ConstraintTest(TestBase, AssertsExecutionResults): def test_double_fk_usage_raises(self): f = ForeignKey('b.id') - assert_raises(exc.InvalidRequestError, Table, "a", metadata, - Column('x', Integer, f), - Column('y', Integer, f) - ) - + Column('x', Integer, f) + assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) def test_circular_constraint(self): a = Table("a", metadata, @@ -78,18 +79,9 @@ class ConstraintTest(TestBase, AssertsExecutionResults): metadata.create_all() foo.insert().execute(id=1,x=9,y=5) - try: - foo.insert().execute(id=2,x=5,y=9) - assert False - except exc.SQLError: - assert True - + assert_raises(exc.SQLError, foo.insert().execute, id=2,x=5,y=9) bar.insert().execute(id=1,x=10) - try: - bar.insert().execute(id=2,x=5) - assert False - except exc.SQLError: - assert True + assert_raises(exc.SQLError, bar.insert().execute, id=2,x=5) def test_unique_constraint(self): foo = Table('foo', metadata, @@ -106,16 +98,8 @@ class ConstraintTest(TestBase, AssertsExecutionResults): foo.insert().execute(id=2, value='value2') bar.insert().execute(id=1, value='a', value2='a') bar.insert().execute(id=2, value='a', value2='b') - try: - foo.insert().execute(id=3, value='value1') - assert False - except exc.SQLError: - assert True - try: - bar.insert().execute(id=3, value='a', value2='b') - assert False - except exc.SQLError: - assert True + assert_raises(exc.SQLError, foo.insert().execute, id=3, value='value1') + assert_raises(exc.SQLError, bar.insert().execute, id=3, value='a', value2='b') def test_index_create(self): employees = Table('employees', metadata, @@ -174,35 +158,22 @@ class ConstraintTest(TestBase, AssertsExecutionResults): Index('sport_announcer', events.c.sport, events.c.announcer, unique=True) Index('idx_winners', events.c.winner) - index_names = [ ix.name for ix in events.indexes ] - assert 'ix_events_name' in index_names - assert 'ix_events_location' in index_names - assert 'sport_announcer' in index_names - assert 'idx_winners' in index_names - assert len(index_names) == 4 - - capt = [] - connection = testing.db.connect() - # TODO: hacky, put a real connection proxy in - ex = connection._Connection__execute_context - def proxy(context): - capt.append(context.statement) - capt.append(repr(context.parameters)) - ex(context) - connection._Connection__execute_context = proxy - schemagen = testing.db.dialect.schemagenerator(testing.db.dialect, connection) - schemagen.traverse(events) - - assert capt[0].strip().startswith('CREATE TABLE events') - - s = set([capt[x].strip() for x in [2,4,6,8]]) - - assert s == set([ - 'CREATE UNIQUE INDEX ix_events_name ON events (name)', - 'CREATE INDEX ix_events_location ON events (location)', - 'CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)', - 'CREATE INDEX idx_winners ON events (winner)' - ]) + eq_( + set([ ix.name for ix in events.indexes ]), + set(['ix_events_name', 'ix_events_location', 'sport_announcer', 'idx_winners']) + ) + + self.assert_sql_execution( + testing.db, + lambda: events.create(testing.db), + RegexSQL("^CREATE TABLE events"), + AllOf( + ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events (name)'), + ExactSQL('CREATE INDEX ix_events_location ON events (location)'), + ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'), + ExactSQL('CREATE INDEX idx_winners ON events (winner)') + ) + ) # verify that the table is functional events.insert().execute(id=1, name='hockey finals', location='rink', @@ -214,84 +185,57 @@ class ConstraintTest(TestBase, AssertsExecutionResults): dialect = testing.db.dialect.__class__() dialect.max_identifier_length = 20 - schemagen = dialect.schemagenerator(dialect, None) - schemagen.execute = lambda : None - t1 = Table("sometable", MetaData(), Column("foo", Integer)) - schemagen.visit_index(Index("this_name_is_too_long_for_what_were_doing", t1.c.foo)) - eq_(schemagen.buffer.getvalue(), "CREATE INDEX this_name_is_t_1 ON sometable (foo)") - schemagen.buffer.truncate(0) - schemagen.visit_index(Index("this_other_name_is_too_long_for_what_were_doing", t1.c.foo)) - eq_(schemagen.buffer.getvalue(), "CREATE INDEX this_other_nam_2 ON sometable (foo)") - - schemadrop = dialect.schemadropper(dialect, None) - schemadrop.execute = lambda: None - assert_raises(exc.IdentifierError, schemadrop.visit_index, Index("this_name_is_too_long_for_what_were_doing", t1.c.foo)) + self.assert_compile( + schema.CreateIndex(Index("this_name_is_too_long_for_what_were_doing", t1.c.foo)), + "CREATE INDEX this_name_is_t_1 ON sometable (foo)", + dialect=dialect + ) + + self.assert_compile( + schema.CreateIndex(Index("this_other_name_is_too_long_for_what_were_doing", t1.c.foo)), + "CREATE INDEX this_other_nam_1 ON sometable (foo)", + dialect=dialect + ) -class ConstraintCompilationTest(TestBase, AssertsExecutionResults): - class accum(object): - def __init__(self): - self.statements = [] - def __call__(self, sql, *a, **kw): - self.statements.append(sql) - def __contains__(self, substring): - for s in self.statements: - if substring in s: - return True - return False - def __str__(self): - return '\n'.join([repr(x) for x in self.statements]) - def clear(self): - del self.statements[:] - - def setup(self): - self.sql = self.accum() - opts = config.db_opts.copy() - opts['strategy'] = 'mock' - opts['executor'] = self.sql - self.engine = engines.testing_engine(options=opts) - +class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): def _test_deferrable(self, constraint_factory): - meta = MetaData(self.engine) - t = Table('tbl', meta, + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer), constraint_factory(deferrable=True)) - t.create() - assert 'DEFERRABLE' in self.sql, self.sql - assert 'NOT DEFERRABLE' not in self.sql, self.sql - self.sql.clear() - meta.clear() - - t = Table('tbl', meta, + + sql = str(schema.CreateTable(t).compile(bind=testing.db)) + assert 'DEFERRABLE' in sql, sql + assert 'NOT DEFERRABLE' not in sql, sql + + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer), constraint_factory(deferrable=False)) - t.create() - assert 'NOT DEFERRABLE' in self.sql - self.sql.clear() - meta.clear() - t = Table('tbl', meta, + sql = str(schema.CreateTable(t).compile(bind=testing.db)) + assert 'NOT DEFERRABLE' in sql + + + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer), constraint_factory(deferrable=True, initially='IMMEDIATE')) - t.create() - assert 'NOT DEFERRABLE' not in self.sql - assert 'INITIALLY IMMEDIATE' in self.sql - self.sql.clear() - meta.clear() + sql = str(schema.CreateTable(t).compile(bind=testing.db)) + assert 'NOT DEFERRABLE' not in sql + assert 'INITIALLY IMMEDIATE' in sql - t = Table('tbl', meta, + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer), constraint_factory(deferrable=True, initially='DEFERRED')) - t.create() + sql = str(schema.CreateTable(t).compile(bind=testing.db)) - assert 'NOT DEFERRABLE' not in self.sql - assert 'INITIALLY DEFERRED' in self.sql, self.sql + assert 'NOT DEFERRABLE' not in sql + assert 'INITIALLY DEFERRED' in sql def test_deferrable_pk(self): factory = lambda **kw: PrimaryKeyConstraint('a', **kw) @@ -302,15 +246,16 @@ class ConstraintCompilationTest(TestBase, AssertsExecutionResults): self._test_deferrable(factory) def test_deferrable_column_fk(self): - meta = MetaData(self.engine) - t = Table('tbl', meta, + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer, ForeignKey('tbl.a', deferrable=True, initially='DEFERRED'))) - t.create() - assert 'DEFERRABLE' in self.sql, self.sql - assert 'INITIALLY DEFERRED' in self.sql, self.sql + + self.assert_compile( + schema.CreateTable(t), + "CREATE TABLE tbl (a INTEGER, b INTEGER, FOREIGN KEY(b) REFERENCES tbl (a) DEFERRABLE INITIALLY DEFERRED)", + ) def test_deferrable_unique(self): factory = lambda **kw: UniqueConstraint('b', **kw) @@ -321,15 +266,105 @@ class ConstraintCompilationTest(TestBase, AssertsExecutionResults): self._test_deferrable(factory) def test_deferrable_column_check(self): - meta = MetaData(self.engine) - t = Table('tbl', meta, + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer, CheckConstraint('a < b', deferrable=True, initially='DEFERRED'))) - t.create() - assert 'DEFERRABLE' in self.sql, self.sql - assert 'INITIALLY DEFERRED' in self.sql, self.sql + + self.assert_compile( + schema.CreateTable(t), + "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)" + ) + + def test_use_alter(self): + m = MetaData() + t = Table('t', m, + Column('a', Integer), + ) + + t2 = Table('t2', m, + Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')), + Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ... + ) + + e = engines.mock_engine(dialect_name='postgresql') + m.create_all(e) + m.drop_all(e) + + e.assert_sql([ + 'CREATE TABLE t (a INTEGER)', + 'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb FOREIGN KEY(b) REFERENCES t (a))', + 'ALTER TABLE t2 ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)', + 'ALTER TABLE t2 DROP CONSTRAINT fk_ta', + 'DROP TABLE t2', + 'DROP TABLE t' + ]) + + + def test_add_drop_constraint(self): + m = MetaData() + + t = Table('tbl', m, + Column('a', Integer), + Column('b', Integer) + ) + + t2 = Table('t2', m, + Column('a', Integer), + Column('b', Integer) + ) + + constraint = CheckConstraint('a < b',name="my_test_constraint", deferrable=True,initially='DEFERRED', table=t) + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE tbl ADD CONSTRAINT my_test_constraint CHECK (a < b) DEFERRABLE INITIALLY DEFERRED" + ) + + self.assert_compile( + schema.DropConstraint(constraint), + "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint" + ) + + self.assert_compile( + schema.DropConstraint(constraint, cascade=True), + "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint CASCADE" + ) + constraint = ForeignKeyConstraint(["b"], ["t2.a"]) + t.append_constraint(constraint) + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE tbl ADD FOREIGN KEY(b) REFERENCES t2 (a)" + ) + constraint = ForeignKeyConstraint([t.c.a], [t2.c.b]) + t.append_constraint(constraint) + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE tbl ADD FOREIGN KEY(a) REFERENCES t2 (b)" + ) + + constraint = UniqueConstraint("a", "b", name="uq_cst") + t2.append_constraint(constraint) + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)" + ) + + constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2") + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)" + ) + + assert t.c.a.primary_key is False + constraint = PrimaryKeyConstraint(t.c.a) + assert t.c.a.primary_key is True + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE tbl ADD PRIMARY KEY (a)" + ) + + diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 964157466..5638dad77 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -3,7 +3,7 @@ import datetime from sqlalchemy import Sequence, Column, func from sqlalchemy.sql import select, text import sqlalchemy as sa -from sqlalchemy.test import testing +from sqlalchemy.test import testing, engines from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean from sqlalchemy.test.schema import Table from sqlalchemy.test.testing import eq_ @@ -37,7 +37,7 @@ class DefaultTest(testing.TestBase): # since its a "branched" connection conn.close() - use_function_defaults = testing.against('postgres', 'mssql', 'maxdb') + use_function_defaults = testing.against('postgresql', 'mssql', 'maxdb') is_oracle = testing.against('oracle') # select "count(1)" returns different results on different DBs also @@ -146,7 +146,7 @@ class DefaultTest(testing.TestBase): assert_raises_message(sa.exc.ArgumentError, ex_msg, sa.ColumnDefault, fn) - + def test_arg_signature(self): def fn1(): pass def fn2(): pass @@ -276,7 +276,7 @@ class DefaultTest(testing.TestBase): assert r.lastrow_has_defaults() eq_(set(r.context.postfetch_cols), set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) - + eq_(t.select(t.c.col1==54).execute().fetchall(), [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)]) @@ -284,7 +284,7 @@ class DefaultTest(testing.TestBase): @testing.fails_on('firebird', 'Data type unknown') def test_insertmany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( - if (testing.against('mysql') and + if (testing.against('mysql') and not testing.against('+zxjdbc') and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): return @@ -304,12 +304,12 @@ class DefaultTest(testing.TestBase): def test_insert_values(self): t.insert(values={'col3':50}).execute() l = t.select().execute() - eq_(50, l.fetchone()['col3']) + eq_(50, l.first()['col3']) @testing.fails_on('firebird', 'Data type unknown') def test_updatemany(self): # MySQL-Python 1.2.2 breaks functions in execute_many :( - if (testing.against('mysql') and + if (testing.against('mysql') and not testing.against('+zxjdbc') and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)): return @@ -337,11 +337,11 @@ class DefaultTest(testing.TestBase): @testing.fails_on('firebird', 'Data type unknown') def test_update(self): r = t.insert().execute() - pk = r.last_inserted_ids()[0] + pk = r.inserted_primary_key[0] t.update(t.c.col1==pk).execute(col4=None, col5=None) ctexec = currenttime.scalar() l = t.select(t.c.col1==pk).execute() - l = l.fetchone() + l = l.first() eq_(l, (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py')) @@ -350,43 +350,12 @@ class DefaultTest(testing.TestBase): @testing.fails_on('firebird', 'Data type unknown') def test_update_values(self): r = t.insert().execute() - pk = r.last_inserted_ids()[0] + pk = r.inserted_primary_key[0] t.update(t.c.col1==pk, values={'col3': 55}).execute() l = t.select(t.c.col1==pk).execute() - l = l.fetchone() + l = l.first() eq_(55, l['col3']) - @testing.fails_on_everything_except('postgres') - def test_passive_override(self): - """ - Primarily for postgres, tests that when we get a primary key column - back from reflecting a table which has a default value on it, we - pre-execute that DefaultClause upon insert, even though DefaultClause - says "let the database execute this", because in postgres we must have - all the primary key values in memory before insert; otherwise we can't - locate the just inserted row. - - """ - # TODO: move this to dialect/postgres - try: - meta = MetaData(testing.db) - testing.db.execute(""" - CREATE TABLE speedy_users - ( - speedy_user_id SERIAL PRIMARY KEY, - - user_name VARCHAR NOT NULL, - user_password VARCHAR NOT NULL - ); - """, None) - - t = Table("speedy_users", meta, autoload=True) - t.insert().execute(user_name='user', user_password='lala') - l = t.select().execute().fetchall() - eq_(l, [(1, 'user', 'lala')]) - finally: - testing.db.execute("drop table speedy_users", None) - class PKDefaultTest(_base.TablesTest): __requires__ = ('subqueries',) @@ -400,18 +369,27 @@ class PKDefaultTest(_base.TablesTest): Column('id', Integer, primary_key=True, default=sa.select([func.max(t2.c.nextid)]).as_scalar()), Column('data', String(30))) - - @testing.fails_on('mssql', 'FIXME: unknown') + + @testing.requires.returning + def test_with_implicit_returning(self): + self._test(True) + + def test_regular(self): + self._test(False) + @testing.resolve_artifact_names - def test_basic(self): - t2.insert().execute(nextid=1) - r = t1.insert().execute(data='hi') - eq_([1], r.last_inserted_ids()) - - t2.insert().execute(nextid=2) - r = t1.insert().execute(data='there') - eq_([2], r.last_inserted_ids()) + def _test(self, returning): + if not returning and not testing.db.dialect.implicit_returning: + engine = testing.db + else: + engine = engines.testing_engine(options={'implicit_returning':returning}) + engine.execute(t2.insert(), nextid=1) + r = engine.execute(t1.insert(), data='hi') + eq_([1], r.inserted_primary_key) + engine.execute(t2.insert(), nextid=2) + r = engine.execute(t1.insert(), data='there') + eq_([2], r.inserted_primary_key) class PKIncrementTest(_base.TablesTest): run_define_tables = 'each' @@ -430,29 +408,31 @@ class PKIncrementTest(_base.TablesTest): def _test_autoincrement(self, bind): ids = set() rs = bind.execute(aitable.insert(), int1=1) - last = rs.last_inserted_ids()[0] + last = rs.inserted_primary_key[0] self.assert_(last) self.assert_(last not in ids) ids.add(last) rs = bind.execute(aitable.insert(), str1='row 2') - last = rs.last_inserted_ids()[0] + last = rs.inserted_primary_key[0] self.assert_(last) self.assert_(last not in ids) ids.add(last) rs = bind.execute(aitable.insert(), int1=3, str1='row 3') - last = rs.last_inserted_ids()[0] + last = rs.inserted_primary_key[0] self.assert_(last) self.assert_(last not in ids) ids.add(last) rs = bind.execute(aitable.insert(values={'int1':func.length('four')})) - last = rs.last_inserted_ids()[0] + last = rs.inserted_primary_key[0] self.assert_(last) self.assert_(last not in ids) ids.add(last) + eq_(ids, set([1,2,3,4])) + eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))), [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)]) @@ -510,8 +490,8 @@ class AutoIncrementTest(_base.TablesTest): single.create() r = single.insert().execute() - id_ = r.last_inserted_ids()[0] - assert id_ is not None + id_ = r.inserted_primary_key[0] + eq_(id_, 1) eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar()) def test_autoincrement_fk(self): @@ -522,7 +502,7 @@ class AutoIncrementTest(_base.TablesTest): nodes.create() r = nodes.insert().execute(data='foo') - id_ = r.last_inserted_ids()[0] + id_ = r.inserted_primary_key[0] nodes.insert().execute(data='bar', parent_id=id_) @testing.fails_on('sqlite', 'FIXME: unknown') @@ -535,7 +515,7 @@ class AutoIncrementTest(_base.TablesTest): try: - # postgres + mysql strict will fail on first row, + # postgresql + mysql strict will fail on first row, # mysql in legacy mode fails on second row nonai.insert().execute(data='row 1') nonai.insert().execute(data='row 2') @@ -570,16 +550,17 @@ class SequenceTest(testing.TestBase): def testseqnonpk(self): """test sequences fire off as defaults on non-pk columns""" - result = sometable.insert().execute(name="somename") + engine = engines.testing_engine(options={'implicit_returning':False}) + result = engine.execute(sometable.insert(), name="somename") assert 'id' in result.postfetch_cols() - result = sometable.insert().execute(name="someother") + result = engine.execute(sometable.insert(), name="someother") assert 'id' in result.postfetch_cols() sometable.insert().execute( {'name':'name3'}, {'name':'name4'}) - eq_(sometable.select().execute().fetchall(), + eq_(sometable.select().order_by(sometable.c.id).execute().fetchall(), [(1, "somename", 1), (2, "someother", 2), (3, "name3", 3), @@ -590,8 +571,8 @@ class SequenceTest(testing.TestBase): cartitems.insert().execute(description='there') r = cartitems.insert().execute(description='lala') - assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None - id_ = r.last_inserted_ids()[0] + assert r.inserted_primary_key and r.inserted_primary_key[0] is not None + id_ = r.inserted_primary_key[0] eq_(1, sa.select([func.count(cartitems.c.cart_id)], diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index e9bf49ce3..7a0f12cac 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -24,7 +24,7 @@ class CompileTest(TestBase, AssertsCompiledSQL): bindtemplate = BIND_TEMPLATES[dialect.paramstyle] self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect) self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect) - if isinstance(dialect, firebird.dialect): + if isinstance(dialect, (firebird.dialect, maxdb.dialect, oracle.dialect)): self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect) else: self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect) @@ -50,7 +50,7 @@ class CompileTest(TestBase, AssertsCompiledSQL): for ret, dialect in [ ('CURRENT_TIMESTAMP', sqlite.dialect()), - ('now()', postgres.dialect()), + ('now()', postgresql.dialect()), ('now()', mysql.dialect()), ('CURRENT_TIMESTAMP', oracle.dialect()) ]: @@ -62,9 +62,9 @@ class CompileTest(TestBase, AssertsCompiledSQL): for ret, dialect in [ ('random()', sqlite.dialect()), - ('random()', postgres.dialect()), + ('random()', postgresql.dialect()), ('rand()', mysql.dialect()), - ('random()', oracle.dialect()) + ('random', oracle.dialect()) ]: self.assert_compile(func.random(), ret, dialect=dialect) @@ -180,7 +180,10 @@ class CompileTest(TestBase, AssertsCompiledSQL): class ExecuteTest(TestBase): - + @engines.close_first + def tearDown(self): + pass + def test_standalone_execute(self): x = testing.db.func.current_date().execute().scalar() y = testing.db.func.current_date().select().execute().scalar() @@ -202,6 +205,7 @@ class ExecuteTest(TestBase): conn.close() assert (x == y == z) is True + @engines.close_first def test_update(self): """ Tests sending functions and SQL expressions to the VALUES and SET @@ -222,15 +226,15 @@ class ExecuteTest(TestBase): meta.create_all() try: t.insert(values=dict(value=func.length("one"))).execute() - assert t.select().execute().fetchone()['value'] == 3 + assert t.select().execute().first()['value'] == 3 t.update(values=dict(value=func.length("asfda"))).execute() - assert t.select().execute().fetchone()['value'] == 5 + assert t.select().execute().first()['value'] == 5 r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute() - id = r.last_inserted_ids()[0] - assert t.select(t.c.id==id).execute().fetchone()['value'] == 9 + id = r.inserted_primary_key[0] + assert t.select(t.c.id==id).execute().first()['value'] == 9 t.update(values={t.c.value:func.length("asdf")}).execute() - assert t.select().execute().fetchone()['value'] == 4 + assert t.select().execute().first()['value'] == 4 print "--------------------------" t2.insert().execute() t2.insert(values=dict(value=func.length("one"))).execute() @@ -245,18 +249,18 @@ class ExecuteTest(TestBase): t2.delete().execute() t2.insert(values=dict(value=func.length("one") + 8)).execute() - assert t2.select().execute().fetchone()['value'] == 11 + assert t2.select().execute().first()['value'] == 11 t2.update(values=dict(value=func.length("asfda"))).execute() - assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff") + assert select([t2.c.value, t2.c.stuff]).execute().first() == (5, "thisisstuff") t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute() - print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone() - assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo") + print "HI", select([t2.c.value, t2.c.stuff]).execute().first() + assert select([t2.c.value, t2.c.stuff]).execute().first() == (9, "foo") finally: meta.drop_all() - @testing.fails_on_everything_except('postgres') + @testing.fails_on_everything_except('postgresql') def test_as_from(self): # TODO: shouldnt this work on oracle too ? x = testing.db.func.current_date().execute().scalar() @@ -266,7 +270,7 @@ class ExecuteTest(TestBase): # construct a column-based FROM object out of a function, like in [ticket:172] s = select([sql.column('date', type_=DateTime)], from_obj=[testing.db.func.current_date()]) - q = s.execute().fetchone()[s.c.date] + q = s.execute().first()[s.c.date] r = s.alias('datequery').select().scalar() assert x == y == z == w == q == r @@ -301,7 +305,7 @@ class ExecuteTest(TestBase): 'd': datetime.date(2010, 5, 1) }) rs = select([extract('year', table.c.dt), extract('month', table.c.d)]).execute() - row = rs.fetchone() + row = rs.first() assert row[0] == 2010 assert row[1] == 5 rs.close() diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py index b946b0ae9..bcac7c01d 100644 --- a/test/sql/test_labels.py +++ b/test/sql/test_labels.py @@ -35,6 +35,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): maxlen = testing.db.dialect.max_identifier_length testing.db.dialect.max_identifier_length = IDENT_LENGTH + @engines.close_first def teardown(self): table1.delete().execute() @@ -92,10 +93,16 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): ], repr(result) def test_table_alias_names(self): - self.assert_compile( - table2.alias().select(), - "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1" - ) + if testing.against('oracle'): + self.assert_compile( + table2.alias().select(), + "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs table_with_exactly_29_c_1" + ) + else: + self.assert_compile( + table2.alias().select(), + "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1" + ) ta = table2.alias() dialect = default.DefaultDialect() diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 51b933e45..0e3b9dff2 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -1,9 +1,11 @@ +from sqlalchemy.test.testing import eq_ import datetime from sqlalchemy import * from sqlalchemy import exc, sql from sqlalchemy.engine import default from sqlalchemy.test import * -from sqlalchemy.test.testing import eq_ +from sqlalchemy.test.testing import eq_, assert_raises_message +from sqlalchemy.test.schema import Table, Column class QueryTest(TestBase): @@ -12,11 +14,11 @@ class QueryTest(TestBase): global users, users2, addresses, metadata metadata = MetaData(testing.db) users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), + Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True), + Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30))) @@ -26,7 +28,8 @@ class QueryTest(TestBase): ) metadata.create_all() - def tearDown(self): + @engines.close_first + def teardown(self): addresses.delete().execute() users.delete().execute() users2.delete().execute() @@ -52,89 +55,133 @@ class QueryTest(TestBase): assert users.count().scalar() == 1 users.update(users.c.user_id == 7).execute(user_name = 'fred') - assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred' + assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred' def test_lastrow_accessor(self): - """Tests the last_inserted_ids() and lastrow_has_id() functions.""" + """Tests the inserted_primary_key and lastrow_has_id() functions.""" - def insert_values(table, values): + def insert_values(engine, table, values): """ Inserts a row into a table, returns the full list of values INSERTed including defaults that fired off on the DB side and detects rows that had defaults and post-fetches. """ - result = table.insert().execute(**values) + result = engine.execute(table.insert(), **values) ret = values.copy() - for col, id in zip(table.primary_key, result.last_inserted_ids()): + for col, id in zip(table.primary_key, result.inserted_primary_key): ret[col.key] = id if result.lastrow_has_defaults(): - criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())]) - row = table.select(criterion).execute().fetchone() + criterion = and_(*[col==id for col, id in zip(table.primary_key, result.inserted_primary_key)]) + row = engine.execute(table.select(criterion)).first() for c in table.c: ret[c.key] = row[c] return ret - for supported, table, values, assertvalues in [ - ( - {'unsupported':['sqlite']}, - Table("t1", metadata, - Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True)), - {'foo':'hi'}, - {'id':1, 'foo':'hi'} - ), - ( - {'unsupported':['sqlite']}, - Table("t2", metadata, - Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') + if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): + test_engines = [ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + else: + test_engines = [testing.db] + + for engine in test_engines: + metadata = MetaData() + for supported, table, values, assertvalues in [ + ( + {'unsupported':['sqlite']}, + Table("t1", metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True)), + {'foo':'hi'}, + {'id':1, 'foo':'hi'} ), - {'foo':'hi'}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t3", metadata, - Column("id", String(40), primary_key=True), - Column('foo', String(30), primary_key=True), - Column("bar", String(30)) + ( + {'unsupported':['sqlite']}, + Table("t2", metadata, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') ), - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} - ), - ( - {'unsupported':[]}, - Table("t4", metadata, - Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), - Column('foo', String(30), primary_key=True), - Column('bar', String(30), server_default='hi') + {'foo':'hi'}, + {'id':1, 'foo':'hi', 'bar':'hi'} ), - {'foo':'hi', 'id':1}, - {'id':1, 'foo':'hi', 'bar':'hi'} - ), - ( - {'unsupported':[]}, - Table("t5", metadata, - Column('id', String(10), primary_key=True), - Column('bar', String(30), server_default='hi') + ( + {'unsupported':[]}, + Table("t3", metadata, + Column("id", String(40), primary_key=True), + Column('foo', String(30), primary_key=True), + Column("bar", String(30)) + ), + {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, + {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} ), - {'id':'id1'}, - {'id':'id1', 'bar':'hi'}, - ), - ]: - if testing.db.name in supported['unsupported']: - continue - try: - table.create() - i = insert_values(table, values) - assert i == assertvalues, repr(i) + " " + repr(assertvalues) - finally: - table.drop() + ( + {'unsupported':[]}, + Table("t4", metadata, + Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), + Column('foo', String(30), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'foo':'hi', 'id':1}, + {'id':1, 'foo':'hi', 'bar':'hi'} + ), + ( + {'unsupported':[]}, + Table("t5", metadata, + Column('id', String(10), primary_key=True), + Column('bar', String(30), server_default='hi') + ), + {'id':'id1'}, + {'id':'id1', 'bar':'hi'}, + ), + ]: + if testing.db.name in supported['unsupported']: + continue + try: + table.create(bind=engine, checkfirst=True) + i = insert_values(engine, table, values) + assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues)) + finally: + table.drop(bind=engine) + + @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks") + def test_misordered_lastrow(self): + related = Table('related', metadata, + Column('id', Integer, primary_key=True) + ) + t6 = Table("t6", metadata, + Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True), + Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True), + ) + metadata.create_all() + r = related.insert().values(id=12).execute() + id = r.inserted_primary_key[0] + assert id==12 + + r = t6.insert().values(manual_id=id).execute() + eq_(r.inserted_primary_key, [12, 1]) + + def test_autoclose_on_insert(self): + if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): + test_engines = [ + engines.testing_engine(options={'implicit_returning':False}), + engines.testing_engine(options={'implicit_returning':True}), + ] + else: + test_engines = [testing.db] + + for engine in test_engines: + + r = engine.execute(users.insert(), + {'user_name':'jack'}, + ) + assert r.closed + def test_row_iteration(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, @@ -147,7 +194,7 @@ class QueryTest(TestBase): l.append(row) self.assert_(len(l) == 3) - @testing.fails_on('firebird', 'Data type unknown') + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") @testing.requires.subqueries def test_anonymous_rows(self): users.insert().execute( @@ -161,6 +208,7 @@ class QueryTest(TestBase): assert row['anon_1'] == 8 assert row['anon_2'] == 10 + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): """test that a label within an ORDER BY works on each backend. @@ -179,6 +227,11 @@ class QueryTest(TestBase): select([concat]).order_by(concat).execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] ) + + eq_( + select([concat]).order_by(concat).execute().fetchall(), + [("test: ed",), ("test: fred",), ("test: jack",)] + ) concat = ("test: " + users.c.user_name).label('thedata') eq_( @@ -195,7 +248,7 @@ class QueryTest(TestBase): def test_row_comparison(self): users.insert().execute(user_id = 7, user_name = 'jack') - rp = users.select().execute().fetchone() + rp = users.select().execute().first() self.assert_(rp == rp) self.assert_(not(rp != rp)) @@ -207,8 +260,7 @@ class QueryTest(TestBase): self.assert_(not (rp != equal)) self.assert_(not (equal != equal)) - @testing.fails_on('mssql', 'No support for boolean logic in column select.') - @testing.fails_on('oracle', 'FIXME: unknown') + @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -218,11 +270,11 @@ class QueryTest(TestBase): eq_(testing.db.execute(select([or_(false, false)])).scalar(), False) eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True) - row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone() + row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first() assert row.x == False assert row.y == False - row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone() + row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first() assert row.x == True assert row.y == False @@ -253,6 +305,9 @@ class QueryTest(TestBase): eq_(expr.execute().fetchall(), result) + @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text") + @testing.fails_on("oracle", "neither % nor %% are accepted") + @testing.fails_on("+pg8000", "can't interpret result column from '%%'") @testing.emits_warning('.*now automatically escapes.*') def test_percents_in_text(self): for expr, result in ( @@ -277,7 +332,7 @@ class QueryTest(TestBase): eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) - if testing.against('postgres'): + if testing.against('postgresql'): eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) @@ -373,7 +428,7 @@ class QueryTest(TestBase): s = select([datetable.alias('x').c.today]).as_scalar() s2 = select([datetable.c.id, s.label('somelabel')]) #print s2.c.somelabel.type - assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime) + assert isinstance(s2.execute().first()['somelabel'], datetime.datetime) finally: datetable.drop() @@ -444,45 +499,58 @@ class QueryTest(TestBase): users.insert().execute(user_id=2, user_name='jack') addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') - r = users.select(users.c.user_id==2).execute().fetchone() + r = users.select(users.c.user_id==2).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - - r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone() + + r = text("select * from query_users where user_id=2", bind=testing.db).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - + # test slices - r = text("select * from query_addresses", bind=testing.db).execute().fetchone() + r = text("select * from query_addresses", bind=testing.db).execute().first() self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) - + # test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone() + "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first() self.assert_(r['user_id']) == 1 self.assert_(r['user_name']) == "john" # test using literal tablename.colname - r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone() + r = text('select query_users.user_id AS "query_users.user_id", ' + 'query_users.user_name AS "query_users.user_name" from query_users', + bind=testing.db).execute().first() self.assert_(r['query_users.user_id']) == 1 self.assert_(r['query_users.user_name']) == "john" # unary experssions - r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().fetchone() + r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first() eq_(r[users.c.user_name], 'jack') eq_(r.user_name, 'jack') - r.close() + + def test_result_case_sensitivity(self): + """test name normalization for result sets.""" + row = testing.db.execute( + select([ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive") + ]) + ).first() + + assert row.keys() == ["case_insensitive", "CaseSensitive"] + def test_row_as_args(self): users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id==1).execute().fetchone() + r = users.select(users.c.user_id==1).execute().first() users.delete().execute() users.insert().execute(r) - assert users.select().execute().fetchall() == [(1, 'john')] - + eq_(users.select().execute().fetchall(), [(1, 'john')]) + def test_result_as_args(self): users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')]) r = users.select().execute() @@ -496,13 +564,12 @@ class QueryTest(TestBase): def test_ambiguous_column(self): users.insert().execute(user_id=1, user_name='john') - r = users.outerjoin(addresses).select().execute().fetchone() - try: - print r['user_id'] - assert False - except exc.InvalidRequestError, e: - assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \ - str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement." + r = users.outerjoin(addresses).select().execute().first() + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) @testing.requires.subqueries def test_column_label_targeting(self): @@ -512,31 +579,29 @@ class QueryTest(TestBase): users.select().alias('foo'), users.select().alias(users.name), ): - row = s.select(use_labels=True).execute().fetchone() + row = s.select(use_labels=True).execute().first() assert row[s.c.user_id] == 7 assert row[s.c.user_name] == 'ed' def test_keys(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_([x.lower() for x in r.keys()], ['user_id', 'user_name']) def test_items(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users.insert().execute(user_id=1, user_name='foo') - r = users.select().execute().fetchone() + r = users.select().execute().first() eq_(len(r), 2) - r.close() - r = testing.db.execute('select user_name, user_id from query_users').fetchone() + + r = testing.db.execute('select user_name, user_id from query_users').first() eq_(len(r), 2) - r.close() - r = testing.db.execute('select user_name from query_users').fetchone() + r = testing.db.execute('select user_name from query_users').first() eq_(len(r), 1) - r.close() def test_cant_execute_join(self): try: @@ -549,7 +614,7 @@ class QueryTest(TestBase): def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id==1).execute().fetchone() + r = users.select(users.c.user_id==1).execute().first() eq_(r[0], 1) eq_(r[1], 'foo') eq_([x.lower() for x in r.keys()], ['user_id', 'user_name']) @@ -558,7 +623,7 @@ class QueryTest(TestBase): def test_column_order_with_text_query(self): # should return values in query order users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users').fetchone() + r = testing.db.execute('select user_name, user_id from query_users').first() eq_(r[0], 'foo') eq_(r[1], 1) eq_([x.lower() for x in r.keys()], ['user_name', 'user_id']) @@ -580,7 +645,7 @@ class QueryTest(TestBase): shadowed.create(checkfirst=True) try: shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone() + r = shadowed.select(shadowed.c.shadow_id==1).execute().first() self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') @@ -622,13 +687,13 @@ class QueryTest(TestBase): # Null values are not outside any set assert len(r) == 0 - u = bindparam('search_key') + @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") + def test_bind_in(self): + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id = 9, user_name = None) - s = users.select(u.in_([])) - r = s.execute(search_key='john').fetchall() - assert len(r) == 0 - r = s.execute(search_key=None).fetchall() - assert len(r) == 0 + u = bindparam('search_key') s = users.select(not_(u.in_([]))) r = s.execute(search_key='john').fetchall() @@ -660,14 +725,15 @@ class QueryTest(TestBase): class PercentSchemaNamesTest(TestBase): """tests using percent signs, spaces in table and column names. - Doesn't pass for mysql, postgres, but this is really a + Doesn't pass for mysql, postgresql, but this is really a SQLAlchemy bug - we should be escaping out %% signs for this operation the same way we do for text() and column labels. """ + @classmethod @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def setup_class(cls): global percent_table, metadata metadata = MetaData(testing.db) @@ -680,12 +746,12 @@ class PercentSchemaNamesTest(TestBase): @classmethod @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def teardown_class(cls): metadata.drop_all() @testing.crashes('mysql', 'mysqldb calls name % (params)') - @testing.crashes('postgres', 'postgres calls name % (params)') + @testing.crashes('postgresql', 'postgresql calls name % (params)') def test_roundtrip(self): percent_table.insert().execute( {'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12}, @@ -731,7 +797,7 @@ class PercentSchemaNamesTest(TestBase): percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute() eq_( - percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(), + percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(), [ (5, 9, 15), (7, 9, 15), @@ -852,7 +918,11 @@ class CompoundTest(TestBase): dict(col2="t3col2r2", col3="bbb", col4="aaa"), dict(col2="t3col2r3", col3="ccc", col4="bbb"), ]) - + + @engines.close_first + def teardown(self): + pass + @classmethod def teardown_class(cls): metadata.drop_all() @@ -878,6 +948,7 @@ class CompoundTest(TestBase): found2 = self._fetchall_sorted(u.alias('bar').select().execute()) eq_(found2, wanted) + @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs") def test_union_ordered(self): (s1, s2) = ( select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], @@ -891,6 +962,7 @@ class CompoundTest(TestBase): ('ccc', 'aaa')] eq_(u.execute().fetchall(), wanted) + @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs") @testing.fails_on('maxdb', 'FIXME: unknown') @testing.requires.subqueries def test_union_ordered_alias(self): @@ -907,6 +979,7 @@ class CompoundTest(TestBase): eq_(u.alias('bar').select().execute().fetchall(), wanted) @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') + @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery") @testing.fails_on('mysql', 'FIXME: unknown') @testing.fails_on('sqlite', 'FIXME: unknown') def test_union_all(self): @@ -925,6 +998,29 @@ class CompoundTest(TestBase): found2 = self._fetchall_sorted(e.alias('foo').select().execute()) eq_(found2, wanted) + def test_union_all_lightweight(self): + """like test_union_all, but breaks the sub-union into + a subquery with an explicit column reference on the outside, + more palatable to a wider variety of engines. + + """ + u = union( + select([t1.c.col3]), + select([t1.c.col3]), + ).alias() + + e = union_all( + select([t1.c.col3]), + select([u.c.col3]) + ) + + wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + found1 = self._fetchall_sorted(e.execute()) + eq_(found1, wanted) + + found2 = self._fetchall_sorted(e.alias('foo').select().execute()) + eq_(found2, wanted) + @testing.crashes('firebird', 'Does not support intersect') @testing.crashes('sybase', 'FIXME: unknown, verify not fails_on') @testing.fails_on('mysql', 'FIXME: unknown') @@ -1330,3 +1426,6 @@ class OperatorTest(TestBase): order_by=flds.c.idcol).execute().fetchall(), [(2,),(1,)] ) + + + diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py index 64e097b85..3198a07af 100644 --- a/test/sql/test_quote.py +++ b/test/sql/test_quote.py @@ -129,7 +129,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL): def testlabels(self): """test the quoting of labels. - if labels arent quoted, a query in postgres in particular will fail since it produces: + if labels arent quoted, a query in postgresql in particular will fail since it produces: SELECT LaLa.lowercase, LaLa."UPPERCASE", LaLa."MixedCase", LaLa."ASC" FROM (SELECT DISTINCT "WorstCase1".lowercase AS lowercase, "WorstCase1"."UPPERCASE" AS UPPERCASE, "WorstCase1"."MixedCase" AS MixedCase, "WorstCase1"."ASC" AS ASC \nFROM "WorstCase1") AS LaLa diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py new file mode 100644 index 000000000..e076f3fe7 --- /dev/null +++ b/test/sql/test_returning.py @@ -0,0 +1,159 @@ +from sqlalchemy.test.testing import eq_ +from sqlalchemy import * +from sqlalchemy.test import * +from sqlalchemy.test.schema import Table, Column +from sqlalchemy.types import TypeDecorator + + +class ReturningTest(TestBase, AssertsExecutionResults): + __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access') + + def setup(self): + meta = MetaData(testing.db) + global table, GoofyType + + class GoofyType(TypeDecorator): + impl = String + + def process_bind_param(self, value, dialect): + if value is None: + return None + return "FOO" + value + + def process_result_value(self, value, dialect): + if value is None: + return None + return value + "BAR" + + table = Table('tables', meta, + Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('persons', Integer), + Column('full', Boolean), + Column('goofy', GoofyType(50)) + ) + table.create(checkfirst=True) + + def teardown(self): + table.drop() + + @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_column_targeting(self): + result = table.insert().returning(table.c.id, table.c.full).execute({'persons': 1, 'full': False}) + + row = result.first() + assert row[table.c.id] == row['id'] == 1 + assert row[table.c.full] == row['full'] == False + + result = table.insert().values(persons=5, full=True, goofy="somegoofy").\ + returning(table.c.persons, table.c.full, table.c.goofy).execute() + row = result.first() + assert row[table.c.persons] == row['persons'] == 5 + assert row[table.c.full] == row['full'] == True + assert row[table.c.goofy] == row['goofy'] == "FOOsomegoofyBAR" + + @testing.fails_on('firebird', "fb can't handle returning x AS y") + @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_labeling(self): + result = table.insert().values(persons=6).\ + returning(table.c.persons.label('lala')).execute() + row = result.first() + assert row['lala'] == 6 + + @testing.fails_on('firebird', "fb/kintersbasdb can't handle the bind params") + @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_anon_expressions(self): + result = table.insert().values(goofy="someOTHERgoofy").\ + returning(func.lower(table.c.goofy, type_=GoofyType)).execute() + row = result.first() + assert row[0] == "foosomeothergoofyBAR" + + result = table.insert().values(persons=12).\ + returning(table.c.persons + 18).execute() + row = result.first() + assert row[0] == 30 + + @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_update_returning(self): + table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) + + result = table.update(table.c.persons > 4, dict(full=True)).returning(table.c.id).execute() + eq_(result.fetchall(), [(1,)]) + + result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() + eq_(result2.fetchall(), [(1,True),(2,False)]) + + @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_insert_returning(self): + result = table.insert().returning(table.c.id).execute({'persons': 1, 'full': False}) + + eq_(result.fetchall(), [(1,)]) + + @testing.fails_on('postgresql', '') + @testing.fails_on('oracle', '') + def test_executemany(): + # return value is documented as failing with psycopg2/executemany + result2 = table.insert().returning(table).execute( + [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}]) + + if testing.against('firebird', 'mssql'): + # Multiple inserts only return the last row + eq_(result2.fetchall(), [(3,3,True, None)]) + else: + # nobody does this as far as we know (pg8000?) + eq_(result2.fetchall(), [(2, 2, False, None), (3,3,True, None)]) + + test_executemany() + + result3 = table.insert().returning(table.c.id).execute({'persons': 4, 'full': False}) + eq_([dict(row) for row in result3], [{'id': 4}]) + + + @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + @testing.fails_on_everything_except('postgresql', 'firebird') + def test_literal_returning(self): + if testing.against("postgresql"): + literal_true = "true" + else: + literal_true = "1" + + result4 = testing.db.execute('insert into tables (id, persons, "full") ' + 'values (5, 10, %s) returning persons' % literal_true) + eq_([dict(row) for row in result4], [{'persons': 10}]) + + @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') + @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature') + def test_delete_returning(self): + table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}]) + + result = table.delete(table.c.persons > 4).returning(table.c.id).execute() + eq_(result.fetchall(), [(1,)]) + + result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute() + eq_(result2.fetchall(), [(2,False),]) + +class SequenceReturningTest(TestBase): + __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'mssql') + + def setup(self): + meta = MetaData(testing.db) + global table, seq + seq = Sequence('tid_seq') + table = Table('tables', meta, + Column('id', Integer, seq, primary_key=True), + Column('data', String(50)) + ) + table.create(checkfirst=True) + + def teardown(self): + table.drop() + + def test_insert(self): + r = table.insert().values(data='hi').returning(table.c.id).execute() + assert r.first() == (1, ) + assert seq.execute() == 2 diff --git a/test/sql/test_select.py b/test/sql/test_select.py index f70492fb3..9acc94eb2 100644 --- a/test/sql/test_select.py +++ b/test/sql/test_select.py @@ -5,7 +5,7 @@ from sqlalchemy import exc, sql, util from sqlalchemy.sql import table, column, label, compiler from sqlalchemy.sql.expression import ClauseList from sqlalchemy.engine import default -from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql +from sqlalchemy.databases import * from sqlalchemy.test import * table1 = table('mytable', @@ -149,12 +149,10 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) self.assert_compile( - select([cast("data", sqlite.SLInteger)], use_labels=True), # this will work with plain Integer in 0.6 + select([cast("data", Integer)], use_labels=True), # this will work with plain Integer in 0.6 "SELECT CAST(:param_1 AS INTEGER) AS anon_1" ) - - def test_nested_uselabels(self): """test nested anonymous label generation. this essentially tests the ANONYMOUS_LABEL regex. @@ -429,7 +427,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def test_operators(self): for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), - (operator.sub, '-'), (operator.div, '/'), + (operator.sub, '-'), + # Py3K + #(operator.truediv, '/'), + # Py2K + (operator.div, '/'), + # end Py2K ): for (lhs, rhs, res) in ( (5, table1.c.myid, ':myid_1 %s mytable.myid'), @@ -519,22 +522,22 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A (~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'", None), (table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'", None), (~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'", None), - (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()), - (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()), + (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()), + (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()), (table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:name_1)", None), - (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgres.PGDialect()), + (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgresql.PGDialect()), (~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:name_1)", None), - (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgres.PGDialect()), + (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgresql.PGDialect()), ]: self.assert_compile(expr, check, dialect=dialect) def test_match(self): for expr, check, dialect in [ (table1.c.myid.match('somstr'), "mytable.myid MATCH ?", sqlite.SQLiteDialect()), - (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.MySQLDialect()), - (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.MSSQLDialect()), - (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgres.PGDialect()), - (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.OracleDialect()), + (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.dialect()), + (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.dialect()), + (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgresql.dialect()), + (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.dialect()), ]: self.assert_compile(expr, check, dialect=dialect) @@ -635,7 +638,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A select([table1.alias('foo')]) ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo") - for dialect in (firebird.dialect(), oracle.dialect()): + for dialect in (oracle.dialect(),): self.assert_compile( select([table1.alias('foo')]) ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo" @@ -748,7 +751,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = params={}, ) - dialect = postgres.dialect() + dialect = postgresql.dialect() self.assert_compile( text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]), "select * from foo where lala=%(bar)s and hoho=%(whee)s", @@ -1122,10 +1125,10 @@ UNION SELECT mytable.myid FROM mytable" self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect()) nonpositional = stmt.compile() positional = stmt.compile(dialect=sqlite.dialect()) - pp = positional.get_params() + pp = positional.params assert [pp[k] for k in positional.positiontup] == expected_default_params_list - assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict))) - pp = positional.get_params(**test_param_dict) + assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict))) + pp = positional.construct_params(test_param_dict) assert [pp[k] for k in positional.positiontup] == expected_test_params_list # check that params() doesnt modify original statement @@ -1144,7 +1147,7 @@ UNION SELECT mytable.myid FROM mytable" ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)") positional = s2.compile(dialect=sqlite.dialect()) - pp = positional.get_params() + pp = positional.params assert [pp[k] for k in positional.positiontup] == [12, 12] # check that conflicts with "unique" params are caught @@ -1163,11 +1166,11 @@ UNION SELECT mytable.myid FROM mytable" params = dict(('in%d' % i, i) for i in range(total_params)) sql = 'text clause %s' % ', '.join(in_clause) t = text(sql) - assert len(t.bindparams) == total_params + eq_(len(t.bindparams), total_params) c = t.compile() pp = c.construct_params(params) - assert len(set(pp)) == total_params - assert len(set(pp.values())) == total_params + eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp))) + eq_(len(set(pp.values())), total_params) def test_bind_as_col(self): @@ -1291,28 +1294,28 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)") eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0]) eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[1]) eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2]) - eq_(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3])) + eq_(str(cast(1234, Text).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3])) eq_(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4])) # fixme: shoving all of this dialect-specific stuff in one test # is now officialy completely ridiculous AND non-obviously omits # coverage on other dialects. sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) if isinstance(dialect, type(mysql.dialect())): - eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) AS anon_1 \nFROM casttest") + eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest") else: - eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest") + eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC) AS anon_1 \nFROM casttest") # first test with PostgreSQL engine - check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s') + check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s') # then the Oracle engine - check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1') + check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1') # then the sqlite engine - check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?') + check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?') # then the MySQL engine - check_results(mysql.dialect(), ['DECIMAL(10, 2)', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s') + check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s') self.assert_compile(cast(text('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect()) self.assert_compile(cast(null(), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect()) @@ -1360,7 +1363,6 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)") s1 = select([table1.c.myid, table1.c.myid.label('foobar'), func.hoho(table1.c.name), func.lala(table1.c.name).label('gg')]) assert s1.c.keys() == ['myid', 'foobar', 'hoho(mytable.name)', 'gg'] - from sqlalchemy.databases.sqlite import SLNumeric meta = MetaData() t1 = Table('mytable', meta, Column('col1', Integer)) @@ -1368,7 +1370,7 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)") (table1.c.name, 'name', 'mytable.name', None), (table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'), (func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'), - (cast(table1.c.name, SLNumeric), 'CAST(mytable.name AS NUMERIC(10, 2))', 'CAST(mytable.name AS NUMERIC(10, 2))', 'anon_1'), + (cast(table1.c.name, Numeric), 'CAST(mytable.name AS NUMERIC)', 'CAST(mytable.name AS NUMERIC)', 'anon_1'), (t1.c.col1, 'col1', 'mytable.col1', None), (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '') ): diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index b0501c913..95ca0d17b 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -416,7 +416,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True), ) - # this is essentially the union formed by the ORM's polymorphic_union function. + # this is essentially the union formed by the ORM's polymorphic_union function. # we define two versions with different ordering of selects. # the first selectable has the "real" column classified_page.magazine_page_id @@ -432,7 +432,6 @@ class ReduceTest(TestBase, AssertsExecutionResults): magazine_page_table.c.page_id, cast(null(), Integer).label('magazine_page_id') ]).select_from(page_table.join(magazine_page_table)), - ).alias('pjoin') eq_( diff --git a/test/sql/test_types.py b/test/sql/test_types.py index 15799358a..9c90549e2 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -1,101 +1,63 @@ +# coding: utf-8 from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message import decimal import datetime, os, re from sqlalchemy import * -from sqlalchemy import exc, types, util +from sqlalchemy import exc, types, util, schema from sqlalchemy.sql import operators from sqlalchemy.test.testing import eq_ import sqlalchemy.engine.url as url -from sqlalchemy.databases import mssql, oracle, mysql, postgres, firebird +from sqlalchemy.databases import * + from sqlalchemy.test import * class AdaptTest(TestBase): - def testadapt(self): - e1 = url.URL('postgres').get_dialect()() - e2 = url.URL('mysql').get_dialect()() - e3 = url.URL('sqlite').get_dialect()() - e4 = url.URL('firebird').get_dialect()() - - type = String(40) - - t1 = type.dialect_impl(e1) - t2 = type.dialect_impl(e2) - t3 = type.dialect_impl(e3) - t4 = type.dialect_impl(e4) - - impls = [t1, t2, t3, t4] - for i,ta in enumerate(impls): - for j,tb in enumerate(impls): - if i == j: - assert ta == tb # call me paranoid... :) + def test_uppercase_rendering(self): + """Test that uppercase types from types.py always render as their type. + + As of SQLA 0.6, using an uppercase type means you want specifically that + type. If the database in use doesn't support that DDL, it (the DB backend) + should raise an error - it means you should be using a lowercased (genericized) type. + + """ + + for dialect in [ + oracle.dialect(), + mysql.dialect(), + postgresql.dialect(), + sqlite.dialect(), + sybase.dialect(), + informix.dialect(), + maxdb.dialect(), + mssql.dialect()]: # TODO when dialects are complete: engines.all_dialects(): + for type_, expected in ( + (FLOAT, "FLOAT"), + (NUMERIC, "NUMERIC"), + (DECIMAL, "DECIMAL"), + (INTEGER, "INTEGER"), + (SMALLINT, "SMALLINT"), + (TIMESTAMP, "TIMESTAMP"), + (DATETIME, "DATETIME"), + (DATE, "DATE"), + (TIME, "TIME"), + (CLOB, "CLOB"), + (VARCHAR, "VARCHAR"), + (NVARCHAR, ("NVARCHAR", "NATIONAL VARCHAR")), + (CHAR, "CHAR"), + (NCHAR, ("NCHAR", "NATIONAL CHAR")), + (BLOB, "BLOB"), + (BOOLEAN, ("BOOLEAN", "BOOL")) + ): + if isinstance(expected, str): + expected = (expected, ) + for exp in expected: + compiled = type_().compile(dialect=dialect) + if exp in compiled: + break else: - assert ta != tb - - def testmsnvarchar(self): - dialect = mssql.MSSQLDialect() - # run the test twice to ensure the caching step works too - for x in range(0, 1): - col = Column('', Unicode(length=10)) - dialect_type = col.type.dialect_impl(dialect) - assert isinstance(dialect_type, mssql.MSNVarchar) - assert dialect_type.get_col_spec() == 'NVARCHAR(10)' - - - def testoracletimestamp(self): - dialect = oracle.OracleDialect() - t1 = oracle.OracleTimestamp - t2 = oracle.OracleTimestamp() - t3 = types.TIMESTAMP - assert isinstance(dialect.type_descriptor(t1), oracle.OracleTimestamp) - assert isinstance(dialect.type_descriptor(t2), oracle.OracleTimestamp) - assert isinstance(dialect.type_descriptor(t3), oracle.OracleTimestamp) - - def testmysqlbinary(self): - dialect = mysql.MySQLDialect() - t1 = mysql.MSVarBinary - t2 = mysql.MSVarBinary() - assert isinstance(dialect.type_descriptor(t1), mysql.MSVarBinary) - assert isinstance(dialect.type_descriptor(t2), mysql.MSVarBinary) - - def teststringadapt(self): - """test that String with no size becomes TEXT, *all* others stay as varchar/String""" - - oracle_dialect = oracle.OracleDialect() - mysql_dialect = mysql.MySQLDialect() - postgres_dialect = postgres.PGDialect() - firebird_dialect = firebird.FBDialect() - - for dialect, start, test in [ - (oracle_dialect, String(), oracle.OracleString), - (oracle_dialect, VARCHAR(), oracle.OracleString), - (oracle_dialect, String(50), oracle.OracleString), - (oracle_dialect, Unicode(), oracle.OracleString), - (oracle_dialect, UnicodeText(), oracle.OracleText), - (oracle_dialect, NCHAR(), oracle.OracleString), - (oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw), - (mysql_dialect, String(), mysql.MSString), - (mysql_dialect, VARCHAR(), mysql.MSString), - (mysql_dialect, String(50), mysql.MSString), - (mysql_dialect, Unicode(), mysql.MSString), - (mysql_dialect, UnicodeText(), mysql.MSText), - (mysql_dialect, NCHAR(), mysql.MSNChar), - (postgres_dialect, String(), postgres.PGString), - (postgres_dialect, VARCHAR(), postgres.PGString), - (postgres_dialect, String(50), postgres.PGString), - (postgres_dialect, Unicode(), postgres.PGString), - (postgres_dialect, UnicodeText(), postgres.PGText), - (postgres_dialect, NCHAR(), postgres.PGString), - (firebird_dialect, String(), firebird.FBString), - (firebird_dialect, VARCHAR(), firebird.FBString), - (firebird_dialect, String(50), firebird.FBString), - (firebird_dialect, Unicode(), firebird.FBString), - (firebird_dialect, UnicodeText(), firebird.FBText), - (firebird_dialect, NCHAR(), firebird.FBString), - ]: - assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect)) - - + assert False, "%r matches none of %r for dialect %s" % (compiled, expected, dialect.name) + class UserDefinedTest(TestBase): """tests user-defined types.""" @@ -131,7 +93,7 @@ class UserDefinedTest(TestBase): def setup_class(cls): global users, metadata - class MyType(types.TypeEngine): + class MyType(types.UserDefinedType): def get_col_spec(self): return "VARCHAR(100)" def bind_processor(self, dialect): @@ -267,124 +229,105 @@ class ColumnsTest(TestBase, AssertsExecutionResults): for aCol in testTable.c: eq_( expectedResults[aCol.name], - db.dialect.schemagenerator(db.dialect, db, None, None).\ + db.dialect.ddl_compiler(db.dialect, schema.CreateTable(testTable)).\ get_column_specification(aCol)) class UnicodeTest(TestBase, AssertsExecutionResults): """tests the Unicode type. also tests the TypeDecorator with instances in the types package.""" + @classmethod def setup_class(cls): - global unicode_table + global unicode_table, metadata metadata = MetaData(testing.db) unicode_table = Table('unicode_table', metadata, Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True), Column('unicode_varchar', Unicode(250)), Column('unicode_text', UnicodeText), - Column('plain_varchar', String(250)) ) - unicode_table.create() + metadata.create_all() + @classmethod def teardown_class(cls): - unicode_table.drop() + metadata.drop_all() + @engines.close_first def teardown(self): unicode_table.delete().execute() def test_round_trip(self): - assert unicode_table.c.unicode_varchar.type.length == 250 - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') - if testing.against('sqlite'): - rawdata = "something" - - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) - x = unicode_table.select().execute().fetchone() + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" + + unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) + + x = unicode_table.select().execute().first() self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) - if isinstance(x['plain_varchar'], unicode): - # SQLLite and MSSQL return non-unicode data as unicode - self.assert_(testing.against('sqlite', 'mssql')) - if not testing.against('sqlite'): - self.assert_(x['plain_varchar'] == unicodedata) - else: - self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata) - def test_union(self): - """ensure compiler processing works for UNIONs""" + def test_round_trip_executemany(self): + # cx_oracle was producing different behavior for cursor.executemany() + # vs. cursor.execute() + + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') - if testing.against('sqlite'): - rawdata = "something" - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) - - x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone() + unicode_table.insert().execute( + dict(unicode_varchar=unicodedata,unicode_text=unicodedata), + dict(unicode_varchar=unicodedata,unicode_text=unicodedata) + ) + + x = unicode_table.select().execute().first() self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) + self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) - def test_assertions(self): - try: - unicode_table.insert().execute(unicode_varchar='not unicode') - assert False - except exc.SAWarning, e: - assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e) + def test_union(self): + """ensure compiler processing works for UNIONs""" - unicode_engine = engines.utf8_engine(options={'convert_unicode':True, - 'assert_unicode':True}) - try: - try: - unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode') - assert False - except exc.InvalidRequestError, e: - assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'" - - @testing.emits_warning('.*non-unicode bind') - def warns(): - # test that data still goes in if warning is emitted.... - unicode_table.insert().execute(unicode_varchar='not unicode') - assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )]) - warns() + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" - finally: - unicode_engine.dispose() + unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) + + x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().first() + self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) - @testing.fails_on('oracle', 'FIXME: unknown') + @testing.fails_on('oracle', 'oracle converts empty strings to a blank space') def test_blank_strings(self): unicode_table.insert().execute(unicode_varchar=u'') assert select([unicode_table.c.unicode_varchar]).scalar() == u'' - def test_engine_parameter(self): - """tests engine-wide unicode conversion""" - prev_unicode = testing.db.engine.dialect.convert_unicode - prev_assert = testing.db.engine.dialect.assert_unicode - try: - testing.db.engine.dialect.convert_unicode = True - testing.db.engine.dialect.assert_unicode = False - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') - if testing.against('sqlite', 'mssql'): - rawdata = "something" - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) - x = unicode_table.select().execute().fetchone() - self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) - self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) - if not testing.against('sqlite', 'mssql'): - self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata) - finally: - testing.db.engine.dialect.convert_unicode = prev_unicode - testing.db.engine.dialect.convert_unicode = prev_assert - - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('firebird', 'Data type unknown') - def test_length_function(self): - """checks the database correctly understands the length of a unicode string""" - teststr = u'aaa\x1234' - self.assert_(testing.db.func.length(teststr).scalar() == len(teststr)) + def test_parameters(self): + """test the dialect convert_unicode parameters.""" + + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" + + u = Unicode(assert_unicode=True) + uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect) + # Py3K + #assert_raises(exc.InvalidRequestError, uni, b'x') + # Py2K + assert_raises(exc.InvalidRequestError, uni, 'x') + # end Py2K + + u = Unicode() + uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect) + # Py3K + #assert_raises(exc.SAWarning, uni, b'x') + # Py2K + assert_raises(exc.SAWarning, uni, 'x') + # end Py2K + + unicode_engine = engines.utf8_engine(options={'convert_unicode':True,'assert_unicode':True}) + unicode_engine.dialect.supports_unicode_binds = False + + s = String() + uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect) + # Py3K + #assert_raises(exc.InvalidRequestError, uni, b'x') + #assert isinstance(uni(unicodedata), bytes) + # Py2K + assert_raises(exc.InvalidRequestError, uni, 'x') + assert isinstance(uni(unicodedata), str) + # end Py2K + + assert uni(unicodedata) == unicodedata.encode('utf-8') class BinaryTest(TestBase, AssertsExecutionResults): __excluded_on__ = ( @@ -409,18 +352,19 @@ class BinaryTest(TestBase, AssertsExecutionResults): return value binary_table = Table('binary_table', MetaData(testing.db), - Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True), - Column('data', Binary), - Column('data_slice', Binary(100)), - Column('misc', String(30)), - # construct PickleType with non-native pickle module, since cPickle uses relative module - # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative - # to the 'types' module - Column('pickled', PickleType), - Column('mypickle', MyPickleType) + Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True), + Column('data', Binary), + Column('data_slice', Binary(100)), + Column('misc', String(30)), + # construct PickleType with non-native pickle module, since cPickle uses relative module + # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative + # to the 'types' module + Column('pickled', PickleType), + Column('mypickle', MyPickleType) ) binary_table.create() + @engines.close_first def teardown(self): binary_table.delete().execute() @@ -428,42 +372,65 @@ class BinaryTest(TestBase, AssertsExecutionResults): def teardown_class(cls): binary_table.drop() - @testing.fails_on('mssql', 'MSSQl BINARY type right pads the fixed length with \x00') - def testbinary(self): + def test_round_trip(self): testobj1 = pickleable.Foo('im foo 1') testobj2 = pickleable.Foo('im foo 2') testobj3 = pickleable.Foo('im foo 3') stream1 =self.load_stream('binary_data_one.dat') stream2 =self.load_stream('binary_data_two.dat') - binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3) - binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2) - binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None) + binary_table.insert().execute( + primary_id=1, + misc='binary_data_one.dat', + data=stream1, + data_slice=stream1[0:100], + pickled=testobj1, + mypickle=testobj3) + binary_table.insert().execute( + primary_id=2, + misc='binary_data_two.dat', + data=stream2, + data_slice=stream2[0:99], + pickled=testobj2) + binary_table.insert().execute( + primary_id=3, + misc='binary_data_two.dat', + data=None, + data_slice=stream2[0:99], + pickled=None) for stmt in ( binary_table.select(order_by=binary_table.c.primary_id), text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db) ): + eq_data = lambda x, y: eq_(list(x), list(y)) + if util.jython: + _eq_data = eq_data + def eq_data(x, y): + # Jython currently returns arrays + from array import ArrayType + if isinstance(y, ArrayType): + return eq_(x, y.tostring()) + return _eq_data(x, y) l = stmt.execute().fetchall() - eq_(list(stream1), list(l[0]['data'])) - eq_(list(stream1[0:100]), list(l[0]['data_slice'])) - eq_(list(stream2), list(l[1]['data'])) + eq_data(stream1, l[0]['data']) + eq_data(stream1[0:100], l[0]['data_slice']) + eq_data(stream2, l[1]['data']) eq_(testobj1, l[0]['pickled']) eq_(testobj2, l[1]['pickled']) eq_(testobj3.moredata, l[0]['mypickle'].moredata) eq_(l[0]['mypickle'].stuff, 'this is the right stuff') - def load_stream(self, name, len=12579): + def load_stream(self, name): f = os.path.join(os.path.dirname(__file__), "..", name) - # put a number less than the typical MySQL default BLOB size - return file(f).read(len) + return open(f, mode='rb').read() class ExpressionTest(TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): global test_table, meta - class MyCustomType(types.TypeEngine): + class MyCustomType(types.UserDefinedType): def get_col_spec(self): return "INT" def bind_processor(self, dialect): @@ -547,7 +514,6 @@ class DateTest(TestBase, AssertsExecutionResults): db = testing.db if testing.against('oracle'): - import sqlalchemy.databases.oracle as oracle insert_data = [ (7, 'jack', datetime.datetime(2005, 11, 10, 0, 0), @@ -576,7 +542,7 @@ class DateTest(TestBase, AssertsExecutionResults): time_micro = 999 # Missing or poor microsecond support: - if testing.against('mssql', 'mysql', 'firebird'): + if testing.against('mssql', 'mysql', 'firebird', '+zxjdbc'): datetime_micro, time_micro = 0, 0 # No microseconds for TIME elif testing.against('maxdb'): @@ -608,7 +574,7 @@ class DateTest(TestBase, AssertsExecutionResults): Column('user_date', Date), Column('user_time', Time)] - if testing.against('sqlite', 'postgres'): + if testing.against('sqlite', 'postgresql'): insert_data.append( (11, 'historic', datetime.datetime(1850, 11, 10, 11, 52, 35, datetime_micro), @@ -676,8 +642,8 @@ class DateTest(TestBase, AssertsExecutionResults): t.drop(checkfirst=True) class StringTest(TestBase, AssertsExecutionResults): - @testing.fails_on('mysql', 'FIXME: unknown') - @testing.fails_on('oracle', 'FIXME: unknown') + + @testing.requires.unbounded_varchar def test_nolength_string(self): metadata = MetaData(testing.db) foo = Table('foo', metadata, Column('one', String)) @@ -700,10 +666,10 @@ class NumericTest(TestBase, AssertsExecutionResults): metadata = MetaData(testing.db) numeric_table = Table('numeric_table', metadata, Column('id', Integer, Sequence('numeric_id_seq', optional=True), primary_key=True), - Column('numericcol', Numeric(asdecimal=False)), - Column('floatcol', Float), - Column('ncasdec', Numeric), - Column('fcasdec', Float(asdecimal=True)) + Column('numericcol', Numeric(precision=10, scale=2, asdecimal=False)), + Column('floatcol', Float(precision=10, )), + Column('ncasdec', Numeric(precision=10, scale=2)), + Column('fcasdec', Float(precision=10, asdecimal=True)) ) metadata.create_all() @@ -711,6 +677,7 @@ class NumericTest(TestBase, AssertsExecutionResults): def teardown_class(cls): metadata.drop_all() + @engines.close_first def teardown(self): numeric_table.delete().execute() @@ -719,6 +686,7 @@ class NumericTest(TestBase, AssertsExecutionResults): from decimal import Decimal numeric_table.insert().execute( numericcol=3.5, floatcol=5.6, ncasdec=12.4, fcasdec=15.75) + numeric_table.insert().execute( numericcol=Decimal("3.5"), floatcol=Decimal("5.6"), ncasdec=Decimal("12.4"), fcasdec=Decimal("15.75")) @@ -744,33 +712,6 @@ class NumericTest(TestBase, AssertsExecutionResults): assert isinstance(row['ncasdec'], decimal.Decimal) assert isinstance(row['fcasdec'], decimal.Decimal) - def test_length_deprecation(self): - assert_raises(exc.SADeprecationWarning, Numeric, length=8) - - @testing.uses_deprecated(".*is deprecated for Numeric") - def go(): - n = Numeric(length=12) - assert n.scale == 12 - go() - - n = Numeric(scale=12) - for dialect in engines.all_dialects(): - n2 = dialect.type_descriptor(n) - eq_(n2.scale, 12, dialect.name) - - # test colspec generates successfully using 'scale' - assert n2.get_col_spec() - - # test constructor of the dialect-specific type - n3 = n2.__class__(scale=5) - eq_(n3.scale, 5, dialect.name) - - @testing.uses_deprecated(".*is deprecated for Numeric") - def go(): - n3 = n2.__class__(length=6) - eq_(n3.scale, 6, dialect.name) - go() - class IntervalTest(TestBase, AssertsExecutionResults): @classmethod @@ -783,6 +724,7 @@ class IntervalTest(TestBase, AssertsExecutionResults): ) metadata.create_all() + @engines.close_first def teardown(self): interval_table.delete().execute() @@ -790,14 +732,16 @@ class IntervalTest(TestBase, AssertsExecutionResults): def teardown_class(cls): metadata.drop_all() + @testing.fails_on("+pg8000", "Not yet known how to pass values of the INTERVAL type") + @testing.fails_on("postgresql+zxjdbc", "Not yet known how to pass values of the INTERVAL type") def test_roundtrip(self): delta = datetime.datetime(2006, 10, 5) - datetime.datetime(2005, 8, 17) interval_table.insert().execute(interval=delta) - assert interval_table.select().execute().fetchone()['interval'] == delta + assert interval_table.select().execute().first()['interval'] == delta def test_null(self): interval_table.insert().execute(id=1, inverval=None) - assert interval_table.select().execute().fetchone()['interval'] is None + assert interval_table.select().execute().first()['interval'] is None class BooleanTest(TestBase, AssertsExecutionResults): @classmethod @@ -825,30 +769,6 @@ class BooleanTest(TestBase, AssertsExecutionResults): assert(res2==[(2, False)]) class PickleTest(TestBase): - def test_noeq_deprecation(self): - p1 = PickleType() - - assert_raises(DeprecationWarning, - p1.compare_values, pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2) - ) - - assert_raises(DeprecationWarning, - p1.compare_values, pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2) - ) - - @testing.uses_deprecated() - def go(): - # test actual dumps comparison - assert p1.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)) - assert p1.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)) - go() - - assert p1.compare_values({1:2, 3:4}, {3:4, 1:2}) - - p2 = PickleType(mutable=False) - assert not p2.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)) - assert not p2.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)) - def test_eq_comparison(self): p1 = PickleType() diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py index d75913267..6551594f3 100644 --- a/test/sql/test_unicode.py +++ b/test/sql/test_unicode.py @@ -56,6 +56,7 @@ class UnicodeSchemaTest(TestBase): ) metadata.create_all() + @engines.close_first def teardown(self): if metadata.tables: t3.delete().execute() @@ -125,11 +126,11 @@ class EscapesDefaultsTest(testing.TestBase): # reset the identifier preparer, so that we can force it to cache # a unicode identifier engine.dialect.identifier_preparer = engine.dialect.preparer(engine.dialect) - select([column(u'special_col')]).select_from(t1).execute() + select([column(u'special_col')]).select_from(t1).execute().close() assert isinstance(engine.dialect.identifier_preparer.format_sequence(Sequence('special_col')), unicode) # now execute, run the sequence. it should run in u"Special_col.nextid" or similar as - # a unicode object; cx_oracle asserts that this is None or a String (postgres lets it pass thru). + # a unicode object; cx_oracle asserts that this is None or a String (postgresql lets it pass thru). # ensure that base.DefaultRunner is encoding. t1.insert().execute(data='foo') finally: |
