diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-18 17:44:35 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-08-18 17:44:35 -0400 |
| commit | c65ed17bc1f02260f5d13b8a97fb103d682a091b (patch) | |
| tree | c646412a4090e7f1c9033c0daa5c340dfe0d52d4 /lib/sqlalchemy/orm/persistence.py | |
| parent | 8773307257550e86801217f2b77d47047718807a (diff) | |
| parent | 399c03939768d4c8afb29ca1e091b046ea4fc88f (diff) | |
| download | sqlalchemy-c65ed17bc1f02260f5d13b8a97fb103d682a091b.tar.gz | |
Merge branch 'master' into ticket_3100
Conflicts:
lib/sqlalchemy/orm/mapper.py
lib/sqlalchemy/orm/persistence.py
Diffstat (limited to 'lib/sqlalchemy/orm/persistence.py')
| -rw-r--r-- | lib/sqlalchemy/orm/persistence.py | 422 |
1 files changed, 212 insertions, 210 deletions
diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index 782d94dc8..8d3e90cf4 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -132,33 +132,60 @@ def save_obj( save_obj(base_mapper, [state], uowtransaction, single=True) return - states_to_insert, states_to_update = _organize_states_for_save( - base_mapper, - states, - uowtransaction) - + states_to_update = [] + states_to_insert = [] cached_connections = _cached_connection_dict(base_mapper) + for (state, dict_, mapper, connection, + has_identity, row_switch) in _organize_states_for_save( + base_mapper, states, uowtransaction + ): + if has_identity or row_switch: + states_to_update.append( + (state, dict_, mapper, connection, + has_identity, row_switch) + ) + else: + states_to_insert.append( + (state, dict_, mapper, connection, + has_identity, row_switch) + ) + for table, mapper in base_mapper._sorted_tables.items(): - insert = _collect_insert_commands(base_mapper, uowtransaction, - table, states_to_insert) + if table not in mapper._pks_by_table: + continue + insert = ( + (state, state_dict, sub_mapper, connection) + for state, state_dict, sub_mapper, connection, has_identity, + row_switch in states_to_insert + if table in sub_mapper._pks_by_table + ) + insert = _collect_insert_commands(table, insert) - update = _collect_update_commands(base_mapper, uowtransaction, - table, states_to_update) + update = ( + (state, state_dict, sub_mapper, connection, row_switch) + for state, state_dict, sub_mapper, connection, has_identity, + row_switch in states_to_update + if table in sub_mapper._pks_by_table + ) + update = _collect_update_commands(uowtransaction, table, update) - if update: - _emit_update_statements(base_mapper, uowtransaction, - cached_connections, - mapper, table, update) + _emit_update_statements(base_mapper, uowtransaction, + cached_connections, + mapper, table, update) - if insert: - _emit_insert_statements(base_mapper, uowtransaction, - cached_connections, - mapper, table, insert) + _emit_insert_statements(base_mapper, uowtransaction, + cached_connections, + mapper, table, insert) _finalize_insert_update_commands( base_mapper, uowtransaction, - states_to_insert, states_to_update) + ( + (state, state_dict, mapper, connection, has_identity) + for state, state_dict, mapper, connection, has_identity, + row_switch in states_to_insert + states_to_update + ) + ) def post_update(base_mapper, states, uowtransaction, post_update_cols): @@ -168,19 +195,28 @@ def post_update(base_mapper, states, uowtransaction, post_update_cols): """ cached_connections = _cached_connection_dict(base_mapper) - states_to_update = _organize_states_for_post_update( + states_to_update = list(_organize_states_for_post_update( base_mapper, - states, uowtransaction) + states, uowtransaction)) for table, mapper in base_mapper._sorted_tables.items(): + if table not in mapper._pks_by_table: + continue + + update = ( + (state, state_dict, sub_mapper, connection) + for + state, state_dict, sub_mapper, connection in states_to_update + if table in sub_mapper._pks_by_table + ) + update = _collect_post_update_commands(base_mapper, uowtransaction, - table, states_to_update, + table, update, post_update_cols) - if update: - _emit_post_update_statements(base_mapper, uowtransaction, - cached_connections, - mapper, table, update) + _emit_post_update_statements(base_mapper, uowtransaction, + cached_connections, + mapper, table, update) def delete_obj(base_mapper, states, uowtransaction): @@ -193,18 +229,27 @@ def delete_obj(base_mapper, states, uowtransaction): cached_connections = _cached_connection_dict(base_mapper) - states_to_delete = _organize_states_for_delete( + states_to_delete = list(_organize_states_for_delete( base_mapper, states, - uowtransaction) + uowtransaction)) table_to_mapper = base_mapper._sorted_tables for table in reversed(list(table_to_mapper.keys())): - delete = _collect_delete_commands(base_mapper, uowtransaction, - table, states_to_delete) - mapper = table_to_mapper[table] + if table not in mapper._pks_by_table: + continue + + delete = ( + (state, state_dict, sub_mapper, connection) + for state, state_dict, sub_mapper, has_identity, connection + in states_to_delete if table in sub_mapper._pks_by_table + and has_identity + ) + + delete = _collect_delete_commands(base_mapper, uowtransaction, + table, delete) _emit_delete_statements(base_mapper, uowtransaction, cached_connections, mapper, table, delete) @@ -226,10 +271,6 @@ def _organize_states_for_save(base_mapper, states, uowtransaction): """ - states_to_insert = [] - states_to_update = [] - instance_key = None - for state, dict_, mapper, connection in _connections_for_states( base_mapper, uowtransaction, states): @@ -275,18 +316,8 @@ def _organize_states_for_save(base_mapper, states, uowtransaction): uowtransaction.remove_state_actions(existing) row_switch = existing - if not has_identity and not row_switch: - states_to_insert.append( - (state, dict_, mapper, connection, - has_identity, row_switch) - ) - else: - states_to_update.append( - (state, dict_, mapper, connection, - has_identity, row_switch) - ) - - return states_to_insert, states_to_update + yield (state, dict_, mapper, connection, + has_identity, row_switch) def _organize_states_for_post_update(base_mapper, states, @@ -299,8 +330,7 @@ def _organize_states_for_post_update(base_mapper, states, the execution per state. """ - return list(_connections_for_states(base_mapper, uowtransaction, - states)) + return _connections_for_states(base_mapper, uowtransaction, states) def _organize_states_for_delete(base_mapper, states, uowtransaction): @@ -311,52 +341,46 @@ def _organize_states_for_delete(base_mapper, states, uowtransaction): mapper, the connection to use for the execution per state. """ - states_to_delete = [] - for state, dict_, mapper, connection in _connections_for_states( base_mapper, uowtransaction, states): mapper.dispatch.before_delete(mapper, connection, state) - states_to_delete.append((state, dict_, mapper, - bool(state.key), connection)) - return states_to_delete + yield state, dict_, mapper, bool(state.key), connection -def _collect_insert_commands(base_mapper, uowtransaction, table, - states_to_insert): +def _collect_insert_commands(table, states_to_insert): """Identify sets of values to use in INSERT statements for a list of states. """ - insert = [] - for state, state_dict, mapper, connection, has_identity, \ - row_switch in states_to_insert: + for state, state_dict, mapper, connection in states_to_insert: - if table not in mapper._pks_by_table: - continue + # assert table in mapper._pks_by_table params = {} value_params = {} - for col, propkey in mapper._col_to_propkey[table]: - if propkey in state_dict: - value = state_dict[propkey] - if isinstance(value, sql.ClauseElement): - value_params[col.key] = value - elif value is not None or ( - not col.primary_key and - not col.server_default and - not col.default): - params[col.key] = value + + propkey_to_col = mapper._propkey_to_col[table] + + for propkey in set(propkey_to_col).intersection(state_dict): + value = state_dict[propkey] + col = propkey_to_col[propkey] + if value is None: + continue + elif isinstance(value, sql.ClauseElement): + value_params[col.key] = value else: - if not col.server_default \ - and not col.default and not col.primary_key: - params[col.key] = None + params[col.key] = value + + for colkey in mapper._insert_cols_as_none[table].\ + difference(params).difference(value_params): + params[colkey] = None has_all_pks = mapper._pk_keys_by_table[table].issubset(params) - if base_mapper.eager_defaults: + if mapper.base_mapper.eager_defaults: has_all_defaults = mapper._server_default_cols[table].\ issubset(params) else: @@ -368,14 +392,13 @@ def _collect_insert_commands(base_mapper, uowtransaction, table, params[mapper.version_id_col.key] = \ mapper.version_id_generator(None) - insert.append((state, state_dict, params, mapper, - connection, value_params, has_all_pks, - has_all_defaults)) - return insert + yield ( + state, state_dict, params, mapper, + connection, value_params, has_all_pks, + has_all_defaults) -def _collect_update_commands(base_mapper, uowtransaction, - table, states_to_update): +def _collect_update_commands(uowtransaction, table, states_to_update): """Identify sets of values to use in UPDATE statements for a list of states. @@ -387,110 +410,82 @@ def _collect_update_commands(base_mapper, uowtransaction, """ - update = [] - for state, state_dict, mapper, connection, has_identity, \ - row_switch in states_to_update: - if table not in mapper._pks_by_table: - continue + for state, state_dict, mapper, connection, row_switch in states_to_update: + + # assert table in mapper._pks_by_table pks = mapper._pks_by_table[table] params = {} value_params = {} - hasdata = hasnull = False + propkey_to_col = mapper._propkey_to_col[table] - for col in mapper._cols_by_table[table]: - if col is mapper.version_id_col: - params[col._label] = \ - mapper._get_committed_state_attr_by_column( - row_switch or state, - row_switch and row_switch.dict - or state_dict, - col) + for propkey in set(propkey_to_col).intersection(state.committed_state): + value = state_dict[propkey] + col = propkey_to_col[propkey] - prop = mapper._columntoproperty[col] - history = state.manager[prop.key].impl.get_history( - state, state_dict, attributes.PASSIVE_NO_INITIALIZE - ) - if history.added: - params[col.key] = history.added[0] - hasdata = True + if not state.manager[propkey].impl.is_equal( + value, state.committed_state[propkey]): + if isinstance(value, sql.ClauseElement): + value_params[col] = value + else: + params[col.key] = value + + if mapper.version_id_col is not None: + col = mapper.version_id_col + params[col._label] = \ + mapper._get_committed_state_attr_by_column( + row_switch if row_switch else state, + row_switch.dict if row_switch else state_dict, + col) + + if col.key not in params and \ + mapper.version_id_generator is not False: + val = mapper.version_id_generator(params[col._label]) + params[col.key] = val + + if not (params or value_params): + continue + + pk_params = {} + for col in pks: + propkey = mapper._columntoproperty[col].key + history = state.manager[propkey].impl.get_history( + state, state_dict, attributes.PASSIVE_OFF) + + if row_switch and not history.deleted and history.added: + # row switch present. convert a row that thought + # it would be an INSERT into an UPDATE, by removing + # the PK value from the SET clause and instead putting + # it in the WHERE clause. + del params[col.key] + pk_params[col._label] = history.added[0] + elif history.added: + # we're updating the PK value. + assert history.deleted, ( + "New PK value without an old one not " + "possible for an UPDATE") + # check if an UPDATE of the PK value + # has already occurred as a result of ON UPDATE CASCADE. + # If so, use the new value to locate the row. + if ("pk_cascaded", state, col) in uowtransaction.attributes: + pk_params[col._label] = history.added[0] else: - if mapper.version_id_generator is not False: - val = mapper.version_id_generator(params[col._label]) - params[col.key] = val - - # HACK: check for history, in case the - # history is only - # in a different table than the one - # where the version_id_col is. - for prop in mapper._columntoproperty.values(): - history = ( - state.manager[prop.key].impl.get_history( - state, state_dict, - attributes.PASSIVE_NO_INITIALIZE)) - if history.added: - hasdata = True + # else, use the old value to locate the row + pk_params[col._label] = history.deleted[0] else: - prop = mapper._columntoproperty[col] - history = state.manager[prop.key].impl.get_history( - state, state_dict, - attributes.PASSIVE_OFF if col in pks else - attributes.PASSIVE_NO_INITIALIZE) - if history.added: - if isinstance(history.added[0], - sql.ClauseElement): - value_params[col] = history.added[0] - else: - value = history.added[0] - params[col.key] = value - - if col in pks: - if history.deleted and \ - not row_switch: - # if passive_updates and sync detected - # this was a pk->pk sync, use the new - # value to locate the row, since the - # DB would already have set this - if ("pk_cascaded", state, col) in \ - uowtransaction.attributes: - value = history.added[0] - params[col._label] = value - else: - # use the old value to - # locate the row - value = history.deleted[0] - params[col._label] = value - hasdata = True - else: - # row switch logic can reach us here - # remove the pk from the update params - # so the update doesn't - # attempt to include the pk in the - # update statement - del params[col.key] - value = history.added[0] - params[col._label] = value - if value is None: - hasnull = True - else: - hasdata = True - elif col in pks: - value = history.unchanged[0] - if value is None: - hasnull = True - params[col._label] = value + pk_params[col._label] = history.unchanged[0] - if hasdata: - if hasnull: + if params or value_params: + if None in pk_params.values(): raise orm_exc.FlushError( - "Can't update table " - "using NULL for primary " + "Can't update table using NULL for primary " "key value") - update.append((state, state_dict, params, mapper, - connection, value_params)) - return update + params.update(pk_params) + yield ( + state, state_dict, params, mapper, + connection, value_params) def _collect_post_update_commands(base_mapper, uowtransaction, table, @@ -500,10 +495,10 @@ def _collect_post_update_commands(base_mapper, uowtransaction, table, """ - update = [] for state, state_dict, mapper, connection in states_to_update: - if table not in mapper._pks_by_table: - continue + + # assert table in mapper._pks_by_table + pks = mapper._pks_by_table[table] params = {} hasdata = False @@ -525,9 +520,7 @@ def _collect_post_update_commands(base_mapper, uowtransaction, table, params[col.key] = value hasdata = True if hasdata: - update.append((state, state_dict, params, mapper, - connection)) - return update + yield params, connection def _collect_delete_commands(base_mapper, uowtransaction, table, @@ -535,15 +528,11 @@ def _collect_delete_commands(base_mapper, uowtransaction, table, """Identify values to use in DELETE statements for a list of states to be deleted.""" - delete = util.defaultdict(list) + for state, state_dict, mapper, connection in states_to_delete: - for state, state_dict, mapper, has_identity, connection \ - in states_to_delete: - if not has_identity or table not in mapper._pks_by_table: - continue + # assert table in mapper._pks_by_table params = {} - delete[connection].append(params) for col in mapper._pks_by_table[table]: params[col.key] = \ value = \ @@ -561,7 +550,7 @@ def _collect_delete_commands(base_mapper, uowtransaction, table, mapper._get_committed_state_attr_by_column( state, state_dict, mapper.version_id_col) - return delete + yield params, connection def _emit_update_statements(base_mapper, uowtransaction, @@ -602,8 +591,7 @@ def _emit_update_statements(base_mapper, uowtransaction, lambda rec: ( rec[4], tuple(sorted(rec[2])), - bool(rec[5])) - ): + bool(rec[5]))): rows = 0 records = list(records) @@ -625,12 +613,29 @@ def _emit_update_statements(base_mapper, uowtransaction, value_params) rows += c.rowcount else: - multiparams = [rec[2] for rec in records] - c = cached_connections[connection].\ - execute(statement, multiparams) + if needs_version_id and \ + not connection.dialect.supports_sane_multi_rowcount and \ + connection.dialect.supports_sane_rowcount: + for state, state_dict, params, mapper, \ + connection, value_params in records: + c = cached_connections[connection].\ + execute(statement, params) + _postfetch( + mapper, + uowtransaction, + table, + state, + state_dict, + c, + c.context.compiled_parameters[0], + value_params) + rows += c.rowcount + else: + multiparams = [rec[2] for rec in records] + c = cached_connections[connection].\ + execute(statement, multiparams) - rows += c.rowcount - if bookkeeping: + rows += c.rowcount for state, state_dict, params, mapper, \ connection, value_params in records: _postfetch( @@ -726,13 +731,7 @@ def _emit_insert_statements(base_mapper, uowtransaction, mapper._pks_by_table[table]): prop = mapper_rec._columntoproperty[col] if state_dict.get(prop.key) is None: - # TODO: would rather say: - # state_dict[prop.key] = pk - mapper_rec._set_state_attr_by_column( - state, - state_dict, - col, pk) - + state_dict[prop.key] = pk _postfetch( mapper_rec, uowtransaction, @@ -765,11 +764,10 @@ def _emit_post_update_statements(base_mapper, uowtransaction, # also group them into common (connection, cols) sets # to support executemany(). for key, grouper in groupby( - update, lambda rec: (rec[4], list(rec[2].keys())) + update, lambda rec: (rec[1], sorted(rec[0])) ): connection = key[0] - multiparams = [params for state, state_dict, - params, mapper, conn in grouper] + multiparams = [params for params, conn in grouper] cached_connections[connection].\ execute(statement, multiparams) @@ -799,8 +797,15 @@ def _emit_delete_statements(base_mapper, uowtransaction, cached_connections, return table.delete(clause) - for connection, del_objects in delete.items(): - statement = base_mapper._memo(('delete', table), delete_stmt) + statement = base_mapper._memo(('delete', table), delete_stmt) + for connection, recs in groupby( + delete, + lambda rec: rec[1] + ): + del_objects = [ + params + for params, connection in recs + ] connection = cached_connections[connection] @@ -853,15 +858,12 @@ def _emit_delete_statements(base_mapper, uowtransaction, cached_connections, ) -def _finalize_insert_update_commands(base_mapper, uowtransaction, - states_to_insert, states_to_update): +def _finalize_insert_update_commands(base_mapper, uowtransaction, states): """finalize state on states that have been inserted or updated, including calling after_insert/after_update events. """ - for state, state_dict, mapper, connection, has_identity, \ - row_switch in states_to_insert + \ - states_to_update: + for state, state_dict, mapper, connection, has_identity in states: if mapper._readonly_props: readonly = state.unmodified_intersection( @@ -917,11 +919,11 @@ def _postfetch(mapper, uowtransaction, table, for col in returning_cols: if col.primary_key: continue - mapper._set_state_attr_by_column(state, dict_, col, row[col]) + dict_[mapper._columntoproperty[col].key] = row[col] for c in prefetch_cols: if c.key in params and c in mapper._columntoproperty: - mapper._set_state_attr_by_column(state, dict_, c, params[c.key]) + dict_[mapper._columntoproperty[c].key] = params[c.key] if postfetch_cols: state._expire_attributes(state.dict, |
