summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/suite/test_results.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-07-04 12:21:36 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-08-13 18:41:53 -0400
commit5fb0138a3220161703e6ab1087319a669d14e7f4 (patch)
tree25d006b30830ce6bc71f7a69bed9b570e1ae9654 /lib/sqlalchemy/testing/suite/test_results.py
parentcd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff)
downloadsqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e We can now create asyncio endpoints that are then handled in "implicit IO" form within the majority of the Core internals. Then coroutines are re-exposed at the point at which we call into asyncpg methods. Patch includes: * asyncpg dialect * asyncio package * engine, result, ORM session classes * new test fixtures, tests * some work with pep-484 and a short plugin for the pyannotate package, which seems to have so-so results Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d Fixes: #3414
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_results.py')
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py81
1 files changed, 64 insertions, 17 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index 2eb986c74..e6f6068c8 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -238,6 +238,8 @@ class ServerSideCursorsTest(
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
+ elif self.engine.dialect.driver == "asyncpg":
+ return cursor.server_side
else:
return False
@@ -331,29 +333,74 @@ class ServerSideCursorsTest(
result.close()
@testing.provide_metadata
- def test_roundtrip(self):
+ def test_roundtrip_fetchall(self):
md = self.metadata
- self._fixture(True)
+ engine = self._fixture(True)
test_table = Table(
"test_table",
md,
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
- test_table.create(checkfirst=True)
- test_table.insert().execute(data="data1")
- test_table.insert().execute(data="data2")
- eq_(
- test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, "data1"), (2, "data2")],
- )
- test_table.update().where(test_table.c.id == 2).values(
- data=test_table.c.data + " updated"
- ).execute()
- eq_(
- test_table.select().order_by(test_table.c.id).execute().fetchall(),
- [(1, "data1"), (2, "data2 updated")],
+
+ with engine.connect() as connection:
+ test_table.create(connection, checkfirst=True)
+ connection.execute(test_table.insert(), dict(data="data1"))
+ connection.execute(test_table.insert(), dict(data="data2"))
+ eq_(
+ connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ ).fetchall(),
+ [(1, "data1"), (2, "data2")],
+ )
+ connection.execute(
+ test_table.update()
+ .where(test_table.c.id == 2)
+ .values(data=test_table.c.data + " updated")
+ )
+ eq_(
+ connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ ).fetchall(),
+ [(1, "data1"), (2, "data2 updated")],
+ )
+ connection.execute(test_table.delete())
+ eq_(
+ connection.scalar(
+ select([func.count("*")]).select_from(test_table)
+ ),
+ 0,
+ )
+
+ @testing.provide_metadata
+ def test_roundtrip_fetchmany(self):
+ md = self.metadata
+
+ engine = self._fixture(True)
+ test_table = Table(
+ "test_table",
+ md,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
)
- test_table.delete().execute()
- eq_(select([func.count("*")]).select_from(test_table).scalar(), 0)
+
+ with engine.connect() as connection:
+ test_table.create(connection, checkfirst=True)
+ connection.execute(
+ test_table.insert(),
+ [dict(data="data%d" % i) for i in range(1, 20)],
+ )
+
+ result = connection.execute(
+ test_table.select().order_by(test_table.c.id)
+ )
+
+ eq_(
+ result.fetchmany(5), [(i, "data%d" % i) for i in range(1, 6)],
+ )
+ eq_(
+ result.fetchmany(10),
+ [(i, "data%d" % i) for i in range(6, 16)],
+ )
+ eq_(result.fetchall(), [(i, "data%d" % i) for i in range(16, 20)])