summaryrefslogtreecommitdiff
path: root/test/dialect/test_sqlite.py
diff options
context:
space:
mode:
authorRamonWill <ramonwilliams@hotmail.co.uk>2020-09-14 18:22:34 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-11-08 13:34:24 -0500
commit89ddd0b8976ed695d239898a2a8e4ebf531537f2 (patch)
tree31728325dbdea93b96fb80af4895d29d6e7c57b9 /test/dialect/test_sqlite.py
parent75fb71d25e988bcc13629469cb6739ad7eb539e9 (diff)
downloadsqlalchemy-89ddd0b8976ed695d239898a2a8e4ebf531537f2.tar.gz
Implement upsert for SQLite
Implemented INSERT... ON CONFLICT clause for SQLite. Pull request courtesy Ramon Williams. Fixes: #4010 Closes: #5580 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/5580 Pull-request-sha: fb422e0749fac442a455cbce539ef662d9512bc0 Change-Id: Ibeea44f4c2cee8dab5dc22b7ec3ae1ab95c12b65
Diffstat (limited to 'test/dialect/test_sqlite.py')
-rw-r--r--test/dialect/test_sqlite.py538
1 files changed, 538 insertions, 0 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index d06cd48f5..10e43b221 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -36,6 +36,7 @@ from sqlalchemy import types as sqltypes
from sqlalchemy import UniqueConstraint
from sqlalchemy import util
from sqlalchemy.dialects.sqlite import base as sqlite
+from sqlalchemy.dialects.sqlite import insert
from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect
from sqlalchemy.engine.url import make_url
from sqlalchemy.schema import CreateTable
@@ -2680,3 +2681,540 @@ class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
self.table.c.myid.regexp_replace("pattern", "rep").compile,
dialect=sqlite.dialect(),
)
+
+
+class OnConflictTest(fixtures.TablesTest):
+
+ __only_on__ = ("sqlite >= 3.24.0",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ )
+
+ class SpecialType(sqltypes.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return value + " processed"
+
+ Table(
+ "bind_targets",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", SpecialType()),
+ )
+
+ users_xtra = Table(
+ "users_xtra",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ Column("login_email", String(50)),
+ Column("lets_index_this", String(50)),
+ )
+ cls.unique_partial_index = schema.Index(
+ "idx_unique_partial_name",
+ users_xtra.c.name,
+ users_xtra.c.lets_index_this,
+ unique=True,
+ sqlite_where=users_xtra.c.lets_index_this == "unique_name",
+ )
+
+ cls.unique_constraint = schema.UniqueConstraint(
+ users_xtra.c.login_email, name="uq_login_email"
+ )
+ cls.bogus_index = schema.Index(
+ "idx_special_ops",
+ users_xtra.c.lets_index_this,
+ sqlite_where=users_xtra.c.lets_index_this > "m",
+ )
+
+ def test_bad_args(self):
+ assert_raises(
+ ValueError, insert(self.tables.users).on_conflict_do_update
+ )
+
+ def test_on_conflict_do_nothing(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_nothing_connectionless(self, connection):
+ users = self.tables.users_xtra
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=["login_email"]
+ ),
+ dict(name="name1", login_email="email1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=["login_email"]
+ ),
+ dict(name="name2", login_email="email1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1", "email1", None)],
+ )
+
+ @testing.provide_metadata
+ def test_on_conflict_do_nothing_target(self, connection):
+ users = self.tables.users
+
+ conn = connection
+
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_update_one(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id], set_=dict(name=i.excluded.name)
+ )
+ result = conn.execute(i, dict(id=1, name="name1"))
+
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_update_two(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ )
+
+ result = conn.execute(i, dict(id=1, name="name2"))
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name2")],
+ )
+
+ def test_on_conflict_do_update_three(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name=i.excluded.name),
+ )
+ result = conn.execute(i, dict(id=1, name="name3"))
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name3")],
+ )
+
+ def test_on_conflict_do_update_four(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ ).values(id=1, name="name4")
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name4")],
+ )
+
+ def test_on_conflict_do_update_five(self, connection):
+ users = self.tables.users
+
+ conn = connection
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=10, name="I'm a name"),
+ ).values(id=1, name="name4")
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 10)).fetchall(),
+ [(10, "I'm a name")],
+ )
+
+ def test_on_conflict_do_update_multivalues(self, connection):
+ users = self.tables.users
+
+ conn = connection
+
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+ conn.execute(users.insert(), dict(id=2, name="name2"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name="updated"),
+ where=(i.excluded.name != "name12"),
+ ).values(
+ [
+ dict(id=1, name="name11"),
+ dict(id=2, name="name12"),
+ dict(id=3, name="name13"),
+ dict(id=4, name="name14"),
+ ]
+ )
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (None,))
+
+ eq_(
+ conn.execute(users.select().order_by(users.c.id)).fetchall(),
+ [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")],
+ )
+
+ def _exotic_targets_fixture(self, conn):
+ users = self.tables.users_xtra
+
+ conn.execute(
+ insert(users),
+ dict(
+ id=1,
+ name="name1",
+ login_email="name1@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+ conn.execute(
+ users.insert(),
+ dict(
+ id=2,
+ name="name2",
+ login_email="name2@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1", "name1@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_two(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ # try primary key constraint: cause an upsert on unique id column
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+ result = conn.execute(
+ i,
+ dict(
+ id=1,
+ name="name2",
+ login_email="name1@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name2", "name1@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_three(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ # try unique constraint: cause an upsert on target
+ # login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=["login_email"],
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+ result = conn.execute(
+ i,
+ dict(
+ id=42,
+ name="nameunique",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (42,))
+
+ eq_(
+ conn.execute(
+ users.select().where(users.c.login_email == "name2@gmail.com")
+ ).fetchall(),
+ [(42, "nameunique", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=["login_email"],
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+
+ result = conn.execute(
+ i,
+ dict(
+ id=43,
+ name="nameunique2",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (43,))
+
+ eq_(
+ conn.execute(
+ users.select().where(users.c.login_email == "name2@gmail.com")
+ ).fetchall(),
+ [(43, "nameunique2", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four_no_pk(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+
+ conn.execute(i, dict(name="name3", login_email="name1@gmail.com"))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [],
+ )
+
+ eq_(
+ conn.execute(users.select().order_by(users.c.id)).fetchall(),
+ [
+ (2, "name2", "name2@gmail.com", "not"),
+ (3, "name3", "name1@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_five(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ # try bogus index
+ i = insert(users)
+
+ i = i.on_conflict_do_update(
+ index_elements=self.bogus_index.columns,
+ index_where=self.bogus_index.dialect_options["sqlite"]["where"],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ assert_raises(
+ exc.OperationalError,
+ conn.execute,
+ i,
+ dict(
+ id=1,
+ name="namebogus",
+ login_email="bogus@gmail.com",
+ lets_index_this="bogus",
+ ),
+ )
+
+ def test_on_conflict_do_update_exotic_targets_six(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ conn.execute(
+ insert(users),
+ dict(
+ id=1,
+ name="name1",
+ login_email="mail1@gmail.com",
+ lets_index_this="unique_name",
+ ),
+ )
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=self.unique_partial_index.columns,
+ index_where=self.unique_partial_index.dialect_options["sqlite"][
+ "where"
+ ],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ conn.execute(
+ i,
+ [
+ dict(
+ name="name1",
+ login_email="mail2@gmail.com",
+ lets_index_this="unique_name",
+ )
+ ],
+ )
+
+ eq_(
+ conn.execute(users.select()).fetchall(),
+ [(1, "name1", "mail2@gmail.com", "unique_name")],
+ )
+
+ def test_on_conflict_do_update_no_row_actually_affected(self, connection):
+ users = self.tables.users_xtra
+
+ conn = connection
+ self._exotic_targets_fixture(conn)
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(name="new_name"),
+ where=(i.excluded.name == "other_name"),
+ )
+ result = conn.execute(
+ i, dict(name="name2", login_email="name1@gmail.com")
+ )
+
+ # The last inserted primary key should be 2 here
+ # it is taking the result from the the exotic fixture
+ eq_(result.inserted_primary_key, (2,))
+
+ eq_(
+ conn.execute(users.select()).fetchall(),
+ [
+ (1, "name1", "name1@gmail.com", "not"),
+ (2, "name2", "name2@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_special_types_in_set(self, connection):
+ bind_targets = self.tables.bind_targets
+
+ conn = connection
+ i = insert(bind_targets)
+ conn.execute(i, {"id": 1, "data": "initial data"})
+
+ eq_(
+ conn.scalar(sql.select(bind_targets.c.data)),
+ "initial data processed",
+ )
+
+ i = insert(bind_targets)
+ i = i.on_conflict_do_update(
+ index_elements=[bind_targets.c.id],
+ set_=dict(data="new updated data"),
+ )
+ conn.execute(i, {"id": 1, "data": "new inserted data"})
+
+ eq_(
+ conn.scalar(sql.select(bind_targets.c.data)),
+ "new updated data processed",
+ )