diff options
author | unknown <knielsen@knielsen-hq.org> | 2011-04-08 09:39:33 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2011-04-08 09:39:33 +0200 |
commit | 64e43e1cc88c66bd89f95a5307020bcf60ba4b96 (patch) | |
tree | 1ae6299d5a75f92fa97b240b0670c81c9a301afe /sql | |
parent | 8b427b7d5548aeb214c70b004cc9056b9f7a6f7c (diff) | |
parent | 86008e0ca2b864f1ab7a30e496d7c67c8fce06c2 (diff) | |
download | mariadb-git-64e43e1cc88c66bd89f95a5307020bcf60ba4b96.tar.gz |
Merge various replication-related patches into MariaDB 5.3:
- MWL#116 Group commit
- MWL#136 Enhancements for START TRANSACTION WITH CONSISTENT SNAPSHOT
- MWL#47 Annotate_rows_log_event
- MWL#163 innodb_release_locks_early
- Percona patch enhancing row-based replication for tables with no primary key
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 209 | ||||
-rw-r--r-- | sql/handler.h | 90 | ||||
-rw-r--r-- | sql/log.cc | 1278 | ||||
-rw-r--r-- | sql/log.h | 140 | ||||
-rw-r--r-- | sql/log_event.cc | 293 | ||||
-rw-r--r-- | sql/log_event.h | 65 | ||||
-rw-r--r-- | sql/mysql_priv.h | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 18 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.h | 38 | ||||
-rw-r--r-- | sql/set_var.cc | 3 | ||||
-rw-r--r-- | sql/slave.cc | 63 | ||||
-rw-r--r-- | sql/slave.h | 1 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 23 | ||||
-rw-r--r-- | sql/sql_class.h | 22 | ||||
-rw-r--r-- | sql/sql_insert.cc | 5 | ||||
-rw-r--r-- | sql/sql_parse.cc | 4 | ||||
-rw-r--r-- | sql/sql_repl.cc | 75 |
19 files changed, 1891 insertions, 446 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index c5a870e77ad..df5acb98efe 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -79,6 +79,8 @@ TYPELIB tx_isolation_typelib= {array_elements(tx_isolation_names)-1,"", static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; +static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, + bool is_real_trans); static plugin_ref ha_default_plugin(THD *thd) @@ -1077,7 +1079,7 @@ ha_check_and_coalesce_trx_read_only(THD *thd, Ha_trx_info *ha_list, */ int ha_commit_trans(THD *thd, bool all) { - int error= 0, cookie= 0; + int error= 0, cookie; /* 'all' means that this is either an explicit commit issued by user, or an implicit commit issued by a DDL. @@ -1092,7 +1094,8 @@ int ha_commit_trans(THD *thd, bool all) */ bool is_real_trans= all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + bool need_prepare_ordered, need_commit_ordered; + my_xid xid; DBUG_ENTER("ha_commit_trans"); /* Just a random warning to test warnings pushed during autocommit. */ @@ -1131,89 +1134,114 @@ int ha_commit_trans(THD *thd, bool all) DBUG_RETURN(2); } #ifdef USING_TRANSACTIONS - if (ha_info) + if (!ha_info) { - uint rw_ha_count; - bool rw_trans; + /* Free resources and perform other cleanup even for 'empty' transactions. */ + if (is_real_trans) + thd->transaction.cleanup(); + DBUG_RETURN(0); + } - DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE();); + DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE();); - /* Close all cursors that can not survive COMMIT */ - if (is_real_trans) /* not a statement commit */ - thd->stmt_map.close_transient_cursors(); + /* Close all cursors that can not survive COMMIT */ + if (is_real_trans) /* not a statement commit */ + thd->stmt_map.close_transient_cursors(); - rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); - /* rw_trans is TRUE when we in a transaction changing data */ - rw_trans= is_real_trans && (rw_ha_count > 0); + uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all); + /* rw_trans is TRUE when we in a transaction changing data */ + bool rw_trans= is_real_trans && (rw_ha_count > 0); - if (rw_trans && - wait_if_global_read_lock(thd, 0, 0)) - { - ha_rollback_trans(thd, all); - DBUG_RETURN(1); - } + if (rw_trans && + wait_if_global_read_lock(thd, 0, 0)) + { + ha_rollback_trans(thd, all); + DBUG_RETURN(1); + } - if (rw_trans && - opt_readonly && - !(thd->security_ctx->master_access & SUPER_ACL) && - !thd->slave_thread) - { - my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); - ha_rollback_trans(thd, all); - error= 1; - goto end; - } + if (rw_trans && + opt_readonly && + !(thd->security_ctx->master_access & SUPER_ACL) && + !thd->slave_thread) + { + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); + goto err; + } - if (!trans->no_2pc && (rw_ha_count > 1)) - { - for (; ha_info && !error; ha_info= ha_info->next()) - { - int err; - handlerton *ht= ha_info->ht(); - /* - Do not call two-phase commit if this particular - transaction is read-only. This allows for simpler - implementation in engines that are always read-only. - */ - if (! ha_info->is_trx_read_write()) - continue; - /* - Sic: we know that prepare() is not NULL since otherwise - trans->no_2pc would have been set. - */ - if ((err= ht->prepare(ht, thd, all))) - { - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - error= 1; - } - status_var_increment(thd->status_var.ha_prepare_count); - } - DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); - if (error || (is_real_trans && xid && - (error= !(cookie= tc_log->log_xid(thd, xid))))) - { - ha_rollback_trans(thd, all); - error= 1; - goto end; - } - DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); - } - error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0; - DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); - if (cookie) - if(tc_log->unlog(cookie, xid)) - { - error= 2; - goto end; - } + if (trans->no_2pc || (rw_ha_count <= 1)) + { + error= ha_commit_one_phase(thd, all); DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); -end: - if (rw_trans) - start_waiting_global_read_lock(thd); + goto end; } - /* Free resources and perform other cleanup even for 'empty' transactions. */ - else if (is_real_trans) - thd->transaction.cleanup(); + + need_prepare_ordered= FALSE; + need_commit_ordered= FALSE; + xid= thd->transaction.xid_state.xid.get_my_xid(); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) + { + int err; + handlerton *ht= hi->ht(); + /* + Do not call two-phase commit if this particular + transaction is read-only. This allows for simpler + implementation in engines that are always read-only. + */ + if (! hi->is_trx_read_write()) + continue; + /* + Sic: we know that prepare() is not NULL since otherwise + trans->no_2pc would have been set. + */ + err= ht->prepare(ht, thd, all); + status_var_increment(thd->status_var.ha_prepare_count); + if (err) + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + + if (err) + goto err; + + need_prepare_ordered|= (ht->prepare_ordered != NULL); + need_commit_ordered|= (ht->commit_ordered != NULL); + } + DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); + + if (!is_real_trans) + { + error= commit_one_phase_2(thd, all, trans, is_real_trans); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); + goto end; + } + + cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered, + need_commit_ordered); + if (!cookie) + goto err; + + DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); + + error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; + DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); + + DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); + if (tc_log->unlog(cookie, xid)) + { + error= 2; /* Error during commit */ + goto end; + } + + DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); + goto end; + + /* Come here if error and we need to rollback. */ +err: + error= 1; /* Transaction was rolled back */ + ha_rollback_trans(thd, all); + +end: + if (rw_trans) + start_waiting_global_read_lock(thd); #endif /* USING_TRANSACTIONS */ DBUG_RETURN(error); } @@ -1224,7 +1252,6 @@ end: */ int ha_commit_one_phase(THD *thd, bool all) { - int error=0; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; /* "real" is a nick name for a transaction for which a commit will @@ -1234,8 +1261,16 @@ int ha_commit_one_phase(THD *thd, bool all) enclosing 'all' transaction is rolled back. */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; - Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; DBUG_ENTER("ha_commit_one_phase"); + DBUG_RETURN(commit_one_phase_2(thd, all, trans, is_real_trans)); +} + +static int +commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) +{ + int error= 0; + Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; + DBUG_ENTER("commit_one_phase_2"); #ifdef USING_TRANSACTIONS if (ha_info) { @@ -1852,7 +1887,16 @@ int ha_start_consistent_snapshot(THD *thd) { bool warn= true; + /* + Holding the LOCK_commit_ordered mutex ensures that we get the same + snapshot for all engines (including the binary log). This allows us + among other things to do backups with + START TRANSACTION WITH CONSISTENT SNAPSHOT and + have a consistent binlog position. + */ + pthread_mutex_lock(&LOCK_commit_ordered); plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn); + pthread_mutex_unlock(&LOCK_commit_ordered); /* Same idea as when one wants to CREATE TABLE in one engine which does not @@ -4622,7 +4666,8 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) /** @brief Write table maps for all (manually or automatically) locked tables - to the binary log. + to the binary log. Also, if binlog_annotate_rows_events is ON, + write Annotate_rows event before the first table map. SYNOPSIS write_locked_table_maps() @@ -4659,6 +4704,9 @@ static int write_locked_table_maps(THD *thd) locks[0]= thd->extra_lock; locks[1]= thd->lock; locks[2]= thd->locked_tables; + my_bool with_annotate= thd->variables.binlog_annotate_rows_events && + thd->query() && thd->query_length(); + for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i ) { MYSQL_LOCK const *const lock= locks[i]; @@ -4676,7 +4724,8 @@ static int write_locked_table_maps(THD *thd) check_table_binlog_row_based(thd, table)) { int const has_trans= table->file->has_transactions(); - int const error= thd->binlog_write_table_map(table, has_trans); + int const error= thd->binlog_write_table_map(table, has_trans, + &with_annotate); /* If an error occurs, it is the responsibility of the caller to roll back the transaction. diff --git a/sql/handler.h b/sql/handler.h index 12413d6238a..02b96ebefb8 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -771,9 +771,97 @@ struct handlerton NOTE 'all' is also false in auto-commit mode where 'end of statement' and 'real commit' mean the same event. */ - int (*commit)(handlerton *hton, THD *thd, bool all); + int (*commit)(handlerton *hton, THD *thd, bool all); + /* + The commit_ordered() method is called prior to the commit() method, after + the transaction manager has decided to commit (not rollback) the + transaction. Unlike commit(), commit_ordered() is called only when the + full transaction is committed, not for each commit of statement + transaction in a multi-statement transaction. + + Not that like prepare(), commit_ordered() is only called when 2-phase + commit takes place. Ie. when no binary log and only a single engine + participates in a transaction, one commit() is called, no + commit_ordered(). So engines must be prepared for this. + + The calls to commit_ordered() in multiple parallel transactions is + guaranteed to happen in the same order in every participating + handler. This can be used to ensure the same commit order among multiple + handlers (eg. in table handler and binlog). So if transaction T1 calls + into commit_ordered() of handler A before T2, then T1 will also call + commit_ordered() of handler B before T2. + + Engines that implement this method should during this call make the + transaction visible to other transactions, thereby making the order of + transaction commits be defined by the order of commit_ordered() calls. + + The intention is that commit_ordered() should do the minimal amount of + work that needs to happen in consistent commit order among handlers. To + preserve ordering, calls need to be serialised on a global mutex, so + doing any time-consuming or blocking operations in commit_ordered() will + limit scalability. + + Handlers can rely on commit_ordered() calls to be serialised (no two + calls can run in parallel, so no extra locking on the handler part is + required to ensure this). + + Note that commit_ordered() can be called from a different thread than the + one handling the transaction! So it can not do anything that depends on + thread local storage, in particular it can not call my_error() and + friends (instead it can store the error code and delay the call of + my_error() to the commit() method). + + Similarly, since commit_ordered() returns void, any return error code + must be saved and returned from the commit() method instead. + + The commit_ordered method is optional, and can be left unset if not + needed in a particular handler (then there will be no ordering guarantees + wrt. other engines and binary log). + */ + void (*commit_ordered)(handlerton *hton, THD *thd, bool all); int (*rollback)(handlerton *hton, THD *thd, bool all); int (*prepare)(handlerton *hton, THD *thd, bool all); + /* + The prepare_ordered method is optional. If set, it will be called after + successful prepare() in all handlers participating in 2-phase + commit. Like commit_ordered(), it is called only when the full + transaction is committed, not for each commit of statement transaction. + + The calls to prepare_ordered() among multiple parallel transactions are + ordered consistently with calls to commit_ordered(). This means that + calls to prepare_ordered() effectively define the commit order, and that + each handler will see the same sequence of transactions calling into + prepare_ordered() and commit_ordered(). + + Thus, prepare_ordered() can be used to define commit order for handlers + that need to do this in the prepare step (like binlog). It can also be + used to release transaction's locks early in an order consistent with the + order transactions will be eventually committed. + + Like commit_ordered(), prepare_ordered() calls are serialised to maintain + ordering, so the intention is that they should execute fast, with only + the minimal amount of work needed to define commit order. Handlers can + rely on this serialisation, and do not need to do any extra locking to + avoid two prepare_ordered() calls running in parallel. + + Like commit_ordered(), prepare_ordered() is not guaranteed to be called + in the context of the thread handling the rest of the transaction. So it + cannot invoke code that relies on thread local storage, in particular it + cannot call my_error(). + + prepare_ordered() cannot cause a rollback by returning an error, all + possible errors must be handled in prepare() (the prepare_ordered() + method returns void). In case of some fatal error, a record of the error + must be made internally by the engine and returned from commit() later. + + Note that for user-level XA SQL commands, no consistent ordering among + prepare_ordered() and commit_ordered() is guaranteed (as that would + require blocking all other commits for an indefinite time). + + When 2-phase commit is not used (eg. only one engine (and no binlog) in + transaction), neither prepare() nor prepare_ordered() is called. + */ + void (*prepare_ordered)(handlerton *hton, THD *thd, bool all); int (*recover)(handlerton *hton, XID *xid_list, uint len); int (*commit_by_xid)(handlerton *hton, XID *xid); int (*rollback_by_xid)(handlerton *hton, XID *xid); diff --git a/sql/log.cc b/sql/log.cc index 5213bdcc937..53c9843ebbc 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -38,6 +38,7 @@ #endif #include <mysql/plugin.h> +#include "debug_sync.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -61,6 +62,38 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv); static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); + +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + +static my_bool opt_optimize_thread_scheduling= TRUE; +#ifndef DBUG_OFF +static ulong opt_binlog_dbug_fsync_sleep= 0; +#endif + +static my_bool mutexes_inited; +pthread_mutex_t LOCK_prepare_ordered; +pthread_mutex_t LOCK_commit_ordered; + +static ulonglong binlog_status_var_num_commits; +static ulonglong binlog_status_var_num_group_commits; +static char binlog_snapshot_file[FN_REFLEN]; +static ulonglong binlog_snapshot_position; + +static SHOW_VAR binlog_status_vars_detail[]= +{ + {"commits", + (char *)&binlog_status_var_num_commits, SHOW_LONGLONG}, + {"group_commits", + (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG}, + {"snapshot_file", + (char *)&binlog_snapshot_file, SHOW_CHAR}, + {"snapshot_position", + (char *)&binlog_snapshot_position, SHOW_LONGLONG}, + {NullS, NullS, SHOW_LONG} +}; + /** Silence all errors and warnings reported when performing a write @@ -134,50 +167,17 @@ char *make_once_alloced_filename(const char *basename, const char *ext) /* - Helper class to hold a mutex for the duration of the - block. - - Eliminates the need for explicit unlocking of mutexes on, e.g., - error returns. On passing a null pointer, the sentry will not do - anything. - */ -class Mutex_sentry -{ -public: - Mutex_sentry(pthread_mutex_t *mutex) - : m_mutex(mutex) - { - if (m_mutex) - pthread_mutex_lock(mutex); - } - - ~Mutex_sentry() - { - if (m_mutex) - pthread_mutex_unlock(m_mutex); -#ifndef DBUG_OFF - m_mutex= 0; -#endif - } - -private: - pthread_mutex_t *m_mutex; - - // It's not allowed to copy this object in any way - Mutex_sentry(Mutex_sentry const&); - void operator=(Mutex_sentry const&); -}; - -/* Helper class to store binary log transaction data. */ class binlog_trx_data { public: binlog_trx_data() : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0), - before_stmt_pos(MY_OFF_T_UNDEF) + before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0), + using_xa(FALSE), xa_xid(0) { trans_log.end_of_file= max_binlog_cache_size; + last_commit_pos_file[0]= 0; } ~binlog_trx_data() @@ -229,11 +229,14 @@ public: completely. */ void reset() { - if (!empty()) + if (trans_log.type != WRITE_CACHE || !empty()) truncate(0); before_stmt_pos= MY_OFF_T_UNDEF; incident= FALSE; trans_log.end_of_file= max_binlog_cache_size; + using_xa= FALSE; + last_commit_pos_file[0]= 0; + last_commit_pos_offset= 0; DBUG_ASSERT(empty()); } @@ -278,6 +281,22 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; + /* + Binlog position for current transaction. + For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog + position corresponding to the snapshot taken. During (and after) commit, + this is set to the binlog position corresponding to just after the + commit (so storage engines can store it in their transaction log). + */ + char last_commit_pos_file[FN_REFLEN]; + my_off_t last_commit_pos_offset; + + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + my_xid xa_xid; }; handlerton *binlog_hton; @@ -1421,6 +1440,7 @@ int binlog_init(void *p) binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; binlog_hton->prepare= binlog_prepare; + binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; } @@ -1437,91 +1457,118 @@ static int binlog_close_connection(handlerton *hton, THD *thd) } /* - End a transaction. + End a transaction, writing events to the binary log. SYNOPSIS - binlog_end_trans() + binlog_flush_trx_cache() thd The thread whose transaction should be ended trx_data Pointer to the transaction data to use - end_ev The end event to use, or NULL all True if the entire transaction should be ended, false if only the statement transaction should be ended. + end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) DESCRIPTION End the currently open transaction. The transaction can be either - a real transaction (if 'all' is true) or a statement transaction - (if 'all' is false). + a real transaction or a statement transaction. - If 'end_ev' is NULL, the transaction is a rollback of only - transactional tables, so the transaction cache will be truncated - to either just before the last opened statement transaction (if - 'all' is false), or reset completely (if 'all' is true). + This can be to commit a transaction, with a COMMIT query event or an XA + commit XID event. But it can also be to rollback a transaction with a + ROLLBACK query event, used for rolling back transactions which also + contain updates to non-transactional tables. */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev, bool all) +binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all) { - DBUG_ENTER("binlog_end_trans"); - int error=0; + DBUG_ENTER("binlog_flush_trx_cache"); IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx", - all ? "all" : "stmt", (long) end_ev)); DBUG_PRINT("info", ("thd->options={ %s%s}", FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->options, OPTION_BEGIN))); + if (thd->binlog_flush_pending_rows_event(TRUE)) + DBUG_RETURN(1); + /* - NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of - only transactional tables. If the transaction contain changes to - any non-transactiona tables, we need write the transaction and log - a ROLLBACK last. - */ - if (end_ev != NULL) - { - if (thd->binlog_flush_pending_rows_event(TRUE)) - DBUG_RETURN(1); - /* - Doing a commit or a rollback including non-transactional tables, - i.e., ending a transaction where we might write the transaction - cache to the binary log. - - We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they - were, we would have to ensure that we're not ending a statement - inside a stored function. - */ - error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev, - trx_data->has_incident()); - trx_data->reset(); + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + + We can always end the statement when ending a transaction since + transactions are not allowed inside stored functions. If they + were, we would have to ensure that we're not ending a statement + inside a stored function. + */ + int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, + end_ev, all); - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - } - else + trx_data->reset(); + + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) { - /* - If rolling back an entire transaction or a single statement not - inside a transaction, we reset the transaction cache. + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } - If rolling back a statement in a transaction, we truncate the - transaction cache to remove the statement. - */ - thd->binlog_remove_pending_rows_event(TRUE); - if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) - { - if (trx_data->has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); - trx_data->reset(); - } - else // ...statement - trx_data->truncate(trx_data->before_stmt_pos); + DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); + DBUG_RETURN(error); +} + +/* + Discard a transaction, ie. ROLLBACK with only transactional table updates. + + SYNOPSIS + binlog_truncate_trx_cache() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + + DESCRIPTION + + Rollback (and end) a transaction that only modifies transactional + tables. The transaction can be either a real transaction (if 'all' is + true) or a statement transaction (if 'all' is false). + + The transaction cache will be truncated to either just before the last + opened statement transaction (if 'all' is false), or reset completely (if + 'all' is true). + */ +static int +binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all) +{ + DBUG_ENTER("binlog_truncate_trx_cache"); + int error= 0; + DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt")); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); + + /* + ROLLBACK with nothing to replicate: i.e., rollback of only transactional + tables. + */ + + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. + + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + */ + thd->binlog_remove_pending_rows_event(TRUE); + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + if (trx_data->has_incident()) + error= mysql_bin_log.write_incident(thd); + trx_data->reset(); } + else // ...statement + trx_data->truncate(trx_data->before_stmt_pos); DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL); DBUG_RETURN(error); @@ -1533,7 +1580,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) do nothing. just pretend we can do 2pc, so that MySQL won't switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_xid() + real work will be done in MYSQL_BIN_LOG::log_and_order() */ return 0; } @@ -1584,8 +1631,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) (trans_has_no_stmt_committed(thd, all) && !stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd))) { - Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0; @@ -1649,7 +1696,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) (thd->options & OPTION_KEEP_LOG)) && mysql_bin_log.check_write_error(thd)) trx_data->set_incident(); - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } else { @@ -1668,8 +1715,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) stmt_has_updated_non_trans_table(thd) && thd->current_stmt_binlog_row_based)) { - Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); - error= binlog_end_trans(thd, trx_data, &qev, all); + Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0); + error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all); } /* Otherwise, we simply truncate the cache as there is no change on @@ -1677,7 +1724,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ else if (ending_trans(thd, all) || (!(thd->options & OPTION_KEEP_LOG) && !stmt_has_updated_non_trans_table(thd))) - error= binlog_end_trans(thd, trx_data, 0, all); + error= binlog_truncate_trx_cache(thd, trx_data, all); } if (!all) trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback @@ -2518,6 +2565,8 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), + group_commit_queue(0), group_commit_queue_busy(FALSE), + num_commits(0), num_group_commits(0), is_relay_log(0), description_event_for_exec(0), description_event_for_queue(0) { @@ -2574,6 +2623,7 @@ void MYSQL_BIN_LOG::init_pthread_objects() (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index", MYF_NO_DEADLOCK_DETECTION); (void) pthread_cond_init(&update_cond, 0); + (void) pthread_cond_init(&COND_queue_busy, 0); } @@ -2795,6 +2845,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (flush_io_cache(&log_file) || my_sync(log_file.file, MYF(MY_WME))) goto err; + pthread_mutex_lock(&LOCK_commit_ordered); + strmake(last_commit_pos_file, log_file_name, + sizeof(last_commit_pos_file)-1); + last_commit_pos_offset= my_b_tell(&log_file); + pthread_mutex_unlock(&LOCK_commit_ordered); if (write_file_name_to_index_file) { @@ -4078,6 +4133,10 @@ bool MYSQL_BIN_LOG::flush_and_sync() { sync_binlog_counter= 0; err=my_sync(fd, MYF(MY_WME)); +#ifndef DBUG_OFF + if (opt_binlog_dbug_fsync_sleep > 0) + my_sleep(opt_binlog_dbug_fsync_sleep); +#endif } return err; } @@ -4269,12 +4328,33 @@ void THD::binlog_set_stmt_begin() { trx_data->before_stmt_pos= pos; } +static int +binlog_start_consistent_snapshot(handlerton *hton, THD *thd) +{ + int err= 0; + binlog_trx_data *trx_data; + DBUG_ENTER("binlog_start_consistent_snapshot"); + + thd->binlog_setup_trx_data(); + trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + + /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */ + strmake(trx_data->last_commit_pos_file, mysql_bin_log.last_commit_pos_file, + sizeof(trx_data->last_commit_pos_file)-1); + trx_data->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; + + trans_register_ha(thd, TRUE, hton); + + DBUG_RETURN(err); +} /* - Write a table map to the binary log. + Write a table map to the binary log. If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the table map. */ -int THD::binlog_write_table_map(TABLE *table, bool is_trans) +int THD::binlog_write_table_map(TABLE *table, bool is_trans, + my_bool *with_annotate) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -4292,7 +4372,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) if (is_trans && binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event))) + if ((error= mysql_bin_log.write(&the_event, with_annotate))) DBUG_RETURN(error); binlog_table_maps++; @@ -4376,44 +4456,48 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= trx_data->pending()) { - IO_CACHE *file= &log_file; - /* Decide if we should write to the log file directly or to the transaction log. */ if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log)) - file= &trx_data->trans_log; - - /* - If we are not writing to the log file directly, we could avoid - locking the log. - */ - pthread_mutex_lock(&LOCK_log); - - /* - Write pending event to log file or transaction cache - */ - if (pending->write(file)) { - pthread_mutex_unlock(&LOCK_log); - set_write_error(thd); - DBUG_RETURN(1); + /* Write to transaction log/cache. */ + if (pending->write(&trx_data->trans_log)) + { + set_write_error(thd); + DBUG_RETURN(1); + } } - - delete pending; - - if (file == &log_file) + else { + /* Write directly to log file. */ + pthread_mutex_lock(&LOCK_log); + if (pending->write(&log_file)) + { + pthread_mutex_unlock(&LOCK_log); + set_write_error(thd); + DBUG_RETURN(1); + } + error= flush_and_sync(); if (!error) { signal_update(); error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + pthread_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= my_b_tell(&log_file); + pthread_mutex_unlock(&LOCK_commit_ordered); + pthread_mutex_unlock(&LOCK_log); } - pthread_mutex_unlock(&LOCK_log); + delete pending; } thd->binlog_set_pending_rows_event(event); @@ -4422,10 +4506,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, } /** - Write an event to the binary log. + Write an event to the binary log. If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the event + (this should happen only if the event is a Table_map). */ -bool MYSQL_BIN_LOG::write(Log_event *event_info) +bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { THD *thd= event_info->thd; bool error= 1; @@ -4443,11 +4529,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } /* - Flush the pending rows event to the transaction cache or to the - log file. Since this function potentially aquire the LOCK_log - mutex, we do this before aquiring the LOCK_log mutex in this - function. - We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since this will close all tables on the slave. @@ -4457,8 +4538,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) if (thd->binlog_flush_pending_rows_event(end_stmt)) DBUG_RETURN(error); - pthread_mutex_lock(&LOCK_log); - /* In most cases this is only called if 'is_open()' is true; in fact this is mostly called if is_open() *was* true a few instructions before, but it @@ -4480,7 +4559,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->lex->sql_command != SQLCOM_SAVEPOINT && !binlog_filter->db_ok(local_db))) { - VOID(pthread_mutex_unlock(&LOCK_log)); DBUG_RETURN(0); } #endif /* HAVE_REPLICATION */ @@ -4524,15 +4602,23 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->binlog_start_trans_and_stmt(); file= trans_log; } - /* - TODO as Mats suggested, for all the cases above where we write to - trans_log, it sounds unnecessary to lock LOCK_log. We should rather - test first if we want to write to trans_log, and if not, lock - LOCK_log. - */ } #endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); + if (file == &log_file) + { + pthread_mutex_lock(&LOCK_log); + /* + We did not want to take LOCK_log unless really necessary. + However, now that we hold LOCK_log, we must check is_open() again, lest + the log was closed just before. + */ + if (unlikely(!is_open())) + { + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); + } + } /* No check for auto events flag here - this write method should @@ -4544,6 +4630,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) of the SQL command */ + if (with_annotate && *with_annotate) + { + DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); + Annotate_rows_log_event anno(thd); + /* Annotate event should be written not more than once */ + *with_annotate= 0; + if (anno.write(file)) + goto err; + } + /* If row-based binlogging, Insert_id, Rand and other kind of "setting context" events are not needed. @@ -4556,7 +4652,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) { @@ -4567,13 +4663,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) thd->auto_inc_intervals_in_cur_stmt_for_binlog. minimum()); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2); if (e.write(file)) - goto err; + goto err_unlock; } if (thd->user_var_events.elements) { @@ -4588,7 +4684,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) user_var_event->type, user_var_event->charset_number); if (e.write(file)) - goto err; + goto err_unlock; } } } @@ -4597,7 +4693,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) /* Write the SQL command */ if (event_info->write(file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) - goto err; + goto err_unlock; if (file == &log_file) // we are writing to the real log (disk) { @@ -4605,20 +4701,33 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) status_var_add(thd->status_var.binlog_bytes_written, data_written); if (flush_and_sync()) - goto err; + goto err_unlock; signal_update(); if ((error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))) - goto err; + goto err_unlock; } error=0; +err_unlock: + if (file == &log_file) + { + my_off_t offset= my_b_tell(&log_file); + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + pthread_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + pthread_mutex_unlock(&LOCK_commit_ordered); + pthread_mutex_unlock(&LOCK_log); + } + err: if (error) set_write_error(thd); } - pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); } @@ -4723,7 +4832,7 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags) We give it a shot and try to write an incident event anyway to the current log. */ - if (!write_incident(current_thd, FALSE)) + if (!write_incident_already_locked(current_thd)) flush_and_sync(); #ifdef HAVE_REPLICATION @@ -4764,19 +4873,15 @@ uint MYSQL_BIN_LOG::next_file_id() write_cache() thd Current_thread cache Cache to write to the binary log - lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and sync:ed DESCRIPTION Write the contents of the cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. */ -int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, - bool sync_log) +int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { - Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); - + safe_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; @@ -4888,6 +4993,8 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, } /* Write data to the binary log file */ + DBUG_EXECUTE_IF("fail_binlog_write_1", + errno= 28; return ER_ERROR_ON_WRITE;); if (my_b_write(&log_file, cache->read_pos, length)) return ER_ERROR_ON_WRITE; status_var_add(thd->status_var.binlog_bytes_written, length); @@ -4897,9 +5004,6 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, DBUG_ASSERT(carry == 0); - if (sync_log) - flush_and_sync(); - return 0; // All OK } @@ -4932,31 +5036,50 @@ int query_error_code(THD *thd, bool not_killed) return error; } -bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) + +bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) { uint error= 0; - DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - - if (!is_open()) - DBUG_RETURN(error); - - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; + DBUG_ENTER("MYSQL_BIN_LOG::write_incident_already_locked"); Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); - if (lock) - pthread_mutex_lock(&LOCK_log); - error= ev.write(&log_file); - status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); - if (lock) + + if (likely(is_open())) + { + error= ev.write(&log_file); + status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); + } + + DBUG_RETURN(error); +} + + +bool MYSQL_BIN_LOG::write_incident(THD *thd) +{ + uint error= 0; + my_off_t offset; + DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); + + pthread_mutex_lock(&LOCK_log); + if (likely(is_open())) { - if (!error && !(error= flush_and_sync())) + if (!(error= write_incident_already_locked(thd)) && + !(error= flush_and_sync())) { signal_update(); error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - pthread_mutex_unlock(&LOCK_log); + offset= my_b_tell(&log_file); + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + pthread_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + pthread_mutex_unlock(&LOCK_commit_ordered); } + pthread_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } @@ -4984,111 +5107,367 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, - bool incident) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all) +{ + group_commit_entry entry; + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + entry.thd= thd; + entry.trx_data= trx_data; + entry.error= 0; + entry.all= all; + + /* + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. + + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + entry.begin_event= &qinfo; + entry.end_event= end_ev; + if (trx_data->has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + entry.incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); + } + else + { + entry.incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); + } +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + + entry->thd->clear_wakeup_ready(); + pthread_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *orig_queue= group_commit_queue; + entry->next= orig_queue; + group_commit_queue= entry; + + if (entry->trx_data->using_xa) + { + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + } + pthread_mutex_unlock(&LOCK_prepare_ordered); + DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); + + /* + The first in the queue handle group commit for all; the others just wait + to be signalled when group commit is done. + */ + if (orig_queue != NULL) + entry->thd->wait_for_wakeup_ready(); + else + trx_group_commit_leader(entry); + + if (!opt_optimize_thread_scheduling) + { + /* For the leader, trx_group_commit_leader() already took the lock. */ + if (orig_queue != NULL) + pthread_mutex_lock(&LOCK_commit_ordered); + + DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (entry->trx_data->using_xa && !entry->error) + run_commit_ordered(entry->thd, entry->all); + + group_commit_entry *next= entry->next; + if (!next) + { + group_commit_queue_busy= FALSE; + pthread_cond_signal(&COND_queue_busy); + DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered"); + } + pthread_mutex_unlock(&LOCK_commit_ordered); + + if (next) + { + next->thd->signal_wakeup_ready(); + } + } + + if (likely(!entry->error)) + return 0; + + switch (entry->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + entry->trx_data->trans_log.file_name, entry->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(entry->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), entry->error); + } + + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (entry->trx_data->using_xa && entry->trx_data->xa_xid) + mark_xid_done(); + + return 1; +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this thread/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + */ +void +MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) +{ + uint xid_count= 0; + uint write_count= 0; + my_off_t commit_offset; + group_commit_entry *current; + group_commit_entry *last_in_queue; + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); + LINT_INIT(commit_offset); + + /* + Lock the LOCK_log(), and once we get it, collect any additional writes + that queued up while we were waiting. + */ VOID(pthread_mutex_lock(&LOCK_log)); + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); - /* NULL would represent nothing to replicate after ROLLBACK */ - DBUG_ASSERT(commit_event != NULL); + pthread_mutex_lock(&LOCK_prepare_ordered); + current= group_commit_queue; + group_commit_queue= NULL; + pthread_mutex_unlock(&LOCK_prepare_ordered); + /* As the queue is in reverse order of entering, reverse it. */ + group_commit_entry *queue= NULL; + last_in_queue= current; + while (current) + { + group_commit_entry *next= current->next; + current->next= queue; + queue= current; + current= next; + } + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); + + /* Now we have in queue the list of transactions to be committed in order. */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= queue; current != NULL; current= current->next) { - /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0); + binlog_trx_data *trx_data= current->trx_data; + IO_CACHE *cache= &trx_data->trans_log; /* - Now this Query_log_event has artificial log_pos 0. It must be - adjusted to reflect the real position in the log. Not doing it - would confuse the slave: it would prevent this one from - knowing where he is in the master's binlog, which would result - in wrong positions being shown to the user, MASTER_POS_WAIT - undue waiting etc. + We only bother to write to the binary log if there is anything + to write. */ - if (qinfo.write(&log_file)) - goto err; - status_var_add(thd->status_var.binlog_bytes_written, qinfo.data_written); - - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_error= write_cache(thd, cache, FALSE, - TRUE))) - DBUG_PRINT("info", ("error writing binlog cache: %d", - write_error)); - DBUG_PRINT("info", ("crashing before writing xid")); - DBUG_SUICIDE(); - }); - - if ((write_error= write_cache(thd, cache, FALSE, FALSE))) - goto err; - - if (commit_event) + if (my_b_tell(cache) > 0) { - if (commit_event->write(&log_file)) - goto err; - status_var_add(thd->status_var.binlog_bytes_written, - commit_event->data_written); + if ((current->error= write_transaction(current))) + current->commit_errno= errno; + + write_count++; } - if (incident && write_incident(thd, FALSE)) - goto err; + strmake(trx_data->last_commit_pos_file, log_file_name, + sizeof(trx_data->last_commit_pos_file)-1); + commit_offset= my_b_write_tell(&log_file); + trx_data->last_commit_pos_offset= commit_offset; + if (trx_data->using_xa && trx_data->xa_xid) + xid_count++; + } + if (write_count > 0) + { if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE();); - if (cache->error) // Error on read { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + for (current= queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } + else + { + signal_update(); } - signal_update(); } /* - if commit_event is Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + if any commit_events are Xid_log_event, increase the number of + prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) + if (xid_count > 0) { - pthread_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - pthread_mutex_unlock(&LOCK_prep_xids); + mark_xids_active(xid_count); } else + { if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)) - goto err; + { + /* + If we fail to rotate, which thread should get the error? + We give the error to the *last* transaction thread; that seems to + make the most sense, as it was the last to write to the log. + */ + last_in_queue->error= ER_ERROR_ON_WRITE; + last_in_queue->commit_errno= errno; + } + } } - VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(0); + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + pthread_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= commit_offset; + /* + We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + otherwise scheduling could allow the next group commit to run ahead of us, + messing up the order of commit_ordered() calls. But as soon as + LOCK_commit_ordered is obtained, we can let the next group commit start. + */ + pthread_mutex_unlock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + ++num_group_commits; -err: - if (!write_error) + if (!opt_optimize_thread_scheduling) { - write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); + /* + If we want to run commit_ordered() each in the transaction's own thread + context, then we need to mark the queue reserved; we need to finish all + threads in one group commit before the next group commit can be allowed + to proceed, and we cannot unlock a simple pthreads mutex in a different + thread from the one that locked it. + */ + + while (group_commit_queue_busy) + pthread_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); + group_commit_queue_busy= TRUE; + + /* Note that we return with LOCK_commit_ordered locked! */ + DBUG_VOID_RETURN; } - VOID(pthread_mutex_unlock(&LOCK_log)); - DBUG_RETURN(1); + + /* + Wakeup each participant waiting for our group commit, first calling the + commit_ordered() methods for any transactions doing 2-phase commit. + */ + current= queue; + while (current != NULL) + { + group_commit_entry *next; + + DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (current->trx_data->using_xa && !current->error) + run_commit_ordered(current->thd, current->all); + + /* + Careful not to access current->next after waking up the other thread! As + it may change immediately after wakeup. + */ + next= current->next; + if (current != leader) // Don't wake up ourself + current->thd->signal_wakeup_ready(); + current= next; + } + DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); + pthread_mutex_unlock(&LOCK_commit_ordered); + + DBUG_VOID_RETURN; } +int +MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) +{ + binlog_trx_data *trx_data= entry->trx_data; + IO_CACHE *cache= &trx_data->trans_log; + + if (entry->begin_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + status_var_add(entry->thd->status_var.binlog_bytes_written, + entry->begin_event->data_written); + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(entry->thd, cache))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(); + + DBUG_PRINT("info", ("crashing before writing xid")); + DBUG_SUICIDE(); + }); + + if (write_cache(entry->thd, cache)) + return ER_ERROR_ON_WRITE; + + if (entry->end_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + status_var_add(entry->thd->status_var.binlog_bytes_written, + entry->end_event->data_written); + + if (entry->incident_event && entry->incident_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (cache->error) // Error on read + return ER_ERROR_ON_READ; + + return 0; +} /** Wait until we get a signal that the binary log has been updated. @@ -5467,6 +5846,171 @@ void sql_print_information(const char *format, ...) } +void +TC_init() +{ + my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_prepare_ordered", MYF(0)); + my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW, + "LOCK_commit_ordered", MYF(0)); + mutexes_inited= TRUE; +} + + +void +TC_destroy() +{ + if (mutexes_inited) + { + pthread_mutex_destroy(&LOCK_prepare_ordered); + pthread_mutex_destroy(&LOCK_commit_ordered); + mutexes_inited= FALSE; + } +} + + +void +TC_LOG::run_prepare_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + safe_mutex_assert_owner(&LOCK_prepare_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->prepare_ordered) + continue; + ht->prepare_ordered(ht, thd, all); + } +} + + +void +TC_LOG::run_commit_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + safe_mutex_assert_owner(&LOCK_commit_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + DEBUG_SYNC(thd, "commit_after_run_commit_ordered"); + } +} + + +int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + int cookie; + struct commit_entry entry; + bool is_group_commit_leader; + LINT_INIT(is_group_commit_leader); + + if (need_prepare_ordered) + { + pthread_mutex_lock(&LOCK_prepare_ordered); + run_prepare_ordered(thd, all); + if (need_commit_ordered) + { + /* + Must put us in queue so we can run_commit_ordered() in same sequence + as we did run_prepare_ordered(). + */ + thd->clear_wakeup_ready(); + entry.thd= thd; + commit_entry *previous_queue= commit_ordered_queue; + entry.next= previous_queue; + commit_ordered_queue= &entry; + is_group_commit_leader= (previous_queue == NULL); + } + pthread_mutex_unlock(&LOCK_prepare_ordered); + } + + cookie= 0; + if (xid) + cookie= log_one_transaction(xid); + + if (need_commit_ordered) + { + if (need_prepare_ordered) + { + /* + We did the run_prepare_ordered() serialised, then ran the log_xid() in + parallel. Now we have to do run_commit_ordered() serialised in the + same sequence as run_prepare_ordered(). + + We do this starting from the head of the queue, each thread doing + run_commit_ordered() and signalling the next in queue. + */ + if (is_group_commit_leader) + { + /* The first in queue starts the ball rolling. */ + pthread_mutex_lock(&LOCK_prepare_ordered); + while (commit_ordered_queue_busy) + pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); + commit_entry *queue= commit_ordered_queue; + commit_ordered_queue= NULL; + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + commit_ordered_queue_busy= true; + pthread_mutex_unlock(&LOCK_prepare_ordered); + + /* Reverse the queue list so we get correct order. */ + commit_entry *prev= NULL; + while (queue) + { + commit_entry *next= queue->next; + queue->next= prev; + prev= queue; + queue= next; + } + DBUG_ASSERT(prev == &entry && prev->thd == thd); + } + else + { + /* Not first in queue; just wait until previous thread wakes us up. */ + thd->wait_for_wakeup_ready(); + } + } + + /* Only run commit_ordered() if log_xid was successful. */ + if (cookie) + { + pthread_mutex_lock(&LOCK_commit_ordered); + run_commit_ordered(thd, all); + pthread_mutex_unlock(&LOCK_commit_ordered); + } + + if (need_prepare_ordered) + { + commit_entry *next= entry.next; + if (next) + { + next->thd->signal_wakeup_ready(); + } + else + { + pthread_mutex_lock(&LOCK_prepare_ordered); + commit_ordered_queue_busy= false; + pthread_cond_signal(&COND_queue_busy); + pthread_mutex_unlock(&LOCK_prepare_ordered); + } + } + } + + return cookie; +} + + /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -5603,6 +6147,7 @@ int TC_LOG_MMAP::open(const char *opt_name) pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST); pthread_cond_init(&COND_active, 0); pthread_cond_init(&COND_pool, 0); + pthread_cond_init(&COND_queue_busy, 0); inited=6; @@ -5610,6 +6155,8 @@ int TC_LOG_MMAP::open(const char *opt_name) active=pages; pool=pages+1; pool_last=pages+npages-1; + commit_ordered_queue= NULL; + commit_ordered_queue_busy= false; return 0; @@ -5715,7 +6262,7 @@ int TC_LOG_MMAP::overflow() to the position in memory where xid was logged to. */ -int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid) +int TC_LOG_MMAP::log_one_transaction(my_xid xid) { int err; PAGE *p; @@ -5885,6 +6432,8 @@ void TC_LOG_MMAP::close() pthread_mutex_destroy(&LOCK_active); pthread_mutex_destroy(&LOCK_pool); pthread_cond_destroy(&COND_pool); + pthread_cond_destroy(&COND_active); + pthread_cond_destroy(&COND_queue_busy); case 5: data[0]='A'; // garble the first (signature) byte, in case my_delete fails case 4: @@ -6093,39 +6642,98 @@ void TC_LOG_BINLOG::close() pthread_cond_destroy (&COND_prep_xids); } -/** - @todo - group commit - - @retval - 0 error - @retval - 1 success +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. */ -int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) +int +TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered __attribute__((unused)), + bool need_commit_ordered __attribute__((unused))) { - DBUG_ENTER("TC_LOG_BINLOG::log"); - Xid_log_event xle(thd, xid); - binlog_trx_data *trx_data= + int err; + DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); + + binlog_trx_data *const trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); - /* - We always commit the entire transaction when writing an XID. Also - note that the return value is inverted. - */ - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); + + trx_data->using_xa= TRUE; + trx_data->xa_xid= xid; + if (xid) + { + Xid_log_event xid_event(thd, xid); + err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all); + } + else + { + /* + Empty xid occurs in XA COMMIT ... ONE PHASE. + In this case, we do not have a MySQL xid for the transaction, and the + external XA transaction coordinator will have to handle recovery if + needed. So we end the transaction with a plain COMMIT query event. + */ + Query_log_event end_event(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0); + err= binlog_flush_trx_cache(thd, trx_data, &end_event, all); + } + + DEBUG_SYNC(thd, "binlog_after_log_and_order"); + + DBUG_RETURN(!err); } -int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. +*/ +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) { - DBUG_ENTER("TC_LOG_BINLOG::unlog"); + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); + pthread_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + pthread_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; +} + +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() +{ + my_bool send_signal; + + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); - if (--prepared_xids == 0) { + send_signal= !--prepared_xids; + pthread_mutex_unlock(&LOCK_prep_xids); + if (send_signal) { DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); pthread_cond_signal(&COND_prep_xids); } - pthread_mutex_unlock(&LOCK_prep_xids); - DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate + DBUG_VOID_RETURN; +} + +int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); + /* As ::write_transaction_to_binlog() did not rotate, do it here. */ + DBUG_RETURN(rotate_and_purge(0)); } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) @@ -6194,9 +6802,131 @@ ulonglong mysql_bin_log_file_pos(void) { return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file; } +/* + Get the current position of the MySQL binlog for transaction currently being + committed. + + This is valid to call from within storage engine commit_ordered() and + commit() methods only. + + Since it stores the position inside THD, it is safe to call without any + locking. +*/ +void +mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) +{ + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + if (trx_data) + { + *out_file= trx_data->last_commit_pos_file; + *out_pos= (ulonglong)(trx_data->last_commit_pos_offset); + } + else + { + *out_file= NULL; + *out_pos= 0; + } +} #endif /* INNODB_COMPATIBILITY_HOOKS */ +static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + mysql_bin_log.set_status_variables(thd); + var->type= SHOW_ARRAY; + var->value= (char *)&binlog_status_vars_detail; + return 0; +} + +static SHOW_VAR binlog_status_vars_top[]= { + {"binlog", (char *) &show_binlog_vars, SHOW_FUNC}, + {NullS, NullS, SHOW_LONG} +}; + +static MYSQL_SYSVAR_BOOL( + optimize_thread_scheduling, + opt_optimize_thread_scheduling, + PLUGIN_VAR_READONLY, + "Run fast part of group commit in a single thread, to optimize kernel " + "thread scheduling. On by default. Disable to run each transaction in group " + "commit in its own thread, which can be slower at very high concurrency. " + "This option is mostly for testing one algorithm versus the other, and it " + "should not normally be necessary to change it.", + NULL, + NULL, + 1); + +#ifndef DBUG_OFF +static MYSQL_SYSVAR_ULONG( + dbug_fsync_sleep, + opt_binlog_dbug_fsync_sleep, + PLUGIN_VAR_RQCMDARG, + "Extra sleep (in microseconds) to add to binlog fsync(), for debugging", + NULL, + NULL, + 0, + 0, + ULONG_MAX, + 0); +#endif + +static struct st_mysql_sys_var *binlog_sys_vars[]= +{ + MYSQL_SYSVAR(optimize_thread_scheduling), +#ifndef DBUG_OFF + MYSQL_SYSVAR(dbug_fsync_sleep), +#endif + NULL +}; + + +/* + Copy out the non-directory part of binlog position filename for the + `binlog_snapshot_file' status variable, same way as it is done for + SHOW MASTER STATUS. +*/ +static void +set_binlog_snapshot_file(const char *src) +{ + int dir_len = dirname_length(src); + strmake(binlog_snapshot_file, src + dir_len, sizeof(binlog_snapshot_file)-1); +} + +/* + Copy out current values of status variables, for SHOW STATUS or + information_schema.global_status. + + This is called only under LOCK_status, so we can fill in a static array. +*/ +void +TC_LOG_BINLOG::set_status_variables(THD *thd) +{ + binlog_trx_data *trx_data; + + if (thd) + trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton); + else + trx_data= NULL; + + bool have_snapshot= (trx_data && trx_data->last_commit_pos_file[0] != 0); + pthread_mutex_lock(&LOCK_commit_ordered); + binlog_status_var_num_commits= this->num_commits; + binlog_status_var_num_group_commits= this->num_group_commits; + if (!have_snapshot) + { + set_binlog_snapshot_file(last_commit_pos_file); + binlog_snapshot_position= last_commit_pos_offset; + } + pthread_mutex_unlock(&LOCK_commit_ordered); + + if (have_snapshot) + { + set_binlog_snapshot_file(trx_data->last_commit_pos_file); + binlog_snapshot_position= trx_data->last_commit_pos_offset; + } +} + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -6211,8 +6941,8 @@ mysql_declare_plugin(binlog) binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ - NULL, /* system variables */ + binlog_status_vars_top, /* status variables */ + binlog_sys_vars, /* system variables */ NULL /* config options */ } mysql_declare_plugin_end; @@ -6227,8 +6957,8 @@ maria_declare_plugin(binlog) binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ - NULL, /* system variables */ + binlog_status_vars_top, /* status variables */ + binlog_sys_vars, /* system variables */ "1.0", /* string version */ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } diff --git a/sql/log.h b/sql/log.h index df475268cc3..503e94df981 100644 --- a/sql/log.h +++ b/sql/log.h @@ -38,17 +38,58 @@ class TC_LOG virtual int open(const char *opt_name)=0; virtual void close()=0; - virtual int log_xid(THD *thd, my_xid xid)=0; + virtual int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) = 0; virtual int unlog(ulong cookie, my_xid xid)=0; + +protected: + /* + These methods are meant to be invoked from log_and_order() implementations + to run any prepare_ordered() respectively commit_ordered() methods in + participating handlers. + + They must be called using suitable thread syncronisation to ensure that + they are each called in the correct commit order among all + transactions. However, it is only necessary to call them if the + corresponding flag passed to log_and_order is set (it is safe, but not + required, to call them when the flag is false). + + The caller must be holding LOCK_prepare_ordered respectively + LOCK_commit_ordered when calling these methods. + */ + void run_prepare_ordered(THD *thd, bool all); + void run_commit_ordered(THD *thd, bool all); }; +/* + Locks used to ensure serialised execution of TC_LOG::run_prepare_ordered() + and TC_LOG::run_commit_ordered(), or any other code that calls handler + prepare_ordered() or commit_ordered() methods. +*/ +extern pthread_mutex_t LOCK_prepare_ordered; +extern pthread_mutex_t LOCK_commit_ordered; + +extern void TC_init(); +extern void TC_destroy(); + class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging { public: TC_LOG_DUMMY() {} int open(const char *opt_name) { return 0; } void close() { } - int log_xid(THD *thd, my_xid xid) { return 1; } + /* + TC_LOG_DUMMY is only used when there are <= 1 XA-capable engines, and we + only use internal XA during commit when >= 2 XA-capable engines + participate. + */ + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered) + { + DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */); + return 1; + } int unlog(ulong cookie, my_xid xid) { return 0; } }; @@ -74,6 +115,13 @@ class TC_LOG_MMAP: public TC_LOG pthread_cond_t cond; // to wait for a sync } PAGE; + /* List of THDs for which to invoke commit_ordered(), in order. */ + struct commit_entry + { + struct commit_entry *next; + THD *thd; + }; + char logname[FN_REFLEN]; File fd; my_off_t file_length; @@ -88,16 +136,38 @@ class TC_LOG_MMAP: public TC_LOG */ pthread_mutex_t LOCK_active, LOCK_pool, LOCK_sync; pthread_cond_t COND_pool, COND_active; + /* + Queue of threads that need to call commit_ordered(). + Access to this queue must be protected by LOCK_prepare_ordered. + */ + commit_entry *commit_ordered_queue; + /* + This flag and condition is used to reserve the queue while threads in it + each run the commit_ordered() methods one after the other. Only once the + last commit_ordered() in the queue is done can we start on a new queue + run. + + Since we start this process in the first thread in the queue and finish in + the last (and possibly different) thread, we need a condition variable for + this (we cannot unlock a mutex in a different thread than the one who + locked it). + + The condition is used together with the LOCK_prepare_ordered mutex. + */ + my_bool commit_ordered_queue_busy; + pthread_cond_t COND_queue_busy; public: TC_LOG_MMAP(): inited(0) {} int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); int recover(); private: + int log_one_transaction(my_xid xid); void get_active_from_pool(); int sync(); int overflow(); @@ -232,9 +302,31 @@ private: time_t last_time; }; +class binlog_trx_data; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: + struct group_commit_entry + { + struct group_commit_entry *next; + THD *thd; + binlog_trx_data *trx_data; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + trx_data->has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; + /* Set during group commit to record any per-thread error. */ + int error; + int commit_errno; + /* This is the `all' parameter for ha_commit_ordered(). */ + bool all; + /* True if we come in through XA log_and_order(), false otherwise. */ + }; + /* LOCK_log and LOCK_index are inited by init_pthread_objects() */ pthread_mutex_t LOCK_index; pthread_mutex_t LOCK_prep_xids; @@ -276,6 +368,20 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG In 5.0 it's 0 for relay logs too! */ bool no_auto_events; + /* Queue of transactions queued up to participate in group commit. */ + group_commit_entry *group_commit_queue; + /* + Condition variable to mark that the group commit queue is busy. + Used when each thread does it's own commit_ordered() (when + binlog_optimize_thread_scheduling=1). + Used with the LOCK_commit_ordered mutex. + */ + my_bool group_commit_queue_busy; + pthread_cond_t COND_queue_busy; + /* Total number of committed transactions. */ + ulonglong num_commits; + /* Number of group commits done. */ + ulonglong num_group_commits; int write_to_file(IO_CACHE *cache); /* @@ -285,6 +391,11 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG */ int new_file_without_locking(); int new_file_impl(bool need_lock); + int write_transaction(group_commit_entry *entry); + bool write_transaction_to_binlog_events(group_commit_entry *entry); + void trx_group_commit_leader(group_commit_entry *leader); + void mark_xid_done(); + void mark_xids_active(uint xid_count); public: MYSQL_LOG::generate_name; @@ -303,6 +414,12 @@ public: */ Format_description_log_event *description_event_for_exec, *description_event_for_queue; + /* + Binlog position of last commit (or non-transactional write) to the binlog. + Access to this is protected by LOCK_commit_ordered. + */ + char last_commit_pos_file[FN_REFLEN]; + my_off_t last_commit_pos_offset; MYSQL_BIN_LOG(); /* @@ -313,7 +430,8 @@ public: int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) @@ -357,11 +475,14 @@ public: int new_file(); void reset_gathered_updates(THD *thd); - bool write(Log_event* event_info); // binary log write - bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); - bool write_incident(THD *thd, bool lock); - int write_cache(THD *thd, IO_CACHE *cache, bool lock_log, - bool flush_and_sync); + bool write(Log_event* event_info, + my_bool *with_annotate= 0); // binary log write + bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all); + + bool write_incident_already_locked(THD *thd); + bool write_incident(THD *thd); + int write_cache(THD *thd, IO_CACHE *cache); void set_write_error(THD *thd); bool check_write_error(THD *thd); @@ -416,6 +537,7 @@ public: inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);} inline IO_CACHE *get_index_file() { return &index_file;} inline uint32 get_open_count() { return open_count; } + void set_status_variables(THD *thd); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index 95cf853d9ff..0259ff762ad 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -649,6 +649,7 @@ const char* Log_event::get_type_str(Log_event_type type) case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; case INCIDENT_EVENT: return "Incident"; + case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; default: return "Unknown"; /* impossible */ } } @@ -728,7 +729,7 @@ Log_event::Log_event(const char* buf, logs are in 4.0 format, until it finds a Format_desc). */ if (description_event->binlog_version==3 && - buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) + (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) { /* If log_pos=0, don't change it. log_pos==0 is a marker to mean @@ -746,8 +747,8 @@ Log_event::Log_event(const char* buf, DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); flags= uint2korr(buf + FLAGS_OFFSET); - if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || - (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) + if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || + ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) { /* These events always have a header which stops here (i.e. their @@ -1168,14 +1169,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, /* Check the integrity */ if (event_len < EVENT_LEN_OFFSET || - buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || + (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) { *error="Sanity check failed"; // Needed to free buffer DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - uint event_type= buf[EVENT_TYPE_OFFSET]; + uint event_type= (uchar)buf[EVENT_TYPE_OFFSET]; if (event_type > description_event->number_of_event_types && event_type != FORMAT_DESCRIPTION_EVENT) { @@ -1297,6 +1298,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case INCIDENT_EVENT: ev = new Incident_log_event(buf, event_len, description_event); break; + case ANNOTATE_ROWS_EVENT: + ev = new Annotate_rows_log_event(buf, event_len, description_event); + break; default: DBUG_PRINT("error",("Unknown event code: %d", (int) buf[EVENT_TYPE_OFFSET])); @@ -3801,6 +3805,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; + // Set header length of the reserved events to 0 + memset(post_header_len + MYSQL_EVENTS_END - 1, 0, + (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8)); + + // Set header lengths of Maria events + post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; + // Sanity-check that all post header lengths are initialized. IF_DBUG({ int i; @@ -4445,8 +4456,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len, */ if (event_len) copy_log_event(buf, event_len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? - LOAD_HEADER_LEN + + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + description_event->common_header_len : LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN), description_event); @@ -4483,7 +4494,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, */ if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset, buf_end, - buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) + (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) DBUG_RETURN(1); data_len = event_len - body_offset; @@ -6178,7 +6189,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1]; if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) || copy_log_event(event_buf,len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? load_header_len + header_len : (fake_base ? (header_len+load_header_len) : (header_len+load_header_len) + @@ -7134,7 +7145,8 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, m_width(tbl_arg ? tbl_arg->s->fields : 1), m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) #ifdef HAVE_REPLICATION - , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) + , m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL), m_key_info(NULL), m_key_nr(0) #endif { /* @@ -7182,7 +7194,8 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, #endif m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) + , m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL), m_key_info(NULL), m_key_nr(0) #endif { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); @@ -7944,6 +7957,142 @@ void Rows_log_event::print_helper(FILE *file, #endif /************************************************************************** + Annotate_rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Annotate_rows_log_event::Annotate_rows_log_event(THD *thd) + : Log_event(thd, 0, true), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_txt= thd->query(); + m_query_len= thd->query_length(); +} +#endif + +Annotate_rows_log_event::Annotate_rows_log_event(const char *buf, + uint event_len, + const Format_description_log_event *desc) + : Log_event(buf, desc), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_len= event_len - desc->common_header_len; + m_query_txt= (char*) buf + desc->common_header_len; +} + +Annotate_rows_log_event::~Annotate_rows_log_event() +{ +#ifndef MYSQL_CLIENT + if (m_save_thd_query_txt) + thd->set_query(m_save_thd_query_txt, m_save_thd_query_len); +#endif +} + +int Annotate_rows_log_event::get_data_size() +{ + return m_query_len; +} + +Log_event_type Annotate_rows_log_event::get_type_code() +{ + return ANNOTATE_ROWS_EVENT; +} + +bool Annotate_rows_log_event::is_valid() const +{ + return (m_query_txt != NULL && m_query_len != 0); +} + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_header(IO_CACHE *file) +{ + return 0; +} +#endif + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) +{ + return my_b_safe_write(file, (uchar*) m_query_txt, m_query_len); +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +void Annotate_rows_log_event::pack_info(Protocol* protocol) +{ + if (m_query_txt && m_query_len) + protocol->store(m_query_txt, m_query_len, &my_charset_bin); +} +#endif + +#ifdef MYSQL_CLIENT +void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo) +{ + if (pinfo->short_form) + return; + + print_header(&pinfo->head_cache, pinfo, TRUE); + my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n"); + + char *pbeg; // beginning of the next line + char *pend; // end of the next line + uint cnt= 0; // characters counter + + for (pbeg= m_query_txt; ; pbeg= pend) + { + // skip all \r's and \n's at the beginning of the next line + for (;; pbeg++) + { + if (++cnt > m_query_len) + return; + + if (*pbeg != '\r' && *pbeg != '\n') + break; + } + + // find end of the next line + for (pend= pbeg + 1; + ++cnt <= m_query_len && *pend != '\r' && *pend != '\n'; + pend++) + ; + + // print next line + my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4); + my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg); + my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1); + } +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli) +{ + m_save_thd_query_txt= thd->query(); + m_save_thd_query_len= thd->query_length(); + thd->set_query(m_query_txt, m_query_len); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +Log_event::enum_skip_reason +Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli) +{ + return continue_group(rli); +} +#endif + +/************************************************************************** Table_map_log_event member functions and support functions **************************************************************************/ @@ -9057,6 +9206,86 @@ record_compare_exit: return result; } + +/** + Find the best key to use when locating the row in @c find_row(). + + A primary key is preferred if it exists; otherwise a unique index is + preferred. Else we pick the index with the smalles rec_per_key value. + + If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info + member fields appropriately. + + @returns Error code on failure, 0 on success. +*/ +int Rows_log_event::find_key() +{ + uint i, best_key_nr, last_part; + KEY *key, *best_key; + ulong best_rec_per_key, tmp; + DBUG_ENTER("Rows_log_event::find_key"); + DBUG_ASSERT(m_table); + + best_key_nr= MAX_KEY; + LINT_INIT(best_key); + LINT_INIT(best_rec_per_key); + + /* + Keys are sorted so that any primary key is first, followed by unique keys, + followed by any other. So we will automatically pick the primary key if + it exists. + */ + for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++) + { + if (!m_table->s->keys_in_use.is_set(i)) + continue; + /* + We cannot use a unique key with NULL-able columns to uniquely identify + a row (but we can still select it for range scan below if nothing better + is available). + */ + if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME) + { + best_key_nr= i; + best_key= key; + break; + } + /* + We can only use a non-unique key if it allows range scans (ie. skip + FULLTEXT indexes and such). + */ + last_part= key->key_parts - 1; + DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu", + key->name, last_part, key->rec_per_key[last_part])); + if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT)) + continue; + + tmp= key->rec_per_key[last_part]; + if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key)) + { + best_key_nr= i; + best_key= key; + best_rec_per_key= tmp; + } + } + + if (best_key_nr == MAX_KEY) + { + m_key_info= NULL; + DBUG_RETURN(0); + } + + // Allocate buffer for key searches + m_key= (uchar *) my_malloc(best_key->key_length, MYF(MY_WME)); + if (m_key == NULL) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + m_key_info= best_key; + m_key_nr= best_key_nr; + + DBUG_RETURN(0);; +} + + /** Locate the current row in event's table. @@ -9156,12 +9385,17 @@ int Rows_log_event::find_row(const Relay_log_info *rli) */ store_record(table,record[1]); - if (table->s->keys > 0 && table->s->keys_in_use.is_set(0)) + if (m_key_info) { - DBUG_PRINT("info",("locating record using primary key (index_read)")); + DBUG_PRINT("info",("locating record using key #%u [%s] (index_read)", + m_key_nr, m_key_info->name)); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_wrong_index", + if(0 != strcmp(m_key_info->name,"expected_key")) abort();); - /* The 0th key is active: search the table using the index */ - if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + /* The key is active: search the table using the index */ + if (!table->file->inited && + (error= table->file->ha_index_init(m_key_nr, FALSE))) { DBUG_PRINT("info",("ha_index_init returns error %d",error)); table->file->print_error(error, MYF(0)); @@ -9171,14 +9405,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli) /* Fill key data for the row */ DBUG_ASSERT(m_key); - key_copy(m_key, table->record[0], table->key_info, 0); + key_copy(m_key, table->record[0], m_key_info, 0); /* Don't print debug messages when running valgrind since they can trigger false warnings. */ #ifndef HAVE_valgrind - DBUG_DUMP("key data", m_key, table->key_info->key_length); + DBUG_DUMP("key data", m_key, m_key_info->key_length); #endif /* @@ -9264,6 +9498,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli) record we are looking for is stored in record[1]. */ DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort();); while (record_compare(table)) { @@ -9302,6 +9538,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli) else { DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + /* We use this to test that the correct key is used in test cases. */ + DBUG_EXECUTE_IF("slave_crash_if_table_scan", abort();); int restart_count= 0; // Number of times scanning has restarted from top @@ -9421,14 +9659,7 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability return 0; } - if (m_table->s->keys > 0) - { - // Allocate buffer for key searches - m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); - if (!m_key) - return HA_ERR_OUT_OF_MEM; - } - return 0; + return find_key(); } int @@ -9439,6 +9670,7 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability m_table->file->ha_index_or_rnd_end(); my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); m_key= NULL; + m_key_info= NULL; return error; } @@ -9541,13 +9773,9 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, int Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) { - if (m_table->s->keys > 0) - { - // Allocate buffer for key searches - m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); - if (!m_key) - return HA_ERR_OUT_OF_MEM; - } + int err; + if ((err= find_key())) + return err; m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; @@ -9562,6 +9790,7 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability m_table->file->ha_index_or_rnd_end(); my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc m_key= NULL; + m_key_info= NULL; return error; } diff --git a/sql/log_event.h b/sql/log_event.h index 4ea511f45b5..91a2ad9693c 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -250,6 +250,7 @@ struct sql_ex_info #define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1) #define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN) #define INCIDENT_HEADER_LEN 2 +#define ANNOTATE_ROWS_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a packet (i.e. a query) sent from client to master; @@ -582,8 +583,14 @@ enum Log_event_type */ INCIDENT_EVENT= 26, + /* New MySQL/Sun events are to be added right above this comment */ + MYSQL_EVENTS_END, + + MARIA_EVENTS_BEGIN= 160, + /* New Maria event numbers start from here */ + ANNOTATE_ROWS_EVENT= 160, + /* - Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers */ @@ -2988,6 +2995,59 @@ public: char *str_to_hex(char *to, const char *from, uint len); /** + @class Annotate_rows_log_event + + In row-based mode, if binlog_annotate_rows_events = ON, each group of + Table_map_log_events is preceded by an Annotate_rows_log_event which + contains the query which caused the subsequent rows operations. + + The Annotate_rows_log_event has no post-header and its body contains + the corresponding query (without trailing zero). Note. The query length + is to be calculated as a difference between the whole event length and + the common header length. +*/ +class Annotate_rows_log_event: public Log_event +{ +public: +#ifndef MYSQL_CLIENT + Annotate_rows_log_event(THD*); +#endif + Annotate_rows_log_event(const char *buf, uint event_len, + const Format_description_log_event*); + ~Annotate_rows_log_event(); + + virtual int get_data_size(); + virtual Log_event_type get_type_code(); + virtual bool is_valid() const; + +#ifndef MYSQL_CLIENT + virtual bool write_data_header(IO_CACHE*); + virtual bool write_data_body(IO_CACHE*); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual void pack_info(Protocol*); +#endif + +#ifdef MYSQL_CLIENT + virtual void print(FILE*, PRINT_EVENT_INFO*); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +private: + virtual int do_apply_event(Relay_log_info const*); + virtual int do_update_pos(Relay_log_info*); + virtual enum_skip_reason do_shall_skip(Relay_log_info*); +#endif + +private: + char *m_query_txt; + uint m_query_len; + char *m_save_thd_query_txt; + uint m_save_thd_query_len; +}; + +/** @class Table_map_log_event In row-based mode, every row operation event is preceded by a @@ -3592,7 +3652,10 @@ protected: const uchar *m_curr_row; /* Start of the row being processed */ const uchar *m_curr_row_end; /* One-after the end of the current row */ uchar *m_key; /* Buffer to keep key value during searches */ + KEY *m_key_info; /* Pointer to KEY info for m_key_nr */ + uint m_key_nr; /* Key number */ + int find_key(); // Find a best key to use in find_row() int find_row(const Relay_log_info *const); int write_row(const Relay_log_info *const, const bool); diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 5255d0a8403..b9bc99e9747 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -688,7 +688,11 @@ protected: /* BINLOG_DUMP options */ #define BINLOG_DUMP_NON_BLOCK 1 +#endif /* !MYSQL_CLIENT */ +#define BINLOG_SEND_ANNOTATE_ROWS_EVENT 2 + +#ifndef MYSQL_CLIENT /* sql_show.cc:show_log_files() */ #define SHOW_LOG_STATUS_FREE "FREE" #define SHOW_LOG_STATUS_INUSE "IN USE" diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1d8fadd8bb6..68140ae1b5a 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -548,6 +548,7 @@ my_bool opt_local_infile, opt_slave_compressed_protocol; my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_log_slave_updates= 0; +my_bool opt_replicate_annotate_rows_events= 0; bool slave_warning_issued = false; /* @@ -1425,6 +1426,7 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); + TC_destroy(); xid_cache_free(); wt_end(); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); @@ -4276,6 +4278,8 @@ a file name for --log-bin-index option", opt_binlog_index_name); if (!errmesg[0][0]) unireg_abort(1); + TC_init(); + /* We have to initialize the storage engines before CSV logging */ if (ha_init()) { @@ -5905,6 +5909,8 @@ enum options_mysqld OPT_REPLICATE_IGNORE_DB, OPT_LOG_SLAVE_UPDATES, OPT_BINLOG_DO_DB, OPT_BINLOG_IGNORE_DB, OPT_BINLOG_FORMAT, + OPT_BINLOG_ANNOTATE_ROWS_EVENTS, + OPT_REPLICATE_ANNOTATE_ROWS_EVENTS, #ifndef DBUG_OFF OPT_BINLOG_SHOW_XID, #endif @@ -6135,6 +6141,18 @@ struct my_option my_long_options[] = #endif , &opt_binlog_format, &opt_binlog_format, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"binlog-annotate-rows-events", OPT_BINLOG_ANNOTATE_ROWS_EVENTS, + "Tells the master to annotate RBR events with the statement that " + "caused these events.", + (uchar**) &global_system_variables.binlog_annotate_rows_events, + (uchar**) &max_system_variables.binlog_annotate_rows_events, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"replicate-annotate-rows-events", OPT_REPLICATE_ANNOTATE_ROWS_EVENTS, + "Tells the slave to write annotate rows events recieved from the master " + "to its own binary log. Sensible only in pair with log-slave-updates option.", + (uchar**) &opt_replicate_annotate_rows_events, + (uchar**) &opt_replicate_annotate_rows_events, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"binlog-do-db", OPT_BINLOG_DO_DB, "Tells the master it should log updates for the specified database, " "and exclude all others not explicitly mentioned.", diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 6319bbb60cf..42701bc9e24 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -38,7 +38,8 @@ Relay_log_info::Relay_log_info() inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), m_flags(0) + last_event_start_time(0), m_flags(0), + m_annotate_event(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -72,6 +73,7 @@ Relay_log_info::~Relay_log_info() pthread_cond_destroy(&stop_cond); pthread_cond_destroy(&log_space_cond); relay_log.cleanup(); + free_annotate_event(); DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 5cafcf47086..0ea3c23bfd8 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -423,8 +423,46 @@ public: (m_flags & (1UL << IN_STMT)); } + /** + Save pointer to Annotate_rows event and switch on the + binlog_annotate_rows_events for this sql thread. + To be called when sql thread recieves an Annotate_rows event. + */ + inline void set_annotate_event(Annotate_rows_log_event *event) + { + free_annotate_event(); + m_annotate_event= event; + sql_thd->variables.binlog_annotate_rows_events= 1; + } + + /** + Returns pointer to the saved Annotate_rows event or NULL if there is + no saved event. + */ + inline Annotate_rows_log_event* get_annotate_event() + { + return m_annotate_event; + } + + /** + Delete saved Annotate_rows event (if any) and switch off the + binlog_annotate_rows_events for this sql thread. + To be called when sql thread has applied the last (i.e. with + STMT_END_F flag) rbr event. + */ + inline void free_annotate_event() + { + if (m_annotate_event) + { + sql_thd->variables.binlog_annotate_rows_events= 0; + delete m_annotate_event; + m_annotate_event= 0; + } + } + private: uint32 m_flags; + Annotate_rows_log_event *m_annotate_event; }; diff --git a/sql/set_var.cc b/sql/set_var.cc index b02330727e9..33575de1ccf 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -183,6 +183,9 @@ static sys_var_const sys_back_log(&vars, "back_log", OPT_GLOBAL, SHOW_LONG, (uchar*) &back_log); static sys_var_const_os_str sys_basedir(&vars, "basedir", mysql_home); +static sys_var_thd_bool +sys_binlog_annotate_rows_events(&vars, "binlog_annotate_rows_events", + &SV::binlog_annotate_rows_events); static sys_var_long_ptr sys_binlog_cache_size(&vars, "binlog_cache_size", &binlog_cache_size); static sys_var_thd_binlog_format sys_binlog_format(&vars, "binlog_format", diff --git a/sql/slave.cc b/sql/slave.cc index 0d14234766b..63b7ce715c9 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1867,6 +1867,9 @@ static int request_dump(MYSQL* mysql, Master_info* mi, *suppress_warnings= FALSE; + if (opt_log_slave_updates && opt_replicate_annotate_rows_events) + binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2261,17 +2264,41 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) } exec_res= apply_event_and_update_pos(ev, thd, rli); - /* - Format_description_log_event should not be deleted because it will be - used to read info about the relay log's format; it will be deleted when - the SQL thread does not need it, i.e. when this thread terminates. - */ - if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - delete ev; + switch (ev->get_type_code()) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rli->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rli->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + delete ev; + break; } + /* update_log_pos failed: this should not happen, so we don't retry. @@ -2899,6 +2926,12 @@ pthread_handler_t handle_slave_sql(void *arg) thd->init_for_queries(); thd->temporary_tables = rli->save_temporary_tables; // restore temp tables set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables + /* + binlog_annotate_rows_events must be TRUE only after an Annotate_rows event + has been recieved and only till the last corresponding rbr event has been + applied. In all other cases it must be FALSE. + */ + thd->variables.binlog_annotate_rows_events= 0; pthread_mutex_lock(&LOCK_thread_count); threads.append(thd); pthread_mutex_unlock(&LOCK_thread_count); @@ -3381,7 +3414,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, If we get Load event, we need to pass a non-reusable buffer to read_log_event, so we do a trick */ - if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) { if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) { @@ -3588,13 +3621,13 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) LINT_INIT(inc_pos); if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && - buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) + (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); LINT_INIT(inc_pos); pthread_mutex_lock(&mi->data_lock); - switch (buf[EVENT_TYPE_OFFSET]) { + switch ((uchar)buf[EVENT_TYPE_OFFSET]) { case STOP_EVENT: /* We needn't write this event to the relay log. Indeed, it just indicates a @@ -3697,9 +3730,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment mi->master_log_pos. */ - if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); diff --git a/sql/slave.h b/sql/slave.h index 1aa5b374e4b..3333e583559 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -106,6 +106,7 @@ extern MYSQL_PLUGIN_IMPORT char *relay_log_info_file; extern char *opt_relay_logname, *opt_relaylog_index_name; extern my_bool opt_skip_slave_start, opt_reckless_slave; extern my_bool opt_log_slave_updates; +extern my_bool opt_replicate_annotate_rows_events; extern ulonglong relay_log_space_limit; /* diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 9713ec1ef5c..a59cd631fef 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -174,7 +174,7 @@ void mysql_client_binlog_statement(THD* thd) */ if (!have_fd_event) { - int type = bufptr[EVENT_TYPE_OFFSET]; + int type = (uchar)bufptr[EVENT_TYPE_OFFSET]; if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3) have_fd_event= TRUE; else diff --git a/sql/sql_class.cc b/sql/sql_class.cc index f10655c3e51..f8905cd5f8c 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -763,6 +763,8 @@ THD::THD() active_vio = 0; #endif pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_wakeup_ready, 0); /* Variables with default values */ proc_info="login"; @@ -1149,6 +1151,8 @@ THD::~THD() free_root(&transaction.mem_root,MYF(0)); #endif mysys_var=0; // Safety (shouldn't be needed) + pthread_cond_destroy(&COND_wakeup_ready); + pthread_mutex_destroy(&LOCK_wakeup_ready); pthread_mutex_destroy(&LOCK_thd_data); #ifndef DBUG_OFF dbug_sentry= THD_SENTRY_GONE; @@ -4243,6 +4247,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_RETURN(0); } +void +THD::wait_for_wakeup_ready() +{ + pthread_mutex_lock(&LOCK_wakeup_ready); + while (!wakeup_ready) + pthread_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready); + pthread_mutex_unlock(&LOCK_wakeup_ready); +} + +void +THD::signal_wakeup_ready() +{ + pthread_mutex_lock(&LOCK_wakeup_ready); + wakeup_ready= true; + pthread_mutex_unlock(&LOCK_wakeup_ready); + pthread_cond_signal(&COND_wakeup_ready); +} + + bool Discrete_intervals_list::append(ulonglong start, ulonglong val, ulonglong incr) { diff --git a/sql/sql_class.h b/sql/sql_class.h index 477ae9bf751..9ea4e37e610 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -451,6 +451,7 @@ struct system_variables ulong ndb_index_stat_cache_entries; ulong ndb_index_stat_update_freq; ulong binlog_format; // binlog format for this thd (see enum_binlog_format) + my_bool binlog_annotate_rows_events; my_bool binlog_direct_non_trans_update; /* In slave thread we need to know in behalf of which @@ -1577,7 +1578,8 @@ public: */ void binlog_start_trans_and_stmt(); void binlog_set_stmt_begin(); - int binlog_write_table_map(TABLE *table, bool is_transactional); + int binlog_write_table_map(TABLE *table, bool is_transactional, + my_bool *with_annotate= 0); int binlog_write_row(TABLE* table, bool is_transactional, MY_BITMAP const* cols, size_t colcnt, const uchar *buf); @@ -2551,6 +2553,14 @@ public: return backup; } + void clear_wakeup_ready() { wakeup_ready= false; } + /* + Sleep waiting for others to wake us up with signal_wakeup_ready(). + Must call clear_wakeup_ready() before waiting. + */ + void wait_for_wakeup_ready(); + /* Wake this thread up from wait_for_wakeup_ready(). */ + void signal_wakeup_ready(); private: /** The current internal error handler for this thread, or NULL. */ Internal_error_handler *m_internal_handler; @@ -2589,6 +2599,16 @@ private: */ LEX_STRING invoker_user; LEX_STRING invoker_host; + /* + Flag, mutex and condition for a thread to wait for a signal from another + thread. + + Currently used to wait for group commit to complete, can also be used for + other purposes. + */ + bool wakeup_ready; + pthread_mutex_t LOCK_wakeup_ready; + pthread_cond_t COND_wakeup_ready; }; /** A short cut for thd->main_da.set_ok_status(). */ diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index adc82d791b6..05804c2ee7e 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1970,6 +1970,11 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) pthread_mutex_lock(&LOCK_thread_count); thread_count++; pthread_mutex_unlock(&LOCK_thread_count); + /* + Annotating delayed inserts is not supported. + */ + di->thd.variables.binlog_annotate_rows_events= 0; + di->thd.set_db(table_list->db, (uint) strlen(table_list->db)); di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0); if (di->thd.db == NULL || di->thd.query() == NULL) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 3088d807549..103cf72cf14 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1008,6 +1008,10 @@ bool dispatch_command(enum enum_server_command command, THD *thd, DBUG_ENTER("dispatch_command"); DBUG_PRINT("info", ("command: %d", command)); + DBUG_EXECUTE_IF("crash_dispatch_command_before", + { DBUG_PRINT("crash_dispatch_command_before", ("now")); + DBUG_ABORT(); }); + thd->command=command; /* Commands which always take a long time are logged into diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 95e48c531be..596f0f5c1e6 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -489,7 +489,7 @@ impossible position"; DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); @@ -556,32 +556,36 @@ impossible position"; } #endif - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) binlog_can_be_corrupted= FALSE; - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT || + (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + { + errmsg = "Failed on my_net_write()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) - { - errmsg = "failed in send_file()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + DBUG_PRINT("info", ("log event code %d", + (*packet)[LOG_EVENT_OFFSET+1] )); + if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + { + if (send_file(thd)) + { + errmsg = "failed in send_file()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } } packet->set("\0", 1, &my_charset_bin); } @@ -677,23 +681,27 @@ impossible position"; if (read_packet) { - thd_proc_info(thd, "Sending binlog event to slave"); - if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) - { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT || + (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) + { + thd_proc_info(thd, "Sending binlog event to slave"); + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { - errmsg = "failed in send_file()"; + errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; goto err; } - } + + if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + { + if (send_file(thd)) + { + errmsg = "failed in send_file()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } + } packet->set("\0", 1, &my_charset_bin); /* No need to net_flush because we will get to flush later when @@ -1774,6 +1782,11 @@ static sys_var_chain vars = { NULL, NULL }; static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates", OPT_GLOBAL, SHOW_MY_BOOL, (uchar*) &opt_log_slave_updates); +static sys_var_const +sys_replicate_annotate_rows_events(&vars, + "replicate_annotate_rows_events", + OPT_GLOBAL, SHOW_MY_BOOL, + (uchar*) &opt_replicate_annotate_rows_events); static sys_var_const sys_relay_log(&vars, "relay_log", OPT_GLOBAL, SHOW_CHAR_PTR, (uchar*) &opt_relay_logname); |