summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-11-10 03:02:16 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-11-10 03:02:16 +0000
commitea46e556f9f691735bc14885648a92e8cf7177d5 (patch)
tree9ddbdbe33b110b0ae6cdb0d289f48ae6801b402e /test/sql
parent681c8fc51c92c5998642fcef0ec9e9b079ccf1f5 (diff)
downloadsqlalchemy-ea46e556f9f691735bc14885648a92e8cf7177d5.tar.gz
- anonymous column expressions are automatically labeled.
e.g. select([x* 5]) produces "SELECT x * 5 AS anon_1". This allows the labelname to be present in the cursor.description which can then be appropriately matched to result-column processing rules. (we can't reliably use positional tracking for result-column matches since text() expressions may represent multiple columns). - operator overloading is now controlled by TypeEngine objects - the one built-in operator overload so far is String types overloading '+' to be the string concatenation operator. User-defined types can also define their own operator overloading by overriding the adapt_operator(self, op) method. - untyped bind parameters on the right side of a binary expression will be assigned the type of the left side of the operation, to better enable the appropriate bind parameter processing to take effect [ticket:819]
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/generative.py1
-rw-r--r--test/sql/select.py24
-rw-r--r--test/sql/testtypes.py70
3 files changed, 83 insertions, 12 deletions
diff --git a/test/sql/generative.py b/test/sql/generative.py
index 1497ecde3..040d4766b 100644
--- a/test/sql/generative.py
+++ b/test/sql/generative.py
@@ -281,6 +281,7 @@ class ClauseTest(SQLCompileTest):
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]), clone=True), "SELECT * FROM table1 AS t1alias, table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t1), clone=True), "SELECT * FROM table2 AS t2alias WHERE t1alias.col1 = t2alias.col2")
self.assert_compile(vis.traverse(select(['*'], t1.c.col1==t2.c.col2, from_obj=[t1, t2]).correlate(t2), clone=True), "SELECT * FROM table1 AS t1alias WHERE t1alias.col1 = t2alias.col2")
+
def test_joins(self):
"""test that ClauseAdapter can target a Join object, replace it, and not dig into the sub-joins after
diff --git a/test/sql/select.py b/test/sql/select.py
index 699d05faa..f9aa21f1e 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -230,21 +230,21 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
def test_scalar_select(self):
s = select([table1.c.myid], scalar=True, correlate=False)
- self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable")
+ self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
s = select([table1.c.myid], scalar=True)
- self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable")
+ self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
s = select([table1.c.myid]).correlate(None).as_scalar()
- self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) FROM mytable")
+ self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
s = select([table1.c.myid]).as_scalar()
- self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) FROM myothertable")
+ self.assert_compile(select([table2, s]), "SELECT myothertable.otherid, myothertable.othername, (SELECT mytable.myid FROM mytable) AS anon_1 FROM myothertable")
# test expressions against scalar selects
- self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal")
- self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal")
- self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal")
+ self.assert_compile(select([s - literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) - :literal AS anon_1")
+ self.assert_compile(select([select([table1.c.name]).as_scalar() + literal('x')]), "SELECT (SELECT mytable.name FROM mytable) || :literal AS anon_1")
+ self.assert_compile(select([s > literal(8)]), "SELECT (SELECT mytable.myid FROM mytable) > :literal AS anon_1")
self.assert_compile(select([select([table1.c.name]).label('foo')]), "SELECT (SELECT mytable.name FROM mytable) AS foo")
@@ -294,7 +294,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid, scalar=True)
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
s2 = select([table1, s1], from_obj=[j1])
- self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
+ self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
def testlabelcomparison(self):
x = func.lala(table1.c.myid).label('foo')
@@ -640,7 +640,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
def testliteral(self):
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
- "SELECT :literal || :literal_1 FROM mytable")
+ "SELECT :literal || :literal_1 AS anon_1 FROM mytable")
def testcalculatedcolumns(self):
value_tbl = table('values',
@@ -652,7 +652,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today
self.assert_compile(
select([value_tbl.c.id, (value_tbl.c.val2 -
value_tbl.c.val1)/value_tbl.c.val1]),
- "SELECT values.id, (values.val2 - values.val1) / values.val1 FROM values"
+ "SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values"
)
self.assert_compile(
@@ -1110,9 +1110,9 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
# coverage on other dialects.
sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect)
if isinstance(dialect, type(mysql.dialect())):
- self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) \nFROM casttest")
+ self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS DECIMAL(10, 2)) AS anon_1 \nFROM casttest")
else:
- self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) \nFROM casttest")
+ self.assertEqual(str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, CAST(casttest.v1 AS NUMERIC(10, 2)) AS anon_1 \nFROM casttest")
# first test with Postgres engine
check_results(postgres.dialect(), ['NUMERIC(10, 2)', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], '%(literal)s')
diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py
index 4af96d57f..630ecb9d5 100644
--- a/test/sql/testtypes.py
+++ b/test/sql/testtypes.py
@@ -3,6 +3,7 @@ import pickleable
import datetime, os
from sqlalchemy import *
from sqlalchemy import types
+from sqlalchemy.sql import operators
import sqlalchemy.engine.url as url
from sqlalchemy.databases import mssql, oracle, mysql, postgres, firebird
from testlib import *
@@ -367,7 +368,76 @@ class BinaryTest(AssertMixin):
# put a number less than the typical MySQL default BLOB size
return file(f).read(len)
+class ExpressionTest(AssertMixin):
+ def setUpAll(self):
+ global test_table, meta
+
+ class MyCustomType(types.TypeEngine):
+ def get_col_spec(self):
+ return "INT"
+ def bind_processor(self, dialect):
+ def process(value):
+ return value * 10
+ return process
+ def result_processor(self, dialect):
+ def process(value):
+ return value / 10
+ return process
+ def adapt_operator(self, op):
+ return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op)
+
+ meta = MetaData(testbase.db)
+ test_table = Table('test', meta,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)),
+ Column('timestamp', Date),
+ Column('value', MyCustomType))
+
+ meta.create_all()
+
+ test_table.insert().execute({'id':1, 'data':'somedata', 'timestamp':datetime.date(2007, 10, 15), 'value':25})
+
+ def tearDownAll(self):
+ meta.drop_all()
+
+ def test_control(self):
+ assert testbase.db.execute("select value from test").scalar() == 250
+
+ assert test_table.select().execute().fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+
+ def test_bind_adapt(self):
+ expr = test_table.c.timestamp == bindparam("thedate")
+ assert expr.right.type.__class__ == test_table.c.timestamp.type.__class__
+
+ assert testbase.db.execute(test_table.select().where(expr), {"thedate":datetime.date(2007, 10, 15)}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+
+ expr = test_table.c.value == bindparam("somevalue")
+ assert expr.right.type.__class__ == test_table.c.value.type.__class__
+ assert testbase.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+
+
+ def test_operator_adapt(self):
+ """test type-based overloading of operators"""
+
+ # test string concatenation
+ expr = test_table.c.data + "somedata"
+ assert testbase.db.execute(select([expr])).scalar() == "somedatasomedata"
+ expr = test_table.c.id + 15
+ assert testbase.db.execute(select([expr])).scalar() == 16
+
+ # test custom operator conversion
+ expr = test_table.c.value + 40
+ assert expr.type.__class__ is test_table.c.value.type.__class__
+
+ # + operator converted to -
+ # value is calculated as: (250 - (40 * 10)) / 10 == -15
+ assert testbase.db.execute(select([expr.label('foo')])).scalar() == -15
+
+ # this one relies upon anonymous labeling to assemble result
+ # processing rules on the column.
+ assert testbase.db.execute(select([expr])).scalar() == -15
+
class DateTest(AssertMixin):
def setUpAll(self):
global users_with_date, insert_data