diff options
| author | Robin Thomas <robin.thomas@livestream.com> | 2016-04-14 12:57:15 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-06-14 15:03:14 -0400 |
| commit | 4e9ab7a72f0ad506cf519069fd67127f63e5f2aa (patch) | |
| tree | fe46fca73605597bf8274ad6bf7f24878a33c399 /test | |
| parent | 31a0da32a8af2503c6b94123a0e869816d83c707 (diff) | |
| download | sqlalchemy-4e9ab7a72f0ad506cf519069fd67127f63e5f2aa.tar.gz | |
Add ON CONFLICT support for Postgresql
Fixes: #3529
Co-authored-by: Mike Bayer <mike_mp@zzzcomputing.com>
Change-Id: Ie3bf6ad70d9be9f0e44938830e922db03573991a
Pull-request: https://github.com/zzzeek/sqlalchemy/pull/258
Diffstat (limited to 'test')
| -rw-r--r-- | test/dialect/postgresql/test_compiler.py | 256 | ||||
| -rw-r--r-- | test/dialect/postgresql/test_on_conflict.py | 312 |
2 files changed, 563 insertions, 5 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index f85ff2682..88110ba2d 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -5,7 +5,7 @@ from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \ from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing from sqlalchemy import Sequence, Table, Column, Integer, update, String,\ - insert, func, MetaData, Enum, Index, and_, delete, select, cast, text, \ + func, MetaData, Enum, Index, and_, delete, select, cast, text, \ Text from sqlalchemy.dialects.postgresql import ExcludeConstraint, array from sqlalchemy import exc, schema @@ -14,9 +14,8 @@ from sqlalchemy.dialects.postgresql import TSRANGE from sqlalchemy.orm import mapper, aliased, Session from sqlalchemy.sql import table, column, operators, literal_column from sqlalchemy.sql import util as sql_util -from sqlalchemy.util import u -from sqlalchemy.dialects.postgresql import aggregate_order_by - +from sqlalchemy.util import u, OrderedDict +from sqlalchemy.dialects.postgresql import aggregate_order_by, insert class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): __prefer__ = 'postgresql' @@ -186,7 +185,6 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): schema_translate_map=schema_translate_map ) - def test_create_table_with_tablespace(self): m = MetaData() tbl = Table( @@ -1035,6 +1033,254 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) +class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = postgresql.dialect() + + def setup(self): + self.table1 = table1 = table( + 'mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + md = MetaData() + self.table_with_metadata = Table( + 'mytable', md, + Column('myid', Integer, primary_key=True), + Column('name', String(128)), + Column('description', String(128)) + ) + self.unique_constr = schema.UniqueConstraint( + table1.c.name, name='uq_name') + self.excl_constr = ExcludeConstraint( + (table1.c.name, '='), + (table1.c.description, '&&'), + name='excl_thing' + ) + self.excl_constr_anon = ExcludeConstraint( + (self.table_with_metadata.c.name, '='), + (self.table_with_metadata.c.description, '&&'), + where=self.table_with_metadata.c.description != 'foo' + ) + self.goofy_index = Index( + 'goofy_index', table1.c.name, + postgresql_where=table1.c.name > 'm' + ) + + def test_do_nothing_no_target(self): + + i = insert( + self.table1, values=dict(name='foo'), + ).on_conflict_do_nothing() + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT DO NOTHING') + + def test_do_nothing_index_elements_target(self): + + i = insert( + self.table1, values=dict(name='foo'), + ).on_conflict_do_nothing( + index_elements=['myid'], + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (myid) DO NOTHING" + ) + + def test_do_update_set_clause_literal(self): + i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=OrderedDict([ + ('name', "I'm a name"), + ('description', None)]) + ) + self.assert_compile( + i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = %(param_1)s, ' + 'description = NULL', + {"myid": 1, "name": "foo", "param_1": "I'm a name"} + + ) + + def test_do_update_str_index_elements_target_one(self): + i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=OrderedDict([ + ('name', i.excluded.name), + ('description', i.excluded.description)]) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name, ' + 'description = excluded.description') + + def test_do_update_str_index_elements_target_two(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_col_index_elements_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=[self.table1.c.myid], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_pk_constraint_target(self): + i = insert( + self.table_with_metadata, values=dict(myid=1, name='foo')) + i = i.on_conflict_do_update( + constraint=self.table_with_metadata.primary_key, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_pk_constraint_index_elements_target(self): + i = insert( + self.table_with_metadata, values=dict(myid=1, name='foo')) + i = i.on_conflict_do_update( + index_elements=self.table_with_metadata.primary_key, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_named_unique_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.unique_constr, + set_=dict(myid=i.excluded.myid) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' + 'DO UPDATE SET myid = excluded.myid') + + def test_do_update_string_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.unique_constr.name, + set_=dict(myid=i.excluded.myid) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' + 'DO UPDATE SET myid = excluded.myid') + + def test_do_update_index_elements_where_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=self.goofy_index.expressions, + index_where=self.goofy_index.dialect_options[ + 'postgresql']['where'], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_index_target(self): + i = insert( + self.table1, values=dict(name='foo')) + + unnamed_goofy = Index( + None, self.table1.c.name, + postgresql_where=self.table1.c.name > 'm' + ) + + i = i.on_conflict_do_update( + constraint=unnamed_goofy, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_exclude_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.excl_constr_anon, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_add_whereclause(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.excl_constr_anon, + set_=dict(name=i.excluded.name), + where=( + (self.table1.c.name != 'brah') & + (self.table1.c.description != 'brah')) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + 'DO UPDATE SET name = excluded.name ' + "WHERE name != %(name_1)s " + "AND description != %(description_2)s") + + def test_quote_raw_string_col(self): + t = table('t', column("FancyName"), column("other name")) + + stmt = insert(t).values(FancyName='something new').\ + on_conflict_do_update( + index_elements=['FancyName', 'other name'], + set_=OrderedDict([ + ("FancyName", 'something updated'), + ("other name", "something else") + ]) + ) + + self.assert_compile( + stmt, + 'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) ' + 'ON CONFLICT ("FancyName", "other name") ' + 'DO UPDATE SET "FancyName" = %(param_1)s, ' + '"other name" = %(param_2)s', + {'param_1': 'something updated', + 'param_2': 'something else', 'FancyName': 'something new'} + ) + + class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): """Test 'DISTINCT' with SQL expression language and orm.Query with diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py new file mode 100644 index 000000000..201287d62 --- /dev/null +++ b/test/dialect/postgresql/test_on_conflict.py @@ -0,0 +1,312 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import eq_, assert_raises +from sqlalchemy.testing import fixtures +from sqlalchemy import testing +from sqlalchemy import Table, Column, Integer, String +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import insert + + +class OnConflictTest(fixtures.TablesTest): + + __only_on__ = 'postgresql >= 9.5', + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'users', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(50)) + ) + + users_xtra = Table( + 'users_xtra', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(50)), + Column('login_email', String(50)), + Column('lets_index_this', String(50)) + ) + cls.unique_constraint = schema.UniqueConstraint( + users_xtra.c.login_email, name='uq_login_email') + cls.bogus_index = schema.Index( + 'idx_special_ops', + users_xtra.c.lets_index_this, + postgresql_where=users_xtra.c.lets_index_this > 'm') + + def test_bad_args(self): + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_nothing, + constraint='id', index_elements=['id'] + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update, + constraint='id', index_elements=['id'] + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update, constraint='id' + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update + ) + + def test_on_conflict_do_nothing(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name='name1') + ) + conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name='name2') + ) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + @testing.provide_metadata + def test_on_conflict_do_nothing_target(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute( + insert(users) + .on_conflict_do_nothing( + index_elements=users.primary_key.columns), + dict(id=1, name='name1') + ) + conn.execute( + insert(users) + .on_conflict_do_nothing( + index_elements=users.primary_key.columns), + dict(id=1, name='name2') + ) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + def test_on_conflict_do_update_one(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(name=i.excluded.name)) + conn.execute(i, dict(id=1, name='name1')) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + def test_on_conflict_do_update_two(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(id=i.excluded.id, name=i.excluded.name) + ) + + conn.execute(i, dict(id=1, name='name2')) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name2')] + ) + + def test_on_conflict_do_update_three(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name=i.excluded.name) + ) + conn.execute(i, dict(id=1, name='name3')) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name3')] + ) + + def test_on_conflict_do_update_four(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=i.excluded.id, name=i.excluded.name) + ).values(id=1, name='name4') + + conn.execute(i) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name4')] + ) + + def test_on_conflict_do_update_five(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=10, name="I'm a name") + ).values(id=1, name='name4') + + conn.execute(i) + + eq_( + conn.execute( + users.select().where(users.c.id == 10)).fetchall(), + [(10, "I'm a name")] + ) + + def _exotic_targets_fixture(self, conn): + users = self.tables.users_xtra + + conn.execute( + insert(users), + dict( + id=1, name='name1', + login_email='name1@gmail.com', lets_index_this='not' + ) + ) + conn.execute( + users.insert(), + dict( + id=2, name='name2', + login_email='name2@gmail.com', lets_index_this='not' + ) + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1', 'name1@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_two(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try primary key constraint: cause an upsert on unique id column + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict( + name=i.excluded.name, + login_email=i.excluded.login_email) + ) + conn.execute(i, dict( + id=1, name='name2', login_email='name1@gmail.com', + lets_index_this='not') + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name2', 'name1@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_three(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint: cause an upsert on target + # login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint=self.unique_constraint, + set_=dict(id=i.excluded.id, name=i.excluded.name, + login_email=i.excluded.login_email) + ) + # note: lets_index_this value totally ignored in SET clause. + conn.execute(i, dict( + id=42, name='nameunique', + login_email='name2@gmail.com', lets_index_this='unique') + ) + + eq_( + conn.execute( + users.select(). + where(users.c.login_email == 'name2@gmail.com') + ).fetchall(), + [(42, 'nameunique', 'name2@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_four(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint=self.unique_constraint.name, + set_=dict( + id=i.excluded.id, name=i.excluded.name, + login_email=i.excluded.login_email) + ) + # note: lets_index_this value totally ignored in SET clause. + + conn.execute(i, dict( + id=43, name='nameunique2', + login_email='name2@gmail.com', lets_index_this='unique') + ) + + eq_( + conn.execute( + users.select(). + where(users.c.login_email == 'name2@gmail.com') + ).fetchall(), + [(43, 'nameunique2', 'name2@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_five(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try bogus index + i = insert(users) + i = i.on_conflict_do_update( + index_elements=self.bogus_index.columns, + index_where=self. + bogus_index.dialect_options['postgresql']['where'], + set_=dict( + name=i.excluded.name, + login_email=i.excluded.login_email) + ) + + assert_raises( + exc.ProgrammingError, conn.execute, i, + dict( + id=1, name='namebogus', login_email='bogus@gmail.com', + lets_index_this='bogus') + ) |
