summaryrefslogtreecommitdiff
path: root/test/ext/test_horizontal_shard.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/ext/test_horizontal_shard.py')
-rw-r--r--test/ext/test_horizontal_shard.py212
1 files changed, 42 insertions, 170 deletions
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index 7cc6a6f79..667f4bfb0 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -465,7 +465,11 @@ class ShardTest:
t = get_tokyo(sess2)
eq_(t.city, tokyo.city)
- def test_bulk_update_synchronize_evaluate(self):
+ @testing.combinations(
+ "fetch", "evaluate", "auto", argnames="synchronize_session"
+ )
+ @testing.combinations(True, False, argnames="legacy")
+ def test_orm_update_synchronize(self, synchronize_session, legacy):
sess = self._fixture_data()
eq_(
@@ -476,33 +480,25 @@ class ShardTest:
temps = sess.query(Report).all()
eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
- sess.query(Report).filter(Report.temperature >= 80).update(
- {"temperature": Report.temperature + 6},
- synchronize_session="evaluate",
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {86.0, 75.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_update_synchronize_fetch(self):
- sess = self._fixture_data()
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {80.0, 75.0, 85.0},
- )
+ if legacy:
+ sess.query(Report).filter(Report.temperature >= 80).update(
+ {"temperature": Report.temperature + 6},
+ synchronize_session=synchronize_session,
+ )
+ else:
+ sess.execute(
+ update(Report)
+ .filter(Report.temperature >= 80)
+ .values(temperature=Report.temperature + 6)
+ .execution_options(synchronize_session=synchronize_session)
+ )
- temps = sess.query(Report).all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
+ # test synchronize session
+ def go():
+ eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
- sess.query(Report).filter(Report.temperature >= 80).update(
- {"temperature": Report.temperature + 6},
- synchronize_session="fetch",
+ self.assert_sql_count(
+ sess._ShardedSession__binds["north_america"], go, 0
)
eq_(
@@ -510,165 +506,41 @@ class ShardTest:
{86.0, 75.0, 91.0},
)
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_delete_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- temps = sess.query(Report).all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.query(Report).filter(Report.temperature >= 80).delete(
- synchronize_session="evaluate"
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_delete_synchronize_fetch(self):
+ @testing.combinations(
+ "fetch", "evaluate", "auto", argnames="synchronize_session"
+ )
+ @testing.combinations(True, False, argnames="legacy")
+ def test_orm_delete_synchronize(self, synchronize_session, legacy):
sess = self._fixture_data()
temps = sess.query(Report).all()
eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
- sess.query(Report).filter(Report.temperature >= 80).delete(
- synchronize_session="fetch"
- )
-
- eq_(
- set(row.temperature for row in sess.query(Report.temperature)),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_update_future_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {80.0, 75.0, 85.0},
- )
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.execute(
- update(Report)
- .filter(Report.temperature >= 80)
- .values(
- {"temperature": Report.temperature + 6},
+ if legacy:
+ sess.query(Report).filter(Report.temperature >= 80).delete(
+ synchronize_session=synchronize_session
)
- .execution_options(synchronize_session="evaluate")
- )
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {86.0, 75.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0})
-
- def test_bulk_update_future_synchronize_fetch(self):
- sess = self._fixture_data()
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {80.0, 75.0, 85.0},
- )
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- # MARKMARK
- # omitting the criteria so that the UPDATE affects three out of
- # four shards
- sess.execute(
- update(Report)
- .values(
- {"temperature": Report.temperature + 6},
+ else:
+ sess.execute(
+ delete(Report)
+ .filter(Report.temperature >= 80)
+ .execution_options(synchronize_session=synchronize_session)
)
- .execution_options(synchronize_session="fetch")
- )
-
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {86.0, 81.0, 91.0},
- )
-
- # test synchronize session as well
- eq_(set(t.temperature for t in temps), {86.0, 81.0, 91.0})
-
- def test_bulk_delete_future_synchronize_evaluate(self):
- sess = self._fixture_data()
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
-
- sess.execute(
- delete(Report)
- .filter(Report.temperature >= 80)
- .execution_options(synchronize_session="evaluate")
- )
- eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
- {75.0},
- )
-
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
- def test_bulk_delete_future_synchronize_fetch(self):
- sess = self._fixture_data()
-
- temps = sess.execute(select(Report)).scalars().all()
- eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0})
+ def go():
+ # test synchronize session
+ for t in temps:
+ assert inspect(t).deleted is (t.temperature >= 80)
- sess.execute(
- delete(Report)
- .filter(Report.temperature >= 80)
- .execution_options(synchronize_session="fetch")
+ self.assert_sql_count(
+ sess._ShardedSession__binds["north_america"], go, 0
)
eq_(
- set(
- row.temperature
- for row in sess.execute(select(Report.temperature))
- ),
+ set(row.temperature for row in sess.query(Report.temperature)),
{75.0},
)
- # test synchronize session as well
- for t in temps:
- assert inspect(t).deleted is (t.temperature >= 80)
-
class DistinctEngineShardTest(ShardTest, fixtures.MappedTest):
def _init_dbs(self):