summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/base/test_events.py15
-rw-r--r--test/dialect/mssql/test_engine.py3
-rw-r--r--test/dialect/mssql/test_query.py76
-rw-r--r--test/dialect/mssql/test_reflection.py11
-rw-r--r--test/dialect/mssql/test_types.py312
-rw-r--r--test/dialect/mysql/test_query.py39
-rw-r--r--test/dialect/mysql/test_types.py345
-rw-r--r--test/dialect/postgresql/test_compiler.py54
-rw-r--r--test/dialect/postgresql/test_dialect.py3
-rw-r--r--test/dialect/postgresql/test_query.py327
-rw-r--r--test/dialect/postgresql/test_reflection.py12
-rw-r--r--test/dialect/postgresql/test_types.py20
-rw-r--r--test/dialect/test_oracle.py138
-rw-r--r--test/dialect/test_sqlite.py772
-rw-r--r--test/dialect/test_suite.py1
-rw-r--r--test/engine/test_execute.py318
-rw-r--r--test/engine/test_parseconnect.py185
-rw-r--r--test/engine/test_transaction.py79
-rw-r--r--test/ext/test_extendedattr.py2
-rw-r--r--test/ext/test_horizontal_shard.py2
-rw-r--r--test/orm/test_bulk.py358
-rw-r--r--test/orm/test_cycles.py28
-rw-r--r--test/orm/test_deferred.py133
-rw-r--r--test/orm/test_loading.py33
-rw-r--r--test/orm/test_mapper.py13
-rw-r--r--test/orm/test_naturalpks.py76
-rw-r--r--test/orm/test_query.py105
-rw-r--r--test/orm/test_session.py7
-rw-r--r--test/orm/test_transaction.py17
-rw-r--r--test/orm/test_unitofworkv2.py99
-rw-r--r--test/orm/test_versioning.py28
-rw-r--r--test/profiles.txt89
-rw-r--r--test/requirements.py32
-rw-r--r--test/sql/test_compiler.py2
-rw-r--r--test/sql/test_constraints.py219
-rw-r--r--test/sql/test_cte.py30
-rw-r--r--test/sql/test_ddlemit.py67
-rw-r--r--test/sql/test_defaults.py25
-rw-r--r--test/sql/test_insert.py136
-rw-r--r--test/sql/test_join_rewriting.py1
-rw-r--r--test/sql/test_metadata.py43
-rw-r--r--test/sql/test_operators.py67
-rw-r--r--test/sql/test_types.py71
43 files changed, 3430 insertions, 963 deletions
diff --git a/test/base/test_events.py b/test/base/test_events.py
index 89379961e..8cfbd0180 100644
--- a/test/base/test_events.py
+++ b/test/base/test_events.py
@@ -155,23 +155,20 @@ class EventsTest(fixtures.TestBase):
t1.dispatch.event_one(5, 6)
t2.dispatch.event_one(5, 6)
is_(
- t1.dispatch.__dict__['event_one'],
- self.Target.dispatch.event_one.
- _empty_listeners[self.Target]
+ self.Target.dispatch._empty_listener_reg[self.Target]['event_one'],
+ t1.dispatch.event_one
)
@event.listens_for(t1, "event_one")
def listen_two(x, y):
pass
is_not_(
- t1.dispatch.__dict__['event_one'],
- self.Target.dispatch.event_one.
- _empty_listeners[self.Target]
+ self.Target.dispatch._empty_listener_reg[self.Target]['event_one'],
+ t1.dispatch.event_one
)
is_(
- t2.dispatch.__dict__['event_one'],
- self.Target.dispatch.event_one.
- _empty_listeners[self.Target]
+ self.Target.dispatch._empty_listener_reg[self.Target]['event_one'],
+ t2.dispatch.event_one
)
def test_immutable_methods(self):
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py
index 4b4780d43..a994b1787 100644
--- a/test/dialect/mssql/test_engine.py
+++ b/test/dialect/mssql/test_engine.py
@@ -157,8 +157,7 @@ class ParseConnectTest(fixtures.TestBase):
eq_(dialect.is_disconnect("not an error", None, None), False)
- @testing.only_on(['mssql+pyodbc', 'mssql+pymssql'],
- "FreeTDS specific test")
+ @testing.requires.mssql_freetds
def test_bad_freetds_warning(self):
engine = engines.testing_engine()
diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py
index 715eebb84..e0affe831 100644
--- a/test/dialect/mssql/test_query.py
+++ b/test/dialect/mssql/test_query.py
@@ -7,6 +7,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import testing
from sqlalchemy.util import ue
from sqlalchemy import util
+from sqlalchemy.testing.assertsql import CursorSQL
@@ -163,7 +164,6 @@ class QueryUnicodeTest(fixtures.TestBase):
finally:
meta.drop_all()
-from sqlalchemy.testing.assertsql import ExactSQL
class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
__only_on__ = 'mssql'
@@ -232,27 +232,73 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
con.execute("""drop trigger paj""")
meta.drop_all()
- @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature')
@testing.provide_metadata
def test_disable_scope_identity(self):
engine = engines.testing_engine(options={"use_scope_identity": False})
metadata = self.metadata
- metadata.bind = engine
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
- implicit_returning=False
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ implicit_returning=False
)
- metadata.create_all()
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert(), {"data": "somedata"})
+
+ asserter.assert_(
+ CursorSQL(
+ "INSERT INTO t1 (data) VALUES (?)",
+ ("somedata", )
+ ),
+ CursorSQL("SELECT @@identity AS lastrowid"),
+ )
+
+ @testing.provide_metadata
+ def test_enable_scope_identity(self):
+ engine = engines.testing_engine(options={"use_scope_identity": True})
+ metadata = self.metadata
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ implicit_returning=False
+ )
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert())
+
+ # even with pyodbc, we don't embed the scope identity on a
+ # DEFAULT VALUES insert
+ asserter.assert_(
+ CursorSQL("INSERT INTO t1 DEFAULT VALUES"),
+ CursorSQL("SELECT scope_identity() AS lastrowid"),
+ )
+
+ @testing.only_on('mssql+pyodbc')
+ @testing.provide_metadata
+ def test_embedded_scope_identity(self):
+ engine = engines.testing_engine(options={"use_scope_identity": True})
+ metadata = self.metadata
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ implicit_returning=False
+ )
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert(), {'data': 'somedata'})
- self.assert_sql_execution(
- testing.db,
- lambda: engine.execute(t1.insert()),
- ExactSQL("INSERT INTO t1 DEFAULT VALUES"),
- # we don't have an event for
- # "SELECT @@IDENTITY" part here.
- # this will be in 0.8 with #2459
+ # pyodbc-specific system
+ asserter.assert_(
+ CursorSQL(
+ "INSERT INTO t1 (data) VALUES (?); select scope_identity()",
+ ("somedata", )
+ ),
)
- assert not engine.dialect.use_scope_identity
def test_insertid_schema(self):
meta = MetaData(testing.db)
diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py
index 0ef69f656..bee441586 100644
--- a/test/dialect/mssql/test_reflection.py
+++ b/test/dialect/mssql/test_reflection.py
@@ -24,14 +24,14 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
Column('user_name', types.VARCHAR(20), nullable=False),
Column('test1', types.CHAR(5), nullable=False),
Column('test2', types.Float(5), nullable=False),
- Column('test3', types.Text),
+ Column('test3', types.Text('max')),
Column('test4', types.Numeric, nullable=False),
Column('test5', types.DateTime),
Column('parent_user_id', types.Integer,
ForeignKey('engine_users.user_id')),
Column('test6', types.DateTime, nullable=False),
- Column('test7', types.Text),
- Column('test8', types.LargeBinary),
+ Column('test7', types.Text('max')),
+ Column('test8', types.LargeBinary('max')),
Column('test_passivedefault2', types.Integer,
server_default='5'),
Column('test9', types.BINARY(100)),
@@ -204,6 +204,11 @@ class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL):
class ReflectHugeViewTest(fixtures.TestBase):
__only_on__ = 'mssql'
+ # crashes on freetds 0.91, not worth it
+ __skip_if__ = (
+ lambda: testing.requires.mssql_freetds.enabled,
+ )
+
def setup(self):
self.col_num = 150
diff --git a/test/dialect/mssql/test_types.py b/test/dialect/mssql/test_types.py
index 9dc1983ae..5c9157379 100644
--- a/test/dialect/mssql/test_types.py
+++ b/test/dialect/mssql/test_types.py
@@ -2,12 +2,15 @@
from sqlalchemy.testing import eq_, engines, pickleable
import datetime
import os
-from sqlalchemy import *
+from sqlalchemy import Table, Column, MetaData, Float, \
+ Integer, String, Boolean, TIMESTAMP, Sequence, Numeric, select, \
+ Date, Time, DateTime, DefaultClause, PickleType, text, Text, \
+ UnicodeText, LargeBinary
from sqlalchemy import types, schema
from sqlalchemy.databases import mssql
from sqlalchemy.dialects.mssql.base import TIME
from sqlalchemy.testing import fixtures, \
- AssertsExecutionResults, ComparesTables
+ AssertsExecutionResults, ComparesTables
from sqlalchemy import testing
from sqlalchemy.testing import emits_warning_on
import decimal
@@ -32,6 +35,7 @@ class TimeTypeTest(fixtures.TestBase):
class TypeDDLTest(fixtures.TestBase):
+
def test_boolean(self):
"Exercise type specification for boolean type."
@@ -39,7 +43,7 @@ class TypeDDLTest(fixtures.TestBase):
# column type, args, kwargs, expected ddl
(Boolean, [], {},
'BIT'),
- ]
+ ]
metadata = MetaData()
table_args = ['test_mssql_boolean', metadata]
@@ -54,11 +58,11 @@ class TypeDDLTest(fixtures.TestBase):
for col in boolean_table.c:
index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
+ testing.eq_(
+ gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
-
def test_numeric(self):
"Exercise type specification and options for numeric types."
@@ -88,7 +92,7 @@ class TypeDDLTest(fixtures.TestBase):
'TINYINT'),
(types.SmallInteger, [], {},
'SMALLINT'),
- ]
+ ]
metadata = MetaData()
table_args = ['test_mssql_numeric', metadata]
@@ -103,11 +107,11 @@ class TypeDDLTest(fixtures.TestBase):
for col in numeric_table.c:
index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
+ testing.eq_(
+ gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
-
def test_char(self):
"""Exercise COLLATE-ish options on string types."""
@@ -149,7 +153,7 @@ class TypeDDLTest(fixtures.TestBase):
'NTEXT'),
(mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
'NTEXT COLLATE Latin1_General_CI_AS'),
- ]
+ ]
metadata = MetaData()
table_args = ['test_mssql_charset', metadata]
@@ -164,10 +168,48 @@ class TypeDDLTest(fixtures.TestBase):
for col in charset_table.c:
index = int(col.name[1:])
- testing.eq_(gen.get_column_specification(col),
- "%s %s" % (col.name, columns[index][3]))
+ testing.eq_(
+ gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
self.assert_(repr(col))
+ def test_large_type_deprecation(self):
+ d1 = mssql.dialect(deprecate_large_types=True)
+ d2 = mssql.dialect(deprecate_large_types=False)
+ d3 = mssql.dialect()
+ d3.server_version_info = (11, 0)
+ d3._setup_version_attributes()
+ d4 = mssql.dialect()
+ d4.server_version_info = (10, 0)
+ d4._setup_version_attributes()
+
+ for dialect in (d1, d3):
+ eq_(
+ str(Text().compile(dialect=dialect)),
+ "VARCHAR(max)"
+ )
+ eq_(
+ str(UnicodeText().compile(dialect=dialect)),
+ "NVARCHAR(max)"
+ )
+ eq_(
+ str(LargeBinary().compile(dialect=dialect)),
+ "VARBINARY(max)"
+ )
+
+ for dialect in (d2, d4):
+ eq_(
+ str(Text().compile(dialect=dialect)),
+ "TEXT"
+ )
+ eq_(
+ str(UnicodeText().compile(dialect=dialect)),
+ "NTEXT"
+ )
+ eq_(
+ str(LargeBinary().compile(dialect=dialect)),
+ "IMAGE"
+ )
def test_timestamp(self):
"""Exercise TIMESTAMP column."""
@@ -176,9 +218,10 @@ class TypeDDLTest(fixtures.TestBase):
metadata = MetaData()
spec, expected = (TIMESTAMP, 'TIMESTAMP')
- t = Table('mssql_ts', metadata,
- Column('id', Integer, primary_key=True),
- Column('t', spec, nullable=None))
+ t = Table(
+ 'mssql_ts', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t', spec, nullable=None))
gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
self.assert_(repr(t.c.t))
@@ -255,7 +298,11 @@ class TypeDDLTest(fixtures.TestBase):
% (col.name, columns[index][3]))
self.assert_(repr(col))
-class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables):
+metadata = None
+
+
+class TypeRoundTripTest(
+ fixtures.TestBase, AssertsExecutionResults, ComparesTables):
__only_on__ = 'mssql'
@classmethod
@@ -266,15 +313,18 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
def teardown(self):
metadata.drop_all()
- @testing.fails_on_everything_except('mssql+pyodbc',
- 'this is some pyodbc-specific feature')
+ @testing.fails_on_everything_except(
+ 'mssql+pyodbc',
+ 'this is some pyodbc-specific feature')
def test_decimal_notation(self):
- numeric_table = Table('numeric_table', metadata, Column('id',
- Integer, Sequence('numeric_id_seq',
- optional=True), primary_key=True),
- Column('numericcol',
- Numeric(precision=38, scale=20,
- asdecimal=True)))
+ numeric_table = Table(
+ 'numeric_table', metadata,
+ Column(
+ 'id', Integer,
+ Sequence('numeric_id_seq', optional=True), primary_key=True),
+ Column(
+ 'numericcol',
+ Numeric(precision=38, scale=20, asdecimal=True)))
metadata.create_all()
test_items = [decimal.Decimal(d) for d in (
'1500000.00000000000000000000',
@@ -323,7 +373,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
'000000000000.32E12',
'00000000000000.1E+12',
'000000000000.2E-32',
- )]
+ )]
for value in test_items:
numeric_table.insert().execute(numericcol=value)
@@ -332,10 +382,13 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
assert value[0] in test_items, "%r not in test_items" % value[0]
def test_float(self):
- float_table = Table('float_table', metadata, Column('id',
- Integer, Sequence('numeric_id_seq',
- optional=True), primary_key=True),
- Column('floatcol', Float()))
+ float_table = Table(
+ 'float_table', metadata,
+ Column(
+ 'id', Integer,
+ Sequence('numeric_id_seq', optional=True), primary_key=True),
+ Column('floatcol', Float()))
+
metadata.create_all()
try:
test_items = [float(d) for d in (
@@ -363,13 +416,12 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
'1E-6',
'1E-7',
'1E-8',
- )]
+ )]
for value in test_items:
float_table.insert().execute(floatcol=value)
except Exception as e:
raise e
-
# todo this should suppress warnings, but it does not
@emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
def test_dates(self):
@@ -417,20 +469,20 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
(mssql.MSDateTime2, [1], {},
'DATETIME2(1)', ['>=', (10,)]),
- ]
+ ]
table_args = ['test_mssql_dates', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res, requires = spec[0:5]
- if requires and testing._is_excluded('mssql', *requires) \
- or not requires:
- c = Column('c%s' % index, type_(*args,
- **kw), nullable=None)
+ if requires and \
+ testing._is_excluded('mssql', *requires) or not requires:
+ c = Column('c%s' % index, type_(*args, **kw), nullable=None)
testing.db.dialect.type_descriptor(c.type)
table_args.append(c)
dates_table = Table(*table_args)
- gen = testing.db.dialect.ddl_compiler(testing.db.dialect,
- schema.CreateTable(dates_table))
+ gen = testing.db.dialect.ddl_compiler(
+ testing.db.dialect,
+ schema.CreateTable(dates_table))
for col in dates_table.c:
index = int(col.name[1:])
testing.eq_(gen.get_column_specification(col), '%s %s'
@@ -443,13 +495,14 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
self.assert_types_base(col, dates_table.c[col.key])
def test_date_roundtrip(self):
- t = Table('test_dates', metadata,
- Column('id', Integer,
- Sequence('datetest_id_seq', optional=True),
- primary_key=True),
- Column('adate', Date),
- Column('atime', Time),
- Column('adatetime', DateTime))
+ t = Table(
+ 'test_dates', metadata,
+ Column('id', Integer,
+ Sequence('datetest_id_seq', optional=True),
+ primary_key=True),
+ Column('adate', Date),
+ Column('atime', Time),
+ Column('adatetime', DateTime))
metadata.create_all()
d1 = datetime.date(2007, 10, 30)
t1 = datetime.time(11, 2, 32)
@@ -471,18 +524,18 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
@emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
@testing.provide_metadata
- def test_binary_reflection(self):
+ def _test_binary_reflection(self, deprecate_large_types):
"Exercise type specification for binary types."
columns = [
- # column type, args, kwargs, expected ddl
+ # column type, args, kwargs, expected ddl from reflected
(mssql.MSBinary, [], {},
- 'BINARY'),
+ 'BINARY(1)'),
(mssql.MSBinary, [10], {},
'BINARY(10)'),
(types.BINARY, [], {},
- 'BINARY'),
+ 'BINARY(1)'),
(types.BINARY, [10], {},
'BINARY(10)'),
@@ -503,10 +556,12 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
'IMAGE'),
(types.LargeBinary, [], {},
- 'IMAGE'),
+ 'IMAGE' if not deprecate_large_types else 'VARBINARY(max)'),
]
metadata = self.metadata
+ metadata.bind = engines.testing_engine(
+ options={"deprecate_large_types": deprecate_large_types})
table_args = ['test_mssql_binary', metadata]
for index, spec in enumerate(columns):
type_, args, kw, res = spec
@@ -516,59 +571,80 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
metadata.create_all()
reflected_binary = Table('test_mssql_binary',
MetaData(testing.db), autoload=True)
- for col in reflected_binary.c:
+ for col, spec in zip(reflected_binary.c, columns):
+ eq_(
+ str(col.type), spec[3],
+ "column %s %s != %s" % (col.key, str(col.type), spec[3])
+ )
c1 = testing.db.dialect.type_descriptor(col.type).__class__
c2 = \
testing.db.dialect.type_descriptor(
binary_table.c[col.name].type).__class__
- assert issubclass(c1, c2), '%r is not a subclass of %r' \
- % (c1, c2)
+ assert issubclass(c1, c2), \
+ 'column %s: %r is not a subclass of %r' \
+ % (col.key, c1, c2)
if binary_table.c[col.name].type.length:
testing.eq_(col.type.length,
binary_table.c[col.name].type.length)
+ def test_binary_reflection_legacy_large_types(self):
+ self._test_binary_reflection(False)
+
+ @testing.only_on('mssql >= 11')
+ def test_binary_reflection_sql2012_large_types(self):
+ self._test_binary_reflection(True)
def test_autoincrement(self):
- Table('ai_1', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_2', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_3', metadata,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_y', Integer, primary_key=True))
- Table('ai_4', metadata,
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False),
- Column('int_n2', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_5', metadata,
- Column('int_y', Integer, primary_key=True),
- Column('int_n', Integer, DefaultClause('0'),
- primary_key=True, autoincrement=False))
- Table('ai_6', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_7', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True),
- Column('int_y', Integer, primary_key=True))
- Table('ai_8', metadata,
- Column('o1', String(1), DefaultClause('x'),
- primary_key=True),
- Column('o2', String(1), DefaultClause('x'),
- primary_key=True))
+ Table(
+ 'ai_1', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column(
+ 'int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table(
+ 'ai_2', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table(
+ 'ai_3', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_y', Integer, primary_key=True))
+
+ Table(
+ 'ai_4', metadata,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_n2', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table(
+ 'ai_5', metadata,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table(
+ 'ai_6', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table(
+ 'ai_7', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table(
+ 'ai_8', metadata,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True))
metadata.create_all()
table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
- 'ai_5', 'ai_6', 'ai_7', 'ai_8']
+ 'ai_5', 'ai_6', 'ai_7', 'ai_8']
mr = MetaData(testing.db)
for name in table_names:
@@ -586,27 +662,29 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTabl
if testing.db.driver == 'mxodbc':
eng = \
- [engines.testing_engine(options={'implicit_returning'
- : True})]
+ [engines.testing_engine(options={
+ 'implicit_returning': True})]
else:
eng = \
- [engines.testing_engine(options={'implicit_returning'
- : False}),
- engines.testing_engine(options={'implicit_returning'
- : True})]
+ [engines.testing_engine(options={
+ 'implicit_returning': False}),
+ engines.testing_engine(options={
+ 'implicit_returning': True})]
for counter, engine in enumerate(eng):
engine.execute(tbl.insert())
if 'int_y' in tbl.c:
assert engine.scalar(select([tbl.c.int_y])) \
== counter + 1
- assert list(engine.execute(tbl.select()).first()).\
- count(counter + 1) == 1
+ assert list(
+ engine.execute(tbl.select()).first()).\
+ count(counter + 1) == 1
else:
assert 1 \
not in list(engine.execute(tbl.select()).first())
engine.execute(tbl.delete())
+
class MonkeyPatchedBinaryTest(fixtures.TestBase):
__only_on__ = 'mssql+pymssql'
@@ -622,7 +700,12 @@ class MonkeyPatchedBinaryTest(fixtures.TestBase):
result = module.Binary(input)
eq_(result, expected_result)
+binary_table = None
+MyPickleType = None
+
+
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
+
"""Test the Binary and VarBinary types"""
__only_on__ = 'mssql'
@@ -655,7 +738,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
Column('misc', String(30)),
Column('pickled', PickleType),
Column('mypickle', MyPickleType),
- )
+ )
binary_table.create()
def teardown(self):
@@ -679,7 +762,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
data_slice=stream1[0:100],
pickled=testobj1,
mypickle=testobj3,
- )
+ )
binary_table.insert().execute(
primary_id=2,
misc='binary_data_two.dat',
@@ -687,7 +770,7 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
data_image=stream2,
data_slice=stream2[0:99],
pickled=testobj2,
- )
+ )
# TODO: pyodbc does not seem to accept "None" for a VARBINARY
# column (data=None). error: [Microsoft][ODBC SQL Server
@@ -697,17 +780,21 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
# misc='binary_data_two.dat', data=None, data_image=None,
# data_slice=stream2[0:99], pickled=None)
- binary_table.insert().execute(primary_id=3,
- misc='binary_data_two.dat', data_image=None,
- data_slice=stream2[0:99], pickled=None)
+ binary_table.insert().execute(
+ primary_id=3,
+ misc='binary_data_two.dat', data_image=None,
+ data_slice=stream2[0:99], pickled=None)
for stmt in \
binary_table.select(order_by=binary_table.c.primary_id), \
- text('select * from binary_table order by '
- 'binary_table.primary_id',
- typemap=dict(data=mssql.MSVarBinary(8000),
- data_image=mssql.MSImage,
- data_slice=types.BINARY(100), pickled=PickleType,
- mypickle=MyPickleType), bind=testing.db):
+ text(
+ 'select * from binary_table order by '
+ 'binary_table.primary_id',
+ typemap=dict(
+ data=mssql.MSVarBinary(8000),
+ data_image=mssql.MSImage,
+ data_slice=types.BINARY(100), pickled=PickleType,
+ mypickle=MyPickleType),
+ bind=testing.db):
l = stmt.execute().fetchall()
eq_(list(stream1), list(l[0]['data']))
paddedstream = list(stream1[0:100])
@@ -721,7 +808,8 @@ class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
def load_stream(self, name, len=3000):
- fp = open(os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb')
+ fp = open(
+ os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb')
stream = fp.read(len)
fp.close()
return stream
diff --git a/test/dialect/mysql/test_query.py b/test/dialect/mysql/test_query.py
index e085d86c1..ccb501651 100644
--- a/test/dialect/mysql/test_query.py
+++ b/test/dialect/mysql/test_query.py
@@ -55,7 +55,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
])
matchtable.insert().execute([
{'id': 1,
- 'title': 'Agile Web Development with Rails',
+ 'title': 'Agile Web Development with Ruby On Rails',
'category_id': 2},
{'id': 2,
'title': 'Dive Into Python',
@@ -76,7 +76,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
metadata.drop_all()
@testing.fails_on('mysql+mysqlconnector', 'uses pyformat')
- def test_expression(self):
+ def test_expression_format(self):
format = testing.db.dialect.paramstyle == 'format' and '%s' or '?'
self.assert_compile(
matchtable.c.title.match('somstr'),
@@ -88,7 +88,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.fails_on('mysql+oursql', 'uses format')
@testing.fails_on('mysql+pyodbc', 'uses format')
@testing.fails_on('mysql+zxjdbc', 'uses format')
- def test_expression(self):
+ def test_expression_pyformat(self):
format = '%(title_1)s'
self.assert_compile(
matchtable.c.title.match('somstr'),
@@ -102,6 +102,14 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
fetchall())
eq_([2, 5], [r.id for r in results])
+ def test_not_match(self):
+ results = (matchtable.select().
+ where(~matchtable.c.title.match('python')).
+ order_by(matchtable.c.id).
+ execute().
+ fetchall())
+ eq_([1, 3, 4], [r.id for r in results])
+
def test_simple_match_with_apostrophe(self):
results = (matchtable.select().
where(matchtable.c.title.match("Matz's")).
@@ -109,6 +117,26 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
fetchall())
eq_([3], [r.id for r in results])
+ def test_return_value(self):
+ # test [ticket:3263]
+ result = testing.db.execute(
+ select([
+ matchtable.c.title.match('Agile Ruby Programming').label('ruby'),
+ matchtable.c.title.match('Dive Python').label('python'),
+ matchtable.c.title
+ ]).order_by(matchtable.c.id)
+ ).fetchall()
+ eq_(
+ result,
+ [
+ (2.0, 0.0, 'Agile Web Development with Ruby On Rails'),
+ (0.0, 2.0, 'Dive Into Python'),
+ (2.0, 0.0, "Programming Matz's Ruby"),
+ (0.0, 0.0, 'The Definitive Guide to Django'),
+ (0.0, 1.0, 'Python in a Nutshell')
+ ]
+ )
+
def test_or_match(self):
results1 = (matchtable.select().
where(or_(matchtable.c.title.match('nutshell'),
@@ -116,14 +144,13 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
order_by(matchtable.c.id).
execute().
fetchall())
- eq_([3, 5], [r.id for r in results1])
+ eq_([1, 3, 5], [r.id for r in results1])
results2 = (matchtable.select().
where(matchtable.c.title.match('nutshell ruby')).
order_by(matchtable.c.id).
execute().
fetchall())
- eq_([3, 5], [r.id for r in results2])
-
+ eq_([1, 3, 5], [r.id for r in results2])
def test_and_match(self):
results1 = (matchtable.select().
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index e65acc6db..13425dc10 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -1,6 +1,6 @@
# coding: utf-8
-from sqlalchemy.testing import eq_, assert_raises
+from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
from sqlalchemy import *
from sqlalchemy import sql, exc, schema
from sqlalchemy.util import u
@@ -295,9 +295,6 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
self.assert_compile(type_, expected)
@testing.exclude('mysql', '<', (5, 0, 5), 'a 5.0+ feature')
- @testing.fails_if(
- lambda: testing.against("mysql+oursql") and util.py3k,
- 'some round trips fail, oursql bug ?')
@testing.provide_metadata
def test_bit_50_roundtrip(self):
bit_table = Table('mysql_bits', self.metadata,
@@ -550,13 +547,13 @@ class TypesTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
eq_(colspec(table.c.y5), 'y5 YEAR(4)')
-class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class EnumSetTest(
+ fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
__only_on__ = 'mysql'
__dialect__ = mysql.dialect()
__backend__ = True
-
@testing.provide_metadata
def test_enum(self):
"""Exercise the ENUM type."""
@@ -566,7 +563,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
e3 = mysql.ENUM("'a'", "'b'", strict=True)
e4 = mysql.ENUM("'a'", "'b'", strict=True)
- enum_table = Table('mysql_enum', self.metadata,
+ enum_table = Table(
+ 'mysql_enum', self.metadata,
Column('e1', e1),
Column('e2', e2, nullable=False),
Column('e2generic', Enum("a", "b"), nullable=False),
@@ -576,32 +574,43 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
Column('e5', mysql.ENUM("a", "b")),
Column('e5generic', Enum("a", "b")),
Column('e6', mysql.ENUM("'a'", "b")),
- )
+ )
- eq_(colspec(enum_table.c.e1),
- "e1 ENUM('a','b')")
- eq_(colspec(enum_table.c.e2),
- "e2 ENUM('a','b') NOT NULL")
- eq_(colspec(enum_table.c.e2generic),
- "e2generic ENUM('a','b') NOT NULL")
- eq_(colspec(enum_table.c.e3),
- "e3 ENUM('a','b')")
- eq_(colspec(enum_table.c.e4),
- "e4 ENUM('a','b') NOT NULL")
- eq_(colspec(enum_table.c.e5),
- "e5 ENUM('a','b')")
- eq_(colspec(enum_table.c.e5generic),
- "e5generic ENUM('a','b')")
- eq_(colspec(enum_table.c.e6),
- "e6 ENUM('''a''','b')")
+ eq_(
+ colspec(enum_table.c.e1),
+ "e1 ENUM('a','b')")
+ eq_(
+ colspec(enum_table.c.e2),
+ "e2 ENUM('a','b') NOT NULL")
+ eq_(
+ colspec(enum_table.c.e2generic),
+ "e2generic ENUM('a','b') NOT NULL")
+ eq_(
+ colspec(enum_table.c.e3),
+ "e3 ENUM('a','b')")
+ eq_(
+ colspec(enum_table.c.e4),
+ "e4 ENUM('a','b') NOT NULL")
+ eq_(
+ colspec(enum_table.c.e5),
+ "e5 ENUM('a','b')")
+ eq_(
+ colspec(enum_table.c.e5generic),
+ "e5generic ENUM('a','b')")
+ eq_(
+ colspec(enum_table.c.e6),
+ "e6 ENUM('''a''','b')")
enum_table.create()
- assert_raises(exc.DBAPIError, enum_table.insert().execute,
- e1=None, e2=None, e3=None, e4=None)
+ assert_raises(
+ exc.DBAPIError, enum_table.insert().execute,
+ e1=None, e2=None, e3=None, e4=None)
- assert_raises(exc.StatementError, enum_table.insert().execute,
- e1='c', e2='c', e2generic='c', e3='c',
- e4='c', e5='c', e5generic='c', e6='c')
+ assert_raises(
+ exc.StatementError,
+ enum_table.insert().execute,
+ e1='c', e2='c', e2generic='c', e3='c',
+ e4='c', e5='c', e5generic='c', e6='c')
enum_table.insert().execute()
enum_table.insert().execute(e1='a', e2='a', e2generic='a', e3='a',
@@ -617,67 +626,191 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
eq_(res, expected)
- @testing.provide_metadata
- def test_set(self):
-
+ def _set_fixture_one(self):
with testing.expect_deprecated('Manually quoting SET value literals'):
e1, e2 = mysql.SET("'a'", "'b'"), mysql.SET("'a'", "'b'")
e4 = mysql.SET("'a'", "b")
e5 = mysql.SET("'a'", "'b'", quoting="quoted")
- set_table = Table('mysql_set', self.metadata,
+
+ set_table = Table(
+ 'mysql_set', self.metadata,
Column('e1', e1),
Column('e2', e2, nullable=False),
Column('e3', mysql.SET("a", "b")),
Column('e4', e4),
Column('e5', e5)
- )
+ )
+ return set_table
+
+ def test_set_colspec(self):
+ self.metadata = MetaData()
+ set_table = self._set_fixture_one()
+ eq_(
+ colspec(set_table.c.e1),
+ "e1 SET('a','b')")
+ eq_(colspec(
+ set_table.c.e2),
+ "e2 SET('a','b') NOT NULL")
+ eq_(
+ colspec(set_table.c.e3),
+ "e3 SET('a','b')")
+ eq_(
+ colspec(set_table.c.e4),
+ "e4 SET('''a''','b')")
+ eq_(
+ colspec(set_table.c.e5),
+ "e5 SET('a','b')")
- eq_(colspec(set_table.c.e1),
- "e1 SET('a','b')")
- eq_(colspec(set_table.c.e2),
- "e2 SET('a','b') NOT NULL")
- eq_(colspec(set_table.c.e3),
- "e3 SET('a','b')")
- eq_(colspec(set_table.c.e4),
- "e4 SET('''a''','b')")
- eq_(colspec(set_table.c.e5),
- "e5 SET('a','b')")
+ @testing.provide_metadata
+ def test_no_null(self):
+ set_table = self._set_fixture_one()
set_table.create()
+ assert_raises(
+ exc.DBAPIError, set_table.insert().execute,
+ e1=None, e2=None, e3=None, e4=None)
- assert_raises(exc.DBAPIError, set_table.insert().execute,
- e1=None, e2=None, e3=None, e4=None)
+ @testing.only_on('+oursql')
+ @testing.provide_metadata
+ def test_oursql_error_one(self):
+ set_table = self._set_fixture_one()
+ set_table.create()
+ assert_raises(
+ exc.StatementError, set_table.insert().execute,
+ e1='c', e2='c', e3='c', e4='c')
+
+ @testing.fails_on("+oursql", "oursql raises on the truncate warning")
+ @testing.provide_metadata
+ def test_empty_set_no_empty_string(self):
+ t = Table(
+ 't', self.metadata,
+ Column('id', Integer),
+ Column('data', mysql.SET("a", "b"))
+ )
+ t.create()
+ with testing.db.begin() as conn:
+ conn.execute(
+ t.insert(),
+ {'id': 1, 'data': set()},
+ {'id': 2, 'data': set([''])},
+ {'id': 3, 'data': set(['a', ''])},
+ {'id': 4, 'data': set(['b'])},
+ )
+ eq_(
+ conn.execute(t.select().order_by(t.c.id)).fetchall(),
+ [
+ (1, set()),
+ (2, set()),
+ (3, set(['a'])),
+ (4, set(['b'])),
+ ]
+ )
- if testing.against("+oursql"):
- assert_raises(exc.StatementError, set_table.insert().execute,
- e1='c', e2='c', e3='c', e4='c')
+ def test_bitwise_required_for_empty(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ "Can't use the blank value '' in a SET without setting "
+ "retrieve_as_bitwise=True",
+ mysql.SET, "a", "b", ''
+ )
- set_table.insert().execute(e1='a', e2='a', e3='a', e4="'a'", e5="a,b")
- set_table.insert().execute(e1='b', e2='b', e3='b', e4='b', e5="a,b")
+ @testing.provide_metadata
+ def test_empty_set_empty_string(self):
+ t = Table(
+ 't', self.metadata,
+ Column('id', Integer),
+ Column('data', mysql.SET("a", "b", '', retrieve_as_bitwise=True))
+ )
+ t.create()
+ with testing.db.begin() as conn:
+ conn.execute(
+ t.insert(),
+ {'id': 1, 'data': set()},
+ {'id': 2, 'data': set([''])},
+ {'id': 3, 'data': set(['a', ''])},
+ {'id': 4, 'data': set(['b'])},
+ )
+ eq_(
+ conn.execute(t.select().order_by(t.c.id)).fetchall(),
+ [
+ (1, set()),
+ (2, set([''])),
+ (3, set(['a', ''])),
+ (4, set(['b'])),
+ ]
+ )
- res = set_table.select().execute().fetchall()
+ @testing.provide_metadata
+ def test_string_roundtrip(self):
+ set_table = self._set_fixture_one()
+ set_table.create()
+ with testing.db.begin() as conn:
+ conn.execute(
+ set_table.insert(),
+ dict(e1='a', e2='a', e3='a', e4="'a'", e5="a,b"))
+ conn.execute(
+ set_table.insert(),
+ dict(e1='b', e2='b', e3='b', e4='b', e5="a,b"))
+
+ expected = [
+ (set(['a']), set(['a']), set(['a']),
+ set(["'a'"]), set(['a', 'b'])),
+ (set(['b']), set(['b']), set(['b']),
+ set(['b']), set(['a', 'b']))
+ ]
+ res = conn.execute(
+ set_table.select()
+ ).fetchall()
- if not testing.against("+oursql"):
- # oursql receives this for first row:
- # (set(['']), set(['']), set(['']), set(['']), None),
- # but based on ...OS? MySQL version? not clear.
- # not worth testing.
+ eq_(res, expected)
- expected = []
+ @testing.provide_metadata
+ def test_unicode_roundtrip(self):
+ set_table = Table(
+ 't', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', mysql.SET(
+ u('réveillé'), u('drôle'), u('S’il'), convert_unicode=True)),
+ )
- expected.extend([
- (set(['a']), set(['a']), set(['a']), set(["'a'"]), set(['a', 'b'])),
- (set(['b']), set(['b']), set(['b']), set(['b']), set(['a', 'b']))
- ])
+ set_table.create()
+ with testing.db.begin() as conn:
+ conn.execute(
+ set_table.insert(),
+ {"data": set([u('réveillé'), u('drôle')])})
+
+ row = conn.execute(
+ set_table.select()
+ ).first()
+
+ eq_(
+ row,
+ (1, set([u('réveillé'), u('drôle')]))
+ )
- eq_(res, expected)
+ @testing.provide_metadata
+ def test_int_roundtrip(self):
+ set_table = self._set_fixture_one()
+ set_table.create()
+ with testing.db.begin() as conn:
+ conn.execute(
+ set_table.insert(),
+ dict(e1=1, e2=2, e3=3, e4=3, e5=0)
+ )
+ res = conn.execute(set_table.select()).first()
+ eq_(
+ res,
+ (
+ set(['a']), set(['b']), set(['a', 'b']),
+ set(["'a'", 'b']), set([]))
+ )
@testing.provide_metadata
def test_set_roundtrip_plus_reflection(self):
- set_table = Table('mysql_set', self.metadata,
- Column('s1',
- mysql.SET("dq", "sq")),
- Column('s2', mysql.SET("a")),
- Column('s3', mysql.SET("5", "7", "9")))
+ set_table = Table(
+ 'mysql_set', self.metadata,
+ Column('s1', mysql.SET("dq", "sq")),
+ Column('s2', mysql.SET("a")),
+ Column('s3', mysql.SET("5", "7", "9")))
eq_(colspec(set_table.c.s1), "s1 SET('dq','sq')")
eq_(colspec(set_table.c.s2), "s2 SET('a')")
@@ -691,37 +824,34 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
expected = expected or store
table.insert(store).execute()
row = table.select().execute().first()
- self.assert_(list(row) == expected)
+ eq_(row, tuple(expected))
table.delete().execute()
roundtrip([None, None, None], [None] * 3)
- roundtrip(['', '', ''], [set([''])] * 3)
+ roundtrip(['', '', ''], [set([])] * 3)
roundtrip([set(['dq']), set(['a']), set(['5'])])
roundtrip(['dq', 'a', '5'], [set(['dq']), set(['a']),
set(['5'])])
- roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'
- ])])
- roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'
- ])])
- set_table.insert().execute({'s3': set(['5'])},
- {'s3': set(['5', '7'])}, {'s3': set(['5', '7', '9'])},
- {'s3': set(['7', '9'])})
-
- # NOTE: the string sent to MySQL here is sensitive to ordering.
- # for some reason the set ordering is always "5, 7" when we test on
- # MySQLdb but in Py3K this is not guaranteed. So basically our
- # SET type doesn't do ordering correctly (not sure how it can,
- # as we don't know how the SET was configured in the first place.)
- rows = select([set_table.c.s3],
- set_table.c.s3.in_([set(['5']), ['5', '7']])
- ).execute().fetchall()
+ roundtrip([1, 1, 1], [set(['dq']), set(['a']), set(['5'])])
+ roundtrip([set(['dq', 'sq']), None, set(['9', '5', '7'])])
+ set_table.insert().execute(
+ {'s3': set(['5'])},
+ {'s3': set(['5', '7'])},
+ {'s3': set(['5', '7', '9'])},
+ {'s3': set(['7', '9'])})
+
+ rows = select(
+ [set_table.c.s3],
+ set_table.c.s3.in_([set(['5']), ['5', '7']])
+ ).execute().fetchall()
found = set([frozenset(row[0]) for row in rows])
eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
@testing.provide_metadata
def test_unicode_enum(self):
metadata = self.metadata
- t1 = Table('table', metadata,
+ t1 = Table(
+ 'table', metadata,
Column('id', Integer, primary_key=True),
Column('value', Enum(u('réveillé'), u('drôle'), u('S’il'))),
Column('value2', mysql.ENUM(u('réveillé'), u('drôle'), u('S’il')))
@@ -731,9 +861,11 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
t1.insert().execute(value=u('réveillé'), value2=u('réveillé'))
t1.insert().execute(value=u('S’il'), value2=u('S’il'))
eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, u('drôle'), u('drôle')), (2, u('réveillé'), u('réveillé')),
- (3, u('S’il'), u('S’il'))]
- )
+ [
+ (1, u('drôle'), u('drôle')),
+ (2, u('réveillé'), u('réveillé')),
+ (3, u('S’il'), u('S’il'))
+ ])
# test reflection of the enum labels
@@ -743,11 +875,15 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
# TODO: what's wrong with the last element ? is there
# latin-1 stuff forcing its way in ?
- assert t2.c.value.type.enums[0:2] == \
- (u('réveillé'), u('drôle')) # u'S’il') # eh ?
+ eq_(
+ t2.c.value.type.enums[0:2],
+ (u('réveillé'), u('drôle')) # u'S’il') # eh ?
+ )
- assert t2.c.value2.type.enums[0:2] == \
- (u('réveillé'), u('drôle')) # u'S’il') # eh ?
+ eq_(
+ t2.c.value2.type.enums[0:2],
+ (u('réveillé'), u('drôle')) # u'S’il') # eh ?
+ )
def test_enum_compile(self):
e1 = Enum('x', 'y', 'z', name='somename')
@@ -767,7 +903,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
def test_enum_parse(self):
with testing.expect_deprecated('Manually quoting ENUM value literals'):
- enum_table = Table('mysql_enum', self.metadata,
+ enum_table = Table(
+ 'mysql_enum', self.metadata,
Column('e1', mysql.ENUM("'a'")),
Column('e2', mysql.ENUM("''")),
Column('e3', mysql.ENUM('a')),
@@ -795,14 +932,17 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
@testing.exclude('mysql', '<', (5,))
def test_set_parse(self):
with testing.expect_deprecated('Manually quoting SET value literals'):
- set_table = Table('mysql_set', self.metadata,
+ set_table = Table(
+ 'mysql_set', self.metadata,
Column('e1', mysql.SET("'a'")),
- Column('e2', mysql.SET("''")),
+ Column('e2', mysql.SET("''", retrieve_as_bitwise=True)),
Column('e3', mysql.SET('a')),
- Column('e4', mysql.SET('')),
- Column('e5', mysql.SET("'a'", "''")),
- Column('e6', mysql.SET("''", "'a'")),
- Column('e7', mysql.SET("''", "'''a'''", "'b''b'", "''''")))
+ Column('e4', mysql.SET('', retrieve_as_bitwise=True)),
+ Column('e5', mysql.SET("'a'", "''", retrieve_as_bitwise=True)),
+ Column('e6', mysql.SET("''", "'a'", retrieve_as_bitwise=True)),
+ Column('e7', mysql.SET(
+ "''", "'''a'''", "'b''b'", "''''",
+ retrieve_as_bitwise=True)))
for col in set_table.c:
self.assert_(repr(col))
@@ -821,7 +961,8 @@ class EnumSetTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
eq_(t.c.e6.type.values, ("", "a"))
eq_(t.c.e7.type.values, ("", "'a'", "b'b", "'"))
+
def colspec(c):
return testing.db.dialect.ddl_compiler(
- testing.db.dialect, None).get_column_specification(c)
+ testing.db.dialect, None).get_column_specification(c)
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py
index 6c4f3c8cc..5717df9f7 100644
--- a/test/dialect/postgresql/test_compiler.py
+++ b/test/dialect/postgresql/test_compiler.py
@@ -5,7 +5,7 @@ from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \
from sqlalchemy.testing import engines, fixtures
from sqlalchemy import testing
from sqlalchemy import Sequence, Table, Column, Integer, update, String,\
- insert, func, MetaData, Enum, Index, and_, delete, select, cast
+ insert, func, MetaData, Enum, Index, and_, delete, select, cast, text
from sqlalchemy.dialects.postgresql import ExcludeConstraint, array
from sqlalchemy import exc, schema
from sqlalchemy.dialects.postgresql import base as postgresql
@@ -296,6 +296,58 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
'(data text_pattern_ops, data2 int4_ops)',
dialect=postgresql.dialect())
+ def test_create_index_with_text_or_composite(self):
+ m = MetaData()
+ tbl = Table('testtbl', m,
+ Column('d1', String),
+ Column('d2', Integer))
+
+ idx = Index('test_idx1', text('x'))
+ tbl.append_constraint(idx)
+
+ idx2 = Index('test_idx2', text('y'), tbl.c.d2)
+
+ idx3 = Index(
+ 'test_idx2', tbl.c.d1, text('y'), tbl.c.d2,
+ postgresql_ops={'d1': 'x1', 'd2': 'x2'}
+ )
+
+ idx4 = Index(
+ 'test_idx2', tbl.c.d1, tbl.c.d2 > 5, text('q'),
+ postgresql_ops={'d1': 'x1', 'd2': 'x2'}
+ )
+
+ idx5 = Index(
+ 'test_idx2', tbl.c.d1, (tbl.c.d2 > 5).label('g'), text('q'),
+ postgresql_ops={'d1': 'x1', 'g': 'x2'}
+ )
+
+ self.assert_compile(
+ schema.CreateIndex(idx),
+ "CREATE INDEX test_idx1 ON testtbl (x)"
+ )
+ self.assert_compile(
+ schema.CreateIndex(idx2),
+ "CREATE INDEX test_idx2 ON testtbl (y, d2)"
+ )
+ self.assert_compile(
+ schema.CreateIndex(idx3),
+ "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)"
+ )
+
+ # note that at the moment we do not expect the 'd2' op to
+ # pick up on the "d2 > 5" expression
+ self.assert_compile(
+ schema.CreateIndex(idx4),
+ "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)"
+ )
+
+ # however it does work if we label!
+ self.assert_compile(
+ schema.CreateIndex(idx5),
+ "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)"
+ )
+
def test_create_index_with_using(self):
m = MetaData()
tbl = Table('testtbl', m, Column('data', String))
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index b751bbcdd..9f86aaa7a 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -118,7 +118,8 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
eq_(c.connection.connection.encoding, test_encoding)
@testing.only_on(
- ['postgresql+psycopg2', 'postgresql+pg8000'],
+ ['postgresql+psycopg2', 'postgresql+pg8000',
+ 'postgresql+psycopg2cffi'],
'psycopg2 / pg8000 - specific feature')
@engines.close_open_connections
def test_autocommit_isolation_level(self):
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py
index a512b56fa..27cb958fd 100644
--- a/test/dialect/postgresql/test_query.py
+++ b/test/dialect/postgresql/test_query.py
@@ -6,6 +6,7 @@ from sqlalchemy import Table, Column, MetaData, Integer, String, bindparam, \
Sequence, ForeignKey, text, select, func, extract, literal_column, \
tuple_, DateTime, Time, literal, and_, Date, or_
from sqlalchemy.testing import engines, fixtures
+from sqlalchemy.testing.assertsql import DialectSQL, CursorSQL
from sqlalchemy import testing
from sqlalchemy import exc
from sqlalchemy.dialects import postgresql
@@ -170,7 +171,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': False})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
# execute with explicit id
@@ -199,32 +200,41 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'data': 'd8'})
- # note that the test framework doesn't capture the "preexecute"
- # of a seqeuence or default. we just see it in the bind params.
+ asserter.assert_(
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 1, 'data': 'd2'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd5'}, {'data': 'd6'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
- [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
table.delete().execute()
# test the same series of events using a reflected version of
@@ -233,7 +243,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
@@ -243,29 +253,39 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 5, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
- [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
+ asserter.assert_(
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 5, 'data': 'd2'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd5'}, {'data': 'd6'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ )
table.delete().execute()
def _assert_data_autoincrement_returning(self, table):
@@ -273,7 +293,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
# execute with explicit id
@@ -302,29 +322,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
[{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
table.delete().execute()
# test the same series of events using a reflected version of
@@ -333,7 +358,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
@@ -343,29 +368,32 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
[{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ )
table.delete().execute()
def _assert_data_with_sequence(self, table, seqname):
@@ -373,7 +401,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': False})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
table.insert().execute({'data': 'd2'})
table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
@@ -382,30 +410,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ CursorSQL("select nextval('my_seq')"),
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
# cant test reflection here since the Sequence must be
# explicitly specified
@@ -415,7 +447,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
table.insert().execute({'data': 'd2'})
table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
@@ -424,31 +456,35 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ("INSERT INTO testtable (id, data) VALUES "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES "
"(nextval('my_seq'), :data) RETURNING testtable.id",
{'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
# cant test reflection here since the Sequence must be
# explicitly specified
@@ -693,6 +729,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.fails_on('postgresql+psycopg2', 'uses pyformat')
@testing.fails_on('postgresql+pypostgresql', 'uses pyformat')
@testing.fails_on('postgresql+zxjdbc', 'uses qmark')
+ @testing.fails_on('postgresql+psycopg2cffi', 'uses pyformat')
def test_expression_positional(self):
self.assert_compile(matchtable.c.title.match('somstr'),
'matchtable.title @@ to_tsquery(%s)')
@@ -703,6 +740,12 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
matchtable.c.id).execute().fetchall()
eq_([2, 5], [r.id for r in results])
+ def test_not_match(self):
+ results = matchtable.select().where(
+ ~matchtable.c.title.match('python')).order_by(
+ matchtable.c.id).execute().fetchall()
+ eq_([1, 3, 4], [r.id for r in results])
+
def test_simple_match_with_apostrophe(self):
results = matchtable.select().where(
matchtable.c.title.match("Matz's")).execute().fetchall()
@@ -813,21 +856,23 @@ class ExtractTest(fixtures.TablesTest):
def utcoffset(self, dt):
return datetime.timedelta(hours=4)
- conn = testing.db.connect()
-
- # we aren't resetting this at the moment but we don't have
- # any other tests that are TZ specific
- conn.execute("SET SESSION TIME ZONE 0")
- conn.execute(
- cls.tables.t.insert(),
- {
- 'dtme': datetime.datetime(2012, 5, 10, 12, 15, 25),
- 'dt': datetime.date(2012, 5, 10),
- 'tm': datetime.time(12, 15, 25),
- 'intv': datetime.timedelta(seconds=570),
- 'dttz': datetime.datetime(2012, 5, 10, 12, 15, 25, tzinfo=TZ())
- },
- )
+ with testing.db.connect() as conn:
+
+ # we aren't resetting this at the moment but we don't have
+ # any other tests that are TZ specific
+ conn.execute("SET SESSION TIME ZONE 0")
+ conn.execute(
+ cls.tables.t.insert(),
+ {
+ 'dtme': datetime.datetime(2012, 5, 10, 12, 15, 25),
+ 'dt': datetime.date(2012, 5, 10),
+ 'tm': datetime.time(12, 15, 25),
+ 'intv': datetime.timedelta(seconds=570),
+ 'dttz':
+ datetime.datetime(2012, 5, 10, 12, 15, 25,
+ tzinfo=TZ())
+ },
+ )
def _test(self, expr, field="all", overrides=None):
t = self.tables.t
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 8de71216e..0dda1fa45 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -323,6 +323,18 @@ class ReflectionTest(fixtures.TestBase):
eq_([c.name for c in t2.primary_key], ['t_id'])
@testing.provide_metadata
+ def test_has_temporary_table(self):
+ assert not testing.db.has_table("some_temp_table")
+ user_tmp = Table(
+ "some_temp_table", self.metadata,
+ Column("id", Integer, primary_key=True),
+ Column('name', String(50)),
+ prefixes=['TEMPORARY']
+ )
+ user_tmp.create(testing.db)
+ assert testing.db.has_table("some_temp_table")
+
+ @testing.provide_metadata
def test_cross_schema_reflection_one(self):
meta1 = self.metadata
diff --git a/test/dialect/postgresql/test_types.py b/test/dialect/postgresql/test_types.py
index 5c5da59b1..1f572c9a1 100644
--- a/test/dialect/postgresql/test_types.py
+++ b/test/dialect/postgresql/test_types.py
@@ -189,7 +189,7 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
try:
self.assert_sql(
- testing.db, go, [], with_sequences=[
+ testing.db, go, [
("CREATE TABLE foo (\tbar "
"VARCHAR(5), \tCONSTRAINT myenum CHECK "
"(bar IN ('one', 'two', 'three')))", {})])
@@ -259,9 +259,9 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
try:
self.assert_sql(
- engine, go, [], with_sequences=[
- ("CREATE TABLE foo (\tbar "
- "VARCHAR(5), \tCONSTRAINT myenum CHECK "
+ engine, go, [
+ ("CREATE TABLE foo (bar "
+ "VARCHAR(5), CONSTRAINT myenum CHECK "
"(bar IN ('one', 'two', 'three')))", {})])
finally:
metadata.drop_all(engine)
@@ -379,10 +379,12 @@ class NumericInterpretationTest(fixtures.TestBase):
__backend__ = True
def test_numeric_codes(self):
- from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base
-
- for dialect in (pg8000.dialect(), psycopg2.dialect()):
+ from sqlalchemy.dialects.postgresql import psycopg2cffi, pg8000, \
+ psycopg2, base
+ dialects = (pg8000.dialect(), psycopg2.dialect(),
+ psycopg2cffi.dialect())
+ for dialect in dialects:
typ = Numeric().dialect_impl(dialect)
for code in base._INT_TYPES + base._FLOAT_TYPES + \
base._DECIMAL_TYPES:
@@ -1397,7 +1399,7 @@ class HStoreRoundTripTest(fixtures.TablesTest):
use_native_hstore=False))
else:
engine = testing.db
- engine.connect()
+ engine.connect().close()
return engine
def test_reflect(self):
@@ -2029,7 +2031,7 @@ class JSONRoundTripTest(fixtures.TablesTest):
engine = engines.testing_engine(options=options)
else:
engine = testing.db
- engine.connect()
+ engine.connect().close()
return engine
def test_reflect(self):
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index 72decbdf3..3c67f1590 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -180,6 +180,51 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"'
)
+ def test_cte(self):
+ part = table(
+ 'part',
+ column('part'),
+ column('sub_part'),
+ column('quantity')
+ )
+
+ included_parts = select([
+ part.c.sub_part, part.c.part, part.c.quantity
+ ]).where(part.c.part == "p1").\
+ cte(name="included_parts", recursive=True).\
+ suffix_with(
+ "search depth first by part set ord1",
+ "cycle part set y_cycle to 1 default 0", dialect='oracle')
+
+ incl_alias = included_parts.alias("pr1")
+ parts_alias = part.alias("p")
+ included_parts = included_parts.union_all(
+ select([
+ parts_alias.c.sub_part,
+ parts_alias.c.part, parts_alias.c.quantity
+ ]).where(parts_alias.c.part == incl_alias.c.sub_part)
+ )
+
+ q = select([
+ included_parts.c.sub_part,
+ func.sum(included_parts.c.quantity).label('total_quantity')]).\
+ group_by(included_parts.c.sub_part)
+
+ self.assert_compile(
+ q,
+ "WITH included_parts(sub_part, part, quantity) AS "
+ "(SELECT part.sub_part AS sub_part, part.part AS part, "
+ "part.quantity AS quantity FROM part WHERE part.part = :part_1 "
+ "UNION ALL SELECT p.sub_part AS sub_part, p.part AS part, "
+ "p.quantity AS quantity FROM part p, included_parts pr1 "
+ "WHERE p.part = pr1.sub_part) "
+ "search depth first by part set ord1 cycle part set "
+ "y_cycle to 1 default 0 "
+ "SELECT included_parts.sub_part, sum(included_parts.quantity) "
+ "AS total_quantity FROM included_parts "
+ "GROUP BY included_parts.sub_part"
+ )
+
def test_limit(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t])
@@ -687,6 +732,34 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
)
+ def test_create_table_compress(self):
+ m = MetaData()
+ tbl1 = Table('testtbl1', m, Column('data', Integer),
+ oracle_compress=True)
+ tbl2 = Table('testtbl2', m, Column('data', Integer),
+ oracle_compress="OLTP")
+
+ self.assert_compile(schema.CreateTable(tbl1),
+ "CREATE TABLE testtbl1 (data INTEGER) COMPRESS")
+ self.assert_compile(schema.CreateTable(tbl2),
+ "CREATE TABLE testtbl2 (data INTEGER) "
+ "COMPRESS FOR OLTP")
+
+ def test_create_index_bitmap_compress(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', Integer))
+ idx1 = Index('idx1', tbl.c.data, oracle_compress=True)
+ idx2 = Index('idx2', tbl.c.data, oracle_compress=1)
+ idx3 = Index('idx3', tbl.c.data, oracle_bitmap=True)
+
+ self.assert_compile(schema.CreateIndex(idx1),
+ "CREATE INDEX idx1 ON testtbl (data) COMPRESS")
+ self.assert_compile(schema.CreateIndex(idx2),
+ "CREATE INDEX idx2 ON testtbl (data) COMPRESS 1")
+ self.assert_compile(schema.CreateIndex(idx3),
+ "CREATE BITMAP INDEX idx3 ON testtbl (data)")
+
+
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def _dialect(self, server_version, **kw):
@@ -1727,6 +1800,58 @@ class UnsupportedIndexReflectTest(fixtures.TestBase):
m2 = MetaData(testing.db)
Table('test_index_reflect', m2, autoload=True)
+
+def all_tables_compression_missing():
+ try:
+ testing.db.execute('SELECT compression FROM all_tables')
+ return False
+ except:
+ return True
+
+
+def all_tables_compress_for_missing():
+ try:
+ testing.db.execute('SELECT compress_for FROM all_tables')
+ return False
+ except:
+ return True
+
+
+class TableReflectionTest(fixtures.TestBase):
+ __only_on__ = 'oracle'
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compression_missing)
+ def test_reflect_basic_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress=True)
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ # Don't hardcode the exact value, but it must be non-empty
+ assert tbl.dialect_options['oracle']['compress']
+
+ @testing.provide_metadata
+ @testing.fails_if(all_tables_compress_for_missing)
+ def test_reflect_oltp_compression(self):
+ metadata = self.metadata
+
+ tbl = Table('test_compress', metadata,
+ Column('data', Integer, primary_key=True),
+ oracle_compress="OLTP")
+ metadata.create_all()
+
+ m2 = MetaData(testing.db)
+
+ tbl = Table('test_compress', m2, autoload=True)
+ assert tbl.dialect_options['oracle']['compress'] == "OLTP"
+
+
class RoundTripIndexTest(fixtures.TestBase):
__only_on__ = 'oracle'
@@ -1744,6 +1869,10 @@ class RoundTripIndexTest(fixtures.TestBase):
# "group" is a keyword, so lower case
normalind = Index('tableind', table.c.id_b, table.c.group)
+ compress1 = Index('compress1', table.c.id_a, table.c.id_b,
+ oracle_compress=True)
+ compress2 = Index('compress2', table.c.id_a, table.c.id_b, table.c.col,
+ oracle_compress=1)
metadata.create_all()
mirror = MetaData(testing.db)
@@ -1792,8 +1921,15 @@ class RoundTripIndexTest(fixtures.TestBase):
)
assert (Index, ('id_b', ), True) in reflected
assert (Index, ('col', 'group'), True) in reflected
+
+ idx = reflected[(Index, ('id_a', 'id_b', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 2
+
+ idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)]
+ assert idx.dialect_options['oracle']['compress'] == 1
+
eq_(len(reflectedtable.constraints), 1)
- eq_(len(reflectedtable.indexes), 3)
+ eq_(len(reflectedtable.indexes), 5)
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index 124208dbe..44e4eda42 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -7,8 +7,8 @@ import datetime
from sqlalchemy.testing import eq_, assert_raises, \
assert_raises_message, is_
from sqlalchemy import Table, select, bindparam, Column,\
- MetaData, func, extract, ForeignKey, text, DefaultClause, and_, create_engine,\
- UniqueConstraint
+ MetaData, func, extract, ForeignKey, text, DefaultClause, and_, \
+ create_engine, UniqueConstraint
from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time
from sqlalchemy import types as sqltypes
from sqlalchemy import event, inspect
@@ -21,6 +21,9 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
AssertsExecutionResults, engines
from sqlalchemy import testing
from sqlalchemy.schema import CreateTable
+from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import mock
+
class TestTypes(fixtures.TestBase, AssertsExecutionResults):
@@ -32,9 +35,10 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
"""
meta = MetaData(testing.db)
- t = Table('bool_table', meta, Column('id', Integer,
- primary_key=True), Column('boo',
- Boolean(create_constraint=False)))
+ t = Table(
+ 'bool_table', meta,
+ Column('id', Integer, primary_key=True),
+ Column('boo', Boolean(create_constraint=False)))
try:
meta.create_all()
testing.db.execute("INSERT INTO bool_table (id, boo) "
@@ -69,28 +73,31 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
ValueError,
"Couldn't parse %s string." % disp,
lambda: testing.db.execute(
- text("select 'ASDF' as value", typemap={"value":typ})
+ text("select 'ASDF' as value", typemap={"value": typ})
).scalar()
)
def test_native_datetime(self):
dbapi = testing.db.dialect.dbapi
- connect_args = {'detect_types': dbapi.PARSE_DECLTYPES \
- | dbapi.PARSE_COLNAMES}
- engine = engines.testing_engine(options={'connect_args'
- : connect_args, 'native_datetime': True})
- t = Table('datetest', MetaData(),
- Column('id', Integer, primary_key=True),
- Column('d1', Date), Column('d2', sqltypes.TIMESTAMP))
+ connect_args = {
+ 'detect_types': dbapi.PARSE_DECLTYPES | dbapi.PARSE_COLNAMES}
+ engine = engines.testing_engine(
+ options={'connect_args': connect_args, 'native_datetime': True})
+ t = Table(
+ 'datetest', MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('d1', Date), Column('d2', sqltypes.TIMESTAMP))
t.create(engine)
try:
- engine.execute(t.insert(), {'d1': datetime.date(2010, 5,
- 10),
- 'd2': datetime.datetime( 2010, 5, 10, 12, 15, 25,
- )})
+ engine.execute(t.insert(), {
+ 'd1': datetime.date(2010, 5, 10),
+ 'd2': datetime.datetime(2010, 5, 10, 12, 15, 25)
+ })
row = engine.execute(t.select()).first()
- eq_(row, (1, datetime.date(2010, 5, 10),
- datetime.datetime( 2010, 5, 10, 12, 15, 25, )))
+ eq_(
+ row,
+ (1, datetime.date(2010, 5, 10),
+ datetime.datetime(2010, 5, 10, 12, 15, 25)))
r = engine.execute(func.current_date()).scalar()
assert isinstance(r, util.string_types)
finally:
@@ -100,15 +107,16 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
@testing.provide_metadata
def test_custom_datetime(self):
sqlite_date = sqlite.DATETIME(
- # 2004-05-21T00:00:00
- storage_format="%(year)04d-%(month)02d-%(day)02d"
- "T%(hour)02d:%(minute)02d:%(second)02d",
- regexp=r"(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)",
- )
+ # 2004-05-21T00:00:00
+ storage_format="%(year)04d-%(month)02d-%(day)02d"
+ "T%(hour)02d:%(minute)02d:%(second)02d",
+ regexp=r"(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)",
+ )
t = Table('t', self.metadata, Column('d', sqlite_date))
self.metadata.create_all(testing.db)
- testing.db.execute(t.insert().
- values(d=datetime.datetime(2010, 10, 15, 12, 37, 0)))
+ testing.db.execute(
+ t.insert().
+ values(d=datetime.datetime(2010, 10, 15, 12, 37, 0)))
testing.db.execute("insert into t (d) values ('2004-05-21T00:00:00')")
eq_(
testing.db.execute("select * from t order by d").fetchall(),
@@ -116,21 +124,70 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
)
eq_(
testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(),
- [(datetime.datetime(2004, 5, 21, 0, 0),),
- (datetime.datetime(2010, 10, 15, 12, 37),)]
+ [
+ (datetime.datetime(2004, 5, 21, 0, 0),),
+ (datetime.datetime(2010, 10, 15, 12, 37),)]
+ )
+
+ @testing.provide_metadata
+ def test_custom_datetime_text_affinity(self):
+ sqlite_date = sqlite.DATETIME(
+ storage_format="%(year)04d%(month)02d%(day)02d"
+ "%(hour)02d%(minute)02d%(second)02d",
+ regexp=r"(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})",
+ )
+ t = Table('t', self.metadata, Column('d', sqlite_date))
+ self.metadata.create_all(testing.db)
+ testing.db.execute(
+ t.insert().
+ values(d=datetime.datetime(2010, 10, 15, 12, 37, 0)))
+ testing.db.execute("insert into t (d) values ('20040521000000')")
+ eq_(
+ testing.db.execute("select * from t order by d").fetchall(),
+ [('20040521000000',), ('20101015123700',)]
+ )
+ eq_(
+ testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(),
+ [
+ (datetime.datetime(2004, 5, 21, 0, 0),),
+ (datetime.datetime(2010, 10, 15, 12, 37),)]
+ )
+
+ @testing.provide_metadata
+ def test_custom_date_text_affinity(self):
+ sqlite_date = sqlite.DATE(
+ storage_format="%(year)04d%(month)02d%(day)02d",
+ regexp=r"(\d{4})(\d{2})(\d{2})",
+ )
+ t = Table('t', self.metadata, Column('d', sqlite_date))
+ self.metadata.create_all(testing.db)
+ testing.db.execute(
+ t.insert().
+ values(d=datetime.date(2010, 10, 15)))
+ testing.db.execute("insert into t (d) values ('20040521')")
+ eq_(
+ testing.db.execute("select * from t order by d").fetchall(),
+ [('20040521',), ('20101015',)]
+ )
+ eq_(
+ testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(),
+ [
+ (datetime.date(2004, 5, 21),),
+ (datetime.date(2010, 10, 15),)]
)
@testing.provide_metadata
def test_custom_date(self):
sqlite_date = sqlite.DATE(
- # 2004-05-21T00:00:00
- storage_format="%(year)04d|%(month)02d|%(day)02d",
- regexp=r"(\d+)\|(\d+)\|(\d+)",
- )
+ # 2004-05-21T00:00:00
+ storage_format="%(year)04d|%(month)02d|%(day)02d",
+ regexp=r"(\d+)\|(\d+)\|(\d+)",
+ )
t = Table('t', self.metadata, Column('d', sqlite_date))
self.metadata.create_all(testing.db)
- testing.db.execute(t.insert().
- values(d=datetime.date(2010, 10, 15)))
+ testing.db.execute(
+ t.insert().
+ values(d=datetime.date(2010, 10, 15)))
testing.db.execute("insert into t (d) values ('2004|05|21')")
eq_(
testing.db.execute("select * from t order by d").fetchall(),
@@ -138,11 +195,11 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
)
eq_(
testing.db.execute(select([t.c.d]).order_by(t.c.d)).fetchall(),
- [(datetime.date(2004, 5, 21),),
- (datetime.date(2010, 10, 15),)]
+ [
+ (datetime.date(2004, 5, 21),),
+ (datetime.date(2010, 10, 15),)]
)
-
def test_no_convert_unicode(self):
"""test no utf-8 encoding occurs"""
@@ -156,7 +213,7 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
sqltypes.CHAR(convert_unicode=True),
sqltypes.Unicode(),
sqltypes.UnicodeText(),
- ):
+ ):
bindproc = t.dialect_impl(dialect).bind_processor(dialect)
assert not bindproc or \
isinstance(bindproc(util.u('some string')), util.text_type)
@@ -198,6 +255,7 @@ class DateTimeTest(fixtures.TestBase, AssertsCompiledSQL):
rp = sldt.result_processor(None, None)
eq_(rp(bp(dt)), dt)
+
class DateTest(fixtures.TestBase, AssertsCompiledSQL):
def test_default(self):
@@ -221,6 +279,7 @@ class DateTest(fixtures.TestBase, AssertsCompiledSQL):
rp = sldt.result_processor(None, None)
eq_(rp(bp(dt)), dt)
+
class TimeTest(fixtures.TestBase, AssertsCompiledSQL):
def test_default(self):
@@ -333,8 +392,9 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.provide_metadata
def test_boolean_default(self):
- t = Table("t", self.metadata,
- Column("x", Boolean, server_default=sql.false()))
+ t = Table(
+ "t", self.metadata,
+ Column("x", Boolean, server_default=sql.false()))
t.create(testing.db)
testing.db.execute(t.insert())
testing.db.execute(t.insert().values(x=True))
@@ -351,7 +411,6 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
eq_(info['default'], '3')
-
class DialectTest(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = 'sqlite'
@@ -372,7 +431,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
Column('true', Integer),
Column('false', Integer),
Column('column', Integer),
- )
+ )
try:
meta.create_all()
t.insert().execute(safe=1)
@@ -403,8 +462,8 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
table1 = Table('django_admin_log', metadata, autoload=True)
table2 = Table('django_content_type', metadata, autoload=True)
j = table1.join(table2)
- assert j.onclause.compare(table1.c.content_type_id
- == table2.c.id)
+ assert j.onclause.compare(
+ table1.c.content_type_id == table2.c.id)
@testing.provide_metadata
def test_quoted_identifiers_functional_two(self):
@@ -426,8 +485,8 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
# unfortunately, still can't do this; sqlite quadruples
# up the quotes on the table name here for pragma foreign_key_list
- #testing.db.execute(r'''
- #CREATE TABLE """b""" (
+ # testing.db.execute(r'''
+ # CREATE TABLE """b""" (
# """id""" integer NOT NULL PRIMARY KEY,
# """aid""" integer NULL
# REFERENCES """a""" ("""id""")
@@ -439,48 +498,25 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
#table2 = Table(r'"b"', metadata, autoload=True)
#j = table1.join(table2)
- #assert j.onclause.compare(table1.c['"id"']
+ # assert j.onclause.compare(table1.c['"id"']
# == table2.c['"aid"'])
- def test_legacy_quoted_identifiers_unit(self):
- dialect = sqlite.dialect()
- dialect._broken_fk_pragma_quotes = True
-
-
- for row in [
- (0, 'target', 'tid', 'id'),
- (0, '"target"', 'tid', 'id'),
- (0, '[target]', 'tid', 'id'),
- (0, "'target'", 'tid', 'id'),
- (0, '`target`', 'tid', 'id'),
- ]:
- fks = {}
- fkeys = []
- dialect._parse_fk(fks, fkeys, *row)
- eq_(fkeys, [{
- 'referred_table': 'target',
- 'referred_columns': ['id'],
- 'referred_schema': None,
- 'name': None,
- 'constrained_columns': ['tid']
- }])
-
@testing.provide_metadata
def test_description_encoding(self):
# amazingly, pysqlite seems to still deliver cursor.description
# as encoded bytes in py2k
- t = Table('x', self.metadata,
- Column(u('méil'), Integer, primary_key=True),
- Column(ue('\u6e2c\u8a66'), Integer),
- )
+ t = Table(
+ 'x', self.metadata,
+ Column(u('méil'), Integer, primary_key=True),
+ Column(ue('\u6e2c\u8a66'), Integer),
+ )
self.metadata.create_all(testing.db)
result = testing.db.execute(t.select())
assert u('méil') in result.keys()
assert ue('\u6e2c\u8a66') in result.keys()
-
def test_file_path_is_absolute(self):
d = pysqlite_dialect.dialect()
eq_(
@@ -498,48 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
e = create_engine('sqlite+pysqlite:///foo.db')
assert e.pool.__class__ is pool.NullPool
- def test_dont_reflect_autoindex(self):
- meta = MetaData(testing.db)
- t = Table('foo', meta, Column('bar', String, primary_key=True))
- meta.create_all()
- from sqlalchemy.engine.reflection import Inspector
- try:
- inspector = Inspector(testing.db)
- eq_(inspector.get_indexes('foo'), [])
- eq_(inspector.get_indexes('foo',
- include_auto_indexes=True), [{'unique': 1, 'name'
- : 'sqlite_autoindex_foo_1', 'column_names': ['bar']}])
- finally:
- meta.drop_all()
-
- def test_create_index_with_schema(self):
- """Test creation of index with explicit schema"""
-
- meta = MetaData(testing.db)
- t = Table('foo', meta, Column('bar', String, index=True),
- schema='main')
- try:
- meta.create_all()
- finally:
- meta.drop_all()
-
- def test_get_unique_constraints(self):
- meta = MetaData(testing.db)
- t1 = Table('foo', meta, Column('f', Integer),
- UniqueConstraint('f', name='foo_f'))
- t2 = Table('bar', meta, Column('b', Integer),
- UniqueConstraint('b', name='bar_b'),
- prefixes=['TEMPORARY'])
- meta.create_all()
- from sqlalchemy.engine.reflection import Inspector
- try:
- inspector = Inspector(testing.db)
- eq_(inspector.get_unique_constraints('foo'),
- [{'column_names': [u'f'], 'name': u'foo_f'}])
- eq_(inspector.get_unique_constraints('bar'),
- [{'column_names': [u'b'], 'name': u'bar_b'}])
- finally:
- meta.drop_all()
class AttachedMemoryDBTest(fixtures.TestBase):
@@ -662,7 +656,7 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
'epoch': '%s',
'dow': '%w',
'week': '%W',
- }
+ }
for field, subst in mapping.items():
self.assert_compile(select([extract(field, t.c.col1)]),
"SELECT CAST(STRFTIME('%s', t.col1) AS "
@@ -685,53 +679,57 @@ class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
def test_constraints_with_schemas(self):
metadata = MetaData()
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
- schema='master')
- t2 = Table('t2', metadata,
- Column('id', Integer, primary_key=True),
- Column('t1_id', Integer, ForeignKey('master.t1.id')),
- schema='master'
- )
- t3 = Table('t3', metadata,
- Column('id', Integer, primary_key=True),
- Column('t1_id', Integer, ForeignKey('master.t1.id')),
- schema='alternate'
- )
- t4 = Table('t4', metadata,
- Column('id', Integer, primary_key=True),
- Column('t1_id', Integer, ForeignKey('master.t1.id')),
- )
+ Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ schema='master')
+ t2 = Table(
+ 't2', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t1_id', Integer, ForeignKey('master.t1.id')),
+ schema='master'
+ )
+ t3 = Table(
+ 't3', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t1_id', Integer, ForeignKey('master.t1.id')),
+ schema='alternate'
+ )
+ t4 = Table(
+ 't4', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('t1_id', Integer, ForeignKey('master.t1.id')),
+ )
# schema->schema, generate REFERENCES with no schema name
self.assert_compile(
schema.CreateTable(t2),
- "CREATE TABLE master.t2 ("
- "id INTEGER NOT NULL, "
- "t1_id INTEGER, "
- "PRIMARY KEY (id), "
- "FOREIGN KEY(t1_id) REFERENCES t1 (id)"
- ")"
+ "CREATE TABLE master.t2 ("
+ "id INTEGER NOT NULL, "
+ "t1_id INTEGER, "
+ "PRIMARY KEY (id), "
+ "FOREIGN KEY(t1_id) REFERENCES t1 (id)"
+ ")"
)
# schema->different schema, don't generate REFERENCES
self.assert_compile(
schema.CreateTable(t3),
- "CREATE TABLE alternate.t3 ("
- "id INTEGER NOT NULL, "
- "t1_id INTEGER, "
- "PRIMARY KEY (id)"
- ")"
+ "CREATE TABLE alternate.t3 ("
+ "id INTEGER NOT NULL, "
+ "t1_id INTEGER, "
+ "PRIMARY KEY (id)"
+ ")"
)
# same for local schema
self.assert_compile(
schema.CreateTable(t4),
- "CREATE TABLE t4 ("
- "id INTEGER NOT NULL, "
- "t1_id INTEGER, "
- "PRIMARY KEY (id)"
- ")"
+ "CREATE TABLE t4 ("
+ "id INTEGER NOT NULL, "
+ "t1_id INTEGER, "
+ "PRIMARY KEY (id)"
+ ")"
)
@@ -756,30 +754,37 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk1(self):
- self._test_empty_insert(Table('a', MetaData(testing.db),
- Column('id', Integer,
- primary_key=True)))
+ self._test_empty_insert(
+ Table(
+ 'a', MetaData(testing.db),
+ Column('id', Integer, primary_key=True)))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk2(self):
- assert_raises(exc.DBAPIError, self._test_empty_insert, Table('b'
- , MetaData(testing.db), Column('x', Integer,
- primary_key=True), Column('y', Integer,
- primary_key=True)))
+ assert_raises(
+ exc.DBAPIError, self._test_empty_insert,
+ Table(
+ 'b', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk3(self):
- assert_raises(exc.DBAPIError, self._test_empty_insert, Table('c'
- , MetaData(testing.db), Column('x', Integer,
- primary_key=True), Column('y', Integer,
- DefaultClause('123'), primary_key=True)))
+ assert_raises(
+ exc.DBAPIError, self._test_empty_insert,
+ Table(
+ 'c', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, DefaultClause('123'), primary_key=True)))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_pk4(self):
- self._test_empty_insert(Table('d', MetaData(testing.db),
- Column('x', Integer, primary_key=True),
- Column('y', Integer, DefaultClause('123'
- ))))
+ self._test_empty_insert(
+ Table(
+ 'd', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, DefaultClause('123'))
+ ))
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_nopk1(self):
@@ -788,9 +793,10 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
@testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
def test_empty_insert_nopk2(self):
- self._test_empty_insert(Table('f', MetaData(testing.db),
- Column('x', Integer), Column('y',
- Integer)))
+ self._test_empty_insert(
+ Table(
+ 'f', MetaData(testing.db),
+ Column('x', Integer), Column('y', Integer)))
def test_inserts_with_spaces(self):
tbl = Table('tbl', MetaData('sqlite:///'), Column('with space',
@@ -800,8 +806,8 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
tbl.insert().execute({'without': 123})
assert list(tbl.select().execute()) == [(None, 123)]
tbl.insert().execute({'with space': 456})
- assert list(tbl.select().execute()) == [(None, 123), (456,
- None)]
+ assert list(tbl.select().execute()) == [
+ (None, 123), (456, None)]
finally:
tbl.drop()
@@ -817,6 +823,8 @@ def full_text_search_missing():
except:
return True
+metadata = cattable = matchtable = None
+
class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -845,19 +853,20 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
""")
matchtable = Table('matchtable', metadata, autoload=True)
metadata.create_all()
- cattable.insert().execute([{'id': 1, 'description': 'Python'},
- {'id': 2, 'description': 'Ruby'}])
- matchtable.insert().execute([{'id': 1, 'title'
- : 'Agile Web Development with Rails'
- , 'category_id': 2}, {'id': 2,
- 'title': 'Dive Into Python',
- 'category_id': 1}, {'id': 3, 'title'
- : "Programming Matz's Ruby",
- 'category_id': 2}, {'id': 4, 'title'
- : 'The Definitive Guide to Django',
- 'category_id': 1}, {'id': 5, 'title'
- : 'Python in a Nutshell',
- 'category_id': 1}])
+ cattable.insert().execute(
+ [{'id': 1, 'description': 'Python'},
+ {'id': 2, 'description': 'Ruby'}])
+ matchtable.insert().execute(
+ [
+ {'id': 1, 'title': 'Agile Web Development with Rails',
+ 'category_id': 2},
+ {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+ {'id': 3, 'title': "Programming Matz's Ruby",
+ 'category_id': 2},
+ {'id': 4, 'title': 'The Definitive Guide to Django',
+ 'category_id': 1},
+ {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+ ])
@classmethod
def teardown_class(cls):
@@ -869,35 +878,38 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
def test_simple_match(self):
results = \
- matchtable.select().where(matchtable.c.title.match('python'
- )).order_by(matchtable.c.id).execute().fetchall()
+ matchtable.select().where(
+ matchtable.c.title.match('python')).\
+ order_by(matchtable.c.id).execute().fetchall()
eq_([2, 5], [r.id for r in results])
def test_simple_prefix_match(self):
results = \
- matchtable.select().where(matchtable.c.title.match('nut*'
- )).execute().fetchall()
+ matchtable.select().where(
+ matchtable.c.title.match('nut*')).execute().fetchall()
eq_([5], [r.id for r in results])
def test_or_match(self):
results2 = \
matchtable.select().where(
- matchtable.c.title.match('nutshell OR ruby'
- )).order_by(matchtable.c.id).execute().fetchall()
+ matchtable.c.title.match('nutshell OR ruby')).\
+ order_by(matchtable.c.id).execute().fetchall()
eq_([3, 5], [r.id for r in results2])
def test_and_match(self):
results2 = \
matchtable.select().where(
- matchtable.c.title.match('python nutshell'
- )).execute().fetchall()
+ matchtable.c.title.match('python nutshell')
+ ).execute().fetchall()
eq_([5], [r.id for r in results2])
def test_match_across_joins(self):
- results = matchtable.select().where(and_(cattable.c.id
- == matchtable.c.category_id,
- cattable.c.description.match('Ruby'
- ))).order_by(matchtable.c.id).execute().fetchall()
+ results = matchtable.select().where(
+ and_(
+ cattable.c.id == matchtable.c.category_id,
+ cattable.c.description.match('Ruby')
+ )
+ ).order_by(matchtable.c.id).execute().fetchall()
eq_([1, 3], [r.id for r in results])
@@ -907,10 +919,11 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL):
table = Table('autoinctable', MetaData(), Column('id', Integer,
primary_key=True), Column('x', Integer,
default=None), sqlite_autoincrement=True)
- self.assert_compile(schema.CreateTable(table),
- 'CREATE TABLE autoinctable (id INTEGER NOT '
- 'NULL PRIMARY KEY AUTOINCREMENT, x INTEGER)'
- , dialect=sqlite.dialect())
+ self.assert_compile(
+ schema.CreateTable(table),
+ 'CREATE TABLE autoinctable (id INTEGER NOT '
+ 'NULL PRIMARY KEY AUTOINCREMENT, x INTEGER)',
+ dialect=sqlite.dialect())
def test_sqlite_autoincrement_constraint(self):
table = Table(
@@ -920,7 +933,7 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL):
Column('x', Integer, default=None),
UniqueConstraint('x'),
sqlite_autoincrement=True,
- )
+ )
self.assert_compile(schema.CreateTable(table),
'CREATE TABLE autoinctable (id INTEGER NOT '
'NULL PRIMARY KEY AUTOINCREMENT, x '
@@ -944,7 +957,7 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL):
MetaData(),
Column('id', MyInteger, primary_key=True),
sqlite_autoincrement=True,
- )
+ )
self.assert_compile(schema.CreateTable(table),
'CREATE TABLE autoinctable (id INTEGER NOT '
'NULL PRIMARY KEY AUTOINCREMENT)',
@@ -958,7 +971,8 @@ class ReflectHeadlessFKsTest(fixtures.TestBase):
testing.db.execute("CREATE TABLE a (id INTEGER PRIMARY KEY)")
# this syntax actually works on other DBs perhaps we'd want to add
# tests to test_reflection
- testing.db.execute("CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)")
+ testing.db.execute(
+ "CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)")
def teardown(self):
testing.db.execute("drop table b")
@@ -971,53 +985,312 @@ class ReflectHeadlessFKsTest(fixtures.TestBase):
assert b.c.id.references(a.c.id)
-class ReflectFKConstraintTest(fixtures.TestBase):
+
+class ConstraintReflectionTest(fixtures.TestBase):
__only_on__ = 'sqlite'
- def setup(self):
- testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
- testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
- testing.db.execute("CREATE TABLE b (id INTEGER PRIMARY KEY, "
- "FOREIGN KEY(id) REFERENCES a1(id),"
- "FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
- testing.db.execute("CREATE TABLE c (id INTEGER, "
- "CONSTRAINT bar PRIMARY KEY(id),"
- "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
- "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
+ @classmethod
+ def setup_class(cls):
+ with testing.db.begin() as conn:
+
+ conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
+ conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
+ conn.execute(
+ "CREATE TABLE b (id INTEGER PRIMARY KEY, "
+ "FOREIGN KEY(id) REFERENCES a1(id),"
+ "FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ "CREATE TABLE c (id INTEGER, "
+ "CONSTRAINT bar PRIMARY KEY(id),"
+ "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
+ "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ "CREATE TABLE d (id INTEGER, x INTEGER unique)")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d1 '
+ '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)')
+
+ conn.execute(
+ # lower casing + inline is intentional
+ "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))")
+ conn.execute(
+ 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER '
+ 'references a2 ("some ( STUPID n,ame"))')
+ conn.execute(
+ 'CREATE TABLE e2 (id INTEGER, '
+ '"some ( STUPID n,ame" INTEGER NOT NULL '
+ 'references a2 ("some ( STUPID n,ame"))')
+
+ conn.execute(
+ "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TEMPORARY TABLE g "
+ "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))"
+ )
+ conn.execute(
+ # intentional broken casing
+ "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))"
+ )
+ conn.execute(
+ "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))"
+ )
+ conn.execute(
+ "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), "
+ "conSTRAINT my_fk FOreiGN KEY ( q , p ) "
+ "REFERENCes i ( x , y ))"
+ )
- def teardown(self):
- testing.db.execute("drop table c")
- testing.db.execute("drop table b")
- testing.db.execute("drop table a1")
- testing.db.execute("drop table a2")
+ meta = MetaData()
+ Table(
+ 'l', meta, Column('bar', String, index=True),
+ schema='main')
+
+ Table(
+ 'm', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x')
+ )
+
+ Table(
+ 'n', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x'),
+ prefixes=['TEMPORARY']
+ )
- def test_name_is_none(self):
+ meta.create_all(conn)
+
+ # will contain an "autoindex"
+ conn.execute("create table o (foo varchar(20) primary key)")
+
+ @classmethod
+ def teardown_class(cls):
+ with testing.db.begin() as conn:
+ for name in [
+ "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1",
+ "d", "d1", "d2", "c", "b", "a1", "a2"]:
+ conn.execute("drop table %s" % name)
+
+ def test_legacy_quoted_identifiers_unit(self):
+ dialect = sqlite.dialect()
+ dialect._broken_fk_pragma_quotes = True
+
+ for row in [
+ (0, None, 'target', 'tid', 'id', None),
+ (0, None, '"target"', 'tid', 'id', None),
+ (0, None, '[target]', 'tid', 'id', None),
+ (0, None, "'target'", 'tid', 'id', None),
+ (0, None, '`target`', 'tid', 'id', None),
+ ]:
+ def _get_table_pragma(*arg, **kw):
+ return [row]
+
+ def _get_table_sql(*arg, **kw):
+ return "CREATE TABLE foo "\
+ "(tid INTEGER, "\
+ "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2]
+ with mock.patch.object(
+ dialect, "_get_table_pragma", _get_table_pragma):
+ with mock.patch.object(
+ dialect, '_get_table_sql', _get_table_sql):
+
+ fkeys = dialect.get_foreign_keys(None, 'foo')
+ eq_(
+ fkeys,
+ [{
+ 'referred_table': 'target',
+ 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None,
+ 'constrained_columns': ['tid']
+ }])
+
+ def test_foreign_key_name_is_none(self):
# and not "0"
- meta = MetaData()
- b = Table('b', meta, autoload=True, autoload_with=testing.db)
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('b')
eq_(
- [con.name for con in b.constraints],
- [None, None, None]
+ fks,
+ [
+ {'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ {'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ ]
)
- def test_name_not_none(self):
- # we don't have names for PK constraints,
- # it appears we get back None in the pragma for
- # FKs also (also it doesn't even appear to be documented on sqlite's docs
- # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list
- # how did we ever know that's the "name" field ??)
+ def test_foreign_key_name_is_not_none(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('c')
+ eq_(
+ fks,
+ [
+ {
+ 'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo1',
+ 'constrained_columns': ['id']},
+ {
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo2',
+ 'constrained_columns': ['id']},
+ ]
+ )
- meta = MetaData()
- c = Table('c', meta, autoload=True, autoload_with=testing.db)
+ def test_unnamed_inline_foreign_key(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e')
eq_(
- set([con.name for con in c.constraints]),
- set([None, None])
+ fks,
+ [{
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['x']
+ }]
+ )
+
+ def test_unnamed_inline_foreign_key_quoted(self):
+ inspector = Inspector(testing.db)
+
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e1')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+ fks = inspector.get_foreign_keys('e2')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+
+ def test_foreign_key_composite_broken_casing(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('j')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'i',
+ 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['q', 'p']}]
+ )
+ fks = inspector.get_foreign_keys('k')
+ eq_(
+ fks,
+ [{'referred_table': 'i', 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': 'my_fk',
+ 'constrained_columns': ['q', 'p']}]
+ )
+
+ def test_dont_reflect_autoindex(self):
+ inspector = Inspector(testing.db)
+ eq_(inspector.get_indexes('o'), [])
+ eq_(
+ inspector.get_indexes('o', include_auto_indexes=True),
+ [{
+ 'unique': 1,
+ 'name': 'sqlite_autoindex_o_1',
+ 'column_names': ['foo']}])
+
+ def test_create_index_with_schema(self):
+ """Test creation of index with explicit schema"""
+
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_indexes('l', schema='main'),
+ [{'unique': 0, 'name': u'ix_main_l_bar',
+ 'column_names': [u'bar']}])
+
+ def test_unique_constraint_named(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("f"),
+ [{'column_names': ['x'], 'name': 'foo_fx'}]
+ )
+
+ def test_unique_constraint_named_broken_casing(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("h"),
+ [{'column_names': ['x'], 'name': 'foo_hx'}]
+ )
+
+ def test_unique_constraint_named_broken_temp(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("g"),
+ [{'column_names': ['x'], 'name': 'foo_gx'}]
+ )
+
+ def test_unique_constraint_unnamed_inline(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_inline_quoted(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d1"),
+ [{'column_names': ['some ( STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d2"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d3"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("m"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal_temporary(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("n"),
+ [{'column_names': ['x'], 'name': None}]
)
class SavepointTest(fixtures.TablesTest):
+
"""test that savepoints work when we use the correct event setup"""
__only_on__ = 'sqlite'
@@ -1081,7 +1354,7 @@ class SavepointTest(fixtures.TablesTest):
connection = self.bind.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
- trans2 = connection.begin_nested()
+ connection.begin_nested()
connection.execute(users.insert(), user_id=2, user_name='user2')
trans3 = connection.begin()
connection.execute(users.insert(), user_id=3, user_name='user3')
@@ -1127,6 +1400,16 @@ class TypeReflectionTest(fixtures.TestBase):
(sqltypes.Time, sqltypes.TIME()),
(sqltypes.BOOLEAN, sqltypes.BOOLEAN()),
(sqltypes.Boolean, sqltypes.BOOLEAN()),
+ (sqlite.DATE(
+ storage_format="%(year)04d%(month)02d%(day)02d",
+ ), sqltypes.DATE()),
+ (sqlite.TIME(
+ storage_format="%(hour)02d%(minute)02d%(second)02d",
+ ), sqltypes.TIME()),
+ (sqlite.DATETIME(
+ storage_format="%(year)04d%(month)02d%(day)02d"
+ "%(hour)02d%(minute)02d%(second)02d",
+ ), sqltypes.DATETIME()),
]
def _unsupported_args_fixture(self):
@@ -1169,8 +1452,8 @@ class TypeReflectionTest(fixtures.TestBase):
if warnings:
def go():
return dialect._resolve_type_affinity(from_)
- final_type = testing.assert_warnings(go,
- ["Could not instantiate"], regex=True)
+ final_type = testing.assert_warnings(
+ go, ["Could not instantiate"], regex=True)
else:
final_type = dialect._resolve_type_affinity(from_)
expected_type = type(to_)
@@ -1186,8 +1469,8 @@ class TypeReflectionTest(fixtures.TestBase):
if warnings:
def go():
return inspector.get_columns("foo")[0]
- col_info = testing.assert_warnings(go,
- ["Could not instantiate"], regex=True)
+ col_info = testing.assert_warnings(
+ go, ["Could not instantiate"], regex=True)
else:
col_info = inspector.get_columns("foo")[0]
expected_type = type(to_)
@@ -1207,7 +1490,8 @@ class TypeReflectionTest(fixtures.TestBase):
self._test_lookup_direct(self._fixed_lookup_fixture())
def test_lookup_direct_unsupported_args(self):
- self._test_lookup_direct(self._unsupported_args_fixture(), warnings=True)
+ self._test_lookup_direct(
+ self._unsupported_args_fixture(), warnings=True)
def test_lookup_direct_type_affinity(self):
self._test_lookup_direct(self._type_affinity_fixture())
@@ -1216,8 +1500,8 @@ class TypeReflectionTest(fixtures.TestBase):
self._test_round_trip(self._fixed_lookup_fixture())
def test_round_trip_direct_unsupported_args(self):
- self._test_round_trip(self._unsupported_args_fixture(), warnings=True)
+ self._test_round_trip(
+ self._unsupported_args_fixture(), warnings=True)
def test_round_trip_direct_type_affinity(self):
self._test_round_trip(self._type_affinity_fixture())
-
diff --git a/test/dialect/test_suite.py b/test/dialect/test_suite.py
index e6d642ced..3820a7721 100644
--- a/test/dialect/test_suite.py
+++ b/test/dialect/test_suite.py
@@ -1,2 +1,3 @@
from sqlalchemy.testing.suite import *
+
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 5c3279ba9..730ef4446 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -174,7 +174,7 @@ class ExecuteTest(fixtures.TestBase):
@testing.skip_if(
lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
@testing.fails_on_everything_except(
- 'postgresql+psycopg2',
+ 'postgresql+psycopg2', 'postgresql+psycopg2cffi',
'postgresql+pypostgresql', 'mysql+mysqlconnector',
'mysql+pymysql', 'mysql+cymysql')
def test_raw_python(self):
@@ -639,21 +639,21 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_ctx_commit(self):
fn = self._trans_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_ctx_rollback(self):
fn = self._trans_rollback_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager, ctx, fn, 5, value=8
- )
- self._assert_no_data()
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
def test_connection_as_ctx(self):
fn = self._trans_fn()
@@ -666,10 +666,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_connect_as_ctx_noautocommit(self):
fn = self._trans_fn()
self._assert_no_data()
- ctx = testing.db.connect().execution_options(autocommit=False)
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is off
- self._assert_no_data()
+
+ with testing.db.connect() as conn:
+ ctx = conn.execution_options(autocommit=False)
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ # autocommit is off
+ self._assert_no_data()
def test_transaction_engine_fn_commit(self):
fn = self._trans_fn()
@@ -687,17 +689,17 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_fn_commit(self):
fn = self._trans_fn()
- conn = testing.db.connect()
- conn.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ conn.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_fn_rollback(self):
fn = self._trans_rollback_fn()
- conn = testing.db.connect()
- assert_raises(
- Exception,
- conn.transaction, fn, 5, value=8
- )
+ with testing.db.connect() as conn:
+ assert_raises(
+ Exception,
+ conn.transaction, fn, 5, value=8
+ )
self._assert_no_data()
@@ -1900,6 +1902,272 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ def test_handle_error_event_connect_isolation_level(self):
+ engine = engines.testing_engine()
+
+ class MySpecialException(Exception):
+ pass
+
+ @event.listens_for(engine, "handle_error")
+ def handle_error(ctx):
+ raise MySpecialException("failed operation")
+
+ ProgrammingError = engine.dialect.dbapi.ProgrammingError
+ with engine.connect() as conn:
+ with patch.object(
+ conn.dialect, "get_isolation_level",
+ Mock(side_effect=ProgrammingError("random error"))
+ ):
+ assert_raises(
+ MySpecialException,
+ conn.get_isolation_level
+ )
+
+
+class HandleInvalidatedOnConnectTest(fixtures.TestBase):
+ __requires__ = ('sqlite', )
+
+ def setUp(self):
+ e = create_engine('sqlite://')
+
+ connection = Mock(
+ get_server_version_info=Mock(return_value='5.0'))
+
+ def connect(*args, **kwargs):
+ return connection
+ dbapi = Mock(
+ sqlite_version_info=(99, 9, 9,),
+ version_info=(99, 9, 9,),
+ sqlite_version='99.9.9',
+ paramstyle='named',
+ connect=Mock(side_effect=connect)
+ )
+
+ sqlite3 = e.dialect.dbapi
+ dbapi.Error = sqlite3.Error,
+ dbapi.ProgrammingError = sqlite3.ProgrammingError
+
+ self.dbapi = dbapi
+ self.ProgrammingError = sqlite3.ProgrammingError
+
+ def test_wraps_connect_in_dbapi(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+ try:
+ create_engine('sqlite://', module=dbapi).connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_handle_error_event_connect(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ getattr, conn, 'connection'
+ )
+
+ def test_handle_error_event_implicit_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(
+ ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ conn.execute, select([1])
+ )
+
+ def test_handle_error_custom_connect(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ def custom_connect():
+ raise self.ProgrammingError("random error")
+
+ eng = create_engine('sqlite://', module=dbapi, creator=custom_connect)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_connect_invalidate_flag(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+ ctx.is_disconnect = False
+
+ try:
+ eng.connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_cant_connect_stay_invalidated(self):
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://')
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+
+ conn = eng.connect()
+
+ conn.invalidate()
+
+ eng.pool._creator = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ try:
+ conn.connection
+ assert False
+ except tsa.exc.DBAPIError:
+ assert conn.invalidated
+
+ def _test_dont_touch_non_dbapi_exception_on_connect(self, connect_fn):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
+
+ e = create_engine('sqlite://', module=dbapi)
+ e.dialect.is_disconnect = is_disconnect = Mock()
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ connect_fn, e
+ )
+ eq_(is_disconnect.call_count, 0)
+
+ def test_dont_touch_non_dbapi_exception_on_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.connect())
+
+ def test_dont_touch_non_dbapi_exception_on_contextual_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.contextual_connect())
+
+ def test_ensure_dialect_does_is_disconnect_no_conn(self):
+ """test that is_disconnect() doesn't choke if no connection,
+ cursor given."""
+ dialect = testing.db.dialect
+ dbapi = dialect.dbapi
+ assert not dialect.is_disconnect(
+ dbapi.OperationalError("test"), None, None)
+
+ def _test_invalidate_on_connect(self, connect_fn):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+ try:
+ connect_fn(create_engine('sqlite://', module=dbapi))
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert de.connection_invalidated
+
+ def test_invalidate_on_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(lambda engine: engine.connect())
+
+ def test_invalidate_on_contextual_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(
+ lambda engine: engine.contextual_connect())
+
class ProxyConnectionTest(fixtures.TestBase):
diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py
index 391b92144..e53a99e15 100644
--- a/test/engine/test_parseconnect.py
+++ b/test/engine/test_parseconnect.py
@@ -1,12 +1,15 @@
from sqlalchemy.testing import assert_raises, eq_, assert_raises_message
-from sqlalchemy.util.compat import configparser, StringIO
import sqlalchemy.engine.url as url
from sqlalchemy import create_engine, engine_from_config, exc, pool
from sqlalchemy.engine.default import DefaultDialect
import sqlalchemy as tsa
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
-from sqlalchemy.testing.mock import Mock, MagicMock, patch
+from sqlalchemy.testing.mock import Mock, MagicMock
+from sqlalchemy import event
+from sqlalchemy import select
+
+dialect = None
class ParseConnectTest(fixtures.TestBase):
@@ -31,21 +34,25 @@ class ParseConnectTest(fixtures.TestBase):
'dbtype://username:password@/database',
'dbtype:////usr/local/_xtest@example.com/members.db',
'dbtype://username:apples%2Foranges@hostspec/database',
- 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]/database?foo=bar',
- 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]:80/database?foo=bar'
- ):
+ 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]'
+ '/database?foo=bar',
+ 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]:80'
+ '/database?foo=bar'
+ ):
u = url.make_url(text)
assert u.drivername in ('dbtype', 'dbtype+apitype')
assert u.username in ('username', None)
assert u.password in ('password', 'apples/oranges', None)
- assert u.host in ('hostspec', '127.0.0.1',
- '2001:da8:2004:1000:202:116:160:90', '', None), u.host
- assert u.database in ('database',
- '/usr/local/_xtest@example.com/members.db',
- '/usr/db_file.db', ':memory:', '',
- 'foo/bar/im/a/file',
- 'E:/work/src/LEM/db/hello.db', None), u.database
+ assert u.host in (
+ 'hostspec', '127.0.0.1',
+ '2001:da8:2004:1000:202:116:160:90', '', None), u.host
+ assert u.database in (
+ 'database',
+ '/usr/local/_xtest@example.com/members.db',
+ '/usr/db_file.db', ':memory:', '',
+ 'foo/bar/im/a/file',
+ 'E:/work/src/LEM/db/hello.db', None), u.database
eq_(str(u), text)
def test_rfc1738_password(self):
@@ -53,13 +60,17 @@ class ParseConnectTest(fixtures.TestBase):
eq_(u.password, "pass word + other:words")
eq_(str(u), "dbtype://user:pass word + other%3Awords@host/dbname")
- u = url.make_url('dbtype://username:apples%2Foranges@hostspec/database')
+ u = url.make_url(
+ 'dbtype://username:apples%2Foranges@hostspec/database')
eq_(u.password, "apples/oranges")
eq_(str(u), 'dbtype://username:apples%2Foranges@hostspec/database')
- u = url.make_url('dbtype://username:apples%40oranges%40%40@hostspec/database')
+ u = url.make_url(
+ 'dbtype://username:apples%40oranges%40%40@hostspec/database')
eq_(u.password, "apples@oranges@@")
- eq_(str(u), 'dbtype://username:apples%40oranges%40%40@hostspec/database')
+ eq_(
+ str(u),
+ 'dbtype://username:apples%40oranges%40%40@hostspec/database')
u = url.make_url('dbtype://username%40:@hostspec/database')
eq_(u.password, '')
@@ -70,23 +81,23 @@ class ParseConnectTest(fixtures.TestBase):
eq_(u.password, 'pass/word')
eq_(str(u), 'dbtype://username:pass%2Fword@hostspec/database')
+
class DialectImportTest(fixtures.TestBase):
def test_import_base_dialects(self):
-
# the globals() somehow makes it for the exec() + nose3.
for name in (
- 'mysql',
- 'firebird',
- 'postgresql',
- 'sqlite',
- 'oracle',
- 'mssql',
- ):
+ 'mysql',
+ 'firebird',
+ 'postgresql',
+ 'sqlite',
+ 'oracle',
+ 'mssql'):
exec ('from sqlalchemy.dialects import %s\ndialect = '
'%s.dialect()' % (name, name), globals())
eq_(dialect.name, name)
+
class CreateEngineTest(fixtures.TestBase):
"""test that create_engine arguments of different types get
propagated properly"""
@@ -97,26 +108,28 @@ class CreateEngineTest(fixtures.TestBase):
create_engine('postgresql://scott:tiger@somehost/test?foobe'
'r=12&lala=18&fooz=somevalue', module=dbapi,
_initialize=False)
- c = e.connect()
+ e.connect()
def test_kwargs(self):
dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'},
fooz='somevalue')
e = \
- create_engine('postgresql://scott:tiger@somehost/test?fooz='
- 'somevalue', connect_args={'foober': 12,
- 'lala': 18, 'hoho': {'this': 'dict'}},
- module=dbapi, _initialize=False)
- c = e.connect()
-
+ create_engine(
+ 'postgresql://scott:tiger@somehost/test?fooz='
+ 'somevalue', connect_args={
+ 'foober': 12,
+ 'lala': 18, 'hoho': {'this': 'dict'}},
+ module=dbapi, _initialize=False)
+ e.connect()
def test_engine_from_config(self):
dbapi = mock_dbapi
- config = \
- {'sqlalchemy.url': 'postgresql://scott:tiger@somehost/test'\
- '?fooz=somevalue', 'sqlalchemy.pool_recycle': '50',
- 'sqlalchemy.echo': 'true'}
+ config = {
+ 'sqlalchemy.url': 'postgresql://scott:tiger@somehost/test'
+ '?fooz=somevalue',
+ 'sqlalchemy.pool_recycle': '50',
+ 'sqlalchemy.echo': 'true'}
e = engine_from_config(config, module=dbapi, _initialize=False)
assert e.pool._recycle == 50
@@ -125,7 +138,6 @@ class CreateEngineTest(fixtures.TestBase):
'z=somevalue')
assert e.echo is True
-
def test_engine_from_config_custom(self):
from sqlalchemy import util
from sqlalchemy.dialects import registry
@@ -143,8 +155,9 @@ class CreateEngineTest(fixtures.TestBase):
global dialect
dialect = MyDialect
- registry.register("mockdialect.barb",
- ".".join(tokens[0:-1]), tokens[-1])
+ registry.register(
+ "mockdialect.barb",
+ ".".join(tokens[0:-1]), tokens[-1])
config = {
"sqlalchemy.url": "mockdialect+barb://",
@@ -155,7 +168,6 @@ class CreateEngineTest(fixtures.TestBase):
eq_(e.dialect.foobar, 5)
eq_(e.dialect.bathoho, False)
-
def test_custom(self):
dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'},
fooz='somevalue')
@@ -169,7 +181,7 @@ class CreateEngineTest(fixtures.TestBase):
e = create_engine('postgresql://', creator=connect,
module=dbapi, _initialize=False)
- c = e.connect()
+ e.connect()
def test_recycle(self):
dbapi = MockDBAPI(foober=12, lala=18, hoho={'this': 'dict'},
@@ -188,8 +200,9 @@ class CreateEngineTest(fixtures.TestBase):
(True, pool.reset_rollback),
(False, pool.reset_none),
]:
- e = create_engine('postgresql://', pool_reset_on_return=value,
- module=dbapi, _initialize=False)
+ e = create_engine(
+ 'postgresql://', pool_reset_on_return=value,
+ module=dbapi, _initialize=False)
assert e.pool._reset_on_return is expected
assert_raises(
@@ -217,7 +230,7 @@ class CreateEngineTest(fixtures.TestBase):
lala=5,
use_ansi=True,
module=mock_dbapi,
- )
+ )
assert_raises(TypeError, create_engine, 'postgresql://',
lala=5, module=mock_dbapi)
assert_raises(TypeError, create_engine, 'sqlite://', lala=5,
@@ -225,69 +238,6 @@ class CreateEngineTest(fixtures.TestBase):
assert_raises(TypeError, create_engine, 'mysql+mysqldb://',
use_unicode=True, module=mock_dbapi)
- @testing.requires.sqlite
- def test_wraps_connect_in_dbapi(self):
- e = create_engine('sqlite://')
- sqlite3 = e.dialect.dbapi
-
- dbapi = MockDBAPI()
- dbapi.Error = sqlite3.Error,
- dbapi.ProgrammingError = sqlite3.ProgrammingError
- dbapi.connect = Mock(side_effect=sqlite3.ProgrammingError("random error"))
- try:
- create_engine('sqlite://', module=dbapi).connect()
- assert False
- except tsa.exc.DBAPIError as de:
- assert not de.connection_invalidated
-
-
- @testing.requires.sqlite
- def test_dont_touch_non_dbapi_exception_on_connect(self):
- e = create_engine('sqlite://')
- sqlite3 = e.dialect.dbapi
-
- dbapi = MockDBAPI()
- dbapi.Error = sqlite3.Error,
- dbapi.ProgrammingError = sqlite3.ProgrammingError
- dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
- e = create_engine('sqlite://', module=dbapi)
- e.dialect.is_disconnect = is_disconnect = Mock()
- assert_raises_message(
- TypeError,
- "I'm not a DBAPI error",
- e.connect
- )
- eq_(is_disconnect.call_count, 0)
-
- def test_ensure_dialect_does_is_disconnect_no_conn(self):
- """test that is_disconnect() doesn't choke if no connection, cursor given."""
- dialect = testing.db.dialect
- dbapi = dialect.dbapi
- assert not dialect.is_disconnect(dbapi.OperationalError("test"), None, None)
-
- @testing.requires.sqlite
- def test_invalidate_on_connect(self):
- """test that is_disconnect() is called during connect.
-
- interpretation of connection failures are not supported by
- every backend.
-
- """
-
- e = create_engine('sqlite://')
- sqlite3 = e.dialect.dbapi
-
- dbapi = MockDBAPI()
- dbapi.Error = sqlite3.Error,
- dbapi.ProgrammingError = sqlite3.ProgrammingError
- dbapi.connect = Mock(side_effect=sqlite3.ProgrammingError(
- "Cannot operate on a closed database."))
- try:
- create_engine('sqlite://', module=dbapi).connect()
- assert False
- except tsa.exc.DBAPIError as de:
- assert de.connection_invalidated
-
def test_urlattr(self):
"""test the url attribute on ``Engine``."""
@@ -313,7 +263,7 @@ class CreateEngineTest(fixtures.TestBase):
echo_pool=None,
module=mock_dbapi,
_initialize=False,
- )
+ )
assert e.pool._recycle == 50
# these args work for QueuePool
@@ -325,7 +275,7 @@ class CreateEngineTest(fixtures.TestBase):
poolclass=tsa.pool.QueuePool,
module=mock_dbapi,
_initialize=False,
- )
+ )
# but not SingletonThreadPool
@@ -338,7 +288,8 @@ class CreateEngineTest(fixtures.TestBase):
poolclass=tsa.pool.SingletonThreadPool,
module=mock_sqlite_dbapi,
_initialize=False,
- )
+ )
+
class TestRegNewDBAPI(fixtures.TestBase):
def test_register_base(self):
@@ -361,7 +312,8 @@ class TestRegNewDBAPI(fixtures.TestBase):
global dialect
dialect = MockDialect
- registry.register("mockdialect.foob", ".".join(tokens[0:-1]), tokens[-1])
+ registry.register(
+ "mockdialect.foob", ".".join(tokens[0:-1]), tokens[-1])
e = create_engine("mockdialect+foob://")
assert isinstance(e.dialect, MockDialect)
@@ -373,13 +325,16 @@ class TestRegNewDBAPI(fixtures.TestBase):
e = create_engine("mysql+my_mock_dialect://")
assert isinstance(e.dialect, MockDialect)
+
class MockDialect(DefaultDialect):
@classmethod
def dbapi(cls, **kw):
return MockDBAPI()
+
def MockDBAPI(**assert_kwargs):
connection = Mock(get_server_version_info=Mock(return_value='5.0'))
+
def connect(*args, **kwargs):
for k in assert_kwargs:
assert k in kwargs, 'key %s not present in dictionary' % k
@@ -389,12 +344,12 @@ def MockDBAPI(**assert_kwargs):
return connection
return MagicMock(
- sqlite_version_info=(99, 9, 9,),
- version_info=(99, 9, 9,),
- sqlite_version='99.9.9',
- paramstyle='named',
- connect=Mock(side_effect=connect)
- )
+ sqlite_version_info=(99, 9, 9,),
+ version_info=(99, 9, 9,),
+ sqlite_version='99.9.9',
+ paramstyle='named',
+ connect=Mock(side_effect=connect)
+ )
mock_dbapi = MockDBAPI()
mock_sqlite_dbapi = msd = MockDBAPI()
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index b3b17e75a..0f5bb4cb5 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1240,7 +1240,7 @@ class IsolationLevelTest(fixtures.TestBase):
eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection)
+ eng.connect().connection)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
@@ -1248,7 +1248,7 @@ class IsolationLevelTest(fixtures.TestBase):
eng = testing_engine(options=dict(isolation_level=level))
eq_(
eng.dialect.get_isolation_level(
- eng.connect().connection),
+ eng.connect().connection),
level
)
@@ -1270,7 +1270,7 @@ class IsolationLevelTest(fixtures.TestBase):
def test_default_level(self):
eng = testing_engine(options=dict())
isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection)
+ eng.connect().connection)
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
@@ -1282,8 +1282,8 @@ class IsolationLevelTest(fixtures.TestBase):
)
eng.dialect.set_isolation_level(
- conn.connection, self._non_default_isolation_level()
- )
+ conn.connection, self._non_default_isolation_level()
+ )
eq_(
eng.dialect.get_isolation_level(conn.connection),
self._non_default_isolation_level()
@@ -1298,14 +1298,15 @@ class IsolationLevelTest(fixtures.TestBase):
conn.close()
def test_reset_level_with_setting(self):
- eng = testing_engine(options=dict(
- isolation_level=
- self._non_default_isolation_level()))
+ eng = testing_engine(
+ options=dict(
+ isolation_level=self._non_default_isolation_level()))
conn = eng.connect()
eq_(eng.dialect.get_isolation_level(conn.connection),
self._non_default_isolation_level())
- eng.dialect.set_isolation_level(conn.connection,
- self._default_isolation_level())
+ eng.dialect.set_isolation_level(
+ conn.connection,
+ self._default_isolation_level())
eq_(eng.dialect.get_isolation_level(conn.connection),
self._default_isolation_level())
eng.dialect.reset_isolation_level(conn.connection)
@@ -1317,22 +1318,24 @@ class IsolationLevelTest(fixtures.TestBase):
eng = testing_engine(options=dict(isolation_level='FOO'))
assert_raises_message(
exc.ArgumentError,
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s" %
- ("FOO", eng.dialect.name,
- ", ".join(eng.dialect._isolation_lookup)),
- eng.connect)
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s" %
+ ("FOO",
+ eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
+ eng.connect
+ )
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
- eng = testing_engine(options=dict(
- poolclass=QueuePool,
- pool_size=2, max_overflow=0))
+ eng = testing_engine(
+ options=dict(
+ poolclass=QueuePool,
+ pool_size=2, max_overflow=0))
c1 = eng.connect()
c1 = c1.execution_options(
- isolation_level=self._non_default_isolation_level()
- )
+ isolation_level=self._non_default_isolation_level()
+ )
c2 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c1.connection),
@@ -1366,19 +1369,41 @@ class IsolationLevelTest(fixtures.TestBase):
r"per-engine using the isolation_level "
r"argument to create_engine\(\).",
select([1]).execution_options,
- isolation_level=self._non_default_isolation_level()
+ isolation_level=self._non_default_isolation_level()
)
-
def test_per_engine(self):
# new in 0.9
- eng = create_engine(testing.db.url,
- execution_options={
- 'isolation_level':
- self._non_default_isolation_level()}
- )
+ eng = create_engine(
+ testing.db.url,
+ execution_options={
+ 'isolation_level':
+ self._non_default_isolation_level()}
+ )
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection),
self._non_default_isolation_level()
)
+
+ def test_isolation_level_accessors_connection_default(self):
+ eng = create_engine(
+ testing.db.url
+ )
+ with eng.connect() as conn:
+ eq_(conn.default_isolation_level, self._default_isolation_level())
+ with eng.connect() as conn:
+ eq_(conn.get_isolation_level(), self._default_isolation_level())
+
+ def test_isolation_level_accessors_connection_option_modified(self):
+ eng = create_engine(
+ testing.db.url
+ )
+ with eng.connect() as conn:
+ c2 = conn.execution_options(
+ isolation_level=self._non_default_isolation_level())
+ eq_(conn.default_isolation_level, self._default_isolation_level())
+ eq_(conn.get_isolation_level(),
+ self._non_default_isolation_level())
+ eq_(c2.get_isolation_level(), self._non_default_isolation_level())
+
diff --git a/test/ext/test_extendedattr.py b/test/ext/test_extendedattr.py
index 352b6b241..c7627c8b2 100644
--- a/test/ext/test_extendedattr.py
+++ b/test/ext/test_extendedattr.py
@@ -485,5 +485,5 @@ class ExtendedEventsTest(fixtures.ORMTest):
register_class(A)
manager = instrumentation.manager_of_class(A)
- assert issubclass(manager.dispatch._parent_cls.__dict__['dispatch'].events, MyEvents)
+ assert issubclass(manager.dispatch._events, MyEvents)
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index 99879a74d..0af33ecde 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -235,8 +235,6 @@ class AttachedFileShardTest(ShardTest, fixtures.TestBase):
def _init_dbs(self):
db1 = testing_engine('sqlite://', options={"execution_options":
{"shard_id": "shard1"}})
- assert db1._has_events
-
db2 = db1.execution_options(shard_id="shard2")
db3 = db1.execution_options(shard_id="shard3")
db4 = db1.execution_options(shard_id="shard4")
diff --git a/test/orm/test_bulk.py b/test/orm/test_bulk.py
new file mode 100644
index 000000000..e27d3b73c
--- /dev/null
+++ b/test/orm/test_bulk.py
@@ -0,0 +1,358 @@
+from sqlalchemy import testing
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing.schema import Table, Column
+from sqlalchemy.testing import fixtures
+from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy.orm import mapper, Session
+from sqlalchemy.testing.assertsql import CompiledSQL
+from test.orm import _fixtures
+
+
+class BulkTest(testing.AssertsExecutionResults):
+ run_inserts = None
+ run_define_tables = 'each'
+
+
+class BulkInsertUpdateTest(BulkTest, _fixtures.FixtureTest):
+
+ @classmethod
+ def setup_mappers(cls):
+ User, Address = cls.classes("User", "Address")
+ u, a = cls.tables("users", "addresses")
+
+ mapper(User, u)
+ mapper(Address, a)
+
+ def test_bulk_save_return_defaults(self):
+ User, = self.classes("User",)
+
+ s = Session()
+ objects = [
+ User(name="u1"),
+ User(name="u2"),
+ User(name="u3")
+ ]
+ assert 'id' not in objects[0].__dict__
+
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_save_objects(objects, return_defaults=True)
+
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ [{'name': 'u1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ [{'name': 'u2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ [{'name': 'u3'}]
+ ),
+ )
+ eq_(objects[0].__dict__['id'], 1)
+
+ def test_bulk_save_no_defaults(self):
+ User, = self.classes("User",)
+
+ s = Session()
+ objects = [
+ User(name="u1"),
+ User(name="u2"),
+ User(name="u3")
+ ]
+ assert 'id' not in objects[0].__dict__
+
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_save_objects(objects)
+
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO users (name) VALUES (:name)",
+ [{'name': 'u1'}, {'name': 'u2'}, {'name': 'u3'}]
+ ),
+ )
+ assert 'id' not in objects[0].__dict__
+
+ def test_bulk_save_updated_include_unchanged(self):
+ User, = self.classes("User",)
+
+ s = Session(expire_on_commit=False)
+ objects = [
+ User(name="u1"),
+ User(name="u2"),
+ User(name="u3")
+ ]
+ s.add_all(objects)
+ s.commit()
+
+ objects[0].name = 'u1new'
+ objects[2].name = 'u3new'
+
+ s = Session()
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_save_objects(objects, update_changed_only=False)
+
+ asserter.assert_(
+ CompiledSQL(
+ "UPDATE users SET id=:id, name=:name WHERE "
+ "users.id = :users_id",
+ [{'users_id': 1, 'id': 1, 'name': 'u1new'},
+ {'users_id': 2, 'id': 2, 'name': 'u2'},
+ {'users_id': 3, 'id': 3, 'name': 'u3new'}]
+ )
+ )
+
+
+class BulkInheritanceTest(BulkTest, fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'people', metadata,
+ Column(
+ 'person_id', Integer,
+ primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ Table(
+ 'engineers', metadata,
+ Column(
+ 'person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('status', String(30)),
+ Column('primary_language', String(50)))
+
+ Table(
+ 'managers', metadata,
+ Column(
+ 'person_id', Integer,
+ ForeignKey('people.person_id'),
+ primary_key=True),
+ Column('status', String(30)),
+ Column('manager_name', String(50)))
+
+ Table(
+ 'boss', metadata,
+ Column(
+ 'boss_id', Integer,
+ ForeignKey('managers.person_id'),
+ primary_key=True),
+ Column('golf_swing', String(30)))
+
+ @classmethod
+ def setup_classes(cls):
+ class Base(cls.Comparable):
+ pass
+
+ class Person(Base):
+ pass
+
+ class Engineer(Person):
+ pass
+
+ class Manager(Person):
+ pass
+
+ class Boss(Manager):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ Person, Engineer, Manager, Boss = \
+ cls.classes('Person', 'Engineer', 'Manager', 'Boss')
+ p, e, m, b = cls.tables('people', 'engineers', 'managers', 'boss')
+
+ mapper(
+ Person, p, polymorphic_on=p.c.type,
+ polymorphic_identity='person')
+ mapper(Engineer, e, inherits=Person, polymorphic_identity='engineer')
+ mapper(Manager, m, inherits=Person, polymorphic_identity='manager')
+ mapper(Boss, b, inherits=Manager, polymorphic_identity='boss')
+
+ def test_bulk_save_joined_inh_return_defaults(self):
+ Person, Engineer, Manager, Boss = \
+ self.classes('Person', 'Engineer', 'Manager', 'Boss')
+
+ s = Session()
+ objects = [
+ Manager(name='m1', status='s1', manager_name='mn1'),
+ Engineer(name='e1', status='s2', primary_language='l1'),
+ Engineer(name='e2', status='s3', primary_language='l2'),
+ Boss(
+ name='b1', status='s3', manager_name='mn2',
+ golf_swing='g1')
+ ]
+ assert 'person_id' not in objects[0].__dict__
+
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_save_objects(objects, return_defaults=True)
+
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO people (name, type) VALUES (:name, :type)",
+ [{'type': 'manager', 'name': 'm1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO managers (person_id, status, manager_name) "
+ "VALUES (:person_id, :status, :manager_name)",
+ [{'person_id': 1, 'status': 's1', 'manager_name': 'mn1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (name, type) VALUES (:name, :type)",
+ [{'type': 'engineer', 'name': 'e1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (name, type) VALUES (:name, :type)",
+ [{'type': 'engineer', 'name': 'e2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO engineers (person_id, status, primary_language) "
+ "VALUES (:person_id, :status, :primary_language)",
+ [{'person_id': 2, 'status': 's2', 'primary_language': 'l1'},
+ {'person_id': 3, 'status': 's3', 'primary_language': 'l2'}]
+
+ ),
+ CompiledSQL(
+ "INSERT INTO people (name, type) VALUES (:name, :type)",
+ [{'type': 'boss', 'name': 'b1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO managers (person_id, status, manager_name) "
+ "VALUES (:person_id, :status, :manager_name)",
+ [{'person_id': 4, 'status': 's3', 'manager_name': 'mn2'}]
+
+ ),
+ CompiledSQL(
+ "INSERT INTO boss (boss_id, golf_swing) VALUES "
+ "(:boss_id, :golf_swing)",
+ [{'boss_id': 4, 'golf_swing': 'g1'}]
+ )
+ )
+ eq_(objects[0].__dict__['person_id'], 1)
+ eq_(objects[3].__dict__['person_id'], 4)
+ eq_(objects[3].__dict__['boss_id'], 4)
+
+ def test_bulk_save_joined_inh_no_defaults(self):
+ Person, Engineer, Manager, Boss = \
+ self.classes('Person', 'Engineer', 'Manager', 'Boss')
+
+ s = Session()
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_save_objects([
+ Manager(
+ person_id=1,
+ name='m1', status='s1', manager_name='mn1'),
+ Engineer(
+ person_id=2,
+ name='e1', status='s2', primary_language='l1'),
+ Engineer(
+ person_id=3,
+ name='e2', status='s3', primary_language='l2'),
+ Boss(
+ person_id=4, boss_id=4,
+ name='b1', status='s3', manager_name='mn2',
+ golf_swing='g1')
+ ],
+
+ )
+
+ # the only difference here is that common classes are grouped together.
+ # at the moment it doesn't lump all the "people" tables from
+ # different classes together.
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO people (person_id, name, type) VALUES "
+ "(:person_id, :name, :type)",
+ [{'person_id': 1, 'type': 'manager', 'name': 'm1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO managers (person_id, status, manager_name) "
+ "VALUES (:person_id, :status, :manager_name)",
+ [{'status': 's1', 'person_id': 1, 'manager_name': 'mn1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (person_id, name, type) VALUES "
+ "(:person_id, :name, :type)",
+ [{'person_id': 2, 'type': 'engineer', 'name': 'e1'},
+ {'person_id': 3, 'type': 'engineer', 'name': 'e2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO engineers (person_id, status, primary_language) "
+ "VALUES (:person_id, :status, :primary_language)",
+ [{'person_id': 2, 'status': 's2', 'primary_language': 'l1'},
+ {'person_id': 3, 'status': 's3', 'primary_language': 'l2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (person_id, name, type) VALUES "
+ "(:person_id, :name, :type)",
+ [{'person_id': 4, 'type': 'boss', 'name': 'b1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO managers (person_id, status, manager_name) "
+ "VALUES (:person_id, :status, :manager_name)",
+ [{'status': 's3', 'person_id': 4, 'manager_name': 'mn2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO boss (boss_id, golf_swing) VALUES "
+ "(:boss_id, :golf_swing)",
+ [{'boss_id': 4, 'golf_swing': 'g1'}]
+ )
+ )
+
+ def test_bulk_insert_joined_inh_return_defaults(self):
+ Person, Engineer, Manager, Boss = \
+ self.classes('Person', 'Engineer', 'Manager', 'Boss')
+
+ s = Session()
+ with self.sql_execution_asserter() as asserter:
+ s.bulk_insert_mappings(
+ Boss,
+ [
+ dict(
+ name='b1', status='s1', manager_name='mn1',
+ golf_swing='g1'
+ ),
+ dict(
+ name='b2', status='s2', manager_name='mn2',
+ golf_swing='g2'
+ ),
+ dict(
+ name='b3', status='s3', manager_name='mn3',
+ golf_swing='g3'
+ ),
+ ], return_defaults=True
+ )
+
+ asserter.assert_(
+ CompiledSQL(
+ "INSERT INTO people (name) VALUES (:name)",
+ [{'name': 'b1'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (name) VALUES (:name)",
+ [{'name': 'b2'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO people (name) VALUES (:name)",
+ [{'name': 'b3'}]
+ ),
+ CompiledSQL(
+ "INSERT INTO managers (person_id, status, manager_name) "
+ "VALUES (:person_id, :status, :manager_name)",
+ [{'person_id': 1, 'status': 's1', 'manager_name': 'mn1'},
+ {'person_id': 2, 'status': 's2', 'manager_name': 'mn2'},
+ {'person_id': 3, 'status': 's3', 'manager_name': 'mn3'}]
+
+ ),
+ CompiledSQL(
+ "INSERT INTO boss (boss_id, golf_swing) VALUES "
+ "(:boss_id, :golf_swing)",
+ [{'golf_swing': 'g1', 'boss_id': 1},
+ {'golf_swing': 'g2', 'boss_id': 2},
+ {'golf_swing': 'g3', 'boss_id': 3}]
+ )
+ )
diff --git a/test/orm/test_cycles.py b/test/orm/test_cycles.py
index 8e086ff88..c95b8d152 100644
--- a/test/orm/test_cycles.py
+++ b/test/orm/test_cycles.py
@@ -11,7 +11,7 @@ from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, sessionmaker
from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import RegexSQL, ExactSQL, CompiledSQL, AllOf
+from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf
from sqlalchemy.testing import fixtures
@@ -284,7 +284,7 @@ class InheritTestTwo(fixtures.MappedTest):
Table('c', metadata,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('aid', Integer,
- ForeignKey('a.id', use_alter=True, name="foo")))
+ ForeignKey('a.id', name="foo")))
@classmethod
def setup_classes(cls):
@@ -334,7 +334,7 @@ class BiDirectionalManyToOneTest(fixtures.MappedTest):
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
Column('t1id', Integer,
- ForeignKey('t1.id', use_alter=True, name="foo_fk")))
+ ForeignKey('t1.id', name="foo_fk")))
Table('t3', metadata,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
@@ -436,7 +436,7 @@ class BiDirectionalOneToManyTest(fixtures.MappedTest):
Table('t2', metadata,
Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('c2', Integer,
- ForeignKey('t1.c1', use_alter=True, name='t1c1_fk')))
+ ForeignKey('t1.c1', name='t1c1_fk')))
@classmethod
def setup_classes(cls):
@@ -491,7 +491,7 @@ class BiDirectionalOneToManyTest2(fixtures.MappedTest):
Table('t2', metadata,
Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('c2', Integer,
- ForeignKey('t1.c1', use_alter=True, name='t1c1_fq')),
+ ForeignKey('t1.c1', name='t1c1_fq')),
test_needs_autoincrement=True)
Table('t1_data', metadata,
@@ -572,7 +572,7 @@ class OneToManyManyToOneTest(fixtures.MappedTest):
Table('ball', metadata,
Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('person_id', Integer,
- ForeignKey('person.id', use_alter=True, name='fk_person_id')),
+ ForeignKey('person.id', name='fk_person_id')),
Column('data', String(30)))
Table('person', metadata,
@@ -656,7 +656,7 @@ class OneToManyManyToOneTest(fixtures.MappedTest):
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
- ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+ CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
"WHERE person.id = :person_id",
lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id}
),
@@ -667,11 +667,11 @@ class OneToManyManyToOneTest(fixtures.MappedTest):
self.assert_sql_execution(
testing.db,
sess.flush,
- ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+ CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
"WHERE person.id = :person_id",
lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}),
- ExactSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
- ExactSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
+ CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
+ CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
)
def test_post_update_backref(self):
@@ -1024,7 +1024,7 @@ class SelfReferentialPostUpdateTest3(fixtures.MappedTest):
test_needs_autoincrement=True),
Column('name', String(50), nullable=False),
Column('child_id', Integer,
- ForeignKey('child.id', use_alter=True, name='c1'), nullable=True))
+ ForeignKey('child.id', name='c1'), nullable=True))
Table('child', metadata,
Column('id', Integer, primary_key=True,
@@ -1094,11 +1094,11 @@ class PostUpdateBatchingTest(fixtures.MappedTest):
test_needs_autoincrement=True),
Column('name', String(50), nullable=False),
Column('c1_id', Integer,
- ForeignKey('child1.id', use_alter=True, name='c1'), nullable=True),
+ ForeignKey('child1.id', name='c1'), nullable=True),
Column('c2_id', Integer,
- ForeignKey('child2.id', use_alter=True, name='c2'), nullable=True),
+ ForeignKey('child2.id', name='c2'), nullable=True),
Column('c3_id', Integer,
- ForeignKey('child3.id', use_alter=True, name='c3'), nullable=True)
+ ForeignKey('child3.id', name='c3'), nullable=True)
)
Table('child1', metadata,
diff --git a/test/orm/test_deferred.py b/test/orm/test_deferred.py
index 1457852d8..1b777b527 100644
--- a/test/orm/test_deferred.py
+++ b/test/orm/test_deferred.py
@@ -2,10 +2,14 @@ import sqlalchemy as sa
from sqlalchemy import testing, util
from sqlalchemy.orm import mapper, deferred, defer, undefer, Load, \
load_only, undefer_group, create_session, synonym, relationship, Session,\
- joinedload, defaultload
+ joinedload, defaultload, aliased, contains_eager, with_polymorphic
from sqlalchemy.testing import eq_, AssertsCompiledSQL, assert_raises_message
from test.orm import _fixtures
-from sqlalchemy.orm import strategies
+
+
+from .inheritance._poly_fixtures import Company, Person, Engineer, Manager, \
+ Boss, Machine, Paperwork, _Polymorphic
+
class DeferredTest(AssertsCompiledSQL, _fixtures.FixtureTest):
@@ -595,3 +599,128 @@ class DeferredOptionsTest(AssertsCompiledSQL, _fixtures.FixtureTest):
)
+class InheritanceTest(_Polymorphic):
+ __dialect__ = 'default'
+
+ def test_load_only_subclass(self):
+ s = Session()
+ q = s.query(Manager).options(load_only("status", "manager_name"))
+ self.assert_compile(
+ q,
+ "SELECT managers.person_id AS managers_person_id, "
+ "people.person_id AS people_person_id, "
+ "people.type AS people_type, "
+ "managers.status AS managers_status, "
+ "managers.manager_name AS managers_manager_name "
+ "FROM people JOIN managers "
+ "ON people.person_id = managers.person_id "
+ "ORDER BY people.person_id"
+ )
+
+ def test_load_only_subclass_and_superclass(self):
+ s = Session()
+ q = s.query(Boss).options(load_only("status", "manager_name"))
+ self.assert_compile(
+ q,
+ "SELECT managers.person_id AS managers_person_id, "
+ "people.person_id AS people_person_id, "
+ "people.type AS people_type, "
+ "managers.status AS managers_status, "
+ "managers.manager_name AS managers_manager_name "
+ "FROM people JOIN managers "
+ "ON people.person_id = managers.person_id JOIN boss "
+ "ON managers.person_id = boss.boss_id ORDER BY people.person_id"
+ )
+
+ def test_load_only_alias_subclass(self):
+ s = Session()
+ m1 = aliased(Manager, flat=True)
+ q = s.query(m1).options(load_only("status", "manager_name"))
+ self.assert_compile(
+ q,
+ "SELECT managers_1.person_id AS managers_1_person_id, "
+ "people_1.person_id AS people_1_person_id, "
+ "people_1.type AS people_1_type, "
+ "managers_1.status AS managers_1_status, "
+ "managers_1.manager_name AS managers_1_manager_name "
+ "FROM people AS people_1 JOIN managers AS "
+ "managers_1 ON people_1.person_id = managers_1.person_id "
+ "ORDER BY people_1.person_id"
+ )
+
+ def test_load_only_subclass_from_relationship_polymorphic(self):
+ s = Session()
+ wp = with_polymorphic(Person, [Manager], flat=True)
+ q = s.query(Company).join(Company.employees.of_type(wp)).options(
+ contains_eager(Company.employees.of_type(wp)).
+ load_only(wp.Manager.status, wp.Manager.manager_name)
+ )
+ self.assert_compile(
+ q,
+ "SELECT people_1.person_id AS people_1_person_id, "
+ "people_1.type AS people_1_type, "
+ "managers_1.person_id AS managers_1_person_id, "
+ "managers_1.status AS managers_1_status, "
+ "managers_1.manager_name AS managers_1_manager_name, "
+ "companies.company_id AS companies_company_id, "
+ "companies.name AS companies_name "
+ "FROM companies JOIN (people AS people_1 LEFT OUTER JOIN "
+ "managers AS managers_1 ON people_1.person_id = "
+ "managers_1.person_id) ON companies.company_id = "
+ "people_1.company_id"
+ )
+
+ def test_load_only_subclass_from_relationship(self):
+ s = Session()
+ from sqlalchemy import inspect
+ inspect(Company).add_property("managers", relationship(Manager))
+ q = s.query(Company).join(Company.managers).options(
+ contains_eager(Company.managers).
+ load_only("status", "manager_name")
+ )
+ self.assert_compile(
+ q,
+ "SELECT companies.company_id AS companies_company_id, "
+ "companies.name AS companies_name, "
+ "managers.person_id AS managers_person_id, "
+ "people.person_id AS people_person_id, "
+ "people.type AS people_type, "
+ "managers.status AS managers_status, "
+ "managers.manager_name AS managers_manager_name "
+ "FROM companies JOIN (people JOIN managers ON people.person_id = "
+ "managers.person_id) ON companies.company_id = people.company_id"
+ )
+
+
+ def test_defer_on_wildcard_subclass(self):
+ # pretty much the same as load_only except doesn't
+ # exclude the primary key
+
+ s = Session()
+ q = s.query(Manager).options(
+ defer(".*"), undefer("status"))
+ self.assert_compile(
+ q,
+ "SELECT managers.status AS managers_status "
+ "FROM people JOIN managers ON "
+ "people.person_id = managers.person_id ORDER BY people.person_id"
+ )
+
+ def test_defer_super_name_on_subclass(self):
+ s = Session()
+ q = s.query(Manager).options(defer("name"))
+ self.assert_compile(
+ q,
+ "SELECT managers.person_id AS managers_person_id, "
+ "people.person_id AS people_person_id, "
+ "people.company_id AS people_company_id, "
+ "people.type AS people_type, managers.status AS managers_status, "
+ "managers.manager_name AS managers_manager_name "
+ "FROM people JOIN managers "
+ "ON people.person_id = managers.person_id "
+ "ORDER BY people.person_id"
+ )
+
+
+
+
diff --git a/test/orm/test_loading.py b/test/orm/test_loading.py
index 97c08ea29..f86477ec2 100644
--- a/test/orm/test_loading.py
+++ b/test/orm/test_loading.py
@@ -1,13 +1,40 @@
from . import _fixtures
from sqlalchemy.orm import loading, Session, aliased
-from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.assertions import eq_, assert_raises
from sqlalchemy.util import KeyedTuple
-
-# class InstancesTest(_fixtures.FixtureTest):
+from sqlalchemy.testing import mock
# class GetFromIdentityTest(_fixtures.FixtureTest):
# class LoadOnIdentTest(_fixtures.FixtureTest):
# class InstanceProcessorTest(_fixture.FixtureTest):
+
+class InstancesTest(_fixtures.FixtureTest):
+ run_setup_mappers = 'once'
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_cursor_close_w_failed_rowproc(self):
+ User = self.classes.User
+ s = Session()
+
+ q = s.query(User)
+
+ ctx = q._compile_context()
+ cursor = mock.Mock()
+ q._entities = [
+ mock.Mock(row_processor=mock.Mock(side_effect=Exception("boom")))
+ ]
+ assert_raises(
+ Exception,
+ list, loading.instances(q, cursor, ctx)
+ )
+ assert cursor.close.called, "Cursor wasn't closed"
+
+
class MergeResultTest(_fixtures.FixtureTest):
run_setup_mappers = 'once'
run_inserts = 'once'
diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py
index 63ba1a207..264b386d4 100644
--- a/test/orm/test_mapper.py
+++ b/test/orm/test_mapper.py
@@ -716,6 +716,19 @@ class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL):
m3.identity_key_from_instance(AddressUser())
)
+ def test_reassign_polymorphic_identity_warns(self):
+ User = self.classes.User
+ users = self.tables.users
+ class MyUser(User):
+ pass
+ m1 = mapper(User, users, polymorphic_on=users.c.name,
+ polymorphic_identity='user')
+ assert_raises_message(
+ sa.exc.SAWarning,
+ "Reassigning polymorphic association for identity 'user'",
+ mapper,
+ MyUser, users, inherits=User, polymorphic_identity='user'
+ )
def test_illegal_non_primary(self):
diff --git a/test/orm/test_naturalpks.py b/test/orm/test_naturalpks.py
index a4e982f84..60387ddce 100644
--- a/test/orm/test_naturalpks.py
+++ b/test/orm/test_naturalpks.py
@@ -1205,3 +1205,79 @@ class JoinedInheritanceTest(fixtures.MappedTest):
eq_(e1.boss_name, 'pointy haired')
eq_(e2.boss_name, 'pointy haired')
+
+
+class JoinedInheritancePKOnFKTest(fixtures.MappedTest):
+ """Test cascades of pk->non-pk/fk on joined table inh."""
+
+ # mssql doesn't allow ON UPDATE on self-referential keys
+ __unsupported_on__ = ('mssql',)
+
+ __requires__ = 'skip_mysql_on_windows',
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ fk_args = _backend_specific_fk_args()
+
+ Table(
+ 'person', metadata,
+ Column('name', String(50), primary_key=True),
+ Column('type', String(50), nullable=False),
+ test_needs_fk=True)
+
+ Table(
+ 'engineer', metadata,
+ Column(
+ 'id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column(
+ 'person_name', String(50),
+ ForeignKey('person.name', **fk_args)),
+ Column('primary_language', String(50)),
+ test_needs_fk=True
+ )
+
+ @classmethod
+ def setup_classes(cls):
+
+ class Person(cls.Comparable):
+ pass
+
+ class Engineer(Person):
+ pass
+
+ def _test_pk(self, passive_updates):
+ Person, person, Engineer, engineer = (
+ self.classes.Person, self.tables.person,
+ self.classes.Engineer, self.tables.engineer)
+
+ mapper(
+ Person, person, polymorphic_on=person.c.type,
+ polymorphic_identity='person', passive_updates=passive_updates)
+ mapper(
+ Engineer, engineer, inherits=Person,
+ polymorphic_identity='engineer')
+
+ sess = sa.orm.sessionmaker()()
+
+ e1 = Engineer(name='dilbert', primary_language='java')
+ sess.add(e1)
+ sess.commit()
+ e1.name = 'wally'
+ e1.primary_language = 'c++'
+
+ sess.flush()
+
+ eq_(e1.person_name, 'wally')
+
+ sess.expire_all()
+ eq_(e1.primary_language, "c++")
+
+ @testing.requires.on_update_cascade
+ def test_pk_passive(self):
+ self._test_pk(True)
+
+ #@testing.requires.non_updating_cascade
+ def test_pk_nonpassive(self):
+ self._test_pk(False)
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index 354bbe5b1..a2a1ee096 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -17,6 +17,9 @@ from sqlalchemy.testing.assertions import (
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings
from test.orm import _fixtures
from sqlalchemy.orm.util import join, with_parent
+import contextlib
+from sqlalchemy.testing import mock, is_, is_not_
+from sqlalchemy import inspect
class QueryTest(_fixtures.FixtureTest):
@@ -1484,7 +1487,6 @@ class SliceTest(QueryTest):
assert create_session().query(User).filter(User.id == 27). \
first() is None
- @testing.only_on('sqlite', 'testing execution but db-specific syntax')
def test_limit_offset_applies(self):
"""Test that the expected LIMIT/OFFSET is applied for slices.
@@ -1510,15 +1512,15 @@ class SliceTest(QueryTest):
testing.db, lambda: q[:20], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': 20, 'param_2': 0})])
+ "AS users_name FROM users LIMIT :param_1",
+ {'param_1': 20})])
self.assert_sql(
testing.db, lambda: q[5:], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': -1, 'param_2': 5})])
+ "AS users_name FROM users LIMIT -1 OFFSET :param_1",
+ {'param_1': 5})])
self.assert_sql(testing.db, lambda: q[2:2], [])
@@ -3213,3 +3215,96 @@ class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"SELECT x HAVING x = 1",
dialect=self._dialect(False)
)
+
+
+class SessionBindTest(QueryTest):
+
+ @contextlib.contextmanager
+ def _assert_bind_args(self, session):
+ get_bind = mock.Mock(side_effect=session.get_bind)
+ with mock.patch.object(session, "get_bind", get_bind):
+ yield
+ for call_ in get_bind.mock_calls:
+ is_(call_[1][0], inspect(self.classes.User))
+ is_not_(call_[2]['clause'], None)
+
+ def test_single_entity_q(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).all()
+
+ def test_sql_expr_entity_q(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User.id).all()
+
+ def test_count(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).count()
+
+ def test_aggregate_fn(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(func.max(User.name)).all()
+
+ def test_bulk_update_no_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).update(
+ {"name": "foob"}, synchronize_session=False)
+
+ def test_bulk_delete_no_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).delete(
+ synchronize_session=False)
+
+ def test_bulk_update_fetch_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).update(
+ {"name": "foob"}, synchronize_session='fetch')
+
+ def test_bulk_delete_fetch_sync(self):
+ User = self.classes.User
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(User).filter(User.id == 15).delete(
+ synchronize_session='fetch')
+
+ def test_column_property(self):
+ User = self.classes.User
+
+ mapper = inspect(User)
+ mapper.add_property(
+ "score",
+ column_property(func.coalesce(self.tables.users.c.name, None)))
+ session = Session()
+ with self._assert_bind_args(session):
+ session.query(func.max(User.score)).scalar()
+
+ def test_column_property_select(self):
+ User = self.classes.User
+ Address = self.classes.Address
+
+ mapper = inspect(User)
+ mapper.add_property(
+ "score",
+ column_property(
+ select([func.sum(Address.id)]).
+ where(Address.user_id == User.id).as_scalar()
+ )
+ )
+ session = Session()
+
+ with self._assert_bind_args(session):
+ session.query(func.max(User.score)).scalar()
+
diff --git a/test/orm/test_session.py b/test/orm/test_session.py
index 96728612d..2aa0cd3eb 100644
--- a/test/orm/test_session.py
+++ b/test/orm/test_session.py
@@ -1364,6 +1364,9 @@ class DisposedStates(fixtures.MappedTest):
def test_close(self):
self._test_session().close()
+ def test_invalidate(self):
+ self._test_session().invalidate()
+
def test_expunge_all(self):
self._test_session().expunge_all()
@@ -1446,7 +1449,9 @@ class SessionInterface(fixtures.TestBase):
raises_('refresh', user_arg)
instance_methods = self._public_session_methods() \
- - self._class_methods
+ - self._class_methods - set([
+ 'bulk_update_mappings', 'bulk_insert_mappings',
+ 'bulk_save_objects'])
eq_(watchdog, instance_methods,
watchdog.symmetric_difference(instance_methods))
diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py
index ba31e4c7d..1d7e8e693 100644
--- a/test/orm/test_transaction.py
+++ b/test/orm/test_transaction.py
@@ -184,6 +184,23 @@ class SessionTransactionTest(FixtureTest):
assert users.count().scalar() == 1
assert addresses.count().scalar() == 1
+ @testing.requires.independent_connections
+ def test_invalidate(self):
+ User, users = self.classes.User, self.tables.users
+ mapper(User, users)
+ sess = Session()
+ u = User(name='u1')
+ sess.add(u)
+ sess.flush()
+ c1 = sess.connection(User)
+
+ sess.invalidate()
+ assert c1.invalidated
+
+ eq_(sess.query(User).all(), [])
+ c2 = sess.connection(User)
+ assert not c2.invalidated
+
def test_subtransaction_on_noautocommit(self):
User, users = self.classes.User, self.tables.users
diff --git a/test/orm/test_unitofworkv2.py b/test/orm/test_unitofworkv2.py
index 374a77237..cef71370d 100644
--- a/test/orm/test_unitofworkv2.py
+++ b/test/orm/test_unitofworkv2.py
@@ -3,13 +3,13 @@ from sqlalchemy import testing
from sqlalchemy.testing import engines
from sqlalchemy.testing.schema import Table, Column
from test.orm import _fixtures
-from sqlalchemy import exc
-from sqlalchemy.testing import fixtures
-from sqlalchemy import Integer, String, ForeignKey, func
+from sqlalchemy import exc, util
+from sqlalchemy.testing import fixtures, config
+from sqlalchemy import Integer, String, ForeignKey, func, literal
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, unitofwork, attributes,\
Session, exc as orm_exc
-from sqlalchemy.testing.mock import Mock
+from sqlalchemy.testing.mock import Mock, patch
from sqlalchemy.testing.assertsql import AllOf, CompiledSQL
from sqlalchemy import event
@@ -1473,6 +1473,96 @@ class BasicStaleChecksTest(fixtures.MappedTest):
sess.flush
)
+ def test_update_single_missing_broken_multi_rowcount(self):
+ @util.memoized_property
+ def rowcount(self):
+ if len(self.context.compiled_parameters) > 1:
+ return -1
+ else:
+ return self.context.rowcount
+
+ with patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False):
+ with patch(
+ "sqlalchemy.engine.result.ResultProxy.rowcount",
+ rowcount):
+ Parent, Child = self._fixture()
+ sess = Session()
+ p1 = Parent(id=1, data=2)
+ sess.add(p1)
+ sess.flush()
+
+ sess.execute(self.tables.parent.delete())
+
+ p1.data = 3
+ assert_raises_message(
+ orm_exc.StaleDataError,
+ "UPDATE statement on table 'parent' expected to "
+ "update 1 row\(s\); 0 were matched.",
+ sess.flush
+ )
+
+ def test_update_multi_missing_broken_multi_rowcount(self):
+ @util.memoized_property
+ def rowcount(self):
+ if len(self.context.compiled_parameters) > 1:
+ return -1
+ else:
+ return self.context.rowcount
+
+ with patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False):
+ with patch(
+ "sqlalchemy.engine.result.ResultProxy.rowcount",
+ rowcount):
+ Parent, Child = self._fixture()
+ sess = Session()
+ p1 = Parent(id=1, data=2)
+ p2 = Parent(id=2, data=3)
+ sess.add_all([p1, p2])
+ sess.flush()
+
+ sess.execute(self.tables.parent.delete().where(Parent.id == 1))
+
+ p1.data = 3
+ p2.data = 4
+ sess.flush() # no exception
+
+ # update occurred for remaining row
+ eq_(
+ sess.query(Parent.id, Parent.data).all(),
+ [(2, 4)]
+ )
+
+ def test_update_value_missing_broken_multi_rowcount(self):
+ @util.memoized_property
+ def rowcount(self):
+ if len(self.context.compiled_parameters) > 1:
+ return -1
+ else:
+ return self.context.rowcount
+
+ with patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False):
+ with patch(
+ "sqlalchemy.engine.result.ResultProxy.rowcount",
+ rowcount):
+ Parent, Child = self._fixture()
+ sess = Session()
+ p1 = Parent(id=1, data=1)
+ sess.add(p1)
+ sess.flush()
+
+ sess.execute(self.tables.parent.delete())
+
+ p1.data = literal(1)
+ assert_raises_message(
+ orm_exc.StaleDataError,
+ "UPDATE statement on table 'parent' expected to "
+ "update 1 row\(s\); 0 were matched.",
+ sess.flush
+ )
+
@testing.requires.sane_multi_rowcount
def test_delete_multi_missing_warning(self):
Parent, Child = self._fixture()
@@ -1544,6 +1634,7 @@ class BatchInsertsTest(fixtures.MappedTest, testing.AssertsExecutionResults):
T(id=10, data='t10', def_='def3'),
T(id=11, data='t11'),
])
+
self.assert_sql_execution(
testing.db,
sess.flush,
diff --git a/test/orm/test_versioning.py b/test/orm/test_versioning.py
index 55ce586b5..8348cb588 100644
--- a/test/orm/test_versioning.py
+++ b/test/orm/test_versioning.py
@@ -1,7 +1,8 @@
import datetime
import sqlalchemy as sa
-from sqlalchemy.testing import engines
+from sqlalchemy.testing import engines, config
from sqlalchemy import testing
+from sqlalchemy.testing.mock import patch
from sqlalchemy import (
Integer, String, Date, ForeignKey, orm, exc, select, TypeDecorator)
from sqlalchemy.testing.schema import Table, Column
@@ -12,6 +13,7 @@ from sqlalchemy.testing import (
eq_, assert_raises, assert_raises_message, fixtures)
from sqlalchemy.testing.assertsql import CompiledSQL
import uuid
+from sqlalchemy import util
def make_uuid():
@@ -223,6 +225,30 @@ class VersioningTest(fixtures.MappedTest):
s1.refresh(f1s1, lockmode='update_nowait')
assert f1s1.version_id == f1s2.version_id
+ def test_update_multi_missing_broken_multi_rowcount(self):
+ @util.memoized_property
+ def rowcount(self):
+ if len(self.context.compiled_parameters) > 1:
+ return -1
+ else:
+ return self.context.rowcount
+
+ with patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False):
+ with patch(
+ "sqlalchemy.engine.result.ResultProxy.rowcount",
+ rowcount):
+
+ Foo = self.classes.Foo
+ s1 = self._fixture()
+ f1s1 = Foo(value='f1 value')
+ s1.add(f1s1)
+ s1.commit()
+
+ f1s1.value = 'f2 value'
+ s1.flush()
+ eq_(f1s1.version_id, 2)
+
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections
def test_noversioncheck(self):
diff --git a/test/profiles.txt b/test/profiles.txt
index 97ef13873..0eb2add93 100644
--- a/test/profiles.txt
+++ b/test/profiles.txt
@@ -16,9 +16,9 @@
test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_mysql_mysqldb_cextensions 74
test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_mysql_mysqldb_nocextensions 74
test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_postgresql_psycopg2_cextensions 74
-test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_postgresql_psycopg2_nocextensions 74
+test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_postgresql_psycopg2_nocextensions 76
test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_sqlite_pysqlite_cextensions 74
-test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_sqlite_pysqlite_nocextensions 74
+test.aaa_profiling.test_compiler.CompileTest.test_insert 2.7_sqlite_pysqlite_nocextensions 76
test.aaa_profiling.test_compiler.CompileTest.test_insert 3.3_postgresql_psycopg2_cextensions 77
test.aaa_profiling.test_compiler.CompileTest.test_insert 3.3_postgresql_psycopg2_nocextensions 77
test.aaa_profiling.test_compiler.CompileTest.test_insert 3.3_sqlite_pysqlite_cextensions 77
@@ -33,9 +33,9 @@ test.aaa_profiling.test_compiler.CompileTest.test_insert 3.4_sqlite_pysqlite_noc
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_mysql_mysqldb_cextensions 152
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_mysql_mysqldb_nocextensions 152
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_cextensions 152
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_nocextensions 152
+test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_nocextensions 154
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_cextensions 152
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_nocextensions 152
+test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_nocextensions 154
test.aaa_profiling.test_compiler.CompileTest.test_select 3.3_postgresql_psycopg2_cextensions 165
test.aaa_profiling.test_compiler.CompileTest.test_select 3.3_postgresql_psycopg2_nocextensions 165
test.aaa_profiling.test_compiler.CompileTest.test_select 3.3_sqlite_pysqlite_cextensions 165
@@ -50,9 +50,9 @@ test.aaa_profiling.test_compiler.CompileTest.test_select 3.4_sqlite_pysqlite_noc
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_mysql_mysqldb_cextensions 186
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_mysql_mysqldb_nocextensions 186
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_postgresql_psycopg2_cextensions 186
-test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_postgresql_psycopg2_nocextensions 186
+test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_postgresql_psycopg2_nocextensions 189
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_sqlite_pysqlite_cextensions 186
-test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_sqlite_pysqlite_nocextensions 186
+test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_sqlite_pysqlite_nocextensions 189
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 3.3_postgresql_psycopg2_cextensions 199
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 3.3_postgresql_psycopg2_nocextensions 199
test.aaa_profiling.test_compiler.CompileTest.test_select_labels 3.3_sqlite_pysqlite_cextensions 199
@@ -67,7 +67,7 @@ test.aaa_profiling.test_compiler.CompileTest.test_select_labels 3.4_sqlite_pysql
test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_cextensions 79
test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_nocextensions 79
test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_cextensions 77
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_nocextensions 77
+test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_nocextensions 79
test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_cextensions 77
test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_nocextensions 77
test.aaa_profiling.test_compiler.CompileTest.test_update 3.3_postgresql_psycopg2_cextensions 78
@@ -103,7 +103,8 @@ test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_mysql_m
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_postgresql_psycopg2_cextensions 4265
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_postgresql_psycopg2_nocextensions 4265
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_sqlite_pysqlite_cextensions 4265
-test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_sqlite_pysqlite_nocextensions 4260
+test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_sqlite_pysqlite_nocextensions 4262
+test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 3.3_postgresql_psycopg2_cextensions 4263
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 3.3_postgresql_psycopg2_nocextensions 4266
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 3.3_sqlite_pysqlite_cextensions 4266
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 3.3_sqlite_pysqlite_nocextensions 4266
@@ -118,6 +119,7 @@ test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 2.7_postgresql_psycopg2_nocextensions 6426
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 2.7_sqlite_pysqlite_cextensions 6426
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 2.7_sqlite_pysqlite_nocextensions 6426
+test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 3.3_postgresql_psycopg2_cextensions 6428
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 3.3_postgresql_psycopg2_nocextensions 6428
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 3.3_sqlite_pysqlite_cextensions 6428
test.aaa_profiling.test_orm.AttributeOverheadTest.test_collection_append_remove 3.3_sqlite_pysqlite_nocextensions 6428
@@ -131,10 +133,11 @@ test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_mysql_mysqldb_noc
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_postgresql_psycopg2_cextensions 31132
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_postgresql_psycopg2_nocextensions 40149
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_sqlite_pysqlite_cextensions 19280
-test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_sqlite_pysqlite_nocextensions 28297
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 2.7_sqlite_pysqlite_nocextensions 28347
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_postgresql_psycopg2_cextensions 20163
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_postgresql_psycopg2_nocextensions 29138
-test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_sqlite_pysqlite_cextensions 32398
-test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_sqlite_pysqlite_nocextensions 37327
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_sqlite_pysqlite_cextensions 20352
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.3_sqlite_pysqlite_nocextensions 29355
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.4_postgresql_psycopg2_cextensions 20135
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline 3.4_postgresql_psycopg2_nocextensions 29138
@@ -145,9 +148,10 @@ test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_mysql_mysq
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_postgresql_psycopg2_cextensions 27049
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_postgresql_psycopg2_nocextensions 30054
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_sqlite_pysqlite_cextensions 27144
-test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_sqlite_pysqlite_nocextensions 30149
+test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 2.7_sqlite_pysqlite_nocextensions 28183
+test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.3_postgresql_psycopg2_cextensions 26097
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.3_postgresql_psycopg2_nocextensions 29068
-test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.3_sqlite_pysqlite_cextensions 32197
+test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.3_sqlite_pysqlite_cextensions 26208
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.3_sqlite_pysqlite_nocextensions 31179
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.4_postgresql_psycopg2_cextensions 26065
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols 3.4_postgresql_psycopg2_nocextensions 29068
@@ -160,6 +164,7 @@ test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 2.7_postgresql_psycopg2_nocextensions 17988
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 2.7_sqlite_pysqlite_cextensions 17988
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 2.7_sqlite_pysqlite_nocextensions 17988
+test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 3.3_postgresql_psycopg2_cextensions 18988
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 3.3_postgresql_psycopg2_nocextensions 18988
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 3.3_sqlite_pysqlite_cextensions 18988
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity 3.3_sqlite_pysqlite_nocextensions 18988
@@ -173,7 +178,8 @@ test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 2.7_postgresql_psycopg2_cextensions 119849
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 2.7_postgresql_psycopg2_nocextensions 122553
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 2.7_sqlite_pysqlite_cextensions 162315
-test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 2.7_sqlite_pysqlite_nocextensions 165111
+test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 2.7_sqlite_pysqlite_nocextensions 164551
+test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 3.3_postgresql_psycopg2_cextensions 126351
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 3.3_postgresql_psycopg2_nocextensions 125352
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 3.3_sqlite_pysqlite_cextensions 169566
test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_no_identity 3.3_sqlite_pysqlite_nocextensions 171364
@@ -187,7 +193,8 @@ test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.7_postgresql_psycopg2_cextensions 18959
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.7_postgresql_psycopg2_nocextensions 19219
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.7_sqlite_pysqlite_cextensions 22288
-test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.7_sqlite_pysqlite_nocextensions 22530
+test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 2.7_sqlite_pysqlite_nocextensions 21852
+test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 3.3_postgresql_psycopg2_cextensions 19423
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 3.3_postgresql_psycopg2_nocextensions 19492
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 3.3_sqlite_pysqlite_cextensions 23067
test.aaa_profiling.test_orm.MergeBackrefsTest.test_merge_pending_with_all_pks 3.3_sqlite_pysqlite_nocextensions 23271
@@ -201,7 +208,8 @@ test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_mysql_mysqldb_nocexten
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_postgresql_psycopg2_cextensions 1323
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_postgresql_psycopg2_nocextensions 1348
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_sqlite_pysqlite_cextensions 1601
-test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_sqlite_pysqlite_nocextensions 1626
+test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_sqlite_pysqlite_nocextensions 1603
+test.aaa_profiling.test_orm.MergeTest.test_merge_load 3.3_postgresql_psycopg2_cextensions 1354
test.aaa_profiling.test_orm.MergeTest.test_merge_load 3.3_postgresql_psycopg2_nocextensions 1355
test.aaa_profiling.test_orm.MergeTest.test_merge_load 3.3_sqlite_pysqlite_cextensions 1656
test.aaa_profiling.test_orm.MergeTest.test_merge_load 3.3_sqlite_pysqlite_nocextensions 1671
@@ -210,17 +218,18 @@ test.aaa_profiling.test_orm.MergeTest.test_merge_load 3.4_postgresql_psycopg2_no
# TEST: test.aaa_profiling.test_orm.MergeTest.test_merge_no_load
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_mysql_mysqldb_cextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_mysql_mysqldb_nocextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_postgresql_psycopg2_cextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_postgresql_psycopg2_nocextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_sqlite_pysqlite_cextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_sqlite_pysqlite_nocextensions 117,18
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_postgresql_psycopg2_nocextensions 122,19
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_cextensions 122,19
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_nocextensions 122,19
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.4_postgresql_psycopg2_cextensions 122,19
-test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.4_postgresql_psycopg2_nocextensions 122,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_mysql_mysqldb_cextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_mysql_mysqldb_nocextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_postgresql_psycopg2_cextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_postgresql_psycopg2_nocextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_sqlite_pysqlite_cextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 2.7_sqlite_pysqlite_nocextensions 91,18
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_postgresql_psycopg2_cextensions 94,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_postgresql_psycopg2_nocextensions 94,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_cextensions 94,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_nocextensions 94,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.4_postgresql_psycopg2_cextensions 94,19
+test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.4_postgresql_psycopg2_nocextensions 94,19
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
@@ -286,9 +295,9 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_cextensions 78
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_nocextensions 80
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_cextensions 78
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 80
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 84
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_cextensions 78
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 80
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 84
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_cextensions 78
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_nocextensions 78
test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_sqlite_pysqlite_cextensions 78
@@ -320,9 +329,9 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 3.4
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_cextensions 514
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_nocextensions 15534
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_cextensions 20501
-test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_nocextensions 35521
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_nocextensions 35528
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_sqlite_pysqlite_cextensions 457
-test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_sqlite_pysqlite_nocextensions 15477
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_sqlite_pysqlite_nocextensions 15481
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.3_postgresql_psycopg2_cextensions 489
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.3_postgresql_psycopg2_nocextensions 14489
test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.3_sqlite_pysqlite_cextensions 462
@@ -337,9 +346,9 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_string 3.4_sqlite_pysqlite_
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_cextensions 514
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_nocextensions 45534
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_cextensions 20501
-test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_nocextensions 35521
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_nocextensions 35528
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_sqlite_pysqlite_cextensions 457
-test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_sqlite_pysqlite_nocextensions 15477
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_sqlite_pysqlite_nocextensions 15481
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.3_postgresql_psycopg2_cextensions 489
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.3_postgresql_psycopg2_nocextensions 14489
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.3_sqlite_pysqlite_cextensions 462
@@ -351,17 +360,17 @@ test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 3.4_sqlite_pysqlite
# TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_cextensions 5562,277,3697,11893,1106,1968,2433
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_nocextensions 5606,277,3929,13595,1223,2011,2692
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_cextensions 5238,273,3577,11529,1077,1886,2439
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_nocextensions 5260,273,3673,12701,1171,1893,2631
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.4_postgresql_psycopg2_cextensions 5221,273,3577,11529,1077,1883,2439
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.4_postgresql_psycopg2_nocextensions 5243,273,3697,12796,1187,1923,2653
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_cextensions 5892,292,3697,11893,1106,1968,2433
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_nocextensions 5936,295,3985,13782,1255,2064,2759
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_cextensions 5497,274,3609,11647,1097,1921,2486
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_nocextensions 5519,274,3705,12819,1191,1928,2678
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.4_postgresql_psycopg2_cextensions 5497,273,3577,11529,1077,1883,2439
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_invocation 3.4_postgresql_psycopg2_nocextensions 5519,273,3697,12796,1187,1923,2653
# TEST: test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation
test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_cextensions 6389,407,6826,18499,1134,2661
-test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_nocextensions 6480,412,7058,19930,1242,2726
+test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 2.7_postgresql_psycopg2_nocextensions 6379,412,7054,19930,1258,2718
test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_cextensions 6268,394,6860,18613,1107,2679
test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 3.3_postgresql_psycopg2_nocextensions 6361,399,6964,19640,1193,2708
test.aaa_profiling.test_zoomark_orm.ZooMarkTest.test_invocation 3.4_postgresql_psycopg2_cextensions 6275,394,6860,18613,1107,2679
diff --git a/test/requirements.py b/test/requirements.py
index d1b7913f0..89fc108b9 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -127,9 +127,15 @@ class DefaultRequirements(SuiteRequirements):
)
@property
- def temporary_table(self):
- """Target database must support CREATE TEMPORARY TABLE"""
- return exclusions.open()
+ def temporary_tables(self):
+ """target database supports temporary tables"""
+ return skip_if(
+ ["mssql"], "sql server has some other syntax?"
+ )
+
+ @property
+ def temp_table_reflection(self):
+ return self.temporary_tables
@property
def reflectable_autoincrement(self):
@@ -454,6 +460,7 @@ class DefaultRequirements(SuiteRequirements):
)
+
@property
def emulated_lastrowid(self):
""""target dialect retrieves cursor.lastrowid or an equivalent
@@ -649,6 +656,10 @@ class DefaultRequirements(SuiteRequirements):
'postgresql+pg8000', None, None,
'postgresql+pg8000 has FP inaccuracy even with '
'only four decimal places '),
+ (
+ 'postgresql+psycopg2cffi', None, None,
+ 'postgresql+psycopg2cffi has FP inaccuracy even with '
+ 'only four decimal places '),
])
@property
@@ -749,6 +760,10 @@ class DefaultRequirements(SuiteRequirements):
"+psycopg2", None, None,
"psycopg2 2.4 no longer accepts percent "
"sign in bind placeholders"),
+ (
+ "+psycopg2cffi", None, None,
+ "psycopg2cffi does not accept percent signs in "
+ "bind placeholders"),
("mysql", None, None, "executemany() doesn't work here")
]
)
@@ -771,6 +786,17 @@ class DefaultRequirements(SuiteRequirements):
"Not supported on MySQL + Windows"
)
+ @property
+ def mssql_freetds(self):
+ return only_on(
+ LambdaPredicate(
+ lambda config: (
+ (against(config, 'mssql+pyodbc') and
+ config.db.dialect.freetds)
+ or against(config, 'mssql+pymssql')
+ )
+ )
+ )
@property
def selectone(self):
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 9e99a947b..428fc8986 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -2440,7 +2440,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"""SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """
"""FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""),
(s7, oracle_d,
- """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """
+ """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """
""""QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""),
]:
self.assert_compile(
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py
index c0b5806ac..2603f67a3 100644
--- a/test/sql/test_constraints.py
+++ b/test/sql/test_constraints.py
@@ -9,7 +9,7 @@ from sqlalchemy import testing
from sqlalchemy.engine import default
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
+from sqlalchemy.testing.assertsql import AllOf, RegexSQL, CompiledSQL
from sqlalchemy.sql import table, column
@@ -58,8 +58,77 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
)
)
+ @testing.force_drop_names('a', 'b')
+ def test_fk_cant_drop_cycled_unnamed(self):
+ metadata = MetaData()
+
+ Table("a", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('bid', Integer),
+ ForeignKeyConstraint(["bid"], ["b.id"])
+ )
+ Table(
+ "b", metadata,
+ Column('id', Integer, primary_key=True),
+ Column("aid", Integer),
+ ForeignKeyConstraint(["aid"], ["a.id"]))
+ metadata.create_all(testing.db)
+ if testing.db.dialect.supports_alter:
+ assert_raises_message(
+ exc.CircularDependencyError,
+ "Can't sort tables for DROP; an unresolvable foreign key "
+ "dependency exists between tables: a, b. Please ensure "
+ "that the ForeignKey and ForeignKeyConstraint objects "
+ "involved in the cycle have names so that they can be "
+ "dropped using DROP CONSTRAINT.",
+ metadata.drop_all, testing.db
+ )
+ else:
+
+ with self.sql_execution_asserter() as asserter:
+ metadata.drop_all(testing.db, checkfirst=False)
+
+ asserter.assert_(
+ AllOf(
+ CompiledSQL("DROP TABLE a"),
+ CompiledSQL("DROP TABLE b")
+ )
+ )
+
+ @testing.provide_metadata
+ def test_fk_table_auto_alter_constraint_create(self):
+ metadata = self.metadata
+
+ Table("a", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('bid', Integer),
+ ForeignKeyConstraint(["bid"], ["b.id"])
+ )
+ Table(
+ "b", metadata,
+ Column('id', Integer, primary_key=True),
+ Column("aid", Integer),
+ ForeignKeyConstraint(["aid"], ["a.id"], name="bfk"))
+ self._assert_cyclic_constraint(metadata, auto=True)
+
+ @testing.provide_metadata
+ def test_fk_column_auto_alter_constraint_create(self):
+ metadata = self.metadata
+
+ Table("a", metadata,
+ Column('id', Integer, primary_key=True),
+ Column('bid', Integer, ForeignKey("b.id")),
+ )
+ Table("b", metadata,
+ Column('id', Integer, primary_key=True),
+ Column("aid", Integer,
+ ForeignKey("a.id", name="bfk")
+ ),
+ )
+ self._assert_cyclic_constraint(metadata, auto=True)
+
@testing.provide_metadata
- def test_cyclic_fk_table_constraint_create(self):
+ def test_fk_table_use_alter_constraint_create(self):
metadata = self.metadata
Table("a", metadata,
@@ -75,7 +144,7 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
self._assert_cyclic_constraint(metadata)
@testing.provide_metadata
- def test_cyclic_fk_column_constraint_create(self):
+ def test_fk_column_use_alter_constraint_create(self):
metadata = self.metadata
Table("a", metadata,
@@ -90,45 +159,104 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
)
self._assert_cyclic_constraint(metadata)
- def _assert_cyclic_constraint(self, metadata):
- assertions = [
- CompiledSQL('CREATE TABLE b ('
+ def _assert_cyclic_constraint(self, metadata, auto=False):
+ table_assertions = []
+ if auto:
+ if testing.db.dialect.supports_alter:
+ table_assertions.append(
+ CompiledSQL('CREATE TABLE b ('
+ 'id INTEGER NOT NULL, '
+ 'aid INTEGER, '
+ 'PRIMARY KEY (id)'
+ ')'
+ )
+ )
+ else:
+ table_assertions.append(
+ CompiledSQL(
+ 'CREATE TABLE b ('
'id INTEGER NOT NULL, '
'aid INTEGER, '
+ 'PRIMARY KEY (id), '
+ 'CONSTRAINT bfk FOREIGN KEY(aid) REFERENCES a (id)'
+ ')'
+ )
+ )
+
+ if testing.db.dialect.supports_alter:
+ table_assertions.append(
+ CompiledSQL(
+ 'CREATE TABLE a ('
+ 'id INTEGER NOT NULL, '
+ 'bid INTEGER, '
'PRIMARY KEY (id)'
')'
- ),
- CompiledSQL('CREATE TABLE a ('
+ )
+ )
+ else:
+ table_assertions.append(
+ CompiledSQL(
+ 'CREATE TABLE a ('
'id INTEGER NOT NULL, '
'bid INTEGER, '
'PRIMARY KEY (id), '
'FOREIGN KEY(bid) REFERENCES b (id)'
')'
- ),
- ]
+ )
+ )
+ else:
+ table_assertions.append(
+ CompiledSQL('CREATE TABLE b ('
+ 'id INTEGER NOT NULL, '
+ 'aid INTEGER, '
+ 'PRIMARY KEY (id)'
+ ')'
+ )
+ )
+ table_assertions.append(
+ CompiledSQL(
+ 'CREATE TABLE a ('
+ 'id INTEGER NOT NULL, '
+ 'bid INTEGER, '
+ 'PRIMARY KEY (id), '
+ 'FOREIGN KEY(bid) REFERENCES b (id)'
+ ')'
+ )
+ )
+
+ assertions = [AllOf(*table_assertions)]
if testing.db.dialect.supports_alter:
- assertions.append(
+ fk_assertions = []
+ fk_assertions.append(
CompiledSQL('ALTER TABLE b ADD CONSTRAINT bfk '
'FOREIGN KEY(aid) REFERENCES a (id)')
)
- self.assert_sql_execution(
- testing.db,
- lambda: metadata.create_all(checkfirst=False),
- *assertions
- )
+ if auto:
+ fk_assertions.append(
+ CompiledSQL('ALTER TABLE a ADD '
+ 'FOREIGN KEY(bid) REFERENCES b (id)')
+ )
+ assertions.append(AllOf(*fk_assertions))
+
+ with self.sql_execution_asserter() as asserter:
+ metadata.create_all(checkfirst=False)
+ asserter.assert_(*assertions)
- assertions = []
if testing.db.dialect.supports_alter:
- assertions.append(CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk'))
- assertions.extend([
- CompiledSQL("DROP TABLE a"),
- CompiledSQL("DROP TABLE b"),
- ])
- self.assert_sql_execution(
- testing.db,
- lambda: metadata.drop_all(checkfirst=False),
- *assertions
- )
+ assertions = [
+ CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk'),
+ CompiledSQL("DROP TABLE a"),
+ CompiledSQL("DROP TABLE b")
+ ]
+ else:
+ assertions = [AllOf(
+ CompiledSQL("DROP TABLE a"),
+ CompiledSQL("DROP TABLE b")
+ )]
+
+ with self.sql_execution_asserter() as asserter:
+ metadata.drop_all(checkfirst=False),
+ asserter.assert_(*assertions)
@testing.requires.check_constraints
@testing.provide_metadata
@@ -289,13 +417,13 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
lambda: events.create(testing.db),
RegexSQL("^CREATE TABLE events"),
AllOf(
- ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events '
+ CompiledSQL('CREATE UNIQUE INDEX ix_events_name ON events '
'(name)'),
- ExactSQL('CREATE INDEX ix_events_location ON events '
+ CompiledSQL('CREATE INDEX ix_events_location ON events '
'(location)'),
- ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events '
+ CompiledSQL('CREATE UNIQUE INDEX sport_announcer ON events '
'(sport, announcer)'),
- ExactSQL('CREATE INDEX idx_winners ON events (winner)')
+ CompiledSQL('CREATE INDEX idx_winners ON events (winner)'),
)
)
@@ -313,7 +441,7 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
lambda: t.create(testing.db),
CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, '
'data VARCHAR(50), PRIMARY KEY (id))'),
- ExactSQL('CREATE INDEX myindex ON sometable (data DESC)')
+ CompiledSQL('CREATE INDEX myindex ON sometable (data DESC)')
)
@@ -542,6 +670,33 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
"REFERENCES tbl (a) MATCH SIMPLE"
)
+ def test_create_table_omit_fks(self):
+ fkcs = [
+ ForeignKeyConstraint(['a'], ['remote.id'], name='foo'),
+ ForeignKeyConstraint(['b'], ['remote.id'], name='bar'),
+ ForeignKeyConstraint(['c'], ['remote.id'], name='bat'),
+ ]
+ m = MetaData()
+ t = Table(
+ 't', m,
+ Column('a', Integer),
+ Column('b', Integer),
+ Column('c', Integer),
+ *fkcs
+ )
+ Table('remote', m, Column('id', Integer, primary_key=True))
+
+ self.assert_compile(
+ schema.CreateTable(t, include_foreign_key_constraints=[]),
+ "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER)"
+ )
+ self.assert_compile(
+ schema.CreateTable(t, include_foreign_key_constraints=fkcs[0:2]),
+ "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER, "
+ "CONSTRAINT foo FOREIGN KEY(a) REFERENCES remote (id), "
+ "CONSTRAINT bar FOREIGN KEY(b) REFERENCES remote (id))"
+ )
+
def test_deferrable_unique(self):
factory = lambda **kw: UniqueConstraint('b', **kw)
self._test_deferrable(factory)
diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py
index b907fe649..c7906dcb7 100644
--- a/test/sql/test_cte.py
+++ b/test/sql/test_cte.py
@@ -462,3 +462,33 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL):
'FROM "order" JOIN regional_sales AS anon_1 '
'ON anon_1."order" = "order"."order"'
)
+
+ def test_suffixes(self):
+ orders = table('order', column('order'))
+ s = select([orders.c.order]).cte("regional_sales")
+ s = s.suffix_with("pg suffix", dialect='postgresql')
+ s = s.suffix_with('oracle suffix', dialect='oracle')
+ stmt = select([orders]).where(orders.c.order > s.c.order)
+
+ self.assert_compile(
+ stmt,
+ 'WITH regional_sales AS (SELECT "order"."order" AS "order" '
+ 'FROM "order") SELECT "order"."order" FROM "order", '
+ 'regional_sales WHERE "order"."order" > regional_sales."order"'
+ )
+
+ self.assert_compile(
+ stmt,
+ 'WITH regional_sales AS (SELECT "order"."order" AS "order" '
+ 'FROM "order") oracle suffix SELECT "order"."order" FROM "order", '
+ 'regional_sales WHERE "order"."order" > regional_sales."order"',
+ dialect='oracle'
+ )
+
+ self.assert_compile(
+ stmt,
+ 'WITH regional_sales AS (SELECT "order"."order" AS "order" '
+ 'FROM "order") pg suffix SELECT "order"."order" FROM "order", '
+ 'regional_sales WHERE "order"."order" > regional_sales."order"',
+ dialect='postgresql'
+ ) \ No newline at end of file
diff --git a/test/sql/test_ddlemit.py b/test/sql/test_ddlemit.py
index 825f8228b..e191beed3 100644
--- a/test/sql/test_ddlemit.py
+++ b/test/sql/test_ddlemit.py
@@ -1,6 +1,6 @@
from sqlalchemy.testing import fixtures
from sqlalchemy.sql.ddl import SchemaGenerator, SchemaDropper
-from sqlalchemy import MetaData, Table, Column, Integer, Sequence
+from sqlalchemy import MetaData, Table, Column, Integer, Sequence, ForeignKey
from sqlalchemy import schema
from sqlalchemy.testing.mock import Mock
@@ -42,6 +42,31 @@ class EmitDDLTest(fixtures.TestBase):
for i in range(1, 6)
)
+ def _use_alter_fixture_one(self):
+ m = MetaData()
+
+ t1 = Table(
+ 't1', m, Column('id', Integer, primary_key=True),
+ Column('t2id', Integer, ForeignKey('t2.id'))
+ )
+ t2 = Table(
+ 't2', m, Column('id', Integer, primary_key=True),
+ Column('t1id', Integer, ForeignKey('t1.id'))
+ )
+ return m, t1, t2
+
+ def _fk_fixture_one(self):
+ m = MetaData()
+
+ t1 = Table(
+ 't1', m, Column('id', Integer, primary_key=True),
+ Column('t2id', Integer, ForeignKey('t2.id'))
+ )
+ t2 = Table(
+ 't2', m, Column('id', Integer, primary_key=True),
+ )
+ return m, t1, t2
+
def _table_seq_fixture(self):
m = MetaData()
@@ -172,6 +197,32 @@ class EmitDDLTest(fixtures.TestBase):
self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m)
+ def test_create_metadata_auto_alter_fk(self):
+ m, t1, t2 = self._use_alter_fixture_one()
+ generator = self._mock_create_fixture(
+ False, [t1, t2]
+ )
+ self._assert_create_w_alter(
+ [t1, t2] +
+ list(t1.foreign_key_constraints) +
+ list(t2.foreign_key_constraints),
+ generator,
+ m
+ )
+
+ def test_create_metadata_inline_fk(self):
+ m, t1, t2 = self._fk_fixture_one()
+ generator = self._mock_create_fixture(
+ False, [t1, t2]
+ )
+ self._assert_create_w_alter(
+ [t1, t2] +
+ list(t1.foreign_key_constraints) +
+ list(t2.foreign_key_constraints),
+ generator,
+ m
+ )
+
def _assert_create_tables(self, elements, generator, argument):
self._assert_ddl(schema.CreateTable, elements, generator, argument)
@@ -188,6 +239,16 @@ class EmitDDLTest(fixtures.TestBase):
(schema.DropTable, schema.DropSequence),
elements, generator, argument)
+ def _assert_create_w_alter(self, elements, generator, argument):
+ self._assert_ddl(
+ (schema.CreateTable, schema.CreateSequence, schema.AddConstraint),
+ elements, generator, argument)
+
+ def _assert_drop_w_alter(self, elements, generator, argument):
+ self._assert_ddl(
+ (schema.DropTable, schema.DropSequence, schema.DropConstraint),
+ elements, generator, argument)
+
def _assert_ddl(self, ddl_cls, elements, generator, argument):
generator.traverse_single(argument)
for call_ in generator.connection.execute.mock_calls:
@@ -196,4 +257,8 @@ class EmitDDLTest(fixtures.TestBase):
assert c.element in elements, "element %r was not expected"\
% c.element
elements.remove(c.element)
+ if getattr(c, 'include_foreign_key_constraints', None) is not None:
+ elements[:] = [
+ e for e in elements
+ if e not in set(c.include_foreign_key_constraints)]
assert not elements, "elements remain in list: %r" % elements
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 10e557b76..48505dd8c 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -336,13 +336,7 @@ class DefaultTest(fixtures.TestBase):
[(54, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, None, 'hi')])
- @testing.fails_on('firebird', 'Data type unknown')
def test_insertmany(self):
- # MySQL-Python 1.2.2 breaks functions in execute_many :(
- if (testing.against('mysql+mysqldb') and
- testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
- return
-
t.insert().execute({}, {}, {})
ctexec = currenttime.scalar()
@@ -356,6 +350,22 @@ class DefaultTest(fixtures.TestBase):
(53, 'imthedefault', f, ts, ts, ctexec, True, False,
12, today, 'py', 'hi')])
+ @testing.requires.multivalues_inserts
+ def test_insert_multivalues(self):
+
+ t.insert().values([{}, {}, {}]).execute()
+
+ ctexec = currenttime.scalar()
+ l = t.select().execute()
+ today = datetime.date.today()
+ eq_(l.fetchall(),
+ [(51, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py', 'hi'),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py', 'hi'),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py', 'hi')])
+
def test_no_embed_in_sql(self):
"""Using a DefaultGenerator, Sequence, DefaultClause
in the columns, where clause of a select, or in the values
@@ -368,7 +378,8 @@ class DefaultTest(fixtures.TestBase):
):
assert_raises_message(
sa.exc.ArgumentError,
- "SQL expression object or string expected.",
+ "SQL expression object or string expected, got object of type "
+ "<.* 'list'> instead",
t.select, [const]
)
assert_raises_message(
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index bd4eaa3e2..8a41d4be7 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -1,12 +1,12 @@
#! coding:utf-8
from sqlalchemy import Column, Integer, MetaData, String, Table,\
- bindparam, exc, func, insert, select, column
+ bindparam, exc, func, insert, select, column, text
from sqlalchemy.dialects import mysql, postgresql
from sqlalchemy.engine import default
from sqlalchemy.testing import AssertsCompiledSQL,\
assert_raises_message, fixtures
-
+from sqlalchemy.sql import crud
class _InsertTestBase(object):
@@ -19,6 +19,12 @@ class _InsertTestBase(object):
Table('myothertable', metadata,
Column('otherid', Integer, primary_key=True),
Column('othername', String(30)))
+ Table('table_w_defaults', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer, default=10),
+ Column('y', Integer, server_default=text('5')),
+ Column('z', Integer, default=lambda: 10)
+ )
class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
@@ -565,6 +571,36 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkpositional=checkpositional,
dialect=dialect)
+ def test_positional_w_defaults(self):
+ table1 = self.tables.table_w_defaults
+
+ values = [
+ {'id': 1},
+ {'id': 2},
+ {'id': 3}
+ ]
+
+ checkpositional = (1, None, None, 2, None, None, 3, None, None)
+
+ dialect = default.DefaultDialect()
+ dialect.supports_multivalues_insert = True
+ dialect.paramstyle = 'format'
+ dialect.positional = True
+
+ self.assert_compile(
+ table1.insert().values(values),
+ "INSERT INTO table_w_defaults (id, x, z) VALUES "
+ "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)",
+ checkpositional=checkpositional,
+ check_prefetch=[
+ table1.c.x, table1.c.z,
+ crud._multiparam_column(table1.c.x, 0),
+ crud._multiparam_column(table1.c.z, 0),
+ crud._multiparam_column(table1.c.x, 1),
+ crud._multiparam_column(table1.c.z, 1)
+ ],
+ dialect=dialect)
+
def test_inline_default(self):
metadata = MetaData()
table = Table('sometable', metadata,
@@ -597,6 +633,74 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams=checkparams,
dialect=postgresql.dialect())
+ def test_python_scalar_default(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String),
+ Column('foo', Integer, default=10))
+
+ values = [
+ {'id': 1, 'data': 'data1'},
+ {'id': 2, 'data': 'data2', 'foo': 15},
+ {'id': 3, 'data': 'data3'},
+ ]
+
+ checkparams = {
+ 'id_0': 1,
+ 'id_1': 2,
+ 'id_2': 3,
+ 'data_0': 'data1',
+ 'data_1': 'data2',
+ 'data_2': 'data3',
+ 'foo': None, # evaluated later
+ 'foo_1': 15,
+ 'foo_2': None # evaluated later
+ }
+
+ self.assert_compile(
+ table.insert().values(values),
+ 'INSERT INTO sometable (id, data, foo) VALUES '
+ '(%(id_0)s, %(data_0)s, %(foo)s), '
+ '(%(id_1)s, %(data_1)s, %(foo_1)s), '
+ '(%(id_2)s, %(data_2)s, %(foo_2)s)',
+ checkparams=checkparams,
+ dialect=postgresql.dialect())
+
+ def test_python_fn_default(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String),
+ Column('foo', Integer, default=lambda: 10))
+
+ values = [
+ {'id': 1, 'data': 'data1'},
+ {'id': 2, 'data': 'data2', 'foo': 15},
+ {'id': 3, 'data': 'data3'},
+ ]
+
+ checkparams = {
+ 'id_0': 1,
+ 'id_1': 2,
+ 'id_2': 3,
+ 'data_0': 'data1',
+ 'data_1': 'data2',
+ 'data_2': 'data3',
+ 'foo': None, # evaluated later
+ 'foo_1': 15,
+ 'foo_2': None, # evaluated later
+ }
+
+ self.assert_compile(
+ table.insert().values(values),
+ "INSERT INTO sometable (id, data, foo) VALUES "
+ "(%(id_0)s, %(data_0)s, %(foo)s), "
+ "(%(id_1)s, %(data_1)s, %(foo_1)s), "
+ "(%(id_2)s, %(data_2)s, %(foo_2)s)",
+ checkparams=checkparams,
+ dialect=postgresql.dialect())
+
def test_sql_functions(self):
metadata = MetaData()
table = Table('sometable', metadata,
@@ -684,24 +788,10 @@ class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
{'id': 3, 'data': 'data3', 'foo': 'otherfoo'},
]
- checkparams = {
- 'id_0': 1,
- 'id_1': 2,
- 'id_2': 3,
- 'data_0': 'data1',
- 'data_1': 'data2',
- 'data_2': 'data3',
- 'foo_0': 'plainfoo',
- 'foo_2': 'otherfoo',
- }
-
- # note the effect here is that the first set of params
- # takes effect for the rest of them, when one is absent
- self.assert_compile(
- table.insert().values(values),
- 'INSERT INTO sometable (id, data, foo) VALUES '
- '(%(id_0)s, %(data_0)s, %(foo_0)s), '
- '(%(id_1)s, %(data_1)s, %(foo_0)s), '
- '(%(id_2)s, %(data_2)s, %(foo_2)s)',
- checkparams=checkparams,
- dialect=postgresql.dialect())
+ assert_raises_message(
+ exc.CompileError,
+ "INSERT value for column sometable.foo is explicitly rendered "
+ "as a boundparameter in the VALUES clause; a Python-side value or "
+ "SQL expression is required",
+ table.insert().values(values).compile
+ )
diff --git a/test/sql/test_join_rewriting.py b/test/sql/test_join_rewriting.py
index ced65d7f1..f99dfda4e 100644
--- a/test/sql/test_join_rewriting.py
+++ b/test/sql/test_join_rewriting.py
@@ -650,6 +650,7 @@ class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
def _test(self, selectable, assert_):
result = testing.db.execute(selectable)
+ result.close()
for col in selectable.inner_columns:
assert col in result._metadata._keymap
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index 0aa5d7305..cc7d0eb4f 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -1160,9 +1160,10 @@ class InfoTest(fixtures.TestBase):
t = Table('x', MetaData(), info={'foo': 'bar'})
eq_(t.info, {'foo': 'bar'})
+
class TableTest(fixtures.TestBase, AssertsCompiledSQL):
- @testing.requires.temporary_table
+ @testing.requires.temporary_tables
@testing.skip_if('mssql', 'different col format')
def test_prefixes(self):
from sqlalchemy import Table
@@ -1195,6 +1196,30 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
t.info['bar'] = 'zip'
assert t.info['bar'] == 'zip'
+ def test_foreign_key_constraints_collection(self):
+ metadata = MetaData()
+ t1 = Table('foo', metadata, Column('a', Integer))
+ eq_(t1.foreign_key_constraints, set())
+
+ fk1 = ForeignKey('q.id')
+ fk2 = ForeignKey('j.id')
+ fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y'])
+
+ t1.append_column(Column('b', Integer, fk1))
+ eq_(
+ t1.foreign_key_constraints,
+ set([fk1.constraint]))
+
+ t1.append_column(Column('c', Integer, fk2))
+ eq_(
+ t1.foreign_key_constraints,
+ set([fk1.constraint, fk2.constraint]))
+
+ t1.append_constraint(fk3)
+ eq_(
+ t1.foreign_key_constraints,
+ set([fk1.constraint, fk2.constraint, fk3]))
+
def test_c_immutable(self):
m = MetaData()
t1 = Table('t', m, Column('x', Integer), Column('y', Integer))
@@ -1946,6 +1971,22 @@ class ConstraintTest(fixtures.TestBase):
assert s1.c.a.references(t1.c.a)
assert not s1.c.a.references(t1.c.b)
+ def test_referred_table_accessor(self):
+ t1, t2, t3 = self._single_fixture()
+ fkc = list(t2.foreign_key_constraints)[0]
+ is_(fkc.referred_table, t1)
+
+ def test_referred_table_accessor_not_available(self):
+ t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id')))
+ fkc = list(t1.foreign_key_constraints)[0]
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Foreign key associated with column 't.x' could not find "
+ "table 'q' with which to generate a foreign key to target "
+ "column 'id'",
+ getattr, fkc, "referred_table"
+ )
+
def test_related_column_not_present_atfirst_ok(self):
m = MetaData()
base_table = Table("base", m,
diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py
index e8ad88511..0985020d1 100644
--- a/test/sql/test_operators.py
+++ b/test/sql/test_operators.py
@@ -12,7 +12,8 @@ from sqlalchemy import exc
from sqlalchemy.engine import default
from sqlalchemy.sql.elements import _literal_as_text
from sqlalchemy.schema import Column, Table, MetaData
-from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, Boolean
+from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, \
+ Boolean, NullType, MatchType
from sqlalchemy.dialects import mysql, firebird, postgresql, oracle, \
sqlite, mssql
from sqlalchemy import util
@@ -360,7 +361,7 @@ class CustomComparatorTest(_CustomComparatorTests, fixtures.TestBase):
class comparator_factory(TypeEngine.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(MyInteger.comparator_factory, self).__init__(expr)
def __add__(self, other):
return self.expr.op("goofy")(other)
@@ -381,7 +382,7 @@ class TypeDecoratorComparatorTest(_CustomComparatorTests, fixtures.TestBase):
class comparator_factory(TypeDecorator.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(MyInteger.comparator_factory, self).__init__(expr)
def __add__(self, other):
return self.expr.op("goofy")(other)
@@ -392,6 +393,31 @@ class TypeDecoratorComparatorTest(_CustomComparatorTests, fixtures.TestBase):
return MyInteger
+class TypeDecoratorTypeDecoratorComparatorTest(
+ _CustomComparatorTests, fixtures.TestBase):
+
+ def _add_override_factory(self):
+
+ class MyIntegerOne(TypeDecorator):
+ impl = Integer
+
+ class comparator_factory(TypeDecorator.Comparator):
+
+ def __init__(self, expr):
+ super(MyIntegerOne.comparator_factory, self).__init__(expr)
+
+ def __add__(self, other):
+ return self.expr.op("goofy")(other)
+
+ def __and__(self, other):
+ return self.expr.op("goofy_and")(other)
+
+ class MyIntegerTwo(TypeDecorator):
+ impl = MyIntegerOne
+
+ return MyIntegerTwo
+
+
class TypeDecoratorWVariantComparatorTest(
_CustomComparatorTests,
fixtures.TestBase):
@@ -403,7 +429,9 @@ class TypeDecoratorWVariantComparatorTest(
class comparator_factory(TypeEngine.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(
+ SomeOtherInteger.comparator_factory,
+ self).__init__(expr)
def __add__(self, other):
return self.expr.op("not goofy")(other)
@@ -417,7 +445,7 @@ class TypeDecoratorWVariantComparatorTest(
class comparator_factory(TypeDecorator.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(MyInteger.comparator_factory, self).__init__(expr)
def __add__(self, other):
return self.expr.op("goofy")(other)
@@ -438,7 +466,7 @@ class CustomEmbeddedinTypeDecoratorTest(
class comparator_factory(TypeEngine.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(MyInteger.comparator_factory, self).__init__(expr)
def __add__(self, other):
return self.expr.op("goofy")(other)
@@ -460,7 +488,7 @@ class NewOperatorTest(_CustomComparatorTests, fixtures.TestBase):
class comparator_factory(TypeEngine.Comparator):
def __init__(self, expr):
- self.expr = expr
+ super(MyInteger.comparator_factory, self).__init__(expr)
def foob(self, other):
return self.expr.op("foob")(other)
@@ -1619,6 +1647,31 @@ class MatchTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"CONTAINS (mytable.myid, :myid_1)",
dialect=oracle.dialect())
+ def test_match_is_now_matchtype(self):
+ expr = self.table1.c.myid.match('somstr')
+ assert expr.type._type_affinity is MatchType()._type_affinity
+ assert isinstance(expr.type, MatchType)
+
+ def test_boolean_inversion_postgresql(self):
+ self.assert_compile(
+ ~self.table1.c.myid.match('somstr'),
+ "NOT mytable.myid @@ to_tsquery(%(myid_1)s)",
+ dialect=postgresql.dialect())
+
+ def test_boolean_inversion_mysql(self):
+ # because mysql doesnt have native boolean
+ self.assert_compile(
+ ~self.table1.c.myid.match('somstr'),
+ "NOT MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
+ dialect=mysql.dialect())
+
+ def test_boolean_inversion_mssql(self):
+ # because mssql doesnt have native boolean
+ self.assert_compile(
+ ~self.table1.c.myid.match('somstr'),
+ "NOT CONTAINS (mytable.myid, :myid_1)",
+ dialect=mssql.dialect())
+
class ComposedLikeOperatorsTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 26dc6c842..5e1542853 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -10,6 +10,8 @@ from sqlalchemy import (
type_coerce, VARCHAR, Time, DateTime, BigInteger, SmallInteger, BOOLEAN,
BLOB, NCHAR, NVARCHAR, CLOB, TIME, DATE, DATETIME, TIMESTAMP, SMALLINT,
INTEGER, DECIMAL, NUMERIC, FLOAT, REAL)
+from sqlalchemy.sql import ddl
+
from sqlalchemy import exc, types, util, dialects
for name in dialects.__all__:
__import__("sqlalchemy.dialects.%s" % name)
@@ -309,6 +311,24 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
literal_binds=True
)
+ def test_kw_colspec(self):
+ class MyType(types.UserDefinedType):
+ def get_col_spec(self, **kw):
+ return "FOOB %s" % kw['type_expression'].name
+
+ class MyOtherType(types.UserDefinedType):
+ def get_col_spec(self):
+ return "BAR"
+
+ self.assert_compile(
+ ddl.CreateColumn(Column('bar', MyType)),
+ "bar FOOB bar"
+ )
+ self.assert_compile(
+ ddl.CreateColumn(Column('bar', MyOtherType)),
+ "bar BAR"
+ )
+
def test_typedecorator_literal_render_fallback_bound(self):
# fall back to process_bind_param for literal
# value rendering.
@@ -932,6 +952,7 @@ class UnicodeTest(fixtures.TestBase):
expected = (testing.db.name, testing.db.driver) in \
(
('postgresql', 'psycopg2'),
+ ('postgresql', 'psycopg2cffi'),
('postgresql', 'pypostgresql'),
('postgresql', 'pg8000'),
('postgresql', 'zxjdbc'),
@@ -1157,8 +1178,11 @@ class EnumTest(AssertsCompiledSQL, fixtures.TestBase):
def test_repr(self):
e = Enum(
"x", "y", name="somename", convert_unicode=True, quote=True,
- inherit_schema=True)
- eq_(repr(e), "Enum('x', 'y', name='somename', inherit_schema=True)")
+ inherit_schema=True, native_enum=False)
+ eq_(
+ repr(e),
+ "Enum('x', 'y', name='somename', "
+ "inherit_schema=True, native_enum=False)")
binary_table = MyPickleType = metadata = None
@@ -1639,6 +1663,49 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
def test_decimal_scale(self):
self.assert_compile(types.DECIMAL(2, 4), 'DECIMAL(2, 4)')
+ def test_kwarg_legacy_typecompiler(self):
+ from sqlalchemy.sql import compiler
+
+ class SomeTypeCompiler(compiler.GenericTypeCompiler):
+ # transparently decorated w/ kw decorator
+ def visit_VARCHAR(self, type_):
+ return "MYVARCHAR"
+
+ # not affected
+ def visit_INTEGER(self, type_, **kw):
+ return "MYINTEGER %s" % kw['type_expression'].name
+
+ dialect = default.DefaultDialect()
+ dialect.type_compiler = SomeTypeCompiler(dialect)
+ self.assert_compile(
+ ddl.CreateColumn(Column('bar', VARCHAR(50))),
+ "bar MYVARCHAR",
+ dialect=dialect
+ )
+ self.assert_compile(
+ ddl.CreateColumn(Column('bar', INTEGER)),
+ "bar MYINTEGER bar",
+ dialect=dialect
+ )
+
+
+class TestKWArgPassThru(AssertsCompiledSQL, fixtures.TestBase):
+ __backend__ = True
+
+ def test_user_defined(self):
+ """test that dialects pass the column through on DDL."""
+
+ class MyType(types.UserDefinedType):
+ def get_col_spec(self, **kw):
+ return "FOOB %s" % kw['type_expression'].name
+
+ m = MetaData()
+ t = Table('t', m, Column('bar', MyType))
+ self.assert_compile(
+ ddl.CreateColumn(t.c.bar),
+ "bar FOOB bar"
+ )
+
class NumericRawSQLTest(fixtures.TestBase):