diff options
Diffstat (limited to 'test/ext/test_horizontal_shard.py')
| -rw-r--r-- | test/ext/test_horizontal_shard.py | 212 |
1 files changed, 42 insertions, 170 deletions
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py index 7cc6a6f79..667f4bfb0 100644 --- a/test/ext/test_horizontal_shard.py +++ b/test/ext/test_horizontal_shard.py @@ -465,7 +465,11 @@ class ShardTest: t = get_tokyo(sess2) eq_(t.city, tokyo.city) - def test_bulk_update_synchronize_evaluate(self): + @testing.combinations( + "fetch", "evaluate", "auto", argnames="synchronize_session" + ) + @testing.combinations(True, False, argnames="legacy") + def test_orm_update_synchronize(self, synchronize_session, legacy): sess = self._fixture_data() eq_( @@ -476,33 +480,25 @@ class ShardTest: temps = sess.query(Report).all() eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - sess.query(Report).filter(Report.temperature >= 80).update( - {"temperature": Report.temperature + 6}, - synchronize_session="evaluate", - ) - - eq_( - set(row.temperature for row in sess.query(Report.temperature)), - {86.0, 75.0, 91.0}, - ) - - # test synchronize session as well - eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0}) - - def test_bulk_update_synchronize_fetch(self): - sess = self._fixture_data() - - eq_( - set(row.temperature for row in sess.query(Report.temperature)), - {80.0, 75.0, 85.0}, - ) + if legacy: + sess.query(Report).filter(Report.temperature >= 80).update( + {"temperature": Report.temperature + 6}, + synchronize_session=synchronize_session, + ) + else: + sess.execute( + update(Report) + .filter(Report.temperature >= 80) + .values(temperature=Report.temperature + 6) + .execution_options(synchronize_session=synchronize_session) + ) - temps = sess.query(Report).all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) + # test synchronize session + def go(): + eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0}) - sess.query(Report).filter(Report.temperature >= 80).update( - {"temperature": Report.temperature + 6}, - synchronize_session="fetch", + self.assert_sql_count( + sess._ShardedSession__binds["north_america"], go, 0 ) eq_( @@ -510,165 +506,41 @@ class ShardTest: {86.0, 75.0, 91.0}, ) - # test synchronize session as well - eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0}) - - def test_bulk_delete_synchronize_evaluate(self): - sess = self._fixture_data() - - temps = sess.query(Report).all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - - sess.query(Report).filter(Report.temperature >= 80).delete( - synchronize_session="evaluate" - ) - - eq_( - set(row.temperature for row in sess.query(Report.temperature)), - {75.0}, - ) - - # test synchronize session as well - for t in temps: - assert inspect(t).deleted is (t.temperature >= 80) - - def test_bulk_delete_synchronize_fetch(self): + @testing.combinations( + "fetch", "evaluate", "auto", argnames="synchronize_session" + ) + @testing.combinations(True, False, argnames="legacy") + def test_orm_delete_synchronize(self, synchronize_session, legacy): sess = self._fixture_data() temps = sess.query(Report).all() eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - sess.query(Report).filter(Report.temperature >= 80).delete( - synchronize_session="fetch" - ) - - eq_( - set(row.temperature for row in sess.query(Report.temperature)), - {75.0}, - ) - - # test synchronize session as well - for t in temps: - assert inspect(t).deleted is (t.temperature >= 80) - - def test_bulk_update_future_synchronize_evaluate(self): - sess = self._fixture_data() - - eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), - {80.0, 75.0, 85.0}, - ) - - temps = sess.execute(select(Report)).scalars().all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - - sess.execute( - update(Report) - .filter(Report.temperature >= 80) - .values( - {"temperature": Report.temperature + 6}, + if legacy: + sess.query(Report).filter(Report.temperature >= 80).delete( + synchronize_session=synchronize_session ) - .execution_options(synchronize_session="evaluate") - ) - - eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), - {86.0, 75.0, 91.0}, - ) - - # test synchronize session as well - eq_(set(t.temperature for t in temps), {86.0, 75.0, 91.0}) - - def test_bulk_update_future_synchronize_fetch(self): - sess = self._fixture_data() - - eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), - {80.0, 75.0, 85.0}, - ) - - temps = sess.execute(select(Report)).scalars().all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - - # MARKMARK - # omitting the criteria so that the UPDATE affects three out of - # four shards - sess.execute( - update(Report) - .values( - {"temperature": Report.temperature + 6}, + else: + sess.execute( + delete(Report) + .filter(Report.temperature >= 80) + .execution_options(synchronize_session=synchronize_session) ) - .execution_options(synchronize_session="fetch") - ) - - eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), - {86.0, 81.0, 91.0}, - ) - - # test synchronize session as well - eq_(set(t.temperature for t in temps), {86.0, 81.0, 91.0}) - - def test_bulk_delete_future_synchronize_evaluate(self): - sess = self._fixture_data() - - temps = sess.execute(select(Report)).scalars().all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) - - sess.execute( - delete(Report) - .filter(Report.temperature >= 80) - .execution_options(synchronize_session="evaluate") - ) - eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), - {75.0}, - ) - - # test synchronize session as well - for t in temps: - assert inspect(t).deleted is (t.temperature >= 80) - - def test_bulk_delete_future_synchronize_fetch(self): - sess = self._fixture_data() - - temps = sess.execute(select(Report)).scalars().all() - eq_(set(t.temperature for t in temps), {80.0, 75.0, 85.0}) + def go(): + # test synchronize session + for t in temps: + assert inspect(t).deleted is (t.temperature >= 80) - sess.execute( - delete(Report) - .filter(Report.temperature >= 80) - .execution_options(synchronize_session="fetch") + self.assert_sql_count( + sess._ShardedSession__binds["north_america"], go, 0 ) eq_( - set( - row.temperature - for row in sess.execute(select(Report.temperature)) - ), + set(row.temperature for row in sess.query(Report.temperature)), {75.0}, ) - # test synchronize session as well - for t in temps: - assert inspect(t).deleted is (t.temperature >= 80) - class DistinctEngineShardTest(ShardTest, fixtures.MappedTest): def _init_dbs(self): |
