summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-02-13 22:53:05 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-02-13 22:53:05 +0000
commite7aaeb28527d27814cb192e1ddc4af228a43d816 (patch)
treeed36612431a3f845cb580f5b2d25e4398f44213e
parent238e8620ee2556d86ecd2ca6a430de990951d514 (diff)
downloadsqlalchemy-e7aaeb28527d27814cb192e1ddc4af228a43d816.tar.gz
- fixed argument passing to straight textual execute() on engine, connection.
can handle *args or a list instance for positional, **kwargs or a dict instance for named args, or a list of list or dicts to invoke executemany()
-rw-r--r--CHANGES3
-rw-r--r--lib/sqlalchemy/engine/base.py12
-rw-r--r--test/engine/alltests.py3
-rw-r--r--test/engine/execute.py56
4 files changed, 70 insertions, 4 deletions
diff --git a/CHANGES b/CHANGES
index 61f43d426..7cbf6c85e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,9 @@
- added a "supports_execution()" method to ClauseElement, so that individual
kinds of clauses can express if they are appropriate for executing...such as,
you can execute a "select", but not a "Table" or a "Join".
+ - fixed argument passing to straight textual execute() on engine, connection.
+ can handle *args or a list instance for positional, **kwargs or a dict instance
+ for named args, or a list of list or dicts to invoke executemany()
- orm:
- another refactoring to relationship calculation. Allows more accurate ORM behavior
with relationships from/to/between mappers, particularly polymorphic mappers,
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 3ec3e028f..1985bcec1 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -260,13 +260,19 @@ class Connection(Connectable):
self.__connection.close()
self.__connection = None
del self.__connection
- def scalar(self, object, parameters=None, **kwargs):
- return self.execute(object, parameters, **kwargs).scalar()
+ def scalar(self, object, *multiparams, **params):
+ return self.execute(object, *multiparams, **params).scalar()
def execute(self, object, *multiparams, **params):
return Connection.executors[type(object).__mro__[-2]](self, object, *multiparams, **params)
def execute_default(self, default, **kwargs):
return default.accept_schema_visitor(self.__engine.dialect.defaultrunner(self.__engine, self.proxy, **kwargs))
- def execute_text(self, statement, parameters=None):
+ def execute_text(self, statement, *multiparams, **params):
+ if len(multiparams) == 0:
+ parameters = params
+ elif len(multiparams) == 1 and (isinstance(multiparams[0], list) or isinstance(multiparams[0], dict)):
+ parameters = multiparams[0]
+ else:
+ parameters = list(multiparams)
cursor = self._execute_raw(statement, parameters)
rpargs = self.__engine.dialect.create_result_proxy_args(self, cursor)
return ResultProxy(self.__engine, self, cursor, **rpargs)
diff --git a/test/engine/alltests.py b/test/engine/alltests.py
index c63cb2861..7bb9acb72 100644
--- a/test/engine/alltests.py
+++ b/test/engine/alltests.py
@@ -5,8 +5,9 @@ import unittest
def suite():
modules_to_test = (
# connectivity, execution
- 'engine.parseconnect',
+ 'engine.parseconnect',
'engine.pool',
+ 'engine.execute',
'engine.transaction',
# schema/tables
diff --git a/test/engine/execute.py b/test/engine/execute.py
new file mode 100644
index 000000000..ce60a21c7
--- /dev/null
+++ b/test/engine/execute.py
@@ -0,0 +1,56 @@
+
+import testbase
+import unittest, sys, datetime
+import tables
+db = testbase.db
+from sqlalchemy import *
+
+
+class ExecuteTest(testbase.PersistTest):
+ def setUpAll(self):
+ global users, metadata
+ metadata = BoundMetaData(testbase.db)
+ users = Table('users', metadata,
+ Column('user_id', INT, primary_key = True),
+ Column('user_name', VARCHAR(20)),
+ mysql_engine='InnoDB'
+ )
+ metadata.create_all()
+
+ def tearDown(self):
+ testbase.db.connect().execute(users.delete())
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ @testbase.supported('sqlite')
+ def test_raw_qmark(self):
+ for conn in (testbase.db, testbase.db.connect()):
+ conn.execute("insert into users (user_id, user_name) values (?, ?)", [1,"jack"])
+ conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"ed"], [3,"horse"])
+ conn.execute("insert into users (user_id, user_name) values (?, ?)", 4, 'sally')
+ res = conn.execute("select * from users")
+ assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+ conn.execute("delete from users")
+
+ @testbase.supported('mysql')
+ def test_raw_sprintf(self):
+ for conn in (testbase.db, testbase.db.connect()):
+ conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
+ conn.execute("insert into users (user_id, user_name) values (%s, %s)", [2,"ed"], [3,"horse"])
+ conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally')
+ res = conn.execute("select * from users")
+ assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+ conn.execute("delete from users")
+
+ @testbase.supported('postgres')
+ def test_raw_python(self):
+ for conn in (testbase.db, testbase.db.connect()):
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
+ conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", id=4, name='sally')
+ res = conn.execute("select * from users")
+ assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
+ conn.execute("delete from users")
+
+if __name__ == "__main__":
+ testbase.main()