summaryrefslogtreecommitdiff
path: root/test/sql/functions.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
commit45cec095b4904ba71425d2fe18c143982dd08f43 (patch)
treeaf5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/functions.py
parent698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff)
downloadsqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run the tests. [ticket:970]
Diffstat (limited to 'test/sql/functions.py')
-rw-r--r--test/sql/functions.py319
1 files changed, 0 insertions, 319 deletions
diff --git a/test/sql/functions.py b/test/sql/functions.py
deleted file mode 100644
index 17d8a35e9..000000000
--- a/test/sql/functions.py
+++ /dev/null
@@ -1,319 +0,0 @@
-import testenv; testenv.configure_for_tests()
-import datetime
-from sqlalchemy import *
-from sqlalchemy.sql import table, column
-from sqlalchemy import databases, sql, util
-from sqlalchemy.sql.compiler import BIND_TEMPLATES
-from sqlalchemy.engine import default
-from testlib.engines import all_dialects
-from sqlalchemy import types as sqltypes
-from testlib import *
-from sqlalchemy.sql.functions import GenericFunction
-from testlib.testing import eq_
-from decimal import Decimal as _python_Decimal
-
-from sqlalchemy.databases import *
-
-# FIXME!
-dialects = [d for d in all_dialects() if d.name not in ('access', 'informix')]
-
-
-class CompileTest(TestBase, AssertsCompiledSQL):
- def test_compile(self):
- for dialect in dialects:
- bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
- self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect)
- self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
- if isinstance(dialect, firebird.dialect):
- self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect)
- else:
- self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
-
- # test generic function compile
- class fake_func(GenericFunction):
- __return_type__ = sqltypes.Integer
-
- def __init__(self, arg, **kwargs):
- GenericFunction.__init__(self, args=[arg], **kwargs)
-
- self.assert_compile(fake_func('foo'), "fake_func(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect)
-
- def test_use_labels(self):
- self.assert_compile(select([func.foo()], use_labels=True),
- "SELECT foo() AS foo_1"
- )
- def test_underscores(self):
- self.assert_compile(func.if_(), "if()")
-
- def test_generic_now(self):
- assert isinstance(func.now().type, sqltypes.DateTime)
-
- for ret, dialect in [
- ('CURRENT_TIMESTAMP', sqlite.dialect()),
- ('now()', postgres.dialect()),
- ('now()', mysql.dialect()),
- ('CURRENT_TIMESTAMP', oracle.dialect())
- ]:
- self.assert_compile(func.now(), ret, dialect=dialect)
-
- def test_generic_random(self):
- assert func.random().type == sqltypes.NULLTYPE
- assert isinstance(func.random(type_=Integer).type, Integer)
-
- for ret, dialect in [
- ('random()', sqlite.dialect()),
- ('random()', postgres.dialect()),
- ('rand()', mysql.dialect()),
- ('random()', oracle.dialect())
- ]:
- self.assert_compile(func.random(), ret, dialect=dialect)
-
- def test_generic_count(self):
- assert isinstance(func.count().type, sqltypes.Integer)
-
- self.assert_compile(func.count(), 'count(*)')
- self.assert_compile(func.count(1), 'count(:param_1)')
- c = column('abc')
- self.assert_compile(func.count(c), 'count(abc)')
-
- def test_constructor(self):
- try:
- func.current_timestamp('somearg')
- assert False
- except TypeError:
- assert True
-
- try:
- func.char_length('a', 'b')
- assert False
- except TypeError:
- assert True
-
- try:
- func.char_length()
- assert False
- except TypeError:
- assert True
-
- def test_return_type_detection(self):
-
- for fn in [func.coalesce, func.max, func.min, func.sum]:
- for args, type_ in [
- ((datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)), sqltypes.Date),
- ((3, 5), sqltypes.Integer),
- ((_python_Decimal(3), _python_Decimal(5)), sqltypes.Numeric),
- (("foo", "bar"), sqltypes.String),
- ((datetime.datetime(2007, 10, 5, 8, 3, 34), datetime.datetime(2005, 10, 15, 14, 45, 33)), sqltypes.DateTime)
- ]:
- assert isinstance(fn(*args).type, type_), "%s / %s" % (fn(), type_)
-
- assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
-
-
- def test_assorted(self):
- table1 = table('mytable',
- column('myid', Integer),
- )
-
- table2 = table(
- 'myothertable',
- column('otherid', Integer),
- )
-
- # test an expression with a function
- self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid,
- "lala(:lala_1, :lala_2, :param_1, mytable.myid) * myothertable.otherid")
-
- # test it in a SELECT
- self.assert_compile(select([func.count(table1.c.myid)]),
- "SELECT count(mytable.myid) AS count_1 FROM mytable")
-
- # test a "dotted" function name
- self.assert_compile(select([func.foo.bar.lala(table1.c.myid)]),
- "SELECT foo.bar.lala(mytable.myid) AS lala_1 FROM mytable")
-
- # test the bind parameter name with a "dotted" function name is only the name
- # (limits the length of the bind param name)
- self.assert_compile(select([func.foo.bar.lala(12)]),
- "SELECT foo.bar.lala(:lala_2) AS lala_1")
-
- # test a dotted func off the engine itself
- self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho_1)")
-
- # test None becomes NULL
- self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)")
-
- # test pickling
- self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)")
-
- # assert func raises AttributeError for __bases__ attribute, since its not a class
- # fixes pydoc
- try:
- func.__bases__
- assert False
- except AttributeError:
- assert True
-
- def test_functions_with_cols(self):
- users = table('users', column('id'), column('name'), column('fullname'))
- calculate = select([column('q'), column('z'), column('r')],
- from_obj=[func.calculate(bindparam('x'), bindparam('y'))])
-
- self.assert_compile(select([users], users.c.id > calculate.c.z),
- "SELECT users.id, users.name, users.fullname "
- "FROM users, (SELECT q, z, r "
- "FROM calculate(:x, :y)) "
- "WHERE users.id > z"
- )
-
- s = select([users], users.c.id.between(
- calculate.alias('c1').unique_params(x=17, y=45).c.z,
- calculate.alias('c2').unique_params(x=5, y=12).c.z))
-
- self.assert_compile(s,
- "SELECT users.id, users.name, users.fullname "
- "FROM users, (SELECT q, z, r "
- "FROM calculate(:x_1, :y_1)) AS c1, (SELECT q, z, r "
- "FROM calculate(:x_2, :y_2)) AS c2 "
- "WHERE users.id BETWEEN c1.z AND c2.z"
- , checkparams={'y_1': 45, 'x_1': 17, 'y_2': 12, 'x_2': 5})
-
-
-class ExecuteTest(TestBase):
-
- def test_standalone_execute(self):
- x = testing.db.func.current_date().execute().scalar()
- y = testing.db.func.current_date().select().execute().scalar()
- z = testing.db.func.current_date().scalar()
- assert (x == y == z) is True
-
- # ansi func
- x = testing.db.func.current_date()
- assert isinstance(x.type, Date)
- assert isinstance(x.execute().scalar(), datetime.date)
-
- def test_conn_execute(self):
- conn = testing.db.connect()
- try:
- x = conn.execute(func.current_date()).scalar()
- y = conn.execute(func.current_date().select()).scalar()
- z = conn.scalar(func.current_date())
- finally:
- conn.close()
- assert (x == y == z) is True
-
- def test_update(self):
- """
- Tests sending functions and SQL expressions to the VALUES and SET
- clauses of INSERT/UPDATE instances, and that column-level defaults
- get overridden.
- """
-
- meta = MetaData(testing.db)
- t = Table('t1', meta,
- Column('id', Integer, Sequence('t1idseq', optional=True), primary_key=True),
- Column('value', Integer)
- )
- t2 = Table('t2', meta,
- Column('id', Integer, Sequence('t2idseq', optional=True), primary_key=True),
- Column('value', Integer, default=7),
- Column('stuff', String(20), onupdate="thisisstuff")
- )
- meta.create_all()
- try:
- t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().fetchone()['value'] == 3
- t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().fetchone()['value'] == 5
-
- r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
- id = r.last_inserted_ids()[0]
- assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
- t.update(values={t.c.value:func.length("asdf")}).execute()
- assert t.select().execute().fetchone()['value'] == 4
- print "--------------------------"
- t2.insert().execute()
- t2.insert(values=dict(value=func.length("one"))).execute()
- t2.insert(values=dict(value=func.length("asfda") + -19)).execute(stuff="hi")
-
- res = exec_sorted(select([t2.c.value, t2.c.stuff]))
- self.assertEquals(res, [(-14, 'hi'), (3, None), (7, None)])
-
- t2.update(values=dict(value=func.length("asdsafasd"))).execute(stuff="some stuff")
- assert select([t2.c.value, t2.c.stuff]).execute().fetchall() == [(9,"some stuff"), (9,"some stuff"), (9,"some stuff")]
-
- t2.delete().execute()
-
- t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().fetchone()['value'] == 11
-
- t2.update(values=dict(value=func.length("asfda"))).execute()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
-
- t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
- print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
- finally:
- meta.drop_all()
-
- @testing.fails_on_everything_except('postgres')
- def test_as_from(self):
- # TODO: shouldnt this work on oracle too ?
- x = testing.db.func.current_date().execute().scalar()
- y = testing.db.func.current_date().select().execute().scalar()
- z = testing.db.func.current_date().scalar()
- w = select(['*'], from_obj=[testing.db.func.current_date()]).scalar()
-
- # construct a column-based FROM object out of a function, like in [ticket:172]
- s = select([sql.column('date', type_=DateTime)], from_obj=[testing.db.func.current_date()])
- q = s.execute().fetchone()[s.c.date]
- r = s.alias('datequery').select().scalar()
-
- assert x == y == z == w == q == r
-
- def test_extract_bind(self):
- """Basic common denominator execution tests for extract()"""
-
- date = datetime.date(2010, 5, 1)
-
- def execute(field):
- return testing.db.execute(select([extract(field, date)])).scalar()
-
- assert execute('year') == 2010
- assert execute('month') == 5
- assert execute('day') == 1
-
- date = datetime.datetime(2010, 5, 1, 12, 11, 10)
-
- assert execute('year') == 2010
- assert execute('month') == 5
- assert execute('day') == 1
-
- def test_extract_expression(self):
- meta = MetaData(testing.db)
- table = Table('test', meta,
- Column('dt', DateTime),
- Column('d', Date))
- meta.create_all()
- try:
- table.insert().execute(
- {'dt': datetime.datetime(2010, 5, 1, 12, 11, 10),
- 'd': datetime.date(2010, 5, 1) })
- rs = select([extract('year', table.c.dt),
- extract('month', table.c.d)]).execute()
- row = rs.fetchone()
- assert row[0] == 2010
- assert row[1] == 5
- rs.close()
- finally:
- meta.drop_all()
-
-
-def exec_sorted(statement, *args, **kw):
- """Executes a statement and returns a sorted list plain tuple rows."""
-
- return sorted([tuple(row)
- for row in statement.execute(*args, **kw).fetchall()])
-
-if __name__ == '__main__':
- testenv.main()