summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_constraints.py287
-rw-r--r--test/sql/test_defaults.py111
-rw-r--r--test/sql/test_functions.py38
-rw-r--r--test/sql/test_labels.py15
-rw-r--r--test/sql/test_query.py331
-rw-r--r--test/sql/test_quote.py2
-rw-r--r--test/sql/test_returning.py159
-rw-r--r--test/sql/test_select.py64
-rw-r--r--test/sql/test_selectable.py3
-rw-r--r--test/sql/test_types.py432
-rw-r--r--test/sql/test_unicode.py5
11 files changed, 827 insertions, 620 deletions
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py
index 8abeb3533..4ad52604d 100644
--- a/test/sql/test_constraints.py
+++ b/test/sql/test_constraints.py
@@ -1,10 +1,14 @@
-from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import assert_raises, assert_raises_message
from sqlalchemy import *
-from sqlalchemy import exc
+from sqlalchemy import exc, schema
from sqlalchemy.test import *
from sqlalchemy.test import config, engines
+from sqlalchemy.engine import ddl
+from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
+from sqlalchemy.dialects.postgresql import base as postgresql
-class ConstraintTest(TestBase, AssertsExecutionResults):
+class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
def setup(self):
global metadata
@@ -33,11 +37,8 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
def test_double_fk_usage_raises(self):
f = ForeignKey('b.id')
- assert_raises(exc.InvalidRequestError, Table, "a", metadata,
- Column('x', Integer, f),
- Column('y', Integer, f)
- )
-
+ Column('x', Integer, f)
+ assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
def test_circular_constraint(self):
a = Table("a", metadata,
@@ -78,18 +79,9 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
metadata.create_all()
foo.insert().execute(id=1,x=9,y=5)
- try:
- foo.insert().execute(id=2,x=5,y=9)
- assert False
- except exc.SQLError:
- assert True
-
+ assert_raises(exc.SQLError, foo.insert().execute, id=2,x=5,y=9)
bar.insert().execute(id=1,x=10)
- try:
- bar.insert().execute(id=2,x=5)
- assert False
- except exc.SQLError:
- assert True
+ assert_raises(exc.SQLError, bar.insert().execute, id=2,x=5)
def test_unique_constraint(self):
foo = Table('foo', metadata,
@@ -106,16 +98,8 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
foo.insert().execute(id=2, value='value2')
bar.insert().execute(id=1, value='a', value2='a')
bar.insert().execute(id=2, value='a', value2='b')
- try:
- foo.insert().execute(id=3, value='value1')
- assert False
- except exc.SQLError:
- assert True
- try:
- bar.insert().execute(id=3, value='a', value2='b')
- assert False
- except exc.SQLError:
- assert True
+ assert_raises(exc.SQLError, foo.insert().execute, id=3, value='value1')
+ assert_raises(exc.SQLError, bar.insert().execute, id=3, value='a', value2='b')
def test_index_create(self):
employees = Table('employees', metadata,
@@ -174,35 +158,22 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
Index('sport_announcer', events.c.sport, events.c.announcer, unique=True)
Index('idx_winners', events.c.winner)
- index_names = [ ix.name for ix in events.indexes ]
- assert 'ix_events_name' in index_names
- assert 'ix_events_location' in index_names
- assert 'sport_announcer' in index_names
- assert 'idx_winners' in index_names
- assert len(index_names) == 4
-
- capt = []
- connection = testing.db.connect()
- # TODO: hacky, put a real connection proxy in
- ex = connection._Connection__execute_context
- def proxy(context):
- capt.append(context.statement)
- capt.append(repr(context.parameters))
- ex(context)
- connection._Connection__execute_context = proxy
- schemagen = testing.db.dialect.schemagenerator(testing.db.dialect, connection)
- schemagen.traverse(events)
-
- assert capt[0].strip().startswith('CREATE TABLE events')
-
- s = set([capt[x].strip() for x in [2,4,6,8]])
-
- assert s == set([
- 'CREATE UNIQUE INDEX ix_events_name ON events (name)',
- 'CREATE INDEX ix_events_location ON events (location)',
- 'CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)',
- 'CREATE INDEX idx_winners ON events (winner)'
- ])
+ eq_(
+ set([ ix.name for ix in events.indexes ]),
+ set(['ix_events_name', 'ix_events_location', 'sport_announcer', 'idx_winners'])
+ )
+
+ self.assert_sql_execution(
+ testing.db,
+ lambda: events.create(testing.db),
+ RegexSQL("^CREATE TABLE events"),
+ AllOf(
+ ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events (name)'),
+ ExactSQL('CREATE INDEX ix_events_location ON events (location)'),
+ ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'),
+ ExactSQL('CREATE INDEX idx_winners ON events (winner)')
+ )
+ )
# verify that the table is functional
events.insert().execute(id=1, name='hockey finals', location='rink',
@@ -214,84 +185,57 @@ class ConstraintTest(TestBase, AssertsExecutionResults):
dialect = testing.db.dialect.__class__()
dialect.max_identifier_length = 20
- schemagen = dialect.schemagenerator(dialect, None)
- schemagen.execute = lambda : None
-
t1 = Table("sometable", MetaData(), Column("foo", Integer))
- schemagen.visit_index(Index("this_name_is_too_long_for_what_were_doing", t1.c.foo))
- eq_(schemagen.buffer.getvalue(), "CREATE INDEX this_name_is_t_1 ON sometable (foo)")
- schemagen.buffer.truncate(0)
- schemagen.visit_index(Index("this_other_name_is_too_long_for_what_were_doing", t1.c.foo))
- eq_(schemagen.buffer.getvalue(), "CREATE INDEX this_other_nam_2 ON sometable (foo)")
-
- schemadrop = dialect.schemadropper(dialect, None)
- schemadrop.execute = lambda: None
- assert_raises(exc.IdentifierError, schemadrop.visit_index, Index("this_name_is_too_long_for_what_were_doing", t1.c.foo))
+ self.assert_compile(
+ schema.CreateIndex(Index("this_name_is_too_long_for_what_were_doing", t1.c.foo)),
+ "CREATE INDEX this_name_is_t_1 ON sometable (foo)",
+ dialect=dialect
+ )
+
+ self.assert_compile(
+ schema.CreateIndex(Index("this_other_name_is_too_long_for_what_were_doing", t1.c.foo)),
+ "CREATE INDEX this_other_nam_1 ON sometable (foo)",
+ dialect=dialect
+ )
-class ConstraintCompilationTest(TestBase, AssertsExecutionResults):
- class accum(object):
- def __init__(self):
- self.statements = []
- def __call__(self, sql, *a, **kw):
- self.statements.append(sql)
- def __contains__(self, substring):
- for s in self.statements:
- if substring in s:
- return True
- return False
- def __str__(self):
- return '\n'.join([repr(x) for x in self.statements])
- def clear(self):
- del self.statements[:]
-
- def setup(self):
- self.sql = self.accum()
- opts = config.db_opts.copy()
- opts['strategy'] = 'mock'
- opts['executor'] = self.sql
- self.engine = engines.testing_engine(options=opts)
-
+class ConstraintCompilationTest(TestBase, AssertsCompiledSQL):
def _test_deferrable(self, constraint_factory):
- meta = MetaData(self.engine)
- t = Table('tbl', meta,
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer),
constraint_factory(deferrable=True))
- t.create()
- assert 'DEFERRABLE' in self.sql, self.sql
- assert 'NOT DEFERRABLE' not in self.sql, self.sql
- self.sql.clear()
- meta.clear()
-
- t = Table('tbl', meta,
+
+ sql = str(schema.CreateTable(t).compile(bind=testing.db))
+ assert 'DEFERRABLE' in sql, sql
+ assert 'NOT DEFERRABLE' not in sql, sql
+
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer),
constraint_factory(deferrable=False))
- t.create()
- assert 'NOT DEFERRABLE' in self.sql
- self.sql.clear()
- meta.clear()
- t = Table('tbl', meta,
+ sql = str(schema.CreateTable(t).compile(bind=testing.db))
+ assert 'NOT DEFERRABLE' in sql
+
+
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer),
constraint_factory(deferrable=True, initially='IMMEDIATE'))
- t.create()
- assert 'NOT DEFERRABLE' not in self.sql
- assert 'INITIALLY IMMEDIATE' in self.sql
- self.sql.clear()
- meta.clear()
+ sql = str(schema.CreateTable(t).compile(bind=testing.db))
+ assert 'NOT DEFERRABLE' not in sql
+ assert 'INITIALLY IMMEDIATE' in sql
- t = Table('tbl', meta,
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer),
constraint_factory(deferrable=True, initially='DEFERRED'))
- t.create()
+ sql = str(schema.CreateTable(t).compile(bind=testing.db))
- assert 'NOT DEFERRABLE' not in self.sql
- assert 'INITIALLY DEFERRED' in self.sql, self.sql
+ assert 'NOT DEFERRABLE' not in sql
+ assert 'INITIALLY DEFERRED' in sql
def test_deferrable_pk(self):
factory = lambda **kw: PrimaryKeyConstraint('a', **kw)
@@ -302,15 +246,16 @@ class ConstraintCompilationTest(TestBase, AssertsExecutionResults):
self._test_deferrable(factory)
def test_deferrable_column_fk(self):
- meta = MetaData(self.engine)
- t = Table('tbl', meta,
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer,
ForeignKey('tbl.a', deferrable=True,
initially='DEFERRED')))
- t.create()
- assert 'DEFERRABLE' in self.sql, self.sql
- assert 'INITIALLY DEFERRED' in self.sql, self.sql
+
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE TABLE tbl (a INTEGER, b INTEGER, FOREIGN KEY(b) REFERENCES tbl (a) DEFERRABLE INITIALLY DEFERRED)",
+ )
def test_deferrable_unique(self):
factory = lambda **kw: UniqueConstraint('b', **kw)
@@ -321,15 +266,105 @@ class ConstraintCompilationTest(TestBase, AssertsExecutionResults):
self._test_deferrable(factory)
def test_deferrable_column_check(self):
- meta = MetaData(self.engine)
- t = Table('tbl', meta,
+ t = Table('tbl', MetaData(),
Column('a', Integer),
Column('b', Integer,
CheckConstraint('a < b',
deferrable=True,
initially='DEFERRED')))
- t.create()
- assert 'DEFERRABLE' in self.sql, self.sql
- assert 'INITIALLY DEFERRED' in self.sql, self.sql
+
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)"
+ )
+
+ def test_use_alter(self):
+ m = MetaData()
+ t = Table('t', m,
+ Column('a', Integer),
+ )
+
+ t2 = Table('t2', m,
+ Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')),
+ Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ...
+ )
+
+ e = engines.mock_engine(dialect_name='postgresql')
+ m.create_all(e)
+ m.drop_all(e)
+
+ e.assert_sql([
+ 'CREATE TABLE t (a INTEGER)',
+ 'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb FOREIGN KEY(b) REFERENCES t (a))',
+ 'ALTER TABLE t2 ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
+ 'ALTER TABLE t2 DROP CONSTRAINT fk_ta',
+ 'DROP TABLE t2',
+ 'DROP TABLE t'
+ ])
+
+
+ def test_add_drop_constraint(self):
+ m = MetaData()
+
+ t = Table('tbl', m,
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+
+ t2 = Table('t2', m,
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+
+ constraint = CheckConstraint('a < b',name="my_test_constraint", deferrable=True,initially='DEFERRED', table=t)
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE tbl ADD CONSTRAINT my_test_constraint CHECK (a < b) DEFERRABLE INITIALLY DEFERRED"
+ )
+
+ self.assert_compile(
+ schema.DropConstraint(constraint),
+ "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint"
+ )
+
+ self.assert_compile(
+ schema.DropConstraint(constraint, cascade=True),
+ "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint CASCADE"
+ )
+ constraint = ForeignKeyConstraint(["b"], ["t2.a"])
+ t.append_constraint(constraint)
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE tbl ADD FOREIGN KEY(b) REFERENCES t2 (a)"
+ )
+ constraint = ForeignKeyConstraint([t.c.a], [t2.c.b])
+ t.append_constraint(constraint)
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE tbl ADD FOREIGN KEY(a) REFERENCES t2 (b)"
+ )
+
+ constraint = UniqueConstraint("a", "b", name="uq_cst")
+ t2.append_constraint(constraint)
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)"
+ )
+
+ constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2")
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)"
+ )
+
+ assert t.c.a.primary_key is False
+ constraint = PrimaryKeyConstraint(t.c.a)
+ assert t.c.a.primary_key is True
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE tbl ADD PRIMARY KEY (a)"
+ )
+
+
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 964157466..5638dad77 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -3,7 +3,7 @@ import datetime
from sqlalchemy import Sequence, Column, func
from sqlalchemy.sql import select, text
import sqlalchemy as sa
-from sqlalchemy.test import testing
+from sqlalchemy.test import testing, engines
from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean
from sqlalchemy.test.schema import Table
from sqlalchemy.test.testing import eq_
@@ -37,7 +37,7 @@ class DefaultTest(testing.TestBase):
# since its a "branched" connection
conn.close()
- use_function_defaults = testing.against('postgres', 'mssql', 'maxdb')
+ use_function_defaults = testing.against('postgresql', 'mssql', 'maxdb')
is_oracle = testing.against('oracle')
# select "count(1)" returns different results on different DBs also
@@ -146,7 +146,7 @@ class DefaultTest(testing.TestBase):
assert_raises_message(sa.exc.ArgumentError,
ex_msg,
sa.ColumnDefault, fn)
-
+
def test_arg_signature(self):
def fn1(): pass
def fn2(): pass
@@ -276,7 +276,7 @@ class DefaultTest(testing.TestBase):
assert r.lastrow_has_defaults()
eq_(set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
-
+
eq_(t.select(t.c.col1==54).execute().fetchall(),
[(54, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, None)])
@@ -284,7 +284,7 @@ class DefaultTest(testing.TestBase):
@testing.fails_on('firebird', 'Data type unknown')
def test_insertmany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
- if (testing.against('mysql') and
+ if (testing.against('mysql') and not testing.against('+zxjdbc') and
testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
return
@@ -304,12 +304,12 @@ class DefaultTest(testing.TestBase):
def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
- eq_(50, l.fetchone()['col3'])
+ eq_(50, l.first()['col3'])
@testing.fails_on('firebird', 'Data type unknown')
def test_updatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
- if (testing.against('mysql') and
+ if (testing.against('mysql') and not testing.against('+zxjdbc') and
testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
return
@@ -337,11 +337,11 @@ class DefaultTest(testing.TestBase):
@testing.fails_on('firebird', 'Data type unknown')
def test_update(self):
r = t.insert().execute()
- pk = r.last_inserted_ids()[0]
+ pk = r.inserted_primary_key[0]
t.update(t.c.col1==pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
l = t.select(t.c.col1==pk).execute()
- l = l.fetchone()
+ l = l.first()
eq_(l,
(pk, 'im the update', f2, None, None, ctexec, True, False,
13, datetime.date.today(), 'py'))
@@ -350,43 +350,12 @@ class DefaultTest(testing.TestBase):
@testing.fails_on('firebird', 'Data type unknown')
def test_update_values(self):
r = t.insert().execute()
- pk = r.last_inserted_ids()[0]
+ pk = r.inserted_primary_key[0]
t.update(t.c.col1==pk, values={'col3': 55}).execute()
l = t.select(t.c.col1==pk).execute()
- l = l.fetchone()
+ l = l.first()
eq_(55, l['col3'])
- @testing.fails_on_everything_except('postgres')
- def test_passive_override(self):
- """
- Primarily for postgres, tests that when we get a primary key column
- back from reflecting a table which has a default value on it, we
- pre-execute that DefaultClause upon insert, even though DefaultClause
- says "let the database execute this", because in postgres we must have
- all the primary key values in memory before insert; otherwise we can't
- locate the just inserted row.
-
- """
- # TODO: move this to dialect/postgres
- try:
- meta = MetaData(testing.db)
- testing.db.execute("""
- CREATE TABLE speedy_users
- (
- speedy_user_id SERIAL PRIMARY KEY,
-
- user_name VARCHAR NOT NULL,
- user_password VARCHAR NOT NULL
- );
- """, None)
-
- t = Table("speedy_users", meta, autoload=True)
- t.insert().execute(user_name='user', user_password='lala')
- l = t.select().execute().fetchall()
- eq_(l, [(1, 'user', 'lala')])
- finally:
- testing.db.execute("drop table speedy_users", None)
-
class PKDefaultTest(_base.TablesTest):
__requires__ = ('subqueries',)
@@ -400,18 +369,27 @@ class PKDefaultTest(_base.TablesTest):
Column('id', Integer, primary_key=True,
default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
Column('data', String(30)))
-
- @testing.fails_on('mssql', 'FIXME: unknown')
+
+ @testing.requires.returning
+ def test_with_implicit_returning(self):
+ self._test(True)
+
+ def test_regular(self):
+ self._test(False)
+
@testing.resolve_artifact_names
- def test_basic(self):
- t2.insert().execute(nextid=1)
- r = t1.insert().execute(data='hi')
- eq_([1], r.last_inserted_ids())
-
- t2.insert().execute(nextid=2)
- r = t1.insert().execute(data='there')
- eq_([2], r.last_inserted_ids())
+ def _test(self, returning):
+ if not returning and not testing.db.dialect.implicit_returning:
+ engine = testing.db
+ else:
+ engine = engines.testing_engine(options={'implicit_returning':returning})
+ engine.execute(t2.insert(), nextid=1)
+ r = engine.execute(t1.insert(), data='hi')
+ eq_([1], r.inserted_primary_key)
+ engine.execute(t2.insert(), nextid=2)
+ r = engine.execute(t1.insert(), data='there')
+ eq_([2], r.inserted_primary_key)
class PKIncrementTest(_base.TablesTest):
run_define_tables = 'each'
@@ -430,29 +408,31 @@ class PKIncrementTest(_base.TablesTest):
def _test_autoincrement(self, bind):
ids = set()
rs = bind.execute(aitable.insert(), int1=1)
- last = rs.last_inserted_ids()[0]
+ last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
rs = bind.execute(aitable.insert(), str1='row 2')
- last = rs.last_inserted_ids()[0]
+ last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
rs = bind.execute(aitable.insert(), int1=3, str1='row 3')
- last = rs.last_inserted_ids()[0]
+ last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
rs = bind.execute(aitable.insert(values={'int1':func.length('four')}))
- last = rs.last_inserted_ids()[0]
+ last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
+ eq_(ids, set([1,2,3,4]))
+
eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
[(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
@@ -510,8 +490,8 @@ class AutoIncrementTest(_base.TablesTest):
single.create()
r = single.insert().execute()
- id_ = r.last_inserted_ids()[0]
- assert id_ is not None
+ id_ = r.inserted_primary_key[0]
+ eq_(id_, 1)
eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
def test_autoincrement_fk(self):
@@ -522,7 +502,7 @@ class AutoIncrementTest(_base.TablesTest):
nodes.create()
r = nodes.insert().execute(data='foo')
- id_ = r.last_inserted_ids()[0]
+ id_ = r.inserted_primary_key[0]
nodes.insert().execute(data='bar', parent_id=id_)
@testing.fails_on('sqlite', 'FIXME: unknown')
@@ -535,7 +515,7 @@ class AutoIncrementTest(_base.TablesTest):
try:
- # postgres + mysql strict will fail on first row,
+ # postgresql + mysql strict will fail on first row,
# mysql in legacy mode fails on second row
nonai.insert().execute(data='row 1')
nonai.insert().execute(data='row 2')
@@ -570,16 +550,17 @@ class SequenceTest(testing.TestBase):
def testseqnonpk(self):
"""test sequences fire off as defaults on non-pk columns"""
- result = sometable.insert().execute(name="somename")
+ engine = engines.testing_engine(options={'implicit_returning':False})
+ result = engine.execute(sometable.insert(), name="somename")
assert 'id' in result.postfetch_cols()
- result = sometable.insert().execute(name="someother")
+ result = engine.execute(sometable.insert(), name="someother")
assert 'id' in result.postfetch_cols()
sometable.insert().execute(
{'name':'name3'},
{'name':'name4'})
- eq_(sometable.select().execute().fetchall(),
+ eq_(sometable.select().order_by(sometable.c.id).execute().fetchall(),
[(1, "somename", 1),
(2, "someother", 2),
(3, "name3", 3),
@@ -590,8 +571,8 @@ class SequenceTest(testing.TestBase):
cartitems.insert().execute(description='there')
r = cartitems.insert().execute(description='lala')
- assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None
- id_ = r.last_inserted_ids()[0]
+ assert r.inserted_primary_key and r.inserted_primary_key[0] is not None
+ id_ = r.inserted_primary_key[0]
eq_(1,
sa.select([func.count(cartitems.c.cart_id)],
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index e9bf49ce3..7a0f12cac 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -24,7 +24,7 @@ class CompileTest(TestBase, AssertsCompiledSQL):
bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect)
self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
- if isinstance(dialect, firebird.dialect):
+ if isinstance(dialect, (firebird.dialect, maxdb.dialect, oracle.dialect)):
self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect)
else:
self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
@@ -50,7 +50,7 @@ class CompileTest(TestBase, AssertsCompiledSQL):
for ret, dialect in [
('CURRENT_TIMESTAMP', sqlite.dialect()),
- ('now()', postgres.dialect()),
+ ('now()', postgresql.dialect()),
('now()', mysql.dialect()),
('CURRENT_TIMESTAMP', oracle.dialect())
]:
@@ -62,9 +62,9 @@ class CompileTest(TestBase, AssertsCompiledSQL):
for ret, dialect in [
('random()', sqlite.dialect()),
- ('random()', postgres.dialect()),
+ ('random()', postgresql.dialect()),
('rand()', mysql.dialect()),
- ('random()', oracle.dialect())
+ ('random', oracle.dialect())
]:
self.assert_compile(func.random(), ret, dialect=dialect)
@@ -180,7 +180,10 @@ class CompileTest(TestBase, AssertsCompiledSQL):
class ExecuteTest(TestBase):
-
+ @engines.close_first
+ def tearDown(self):
+ pass
+
def test_standalone_execute(self):
x = testing.db.func.current_date().execute().scalar()
y = testing.db.func.current_date().select().execute().scalar()
@@ -202,6 +205,7 @@ class ExecuteTest(TestBase):
conn.close()
assert (x == y == z) is True
+ @engines.close_first
def test_update(self):
"""
Tests sending functions and SQL expressions to the VALUES and SET
@@ -222,15 +226,15 @@ class ExecuteTest(TestBase):
meta.create_all()
try:
t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().fetchone()['value'] == 3
+ assert t.select().execute().first()['value'] == 3
t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().fetchone()['value'] == 5
+ assert t.select().execute().first()['value'] == 5
r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
- id = r.last_inserted_ids()[0]
- assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
+ id = r.inserted_primary_key[0]
+ assert t.select(t.c.id==id).execute().first()['value'] == 9
t.update(values={t.c.value:func.length("asdf")}).execute()
- assert t.select().execute().fetchone()['value'] == 4
+ assert t.select().execute().first()['value'] == 4
print "--------------------------"
t2.insert().execute()
t2.insert(values=dict(value=func.length("one"))).execute()
@@ -245,18 +249,18 @@ class ExecuteTest(TestBase):
t2.delete().execute()
t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().fetchone()['value'] == 11
+ assert t2.select().execute().first()['value'] == 11
t2.update(values=dict(value=func.length("asfda"))).execute()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
+ assert select([t2.c.value, t2.c.stuff]).execute().first() == (5, "thisisstuff")
t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
- print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
+ print "HI", select([t2.c.value, t2.c.stuff]).execute().first()
+ assert select([t2.c.value, t2.c.stuff]).execute().first() == (9, "foo")
finally:
meta.drop_all()
- @testing.fails_on_everything_except('postgres')
+ @testing.fails_on_everything_except('postgresql')
def test_as_from(self):
# TODO: shouldnt this work on oracle too ?
x = testing.db.func.current_date().execute().scalar()
@@ -266,7 +270,7 @@ class ExecuteTest(TestBase):
# construct a column-based FROM object out of a function, like in [ticket:172]
s = select([sql.column('date', type_=DateTime)], from_obj=[testing.db.func.current_date()])
- q = s.execute().fetchone()[s.c.date]
+ q = s.execute().first()[s.c.date]
r = s.alias('datequery').select().scalar()
assert x == y == z == w == q == r
@@ -301,7 +305,7 @@ class ExecuteTest(TestBase):
'd': datetime.date(2010, 5, 1) })
rs = select([extract('year', table.c.dt),
extract('month', table.c.d)]).execute()
- row = rs.fetchone()
+ row = rs.first()
assert row[0] == 2010
assert row[1] == 5
rs.close()
diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py
index b946b0ae9..bcac7c01d 100644
--- a/test/sql/test_labels.py
+++ b/test/sql/test_labels.py
@@ -35,6 +35,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
maxlen = testing.db.dialect.max_identifier_length
testing.db.dialect.max_identifier_length = IDENT_LENGTH
+ @engines.close_first
def teardown(self):
table1.delete().execute()
@@ -92,10 +93,16 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL):
], repr(result)
def test_table_alias_names(self):
- self.assert_compile(
- table2.alias().select(),
- "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
- )
+ if testing.against('oracle'):
+ self.assert_compile(
+ table2.alias().select(),
+ "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs table_with_exactly_29_c_1"
+ )
+ else:
+ self.assert_compile(
+ table2.alias().select(),
+ "SELECT table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM table_with_exactly_29_characs AS table_with_exactly_29_c_1"
+ )
ta = table2.alias()
dialect = default.DefaultDialect()
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 51b933e45..0e3b9dff2 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -1,9 +1,11 @@
+from sqlalchemy.test.testing import eq_
import datetime
from sqlalchemy import *
from sqlalchemy import exc, sql
from sqlalchemy.engine import default
from sqlalchemy.test import *
-from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.testing import eq_, assert_raises_message
+from sqlalchemy.test.schema import Table, Column
class QueryTest(TestBase):
@@ -12,11 +14,11 @@ class QueryTest(TestBase):
global users, users2, addresses, metadata
metadata = MetaData(testing.db)
users = Table('query_users', metadata,
- Column('user_id', INT, primary_key = True),
+ Column('user_id', INT, primary_key=True, test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
addresses = Table('query_addresses', metadata,
- Column('address_id', Integer, primary_key=True),
+ Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)))
@@ -26,7 +28,8 @@ class QueryTest(TestBase):
)
metadata.create_all()
- def tearDown(self):
+ @engines.close_first
+ def teardown(self):
addresses.delete().execute()
users.delete().execute()
users2.delete().execute()
@@ -52,89 +55,133 @@ class QueryTest(TestBase):
assert users.count().scalar() == 1
users.update(users.c.user_id == 7).execute(user_name = 'fred')
- assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
+ assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
def test_lastrow_accessor(self):
- """Tests the last_inserted_ids() and lastrow_has_id() functions."""
+ """Tests the inserted_primary_key and lastrow_has_id() functions."""
- def insert_values(table, values):
+ def insert_values(engine, table, values):
"""
Inserts a row into a table, returns the full list of values
INSERTed including defaults that fired off on the DB side and
detects rows that had defaults and post-fetches.
"""
- result = table.insert().execute(**values)
+ result = engine.execute(table.insert(), **values)
ret = values.copy()
- for col, id in zip(table.primary_key, result.last_inserted_ids()):
+ for col, id in zip(table.primary_key, result.inserted_primary_key):
ret[col.key] = id
if result.lastrow_has_defaults():
- criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
- row = table.select(criterion).execute().fetchone()
+ criterion = and_(*[col==id for col, id in zip(table.primary_key, result.inserted_primary_key)])
+ row = engine.execute(table.select(criterion)).first()
for c in table.c:
ret[c.key] = row[c]
return ret
- for supported, table, values, assertvalues in [
- (
- {'unsupported':['sqlite']},
- Table("t1", metadata,
- Column('id', Integer, Sequence('t1_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True)),
- {'foo':'hi'},
- {'id':1, 'foo':'hi'}
- ),
- (
- {'unsupported':['sqlite']},
- Table("t2", metadata,
- Column('id', Integer, Sequence('t2_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ test_engines = [
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ else:
+ test_engines = [testing.db]
+
+ for engine in test_engines:
+ metadata = MetaData()
+ for supported, table, values, assertvalues in [
+ (
+ {'unsupported':['sqlite']},
+ Table("t1", metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True)),
+ {'foo':'hi'},
+ {'id':1, 'foo':'hi'}
),
- {'foo':'hi'},
- {'id':1, 'foo':'hi', 'bar':'hi'}
- ),
- (
- {'unsupported':[]},
- Table("t3", metadata,
- Column("id", String(40), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column("bar", String(30))
+ (
+ {'unsupported':['sqlite']},
+ Table("t2", metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
),
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
- ),
- (
- {'unsupported':[]},
- Table("t4", metadata,
- Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
- Column('foo', String(30), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ {'foo':'hi'},
+ {'id':1, 'foo':'hi', 'bar':'hi'}
),
- {'foo':'hi', 'id':1},
- {'id':1, 'foo':'hi', 'bar':'hi'}
- ),
- (
- {'unsupported':[]},
- Table("t5", metadata,
- Column('id', String(10), primary_key=True),
- Column('bar', String(30), server_default='hi')
+ (
+ {'unsupported':[]},
+ Table("t3", metadata,
+ Column("id", String(40), primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column("bar", String(30))
+ ),
+ {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
+ {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
),
- {'id':'id1'},
- {'id':'id1', 'bar':'hi'},
- ),
- ]:
- if testing.db.name in supported['unsupported']:
- continue
- try:
- table.create()
- i = insert_values(table, values)
- assert i == assertvalues, repr(i) + " " + repr(assertvalues)
- finally:
- table.drop()
+ (
+ {'unsupported':[]},
+ Table("t4", metadata,
+ Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
+ Column('foo', String(30), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'foo':'hi', 'id':1},
+ {'id':1, 'foo':'hi', 'bar':'hi'}
+ ),
+ (
+ {'unsupported':[]},
+ Table("t5", metadata,
+ Column('id', String(10), primary_key=True),
+ Column('bar', String(30), server_default='hi')
+ ),
+ {'id':'id1'},
+ {'id':'id1', 'bar':'hi'},
+ ),
+ ]:
+ if testing.db.name in supported['unsupported']:
+ continue
+ try:
+ table.create(bind=engine, checkfirst=True)
+ i = insert_values(engine, table, values)
+ assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues))
+ finally:
+ table.drop(bind=engine)
+
+ @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
+ def test_misordered_lastrow(self):
+ related = Table('related', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+ t6 = Table("t6", metadata,
+ Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
+ Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ )
+ metadata.create_all()
+ r = related.insert().values(id=12).execute()
+ id = r.inserted_primary_key[0]
+ assert id==12
+
+ r = t6.insert().values(manual_id=id).execute()
+ eq_(r.inserted_primary_key, [12, 1])
+
+ def test_autoclose_on_insert(self):
+ if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
+ test_engines = [
+ engines.testing_engine(options={'implicit_returning':False}),
+ engines.testing_engine(options={'implicit_returning':True}),
+ ]
+ else:
+ test_engines = [testing.db]
+
+ for engine in test_engines:
+
+ r = engine.execute(users.insert(),
+ {'user_name':'jack'},
+ )
+ assert r.closed
+
def test_row_iteration(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},
@@ -147,7 +194,7 @@ class QueryTest(TestBase):
l.append(row)
self.assert_(len(l) == 3)
- @testing.fails_on('firebird', 'Data type unknown')
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
@testing.requires.subqueries
def test_anonymous_rows(self):
users.insert().execute(
@@ -161,6 +208,7 @@ class QueryTest(TestBase):
assert row['anon_1'] == 8
assert row['anon_2'] == 10
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
@@ -179,6 +227,11 @@ class QueryTest(TestBase):
select([concat]).order_by(concat).execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
+
+ eq_(
+ select([concat]).order_by(concat).execute().fetchall(),
+ [("test: ed",), ("test: fred",), ("test: jack",)]
+ )
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
@@ -195,7 +248,7 @@ class QueryTest(TestBase):
def test_row_comparison(self):
users.insert().execute(user_id = 7, user_name = 'jack')
- rp = users.select().execute().fetchone()
+ rp = users.select().execute().first()
self.assert_(rp == rp)
self.assert_(not(rp != rp))
@@ -207,8 +260,7 @@ class QueryTest(TestBase):
self.assert_(not (rp != equal))
self.assert_(not (equal != equal))
- @testing.fails_on('mssql', 'No support for boolean logic in column select.')
- @testing.fails_on('oracle', 'FIXME: unknown')
+ @testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -218,11 +270,11 @@ class QueryTest(TestBase):
eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
- row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == False
assert row.y == False
- row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == True
assert row.y == False
@@ -253,6 +305,9 @@ class QueryTest(TestBase):
eq_(expr.execute().fetchall(), result)
+ @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text")
+ @testing.fails_on("oracle", "neither % nor %% are accepted")
+ @testing.fails_on("+pg8000", "can't interpret result column from '%%'")
@testing.emits_warning('.*now automatically escapes.*')
def test_percents_in_text(self):
for expr, result in (
@@ -277,7 +332,7 @@ class QueryTest(TestBase):
eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )])
- if testing.against('postgres'):
+ if testing.against('postgresql'):
eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )])
eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), [])
@@ -373,7 +428,7 @@ class QueryTest(TestBase):
s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
#print s2.c.somelabel.type
- assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
+ assert isinstance(s2.execute().first()['somelabel'], datetime.datetime)
finally:
datetable.drop()
@@ -444,45 +499,58 @@ class QueryTest(TestBase):
users.insert().execute(user_id=2, user_name='jack')
addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
- r = users.select(users.c.user_id==2).execute().fetchone()
+ r = users.select(users.c.user_id==2).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone()
+
+ r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
# test slices
- r = text("select * from query_addresses", bind=testing.db).execute().fetchone()
+ r = text("select * from query_addresses", bind=testing.db).execute().first()
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
-
+
# test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description
r = text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone()
+ "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first()
self.assert_(r['user_id']) == 1
self.assert_(r['user_name']) == "john"
# test using literal tablename.colname
- r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone()
+ r = text('select query_users.user_id AS "query_users.user_id", '
+ 'query_users.user_name AS "query_users.user_name" from query_users',
+ bind=testing.db).execute().first()
self.assert_(r['query_users.user_id']) == 1
self.assert_(r['query_users.user_name']) == "john"
# unary experssions
- r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().fetchone()
+ r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first()
eq_(r[users.c.user_name], 'jack')
eq_(r.user_name, 'jack')
- r.close()
+
+ def test_result_case_sensitivity(self):
+ """test name normalization for result sets."""
+ row = testing.db.execute(
+ select([
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive")
+ ])
+ ).first()
+
+ assert row.keys() == ["case_insensitive", "CaseSensitive"]
+
def test_row_as_args(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
users.delete().execute()
users.insert().execute(r)
- assert users.select().execute().fetchall() == [(1, 'john')]
-
+ eq_(users.select().execute().fetchall(), [(1, 'john')])
+
def test_result_as_args(self):
users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
r = users.select().execute()
@@ -496,13 +564,12 @@ class QueryTest(TestBase):
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.outerjoin(addresses).select().execute().fetchone()
- try:
- print r['user_id']
- assert False
- except exc.InvalidRequestError, e:
- assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
- str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
+ r = users.outerjoin(addresses).select().execute().first()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
@testing.requires.subqueries
def test_column_label_targeting(self):
@@ -512,31 +579,29 @@ class QueryTest(TestBase):
users.select().alias('foo'),
users.select().alias(users.name),
):
- row = s.select(use_labels=True).execute().fetchone()
+ row = s.select(use_labels=True).execute().first()
assert row[s.c.user_id] == 7
assert row[s.c.user_name] == 'ed'
def test_keys(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_items(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
+ r = users.select().execute().first()
eq_(len(r), 2)
- r.close()
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+
+ r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(len(r), 2)
- r.close()
- r = testing.db.execute('select user_name from query_users').fetchone()
+ r = testing.db.execute('select user_name from query_users').first()
eq_(len(r), 1)
- r.close()
def test_cant_execute_join(self):
try:
@@ -549,7 +614,7 @@ class QueryTest(TestBase):
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
eq_(r[0], 1)
eq_(r[1], 'foo')
eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
@@ -558,7 +623,7 @@ class QueryTest(TestBase):
def test_column_order_with_text_query(self):
# should return values in query order
users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+ r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(r[0], 'foo')
eq_(r[1], 1)
eq_([x.lower() for x in r.keys()], ['user_name', 'user_id'])
@@ -580,7 +645,7 @@ class QueryTest(TestBase):
shadowed.create(checkfirst=True)
try:
shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+ r = shadowed.select(shadowed.c.shadow_id==1).execute().first()
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
@@ -622,13 +687,13 @@ class QueryTest(TestBase):
# Null values are not outside any set
assert len(r) == 0
- u = bindparam('search_key')
+ @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
+ def test_bind_in(self):
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 9, user_name = None)
- s = users.select(u.in_([]))
- r = s.execute(search_key='john').fetchall()
- assert len(r) == 0
- r = s.execute(search_key=None).fetchall()
- assert len(r) == 0
+ u = bindparam('search_key')
s = users.select(not_(u.in_([])))
r = s.execute(search_key='john').fetchall()
@@ -660,14 +725,15 @@ class QueryTest(TestBase):
class PercentSchemaNamesTest(TestBase):
"""tests using percent signs, spaces in table and column names.
- Doesn't pass for mysql, postgres, but this is really a
+ Doesn't pass for mysql, postgresql, but this is really a
SQLAlchemy bug - we should be escaping out %% signs for this
operation the same way we do for text() and column labels.
"""
+
@classmethod
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def setup_class(cls):
global percent_table, metadata
metadata = MetaData(testing.db)
@@ -680,12 +746,12 @@ class PercentSchemaNamesTest(TestBase):
@classmethod
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def teardown_class(cls):
metadata.drop_all()
@testing.crashes('mysql', 'mysqldb calls name % (params)')
- @testing.crashes('postgres', 'postgres calls name % (params)')
+ @testing.crashes('postgresql', 'postgresql calls name % (params)')
def test_roundtrip(self):
percent_table.insert().execute(
{'percent%':5, '%(oneofthese)s':7, 'spaces % more spaces':12},
@@ -731,7 +797,7 @@ class PercentSchemaNamesTest(TestBase):
percent_table.update().values({percent_table.c['%(oneofthese)s']:9, percent_table.c['spaces % more spaces']:15}).execute()
eq_(
- percent_table.select().order_by(percent_table.c['%(oneofthese)s']).execute().fetchall(),
+ percent_table.select().order_by(percent_table.c['percent%']).execute().fetchall(),
[
(5, 9, 15),
(7, 9, 15),
@@ -852,7 +918,11 @@ class CompoundTest(TestBase):
dict(col2="t3col2r2", col3="bbb", col4="aaa"),
dict(col2="t3col2r3", col3="ccc", col4="bbb"),
])
-
+
+ @engines.close_first
+ def teardown(self):
+ pass
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -878,6 +948,7 @@ class CompoundTest(TestBase):
found2 = self._fetchall_sorted(u.alias('bar').select().execute())
eq_(found2, wanted)
+ @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs")
def test_union_ordered(self):
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
@@ -891,6 +962,7 @@ class CompoundTest(TestBase):
('ccc', 'aaa')]
eq_(u.execute().fetchall(), wanted)
+ @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs")
@testing.fails_on('maxdb', 'FIXME: unknown')
@testing.requires.subqueries
def test_union_ordered_alias(self):
@@ -907,6 +979,7 @@ class CompoundTest(TestBase):
eq_(u.alias('bar').select().execute().fetchall(), wanted)
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
+ @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery")
@testing.fails_on('mysql', 'FIXME: unknown')
@testing.fails_on('sqlite', 'FIXME: unknown')
def test_union_all(self):
@@ -925,6 +998,29 @@ class CompoundTest(TestBase):
found2 = self._fetchall_sorted(e.alias('foo').select().execute())
eq_(found2, wanted)
+ def test_union_all_lightweight(self):
+ """like test_union_all, but breaks the sub-union into
+ a subquery with an explicit column reference on the outside,
+ more palatable to a wider variety of engines.
+
+ """
+ u = union(
+ select([t1.c.col3]),
+ select([t1.c.col3]),
+ ).alias()
+
+ e = union_all(
+ select([t1.c.col3]),
+ select([u.c.col3])
+ )
+
+ wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)]
+ found1 = self._fetchall_sorted(e.execute())
+ eq_(found1, wanted)
+
+ found2 = self._fetchall_sorted(e.alias('foo').select().execute())
+ eq_(found2, wanted)
+
@testing.crashes('firebird', 'Does not support intersect')
@testing.crashes('sybase', 'FIXME: unknown, verify not fails_on')
@testing.fails_on('mysql', 'FIXME: unknown')
@@ -1330,3 +1426,6 @@ class OperatorTest(TestBase):
order_by=flds.c.idcol).execute().fetchall(),
[(2,),(1,)]
)
+
+
+
diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py
index 64e097b85..3198a07af 100644
--- a/test/sql/test_quote.py
+++ b/test/sql/test_quote.py
@@ -129,7 +129,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL):
def testlabels(self):
"""test the quoting of labels.
- if labels arent quoted, a query in postgres in particular will fail since it produces:
+ if labels arent quoted, a query in postgresql in particular will fail since it produces:
SELECT LaLa.lowercase, LaLa."UPPERCASE", LaLa."MixedCase", LaLa."ASC"
FROM (SELECT DISTINCT "WorstCase1".lowercase AS lowercase, "WorstCase1"."UPPERCASE" AS UPPERCASE, "WorstCase1"."MixedCase" AS MixedCase, "WorstCase1"."ASC" AS ASC \nFROM "WorstCase1") AS LaLa
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
new file mode 100644
index 000000000..e076f3fe7
--- /dev/null
+++ b/test/sql/test_returning.py
@@ -0,0 +1,159 @@
+from sqlalchemy.test.testing import eq_
+from sqlalchemy import *
+from sqlalchemy.test import *
+from sqlalchemy.test.schema import Table, Column
+from sqlalchemy.types import TypeDecorator
+
+
+class ReturningTest(TestBase, AssertsExecutionResults):
+ __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access')
+
+ def setup(self):
+ meta = MetaData(testing.db)
+ global table, GoofyType
+
+ class GoofyType(TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ if value is None:
+ return None
+ return "FOO" + value
+
+ def process_result_value(self, value, dialect):
+ if value is None:
+ return None
+ return value + "BAR"
+
+ table = Table('tables', meta,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('persons', Integer),
+ Column('full', Boolean),
+ Column('goofy', GoofyType(50))
+ )
+ table.create(checkfirst=True)
+
+ def teardown(self):
+ table.drop()
+
+ @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_column_targeting(self):
+ result = table.insert().returning(table.c.id, table.c.full).execute({'persons': 1, 'full': False})
+
+ row = result.first()
+ assert row[table.c.id] == row['id'] == 1
+ assert row[table.c.full] == row['full'] == False
+
+ result = table.insert().values(persons=5, full=True, goofy="somegoofy").\
+ returning(table.c.persons, table.c.full, table.c.goofy).execute()
+ row = result.first()
+ assert row[table.c.persons] == row['persons'] == 5
+ assert row[table.c.full] == row['full'] == True
+ assert row[table.c.goofy] == row['goofy'] == "FOOsomegoofyBAR"
+
+ @testing.fails_on('firebird', "fb can't handle returning x AS y")
+ @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_labeling(self):
+ result = table.insert().values(persons=6).\
+ returning(table.c.persons.label('lala')).execute()
+ row = result.first()
+ assert row['lala'] == 6
+
+ @testing.fails_on('firebird', "fb/kintersbasdb can't handle the bind params")
+ @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_anon_expressions(self):
+ result = table.insert().values(goofy="someOTHERgoofy").\
+ returning(func.lower(table.c.goofy, type_=GoofyType)).execute()
+ row = result.first()
+ assert row[0] == "foosomeothergoofyBAR"
+
+ result = table.insert().values(persons=12).\
+ returning(table.c.persons + 18).execute()
+ row = result.first()
+ assert row[0] == 30
+
+ @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_update_returning(self):
+ table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
+
+ result = table.update(table.c.persons > 4, dict(full=True)).returning(table.c.id).execute()
+ eq_(result.fetchall(), [(1,)])
+
+ result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
+ eq_(result2.fetchall(), [(1,True),(2,False)])
+
+ @testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_insert_returning(self):
+ result = table.insert().returning(table.c.id).execute({'persons': 1, 'full': False})
+
+ eq_(result.fetchall(), [(1,)])
+
+ @testing.fails_on('postgresql', '')
+ @testing.fails_on('oracle', '')
+ def test_executemany():
+ # return value is documented as failing with psycopg2/executemany
+ result2 = table.insert().returning(table).execute(
+ [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
+
+ if testing.against('firebird', 'mssql'):
+ # Multiple inserts only return the last row
+ eq_(result2.fetchall(), [(3,3,True, None)])
+ else:
+ # nobody does this as far as we know (pg8000?)
+ eq_(result2.fetchall(), [(2, 2, False, None), (3,3,True, None)])
+
+ test_executemany()
+
+ result3 = table.insert().returning(table.c.id).execute({'persons': 4, 'full': False})
+ eq_([dict(row) for row in result3], [{'id': 4}])
+
+
+ @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ @testing.fails_on_everything_except('postgresql', 'firebird')
+ def test_literal_returning(self):
+ if testing.against("postgresql"):
+ literal_true = "true"
+ else:
+ literal_true = "1"
+
+ result4 = testing.db.execute('insert into tables (id, persons, "full") '
+ 'values (5, 10, %s) returning persons' % literal_true)
+ eq_([dict(row) for row in result4], [{'persons': 10}])
+
+ @testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
+ @testing.exclude('postgresql', '<', (8, 2), '8.3+ feature')
+ def test_delete_returning(self):
+ table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
+
+ result = table.delete(table.c.persons > 4).returning(table.c.id).execute()
+ eq_(result.fetchall(), [(1,)])
+
+ result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
+ eq_(result2.fetchall(), [(2,False),])
+
+class SequenceReturningTest(TestBase):
+ __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access', 'mssql')
+
+ def setup(self):
+ meta = MetaData(testing.db)
+ global table, seq
+ seq = Sequence('tid_seq')
+ table = Table('tables', meta,
+ Column('id', Integer, seq, primary_key=True),
+ Column('data', String(50))
+ )
+ table.create(checkfirst=True)
+
+ def teardown(self):
+ table.drop()
+
+ def test_insert(self):
+ r = table.insert().values(data='hi').returning(table.c.id).execute()
+ assert r.first() == (1, )
+ assert seq.execute() == 2
diff --git a/test/sql/test_select.py b/test/sql/test_select.py
index f70492fb3..9acc94eb2 100644
--- a/test/sql/test_select.py
+++ b/test/sql/test_select.py
@@ -5,7 +5,7 @@ from sqlalchemy import exc, sql, util
from sqlalchemy.sql import table, column, label, compiler
from sqlalchemy.sql.expression import ClauseList
from sqlalchemy.engine import default
-from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
+from sqlalchemy.databases import *
from sqlalchemy.test import *
table1 = table('mytable',
@@ -149,12 +149,10 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
)
self.assert_compile(
- select([cast("data", sqlite.SLInteger)], use_labels=True), # this will work with plain Integer in 0.6
+ select([cast("data", Integer)], use_labels=True), # this will work with plain Integer in 0.6
"SELECT CAST(:param_1 AS INTEGER) AS anon_1"
)
-
-
def test_nested_uselabels(self):
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
@@ -429,7 +427,12 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def test_operators(self):
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
- (operator.sub, '-'), (operator.div, '/'),
+ (operator.sub, '-'),
+ # Py3K
+ #(operator.truediv, '/'),
+ # Py2K
+ (operator.div, '/'),
+ # end Py2K
):
for (lhs, rhs, res) in (
(5, table1.c.myid, ':myid_1 %s mytable.myid'),
@@ -519,22 +522,22 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
(~table1.c.myid.like('somstr', escape='\\'), "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'", None),
(table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'", None),
(~table1.c.myid.ilike('somstr', escape='\\'), "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'", None),
- (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()),
- (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgres.PGDialect()),
+ (table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()),
+ (~table1.c.myid.ilike('somstr', escape='\\'), "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\'", postgresql.PGDialect()),
(table1.c.name.ilike('%something%'), "lower(mytable.name) LIKE lower(:name_1)", None),
- (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgres.PGDialect()),
+ (table1.c.name.ilike('%something%'), "mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
(~table1.c.name.ilike('%something%'), "lower(mytable.name) NOT LIKE lower(:name_1)", None),
- (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgres.PGDialect()),
+ (~table1.c.name.ilike('%something%'), "mytable.name NOT ILIKE %(name_1)s", postgresql.PGDialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
def test_match(self):
for expr, check, dialect in [
(table1.c.myid.match('somstr'), "mytable.myid MATCH ?", sqlite.SQLiteDialect()),
- (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.MySQLDialect()),
- (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.MSSQLDialect()),
- (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgres.PGDialect()),
- (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.OracleDialect()),
+ (table1.c.myid.match('somstr'), "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)", mysql.dialect()),
+ (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", mssql.dialect()),
+ (table1.c.myid.match('somstr'), "mytable.myid @@ to_tsquery(%(myid_1)s)", postgresql.dialect()),
+ (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", oracle.dialect()),
]:
self.assert_compile(expr, check, dialect=dialect)
@@ -635,7 +638,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
- for dialect in (firebird.dialect(), oracle.dialect()):
+ for dialect in (oracle.dialect(),):
self.assert_compile(
select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
@@ -748,7 +751,7 @@ WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid =
params={},
)
- dialect = postgres.dialect()
+ dialect = postgresql.dialect()
self.assert_compile(
text("select * from foo where lala=:bar and hoho=:whee", bindparams=[bindparam('bar',4), bindparam('whee',7)]),
"select * from foo where lala=%(bar)s and hoho=%(whee)s",
@@ -1122,10 +1125,10 @@ UNION SELECT mytable.myid FROM mytable"
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
- pp = positional.get_params()
+ pp = positional.params
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
- assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
- pp = positional.get_params(**test_param_dict)
+ assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
+ pp = positional.construct_params(test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
# check that params() doesnt modify original statement
@@ -1144,7 +1147,7 @@ UNION SELECT mytable.myid FROM mytable"
":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
- pp = positional.get_params()
+ pp = positional.params
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
@@ -1163,11 +1166,11 @@ UNION SELECT mytable.myid FROM mytable"
params = dict(('in%d' % i, i) for i in range(total_params))
sql = 'text clause %s' % ', '.join(in_clause)
t = text(sql)
- assert len(t.bindparams) == total_params
+ eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
- assert len(set(pp)) == total_params
- assert len(set(pp.values())) == total_params
+ eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp)))
+ eq_(len(set(pp.values())), total_params)
def test_bind_as_col(self):
@@ -1291,28 +1294,28 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[0])
eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), 'CAST(casttest.v1 AS %s)' %expected_results[1])
eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), 'CAST(casttest.ts AS %s)' %expected_results[2])
- eq_(str(cast(1234, TEXT).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3]))
+ eq_(str(cast(1234, Text).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[3]))
eq_(str(cast('test', String(20)).compile(dialect=dialect)), 'CAST(%s AS %s)' %(literal, expected_results[4]))
# fixme: shoving all of this dialect-specific stuff in one test
# is now officialy completely ridiculous AND non-obviously omits
# coverage on other dialects.
sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
if isinstance(dialect, type(mysql.dialect())):
- eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) AS anon_1 \nFROM casttest")
+ eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest")
else:
- eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest")
+ eq_(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC) AS anon_1 \nFROM casttest")
# first test with PostgreSQL engine
- check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
+ check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
# then the Oracle engine
- check_results(oracle.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1')
+ check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'CLOB', 'VARCHAR(20)'], ':param_1')
# then the sqlite engine
- check_results(sqlite.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
+ check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
# then the MySQL engine
- check_results(mysql.dialect(), ['DECIMAL(10, 2)', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s')
+ check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)', 'DATE', 'CHAR', 'CHAR(20)'], '%s')
self.assert_compile(cast(text('NULL'), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect())
self.assert_compile(cast(null(), Integer), "CAST(NULL AS INTEGER)", dialect=sqlite.dialect())
@@ -1360,7 +1363,6 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
s1 = select([table1.c.myid, table1.c.myid.label('foobar'), func.hoho(table1.c.name), func.lala(table1.c.name).label('gg')])
assert s1.c.keys() == ['myid', 'foobar', 'hoho(mytable.name)', 'gg']
- from sqlalchemy.databases.sqlite import SLNumeric
meta = MetaData()
t1 = Table('mytable', meta, Column('col1', Integer))
@@ -1368,7 +1370,7 @@ UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
(table1.c.name, 'name', 'mytable.name', None),
(table1.c.myid==12, 'mytable.myid = :myid_1', 'mytable.myid = :myid_1', 'anon_1'),
(func.hoho(table1.c.myid), 'hoho(mytable.myid)', 'hoho(mytable.myid)', 'hoho_1'),
- (cast(table1.c.name, SLNumeric), 'CAST(mytable.name AS NUMERIC(10, 2))', 'CAST(mytable.name AS NUMERIC(10, 2))', 'anon_1'),
+ (cast(table1.c.name, Numeric), 'CAST(mytable.name AS NUMERIC)', 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
(t1.c.col1, 'col1', 'mytable.col1', None),
(column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '')
):
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index b0501c913..95ca0d17b 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -416,7 +416,7 @@ class ReduceTest(TestBase, AssertsExecutionResults):
Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
)
- # this is essentially the union formed by the ORM's polymorphic_union function.
+ # this is essentially the union formed by the ORM's polymorphic_union function.
# we define two versions with different ordering of selects.
# the first selectable has the "real" column classified_page.magazine_page_id
@@ -432,7 +432,6 @@ class ReduceTest(TestBase, AssertsExecutionResults):
magazine_page_table.c.page_id,
cast(null(), Integer).label('magazine_page_id')
]).select_from(page_table.join(magazine_page_table)),
-
).alias('pjoin')
eq_(
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 15799358a..9c90549e2 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -1,101 +1,63 @@
+# coding: utf-8
from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
import decimal
import datetime, os, re
from sqlalchemy import *
-from sqlalchemy import exc, types, util
+from sqlalchemy import exc, types, util, schema
from sqlalchemy.sql import operators
from sqlalchemy.test.testing import eq_
import sqlalchemy.engine.url as url
-from sqlalchemy.databases import mssql, oracle, mysql, postgres, firebird
+from sqlalchemy.databases import *
+
from sqlalchemy.test import *
class AdaptTest(TestBase):
- def testadapt(self):
- e1 = url.URL('postgres').get_dialect()()
- e2 = url.URL('mysql').get_dialect()()
- e3 = url.URL('sqlite').get_dialect()()
- e4 = url.URL('firebird').get_dialect()()
-
- type = String(40)
-
- t1 = type.dialect_impl(e1)
- t2 = type.dialect_impl(e2)
- t3 = type.dialect_impl(e3)
- t4 = type.dialect_impl(e4)
-
- impls = [t1, t2, t3, t4]
- for i,ta in enumerate(impls):
- for j,tb in enumerate(impls):
- if i == j:
- assert ta == tb # call me paranoid... :)
+ def test_uppercase_rendering(self):
+ """Test that uppercase types from types.py always render as their type.
+
+ As of SQLA 0.6, using an uppercase type means you want specifically that
+ type. If the database in use doesn't support that DDL, it (the DB backend)
+ should raise an error - it means you should be using a lowercased (genericized) type.
+
+ """
+
+ for dialect in [
+ oracle.dialect(),
+ mysql.dialect(),
+ postgresql.dialect(),
+ sqlite.dialect(),
+ sybase.dialect(),
+ informix.dialect(),
+ maxdb.dialect(),
+ mssql.dialect()]: # TODO when dialects are complete: engines.all_dialects():
+ for type_, expected in (
+ (FLOAT, "FLOAT"),
+ (NUMERIC, "NUMERIC"),
+ (DECIMAL, "DECIMAL"),
+ (INTEGER, "INTEGER"),
+ (SMALLINT, "SMALLINT"),
+ (TIMESTAMP, "TIMESTAMP"),
+ (DATETIME, "DATETIME"),
+ (DATE, "DATE"),
+ (TIME, "TIME"),
+ (CLOB, "CLOB"),
+ (VARCHAR, "VARCHAR"),
+ (NVARCHAR, ("NVARCHAR", "NATIONAL VARCHAR")),
+ (CHAR, "CHAR"),
+ (NCHAR, ("NCHAR", "NATIONAL CHAR")),
+ (BLOB, "BLOB"),
+ (BOOLEAN, ("BOOLEAN", "BOOL"))
+ ):
+ if isinstance(expected, str):
+ expected = (expected, )
+ for exp in expected:
+ compiled = type_().compile(dialect=dialect)
+ if exp in compiled:
+ break
else:
- assert ta != tb
-
- def testmsnvarchar(self):
- dialect = mssql.MSSQLDialect()
- # run the test twice to ensure the caching step works too
- for x in range(0, 1):
- col = Column('', Unicode(length=10))
- dialect_type = col.type.dialect_impl(dialect)
- assert isinstance(dialect_type, mssql.MSNVarchar)
- assert dialect_type.get_col_spec() == 'NVARCHAR(10)'
-
-
- def testoracletimestamp(self):
- dialect = oracle.OracleDialect()
- t1 = oracle.OracleTimestamp
- t2 = oracle.OracleTimestamp()
- t3 = types.TIMESTAMP
- assert isinstance(dialect.type_descriptor(t1), oracle.OracleTimestamp)
- assert isinstance(dialect.type_descriptor(t2), oracle.OracleTimestamp)
- assert isinstance(dialect.type_descriptor(t3), oracle.OracleTimestamp)
-
- def testmysqlbinary(self):
- dialect = mysql.MySQLDialect()
- t1 = mysql.MSVarBinary
- t2 = mysql.MSVarBinary()
- assert isinstance(dialect.type_descriptor(t1), mysql.MSVarBinary)
- assert isinstance(dialect.type_descriptor(t2), mysql.MSVarBinary)
-
- def teststringadapt(self):
- """test that String with no size becomes TEXT, *all* others stay as varchar/String"""
-
- oracle_dialect = oracle.OracleDialect()
- mysql_dialect = mysql.MySQLDialect()
- postgres_dialect = postgres.PGDialect()
- firebird_dialect = firebird.FBDialect()
-
- for dialect, start, test in [
- (oracle_dialect, String(), oracle.OracleString),
- (oracle_dialect, VARCHAR(), oracle.OracleString),
- (oracle_dialect, String(50), oracle.OracleString),
- (oracle_dialect, Unicode(), oracle.OracleString),
- (oracle_dialect, UnicodeText(), oracle.OracleText),
- (oracle_dialect, NCHAR(), oracle.OracleString),
- (oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw),
- (mysql_dialect, String(), mysql.MSString),
- (mysql_dialect, VARCHAR(), mysql.MSString),
- (mysql_dialect, String(50), mysql.MSString),
- (mysql_dialect, Unicode(), mysql.MSString),
- (mysql_dialect, UnicodeText(), mysql.MSText),
- (mysql_dialect, NCHAR(), mysql.MSNChar),
- (postgres_dialect, String(), postgres.PGString),
- (postgres_dialect, VARCHAR(), postgres.PGString),
- (postgres_dialect, String(50), postgres.PGString),
- (postgres_dialect, Unicode(), postgres.PGString),
- (postgres_dialect, UnicodeText(), postgres.PGText),
- (postgres_dialect, NCHAR(), postgres.PGString),
- (firebird_dialect, String(), firebird.FBString),
- (firebird_dialect, VARCHAR(), firebird.FBString),
- (firebird_dialect, String(50), firebird.FBString),
- (firebird_dialect, Unicode(), firebird.FBString),
- (firebird_dialect, UnicodeText(), firebird.FBText),
- (firebird_dialect, NCHAR(), firebird.FBString),
- ]:
- assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
-
-
+ assert False, "%r matches none of %r for dialect %s" % (compiled, expected, dialect.name)
+
class UserDefinedTest(TestBase):
"""tests user-defined types."""
@@ -131,7 +93,7 @@ class UserDefinedTest(TestBase):
def setup_class(cls):
global users, metadata
- class MyType(types.TypeEngine):
+ class MyType(types.UserDefinedType):
def get_col_spec(self):
return "VARCHAR(100)"
def bind_processor(self, dialect):
@@ -267,124 +229,105 @@ class ColumnsTest(TestBase, AssertsExecutionResults):
for aCol in testTable.c:
eq_(
expectedResults[aCol.name],
- db.dialect.schemagenerator(db.dialect, db, None, None).\
+ db.dialect.ddl_compiler(db.dialect, schema.CreateTable(testTable)).\
get_column_specification(aCol))
class UnicodeTest(TestBase, AssertsExecutionResults):
"""tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
+
@classmethod
def setup_class(cls):
- global unicode_table
+ global unicode_table, metadata
metadata = MetaData(testing.db)
unicode_table = Table('unicode_table', metadata,
Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
Column('unicode_varchar', Unicode(250)),
Column('unicode_text', UnicodeText),
- Column('plain_varchar', String(250))
)
- unicode_table.create()
+ metadata.create_all()
+
@classmethod
def teardown_class(cls):
- unicode_table.drop()
+ metadata.drop_all()
+ @engines.close_first
def teardown(self):
unicode_table.delete().execute()
def test_round_trip(self):
- assert unicode_table.c.unicode_varchar.type.length == 250
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
- if testing.against('sqlite'):
- rawdata = "something"
-
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
- x = unicode_table.select().execute().fetchone()
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
+
+ x = unicode_table.select().execute().first()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
- if isinstance(x['plain_varchar'], unicode):
- # SQLLite and MSSQL return non-unicode data as unicode
- self.assert_(testing.against('sqlite', 'mssql'))
- if not testing.against('sqlite'):
- self.assert_(x['plain_varchar'] == unicodedata)
- else:
- self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
- def test_union(self):
- """ensure compiler processing works for UNIONs"""
+ def test_round_trip_executemany(self):
+ # cx_oracle was producing different behavior for cursor.executemany()
+ # vs. cursor.execute()
+
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
- if testing.against('sqlite'):
- rawdata = "something"
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
-
- x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone()
+ unicode_table.insert().execute(
+ dict(unicode_varchar=unicodedata,unicode_text=unicodedata),
+ dict(unicode_varchar=unicodedata,unicode_text=unicodedata)
+ )
+
+ x = unicode_table.select().execute().first()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
+ self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
- def test_assertions(self):
- try:
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert False
- except exc.SAWarning, e:
- assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
+ def test_union(self):
+ """ensure compiler processing works for UNIONs"""
- unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
- 'assert_unicode':True})
- try:
- try:
- unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode')
- assert False
- except exc.InvalidRequestError, e:
- assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
-
- @testing.emits_warning('.*non-unicode bind')
- def warns():
- # test that data still goes in if warning is emitted....
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
- warns()
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
- finally:
- unicode_engine.dispose()
+ unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
+
+ x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().first()
+ self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
- @testing.fails_on('oracle', 'FIXME: unknown')
+ @testing.fails_on('oracle', 'oracle converts empty strings to a blank space')
def test_blank_strings(self):
unicode_table.insert().execute(unicode_varchar=u'')
assert select([unicode_table.c.unicode_varchar]).scalar() == u''
- def test_engine_parameter(self):
- """tests engine-wide unicode conversion"""
- prev_unicode = testing.db.engine.dialect.convert_unicode
- prev_assert = testing.db.engine.dialect.assert_unicode
- try:
- testing.db.engine.dialect.convert_unicode = True
- testing.db.engine.dialect.assert_unicode = False
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
- if testing.against('sqlite', 'mssql'):
- rawdata = "something"
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
- x = unicode_table.select().execute().fetchone()
- self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
- self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
- if not testing.against('sqlite', 'mssql'):
- self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)
- finally:
- testing.db.engine.dialect.convert_unicode = prev_unicode
- testing.db.engine.dialect.convert_unicode = prev_assert
-
- @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
- @testing.fails_on('firebird', 'Data type unknown')
- def test_length_function(self):
- """checks the database correctly understands the length of a unicode string"""
- teststr = u'aaa\x1234'
- self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))
+ def test_parameters(self):
+ """test the dialect convert_unicode parameters."""
+
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ u = Unicode(assert_unicode=True)
+ uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+ # Py3K
+ #assert_raises(exc.InvalidRequestError, uni, b'x')
+ # Py2K
+ assert_raises(exc.InvalidRequestError, uni, 'x')
+ # end Py2K
+
+ u = Unicode()
+ uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+ # Py3K
+ #assert_raises(exc.SAWarning, uni, b'x')
+ # Py2K
+ assert_raises(exc.SAWarning, uni, 'x')
+ # end Py2K
+
+ unicode_engine = engines.utf8_engine(options={'convert_unicode':True,'assert_unicode':True})
+ unicode_engine.dialect.supports_unicode_binds = False
+
+ s = String()
+ uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect)
+ # Py3K
+ #assert_raises(exc.InvalidRequestError, uni, b'x')
+ #assert isinstance(uni(unicodedata), bytes)
+ # Py2K
+ assert_raises(exc.InvalidRequestError, uni, 'x')
+ assert isinstance(uni(unicodedata), str)
+ # end Py2K
+
+ assert uni(unicodedata) == unicodedata.encode('utf-8')
class BinaryTest(TestBase, AssertsExecutionResults):
__excluded_on__ = (
@@ -409,18 +352,19 @@ class BinaryTest(TestBase, AssertsExecutionResults):
return value
binary_table = Table('binary_table', MetaData(testing.db),
- Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
- Column('data', Binary),
- Column('data_slice', Binary(100)),
- Column('misc', String(30)),
- # construct PickleType with non-native pickle module, since cPickle uses relative module
- # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
- # to the 'types' module
- Column('pickled', PickleType),
- Column('mypickle', MyPickleType)
+ Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
+ Column('data', Binary),
+ Column('data_slice', Binary(100)),
+ Column('misc', String(30)),
+ # construct PickleType with non-native pickle module, since cPickle uses relative module
+ # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
+ # to the 'types' module
+ Column('pickled', PickleType),
+ Column('mypickle', MyPickleType)
)
binary_table.create()
+ @engines.close_first
def teardown(self):
binary_table.delete().execute()
@@ -428,42 +372,65 @@ class BinaryTest(TestBase, AssertsExecutionResults):
def teardown_class(cls):
binary_table.drop()
- @testing.fails_on('mssql', 'MSSQl BINARY type right pads the fixed length with \x00')
- def testbinary(self):
+ def test_round_trip(self):
testobj1 = pickleable.Foo('im foo 1')
testobj2 = pickleable.Foo('im foo 2')
testobj3 = pickleable.Foo('im foo 3')
stream1 =self.load_stream('binary_data_one.dat')
stream2 =self.load_stream('binary_data_two.dat')
- binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
- binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
- binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+ binary_table.insert().execute(
+ primary_id=1,
+ misc='binary_data_one.dat',
+ data=stream1,
+ data_slice=stream1[0:100],
+ pickled=testobj1,
+ mypickle=testobj3)
+ binary_table.insert().execute(
+ primary_id=2,
+ misc='binary_data_two.dat',
+ data=stream2,
+ data_slice=stream2[0:99],
+ pickled=testobj2)
+ binary_table.insert().execute(
+ primary_id=3,
+ misc='binary_data_two.dat',
+ data=None,
+ data_slice=stream2[0:99],
+ pickled=None)
for stmt in (
binary_table.select(order_by=binary_table.c.primary_id),
text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db)
):
+ eq_data = lambda x, y: eq_(list(x), list(y))
+ if util.jython:
+ _eq_data = eq_data
+ def eq_data(x, y):
+ # Jython currently returns arrays
+ from array import ArrayType
+ if isinstance(y, ArrayType):
+ return eq_(x, y.tostring())
+ return _eq_data(x, y)
l = stmt.execute().fetchall()
- eq_(list(stream1), list(l[0]['data']))
- eq_(list(stream1[0:100]), list(l[0]['data_slice']))
- eq_(list(stream2), list(l[1]['data']))
+ eq_data(stream1, l[0]['data'])
+ eq_data(stream1[0:100], l[0]['data_slice'])
+ eq_data(stream2, l[1]['data'])
eq_(testobj1, l[0]['pickled'])
eq_(testobj2, l[1]['pickled'])
eq_(testobj3.moredata, l[0]['mypickle'].moredata)
eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
- def load_stream(self, name, len=12579):
+ def load_stream(self, name):
f = os.path.join(os.path.dirname(__file__), "..", name)
- # put a number less than the typical MySQL default BLOB size
- return file(f).read(len)
+ return open(f, mode='rb').read()
class ExpressionTest(TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
global test_table, meta
- class MyCustomType(types.TypeEngine):
+ class MyCustomType(types.UserDefinedType):
def get_col_spec(self):
return "INT"
def bind_processor(self, dialect):
@@ -547,7 +514,6 @@ class DateTest(TestBase, AssertsExecutionResults):
db = testing.db
if testing.against('oracle'):
- import sqlalchemy.databases.oracle as oracle
insert_data = [
(7, 'jack',
datetime.datetime(2005, 11, 10, 0, 0),
@@ -576,7 +542,7 @@ class DateTest(TestBase, AssertsExecutionResults):
time_micro = 999
# Missing or poor microsecond support:
- if testing.against('mssql', 'mysql', 'firebird'):
+ if testing.against('mssql', 'mysql', 'firebird', '+zxjdbc'):
datetime_micro, time_micro = 0, 0
# No microseconds for TIME
elif testing.against('maxdb'):
@@ -608,7 +574,7 @@ class DateTest(TestBase, AssertsExecutionResults):
Column('user_date', Date),
Column('user_time', Time)]
- if testing.against('sqlite', 'postgres'):
+ if testing.against('sqlite', 'postgresql'):
insert_data.append(
(11, 'historic',
datetime.datetime(1850, 11, 10, 11, 52, 35, datetime_micro),
@@ -676,8 +642,8 @@ class DateTest(TestBase, AssertsExecutionResults):
t.drop(checkfirst=True)
class StringTest(TestBase, AssertsExecutionResults):
- @testing.fails_on('mysql', 'FIXME: unknown')
- @testing.fails_on('oracle', 'FIXME: unknown')
+
+ @testing.requires.unbounded_varchar
def test_nolength_string(self):
metadata = MetaData(testing.db)
foo = Table('foo', metadata, Column('one', String))
@@ -700,10 +666,10 @@ class NumericTest(TestBase, AssertsExecutionResults):
metadata = MetaData(testing.db)
numeric_table = Table('numeric_table', metadata,
Column('id', Integer, Sequence('numeric_id_seq', optional=True), primary_key=True),
- Column('numericcol', Numeric(asdecimal=False)),
- Column('floatcol', Float),
- Column('ncasdec', Numeric),
- Column('fcasdec', Float(asdecimal=True))
+ Column('numericcol', Numeric(precision=10, scale=2, asdecimal=False)),
+ Column('floatcol', Float(precision=10, )),
+ Column('ncasdec', Numeric(precision=10, scale=2)),
+ Column('fcasdec', Float(precision=10, asdecimal=True))
)
metadata.create_all()
@@ -711,6 +677,7 @@ class NumericTest(TestBase, AssertsExecutionResults):
def teardown_class(cls):
metadata.drop_all()
+ @engines.close_first
def teardown(self):
numeric_table.delete().execute()
@@ -719,6 +686,7 @@ class NumericTest(TestBase, AssertsExecutionResults):
from decimal import Decimal
numeric_table.insert().execute(
numericcol=3.5, floatcol=5.6, ncasdec=12.4, fcasdec=15.75)
+
numeric_table.insert().execute(
numericcol=Decimal("3.5"), floatcol=Decimal("5.6"),
ncasdec=Decimal("12.4"), fcasdec=Decimal("15.75"))
@@ -744,33 +712,6 @@ class NumericTest(TestBase, AssertsExecutionResults):
assert isinstance(row['ncasdec'], decimal.Decimal)
assert isinstance(row['fcasdec'], decimal.Decimal)
- def test_length_deprecation(self):
- assert_raises(exc.SADeprecationWarning, Numeric, length=8)
-
- @testing.uses_deprecated(".*is deprecated for Numeric")
- def go():
- n = Numeric(length=12)
- assert n.scale == 12
- go()
-
- n = Numeric(scale=12)
- for dialect in engines.all_dialects():
- n2 = dialect.type_descriptor(n)
- eq_(n2.scale, 12, dialect.name)
-
- # test colspec generates successfully using 'scale'
- assert n2.get_col_spec()
-
- # test constructor of the dialect-specific type
- n3 = n2.__class__(scale=5)
- eq_(n3.scale, 5, dialect.name)
-
- @testing.uses_deprecated(".*is deprecated for Numeric")
- def go():
- n3 = n2.__class__(length=6)
- eq_(n3.scale, 6, dialect.name)
- go()
-
class IntervalTest(TestBase, AssertsExecutionResults):
@classmethod
@@ -783,6 +724,7 @@ class IntervalTest(TestBase, AssertsExecutionResults):
)
metadata.create_all()
+ @engines.close_first
def teardown(self):
interval_table.delete().execute()
@@ -790,14 +732,16 @@ class IntervalTest(TestBase, AssertsExecutionResults):
def teardown_class(cls):
metadata.drop_all()
+ @testing.fails_on("+pg8000", "Not yet known how to pass values of the INTERVAL type")
+ @testing.fails_on("postgresql+zxjdbc", "Not yet known how to pass values of the INTERVAL type")
def test_roundtrip(self):
delta = datetime.datetime(2006, 10, 5) - datetime.datetime(2005, 8, 17)
interval_table.insert().execute(interval=delta)
- assert interval_table.select().execute().fetchone()['interval'] == delta
+ assert interval_table.select().execute().first()['interval'] == delta
def test_null(self):
interval_table.insert().execute(id=1, inverval=None)
- assert interval_table.select().execute().fetchone()['interval'] is None
+ assert interval_table.select().execute().first()['interval'] is None
class BooleanTest(TestBase, AssertsExecutionResults):
@classmethod
@@ -825,30 +769,6 @@ class BooleanTest(TestBase, AssertsExecutionResults):
assert(res2==[(2, False)])
class PickleTest(TestBase):
- def test_noeq_deprecation(self):
- p1 = PickleType()
-
- assert_raises(DeprecationWarning,
- p1.compare_values, pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)
- )
-
- assert_raises(DeprecationWarning,
- p1.compare_values, pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)
- )
-
- @testing.uses_deprecated()
- def go():
- # test actual dumps comparison
- assert p1.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
- assert p1.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
- go()
-
- assert p1.compare_values({1:2, 3:4}, {3:4, 1:2})
-
- p2 = PickleType(mutable=False)
- assert not p2.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
- assert not p2.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
-
def test_eq_comparison(self):
p1 = PickleType()
diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py
index d75913267..6551594f3 100644
--- a/test/sql/test_unicode.py
+++ b/test/sql/test_unicode.py
@@ -56,6 +56,7 @@ class UnicodeSchemaTest(TestBase):
)
metadata.create_all()
+ @engines.close_first
def teardown(self):
if metadata.tables:
t3.delete().execute()
@@ -125,11 +126,11 @@ class EscapesDefaultsTest(testing.TestBase):
# reset the identifier preparer, so that we can force it to cache
# a unicode identifier
engine.dialect.identifier_preparer = engine.dialect.preparer(engine.dialect)
- select([column(u'special_col')]).select_from(t1).execute()
+ select([column(u'special_col')]).select_from(t1).execute().close()
assert isinstance(engine.dialect.identifier_preparer.format_sequence(Sequence('special_col')), unicode)
# now execute, run the sequence. it should run in u"Special_col.nextid" or similar as
- # a unicode object; cx_oracle asserts that this is None or a String (postgres lets it pass thru).
+ # a unicode object; cx_oracle asserts that this is None or a String (postgresql lets it pass thru).
# ensure that base.DefaultRunner is encoding.
t1.insert().execute(data='foo')
finally: