diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2007-11-10 03:02:16 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2007-11-10 03:02:16 +0000 |
| commit | ea46e556f9f691735bc14885648a92e8cf7177d5 (patch) | |
| tree | 9ddbdbe33b110b0ae6cdb0d289f48ae6801b402e /test/sql | |
| parent | 681c8fc51c92c5998642fcef0ec9e9b079ccf1f5 (diff) | |
| download | sqlalchemy-ea46e556f9f691735bc14885648a92e8cf7177d5.tar.gz | |
- anonymous column expressions are automatically labeled.
e.g. select([x* 5]) produces "SELECT x * 5 AS anon_1".
This allows the labelname to be present in the cursor.description
which can then be appropriately matched to result-column processing
rules. (we can't reliably use positional tracking for result-column
matches since text() expressions may represent multiple columns).
- operator overloading is now controlled by TypeEngine objects - the
one built-in operator overload so far is String types overloading
'+' to be the string concatenation operator.
User-defined types can also define their own operator overloading
by overriding the adapt_operator(self, op) method.
- untyped bind parameters on the right side of a binary expression
will be assigned the type of the left side of the operation, to better
enable the appropriate bind parameter processing to take effect
[ticket:819]
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/generative.py | 1 | ||||
| -rw-r--r-- | test/sql/select.py | 24 | ||||
| -rw-r--r-- | test/sql/testtypes.py | 70 |
3 files changed, 83 insertions, 12 deletions
diff --git a/test/sql/generative.py b/test/sql/generative.py index 1497ecde3..040d4766b 100644 --- a/test/sql/generative.py +++ b/test/sql/generative.py @@ -281,6 +281,7 @@ class ClauseTest(SQLCompileTest): self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2") self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2") + def test_joins(self): """test that ClauseAdapter can target a Join object, replace it, and not dig into the sub-joins after diff --git a/test/sql/select.py b/test/sql/select.py index 699d05faa..f9aa21f1e 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -230,21 +230,21 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def test_scalar_select(self): s = select([table1.c.myid], scalar=True, correlate=False) - self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") s = select([table1.c.myid], scalar=True) - self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") s = select([table1.c.myid]).correlate(None).as_scalar() - self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable") + self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable") s = select([table1.c.myid]).as_scalar() - self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable") + self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable") # test expressions against scalar selects - self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal") - self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal") - self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal") + self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal AS anon_1") + self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal AS anon_1") + self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal AS anon_1") self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo") @@ -294,7 +294,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True) j1 = table1.join(table2, table1.c.myid==table2.c.otherid) s2 = select([table1, s1], from_obj=[j1]) - self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid") def testlabelcomparison(self): x = func.lala(table1.c.myid).label('foo') @@ -640,7 +640,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today def testliteral(self): self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), - "SELECT :literal || :literal_1 FROM mytable") + "SELECT :literal || :literal_1 AS anon_1 FROM mytable") def testcalculatedcolumns(self): value_tbl = table('values', @@ -652,7 +652,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today self.assert_compile( select([value_tbl.c.id, (value_tbl.c.val2 - value_tbl.c.val1)/value_tbl.c.val1]), - "SELECT values.id, (values.val2 - values.val1) / values.val1 FROM values" + "SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values" ) self.assert_compile( @@ -1110,9 +1110,9 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE # coverage on other dialects. sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) if isinstance(dialect, type(mysql.dialect())): - self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) \nFROM casttest") + self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) AS anon_1 \nFROM casttest") else: - self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest") + self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest") # first test with Postgres engine check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s') diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index 4af96d57f..630ecb9d5 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -3,6 +3,7 @@ import pickleable import datetime, os from sqlalchemy import * from sqlalchemy import types +from sqlalchemy.sql import operators import sqlalchemy.engine.url as url from sqlalchemy.databases import mssql, oracle, mysql, postgres, firebird from testlib import * @@ -367,7 +368,76 @@ class BinaryTest(AssertMixin): # put a number less than the typical MySQL default BLOB size return file(f).read(len) +class ExpressionTest(AssertMixin): + def setUpAll(self): + global test_table, meta + + class MyCustomType(types.TypeEngine): + def get_col_spec(self): + return "INT" + def bind_processor(self, dialect): + def process(value): + return value * 10 + return process + def result_processor(self, dialect): + def process(value): + return value / 10 + return process + def adapt_operator(self, op): + return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op) + + meta = MetaData(testbase.db) + test_table = Table('test', meta, + Column('id', Integer, primary_key=True), + Column('data', String(30)), + Column('timestamp', Date), + Column('value', MyCustomType)) + + meta.create_all() + + test_table.insert().execute({'id':1, 'data':'somedata', 'timestamp':datetime.date(2007, 10, 15), 'value':25}) + + def tearDownAll(self): + meta.drop_all() + + def test_control(self): + assert testbase.db.execute("select value from test").scalar() == 250 + + assert test_table.select().execute().fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] + + def test_bind_adapt(self): + expr = test_table.c.timestamp == bindparam("thedate") + assert expr.right.type.__class__ == test_table.c.timestamp.type.__class__ + + assert testbase.db.execute(test_table.select().where(expr), {"thedate":datetime.date(2007, 10, 15)}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] + + expr = test_table.c.value == bindparam("somevalue") + assert expr.right.type.__class__ == test_table.c.value.type.__class__ + assert testbase.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)] + + + def test_operator_adapt(self): + """test type-based overloading of operators""" + + # test string concatenation + expr = test_table.c.data + "somedata" + assert testbase.db.execute(select([expr])).scalar() == "somedatasomedata" + expr = test_table.c.id + 15 + assert testbase.db.execute(select([expr])).scalar() == 16 + + # test custom operator conversion + expr = test_table.c.value + 40 + assert expr.type.__class__ is test_table.c.value.type.__class__ + + # + operator converted to - + # value is calculated as: (250 - (40 * 10)) / 10 == -15 + assert testbase.db.execute(select([expr.label('foo')])).scalar() == -15 + + # this one relies upon anonymous labeling to assemble result + # processing rules on the column. + assert testbase.db.execute(select([expr])).scalar() == -15 + class DateTest(AssertMixin): def setUpAll(self): global users_with_date, insert_data |
