diff options
| author | RamonWill <ramonwilliams@hotmail.co.uk> | 2020-09-14 18:22:34 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-11-08 13:34:24 -0500 |
| commit | 89ddd0b8976ed695d239898a2a8e4ebf531537f2 (patch) | |
| tree | 31728325dbdea93b96fb80af4895d29d6e7c57b9 /test/dialect/test_sqlite.py | |
| parent | 75fb71d25e988bcc13629469cb6739ad7eb539e9 (diff) | |
| download | sqlalchemy-89ddd0b8976ed695d239898a2a8e4ebf531537f2.tar.gz | |
Implement upsert for SQLite
Implemented INSERT... ON CONFLICT clause for SQLite. Pull request courtesy
Ramon Williams.
Fixes: #4010
Closes: #5580
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/5580
Pull-request-sha: fb422e0749fac442a455cbce539ef662d9512bc0
Change-Id: Ibeea44f4c2cee8dab5dc22b7ec3ae1ab95c12b65
Diffstat (limited to 'test/dialect/test_sqlite.py')
| -rw-r--r-- | test/dialect/test_sqlite.py | 538 |
1 files changed, 538 insertions, 0 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index d06cd48f5..10e43b221 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -36,6 +36,7 @@ from sqlalchemy import types as sqltypes from sqlalchemy import UniqueConstraint from sqlalchemy import util from sqlalchemy.dialects.sqlite import base as sqlite +from sqlalchemy.dialects.sqlite import insert from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect from sqlalchemy.engine.url import make_url from sqlalchemy.schema import CreateTable @@ -2680,3 +2681,540 @@ class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL): self.table.c.myid.regexp_replace("pattern", "rep").compile, dialect=sqlite.dialect(), ) + + +class OnConflictTest(fixtures.TablesTest): + + __only_on__ = ("sqlite >= 3.24.0",) + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + ) + + class SpecialType(sqltypes.TypeDecorator): + impl = String + + def process_bind_param(self, value, dialect): + return value + " processed" + + Table( + "bind_targets", + metadata, + Column("id", Integer, primary_key=True), + Column("data", SpecialType()), + ) + + users_xtra = Table( + "users_xtra", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + Column("login_email", String(50)), + Column("lets_index_this", String(50)), + ) + cls.unique_partial_index = schema.Index( + "idx_unique_partial_name", + users_xtra.c.name, + users_xtra.c.lets_index_this, + unique=True, + sqlite_where=users_xtra.c.lets_index_this == "unique_name", + ) + + cls.unique_constraint = schema.UniqueConstraint( + users_xtra.c.login_email, name="uq_login_email" + ) + cls.bogus_index = schema.Index( + "idx_special_ops", + users_xtra.c.lets_index_this, + sqlite_where=users_xtra.c.lets_index_this > "m", + ) + + def test_bad_args(self): + assert_raises( + ValueError, insert(self.tables.users).on_conflict_do_update + ) + + def test_on_conflict_do_nothing(self, connection): + users = self.tables.users + + conn = connection + result = conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name="name1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name="name2"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_nothing_connectionless(self, connection): + users = self.tables.users_xtra + + result = connection.execute( + insert(users).on_conflict_do_nothing( + index_elements=["login_email"] + ), + dict(name="name1", login_email="email1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = connection.execute( + insert(users).on_conflict_do_nothing( + index_elements=["login_email"] + ), + dict(name="name2", login_email="email1"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + connection.execute( + users.select().where(users.c.id == 1) + ).fetchall(), + [(1, "name1", "email1", None)], + ) + + @testing.provide_metadata + def test_on_conflict_do_nothing_target(self, connection): + users = self.tables.users + + conn = connection + + result = conn.execute( + insert(users).on_conflict_do_nothing( + index_elements=users.primary_key.columns + ), + dict(id=1, name="name1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = conn.execute( + insert(users).on_conflict_do_nothing( + index_elements=users.primary_key.columns + ), + dict(id=1, name="name2"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_update_one(self, connection): + users = self.tables.users + + conn = connection + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], set_=dict(name=i.excluded.name) + ) + result = conn.execute(i, dict(id=1, name="name1")) + + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_update_two(self, connection): + users = self.tables.users + + conn = connection + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(id=i.excluded.id, name=i.excluded.name), + ) + + result = conn.execute(i, dict(id=1, name="name2")) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name2")], + ) + + def test_on_conflict_do_update_three(self, connection): + users = self.tables.users + + conn = connection + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name=i.excluded.name), + ) + result = conn.execute(i, dict(id=1, name="name3")) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name3")], + ) + + def test_on_conflict_do_update_four(self, connection): + users = self.tables.users + + conn = connection + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=i.excluded.id, name=i.excluded.name), + ).values(id=1, name="name4") + + result = conn.execute(i) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name4")], + ) + + def test_on_conflict_do_update_five(self, connection): + users = self.tables.users + + conn = connection + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=10, name="I'm a name"), + ).values(id=1, name="name4") + + result = conn.execute(i) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 10)).fetchall(), + [(10, "I'm a name")], + ) + + def test_on_conflict_do_update_multivalues(self, connection): + users = self.tables.users + + conn = connection + + conn.execute(users.insert(), dict(id=1, name="name1")) + conn.execute(users.insert(), dict(id=2, name="name2")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name="updated"), + where=(i.excluded.name != "name12"), + ).values( + [ + dict(id=1, name="name11"), + dict(id=2, name="name12"), + dict(id=3, name="name13"), + dict(id=4, name="name14"), + ] + ) + + result = conn.execute(i) + eq_(result.inserted_primary_key, (None,)) + + eq_( + conn.execute(users.select().order_by(users.c.id)).fetchall(), + [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")], + ) + + def _exotic_targets_fixture(self, conn): + users = self.tables.users_xtra + + conn.execute( + insert(users), + dict( + id=1, + name="name1", + login_email="name1@gmail.com", + lets_index_this="not", + ), + ) + conn.execute( + users.insert(), + dict( + id=2, + name="name2", + login_email="name2@gmail.com", + lets_index_this="not", + ), + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1", "name1@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_two(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + # try primary key constraint: cause an upsert on unique id column + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + result = conn.execute( + i, + dict( + id=1, + name="name2", + login_email="name1@gmail.com", + lets_index_this="not", + ), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name2", "name1@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_three(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + # try unique constraint: cause an upsert on target + # login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + index_elements=["login_email"], + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + # note: lets_index_this value totally ignored in SET clause. + result = conn.execute( + i, + dict( + id=42, + name="nameunique", + login_email="name2@gmail.com", + lets_index_this="unique", + ), + ) + eq_(result.inserted_primary_key, (42,)) + + eq_( + conn.execute( + users.select().where(users.c.login_email == "name2@gmail.com") + ).fetchall(), + [(42, "nameunique", "name2@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_four(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + index_elements=["login_email"], + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + # note: lets_index_this value totally ignored in SET clause. + + result = conn.execute( + i, + dict( + id=43, + name="nameunique2", + login_email="name2@gmail.com", + lets_index_this="unique", + ), + ) + eq_(result.inserted_primary_key, (43,)) + + eq_( + conn.execute( + users.select().where(users.c.login_email == "name2@gmail.com") + ).fetchall(), + [(43, "nameunique2", "name2@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_four_no_pk(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.login_email], + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + + conn.execute(i, dict(name="name3", login_email="name1@gmail.com")) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [], + ) + + eq_( + conn.execute(users.select().order_by(users.c.id)).fetchall(), + [ + (2, "name2", "name2@gmail.com", "not"), + (3, "name3", "name1@gmail.com", "not"), + ], + ) + + def test_on_conflict_do_update_exotic_targets_five(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + # try bogus index + i = insert(users) + + i = i.on_conflict_do_update( + index_elements=self.bogus_index.columns, + index_where=self.bogus_index.dialect_options["sqlite"]["where"], + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + + assert_raises( + exc.OperationalError, + conn.execute, + i, + dict( + id=1, + name="namebogus", + login_email="bogus@gmail.com", + lets_index_this="bogus", + ), + ) + + def test_on_conflict_do_update_exotic_targets_six(self, connection): + users = self.tables.users_xtra + + conn = connection + conn.execute( + insert(users), + dict( + id=1, + name="name1", + login_email="mail1@gmail.com", + lets_index_this="unique_name", + ), + ) + i = insert(users) + i = i.on_conflict_do_update( + index_elements=self.unique_partial_index.columns, + index_where=self.unique_partial_index.dialect_options["sqlite"][ + "where" + ], + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + + conn.execute( + i, + [ + dict( + name="name1", + login_email="mail2@gmail.com", + lets_index_this="unique_name", + ) + ], + ) + + eq_( + conn.execute(users.select()).fetchall(), + [(1, "name1", "mail2@gmail.com", "unique_name")], + ) + + def test_on_conflict_do_update_no_row_actually_affected(self, connection): + users = self.tables.users_xtra + + conn = connection + self._exotic_targets_fixture(conn) + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.login_email], + set_=dict(name="new_name"), + where=(i.excluded.name == "other_name"), + ) + result = conn.execute( + i, dict(name="name2", login_email="name1@gmail.com") + ) + + # The last inserted primary key should be 2 here + # it is taking the result from the the exotic fixture + eq_(result.inserted_primary_key, (2,)) + + eq_( + conn.execute(users.select()).fetchall(), + [ + (1, "name1", "name1@gmail.com", "not"), + (2, "name2", "name2@gmail.com", "not"), + ], + ) + + def test_on_conflict_do_update_special_types_in_set(self, connection): + bind_targets = self.tables.bind_targets + + conn = connection + i = insert(bind_targets) + conn.execute(i, {"id": 1, "data": "initial data"}) + + eq_( + conn.scalar(sql.select(bind_targets.c.data)), + "initial data processed", + ) + + i = insert(bind_targets) + i = i.on_conflict_do_update( + index_elements=[bind_targets.c.id], + set_=dict(data="new updated data"), + ) + conn.execute(i, {"id": 1, "data": "new inserted data"}) + + eq_( + conn.scalar(sql.select(bind_targets.c.data)), + "new updated data processed", + ) |
