summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2011-04-08 09:39:33 +0200
committerunknown <knielsen@knielsen-hq.org>2011-04-08 09:39:33 +0200
commit64e43e1cc88c66bd89f95a5307020bcf60ba4b96 (patch)
tree1ae6299d5a75f92fa97b240b0670c81c9a301afe /sql
parent8b427b7d5548aeb214c70b004cc9056b9f7a6f7c (diff)
parent86008e0ca2b864f1ab7a30e496d7c67c8fce06c2 (diff)
downloadmariadb-git-64e43e1cc88c66bd89f95a5307020bcf60ba4b96.tar.gz
Merge various replication-related patches into MariaDB 5.3:
- MWL#116 Group commit - MWL#136 Enhancements for START TRANSACTION WITH CONSISTENT SNAPSHOT - MWL#47 Annotate_rows_log_event - MWL#163 innodb_release_locks_early - Percona patch enhancing row-based replication for tables with no primary key
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc209
-rw-r--r--sql/handler.h90
-rw-r--r--sql/log.cc1278
-rw-r--r--sql/log.h140
-rw-r--r--sql/log_event.cc293
-rw-r--r--sql/log_event.h65
-rw-r--r--sql/mysql_priv.h4
-rw-r--r--sql/mysqld.cc18
-rw-r--r--sql/rpl_rli.cc4
-rw-r--r--sql/rpl_rli.h38
-rw-r--r--sql/set_var.cc3
-rw-r--r--sql/slave.cc63
-rw-r--r--sql/slave.h1
-rw-r--r--sql/sql_binlog.cc2
-rw-r--r--sql/sql_class.cc23
-rw-r--r--sql/sql_class.h22
-rw-r--r--sql/sql_insert.cc5
-rw-r--r--sql/sql_parse.cc4
-rw-r--r--sql/sql_repl.cc75
19 files changed, 1891 insertions, 446 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index c5a870e77ad..df5acb98efe 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -79,6 +79,8 @@ TYPELIB tx_isolation_typelib= {array_elements(tx_isolation_names)-1,"",
static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
uint known_extensions_id= 0;
+static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans,
+ bool is_real_trans);
static plugin_ref ha_default_plugin(THD *thd)
@@ -1077,7 +1079,7 @@ ha_check_and_coalesce_trx_read_only(THD *thd, Ha_trx_info *ha_list,
*/
int ha_commit_trans(THD *thd, bool all)
{
- int error= 0, cookie= 0;
+ int error= 0, cookie;
/*
'all' means that this is either an explicit commit issued by
user, or an implicit commit issued by a DDL.
@@ -1092,7 +1094,8 @@ int ha_commit_trans(THD *thd, bool all)
*/
bool is_real_trans= all || thd->transaction.all.ha_list == 0;
Ha_trx_info *ha_info= trans->ha_list;
- my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
+ bool need_prepare_ordered, need_commit_ordered;
+ my_xid xid;
DBUG_ENTER("ha_commit_trans");
/* Just a random warning to test warnings pushed during autocommit. */
@@ -1131,89 +1134,114 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_RETURN(2);
}
#ifdef USING_TRANSACTIONS
- if (ha_info)
+ if (!ha_info)
{
- uint rw_ha_count;
- bool rw_trans;
+ /* Free resources and perform other cleanup even for 'empty' transactions. */
+ if (is_real_trans)
+ thd->transaction.cleanup();
+ DBUG_RETURN(0);
+ }
- DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE(););
+ DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE(););
- /* Close all cursors that can not survive COMMIT */
- if (is_real_trans) /* not a statement commit */
- thd->stmt_map.close_transient_cursors();
+ /* Close all cursors that can not survive COMMIT */
+ if (is_real_trans) /* not a statement commit */
+ thd->stmt_map.close_transient_cursors();
- rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
- /* rw_trans is TRUE when we in a transaction changing data */
- rw_trans= is_real_trans && (rw_ha_count > 0);
+ uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
+ /* rw_trans is TRUE when we in a transaction changing data */
+ bool rw_trans= is_real_trans && (rw_ha_count > 0);
- if (rw_trans &&
- wait_if_global_read_lock(thd, 0, 0))
- {
- ha_rollback_trans(thd, all);
- DBUG_RETURN(1);
- }
+ if (rw_trans &&
+ wait_if_global_read_lock(thd, 0, 0))
+ {
+ ha_rollback_trans(thd, all);
+ DBUG_RETURN(1);
+ }
- if (rw_trans &&
- opt_readonly &&
- !(thd->security_ctx->master_access & SUPER_ACL) &&
- !thd->slave_thread)
- {
- my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
- ha_rollback_trans(thd, all);
- error= 1;
- goto end;
- }
+ if (rw_trans &&
+ opt_readonly &&
+ !(thd->security_ctx->master_access & SUPER_ACL) &&
+ !thd->slave_thread)
+ {
+ my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
+ goto err;
+ }
- if (!trans->no_2pc && (rw_ha_count > 1))
- {
- for (; ha_info && !error; ha_info= ha_info->next())
- {
- int err;
- handlerton *ht= ha_info->ht();
- /*
- Do not call two-phase commit if this particular
- transaction is read-only. This allows for simpler
- implementation in engines that are always read-only.
- */
- if (! ha_info->is_trx_read_write())
- continue;
- /*
- Sic: we know that prepare() is not NULL since otherwise
- trans->no_2pc would have been set.
- */
- if ((err= ht->prepare(ht, thd, all)))
- {
- my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
- error= 1;
- }
- status_var_increment(thd->status_var.ha_prepare_count);
- }
- DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
- if (error || (is_real_trans && xid &&
- (error= !(cookie= tc_log->log_xid(thd, xid)))))
- {
- ha_rollback_trans(thd, all);
- error= 1;
- goto end;
- }
- DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
- }
- error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0;
- DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
- if (cookie)
- if(tc_log->unlog(cookie, xid))
- {
- error= 2;
- goto end;
- }
+ if (trans->no_2pc || (rw_ha_count <= 1))
+ {
+ error= ha_commit_one_phase(thd, all);
DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
-end:
- if (rw_trans)
- start_waiting_global_read_lock(thd);
+ goto end;
}
- /* Free resources and perform other cleanup even for 'empty' transactions. */
- else if (is_real_trans)
- thd->transaction.cleanup();
+
+ need_prepare_ordered= FALSE;
+ need_commit_ordered= FALSE;
+ xid= thd->transaction.xid_state.xid.get_my_xid();
+
+ for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
+ {
+ int err;
+ handlerton *ht= hi->ht();
+ /*
+ Do not call two-phase commit if this particular
+ transaction is read-only. This allows for simpler
+ implementation in engines that are always read-only.
+ */
+ if (! hi->is_trx_read_write())
+ continue;
+ /*
+ Sic: we know that prepare() is not NULL since otherwise
+ trans->no_2pc would have been set.
+ */
+ err= ht->prepare(ht, thd, all);
+ status_var_increment(thd->status_var.ha_prepare_count);
+ if (err)
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+
+ if (err)
+ goto err;
+
+ need_prepare_ordered|= (ht->prepare_ordered != NULL);
+ need_commit_ordered|= (ht->commit_ordered != NULL);
+ }
+ DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
+
+ if (!is_real_trans)
+ {
+ error= commit_one_phase_2(thd, all, trans, is_real_trans);
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
+ goto end;
+ }
+
+ cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
+ need_commit_ordered);
+ if (!cookie)
+ goto err;
+
+ DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
+
+ error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
+
+ DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
+ if (tc_log->unlog(cookie, xid))
+ {
+ error= 2; /* Error during commit */
+ goto end;
+ }
+
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
+ goto end;
+
+ /* Come here if error and we need to rollback. */
+err:
+ error= 1; /* Transaction was rolled back */
+ ha_rollback_trans(thd, all);
+
+end:
+ if (rw_trans)
+ start_waiting_global_read_lock(thd);
#endif /* USING_TRANSACTIONS */
DBUG_RETURN(error);
}
@@ -1224,7 +1252,6 @@ end:
*/
int ha_commit_one_phase(THD *thd, bool all)
{
- int error=0;
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
/*
"real" is a nick name for a transaction for which a commit will
@@ -1234,8 +1261,16 @@ int ha_commit_one_phase(THD *thd, bool all)
enclosing 'all' transaction is rolled back.
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
- Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("ha_commit_one_phase");
+ DBUG_RETURN(commit_one_phase_2(thd, all, trans, is_real_trans));
+}
+
+static int
+commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
+{
+ int error= 0;
+ Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
+ DBUG_ENTER("commit_one_phase_2");
#ifdef USING_TRANSACTIONS
if (ha_info)
{
@@ -1852,7 +1887,16 @@ int ha_start_consistent_snapshot(THD *thd)
{
bool warn= true;
+ /*
+ Holding the LOCK_commit_ordered mutex ensures that we get the same
+ snapshot for all engines (including the binary log). This allows us
+ among other things to do backups with
+ START TRANSACTION WITH CONSISTENT SNAPSHOT and
+ have a consistent binlog position.
+ */
+ pthread_mutex_lock(&LOCK_commit_ordered);
plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
/*
Same idea as when one wants to CREATE TABLE in one engine which does not
@@ -4622,7 +4666,8 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table)
/** @brief
Write table maps for all (manually or automatically) locked tables
- to the binary log.
+ to the binary log. Also, if binlog_annotate_rows_events is ON,
+ write Annotate_rows event before the first table map.
SYNOPSIS
write_locked_table_maps()
@@ -4659,6 +4704,9 @@ static int write_locked_table_maps(THD *thd)
locks[0]= thd->extra_lock;
locks[1]= thd->lock;
locks[2]= thd->locked_tables;
+ my_bool with_annotate= thd->variables.binlog_annotate_rows_events &&
+ thd->query() && thd->query_length();
+
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
{
MYSQL_LOCK const *const lock= locks[i];
@@ -4676,7 +4724,8 @@ static int write_locked_table_maps(THD *thd)
check_table_binlog_row_based(thd, table))
{
int const has_trans= table->file->has_transactions();
- int const error= thd->binlog_write_table_map(table, has_trans);
+ int const error= thd->binlog_write_table_map(table, has_trans,
+ &with_annotate);
/*
If an error occurs, it is the responsibility of the caller to
roll back the transaction.
diff --git a/sql/handler.h b/sql/handler.h
index 12413d6238a..02b96ebefb8 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -771,9 +771,97 @@ struct handlerton
NOTE 'all' is also false in auto-commit mode where 'end of statement'
and 'real commit' mean the same event.
*/
- int (*commit)(handlerton *hton, THD *thd, bool all);
+ int (*commit)(handlerton *hton, THD *thd, bool all);
+ /*
+ The commit_ordered() method is called prior to the commit() method, after
+ the transaction manager has decided to commit (not rollback) the
+ transaction. Unlike commit(), commit_ordered() is called only when the
+ full transaction is committed, not for each commit of statement
+ transaction in a multi-statement transaction.
+
+ Not that like prepare(), commit_ordered() is only called when 2-phase
+ commit takes place. Ie. when no binary log and only a single engine
+ participates in a transaction, one commit() is called, no
+ commit_ordered(). So engines must be prepared for this.
+
+ The calls to commit_ordered() in multiple parallel transactions is
+ guaranteed to happen in the same order in every participating
+ handler. This can be used to ensure the same commit order among multiple
+ handlers (eg. in table handler and binlog). So if transaction T1 calls
+ into commit_ordered() of handler A before T2, then T1 will also call
+ commit_ordered() of handler B before T2.
+
+ Engines that implement this method should during this call make the
+ transaction visible to other transactions, thereby making the order of
+ transaction commits be defined by the order of commit_ordered() calls.
+
+ The intention is that commit_ordered() should do the minimal amount of
+ work that needs to happen in consistent commit order among handlers. To
+ preserve ordering, calls need to be serialised on a global mutex, so
+ doing any time-consuming or blocking operations in commit_ordered() will
+ limit scalability.
+
+ Handlers can rely on commit_ordered() calls to be serialised (no two
+ calls can run in parallel, so no extra locking on the handler part is
+ required to ensure this).
+
+ Note that commit_ordered() can be called from a different thread than the
+ one handling the transaction! So it can not do anything that depends on
+ thread local storage, in particular it can not call my_error() and
+ friends (instead it can store the error code and delay the call of
+ my_error() to the commit() method).
+
+ Similarly, since commit_ordered() returns void, any return error code
+ must be saved and returned from the commit() method instead.
+
+ The commit_ordered method is optional, and can be left unset if not
+ needed in a particular handler (then there will be no ordering guarantees
+ wrt. other engines and binary log).
+ */
+ void (*commit_ordered)(handlerton *hton, THD *thd, bool all);
int (*rollback)(handlerton *hton, THD *thd, bool all);
int (*prepare)(handlerton *hton, THD *thd, bool all);
+ /*
+ The prepare_ordered method is optional. If set, it will be called after
+ successful prepare() in all handlers participating in 2-phase
+ commit. Like commit_ordered(), it is called only when the full
+ transaction is committed, not for each commit of statement transaction.
+
+ The calls to prepare_ordered() among multiple parallel transactions are
+ ordered consistently with calls to commit_ordered(). This means that
+ calls to prepare_ordered() effectively define the commit order, and that
+ each handler will see the same sequence of transactions calling into
+ prepare_ordered() and commit_ordered().
+
+ Thus, prepare_ordered() can be used to define commit order for handlers
+ that need to do this in the prepare step (like binlog). It can also be
+ used to release transaction's locks early in an order consistent with the
+ order transactions will be eventually committed.
+
+ Like commit_ordered(), prepare_ordered() calls are serialised to maintain
+ ordering, so the intention is that they should execute fast, with only
+ the minimal amount of work needed to define commit order. Handlers can
+ rely on this serialisation, and do not need to do any extra locking to
+ avoid two prepare_ordered() calls running in parallel.
+
+ Like commit_ordered(), prepare_ordered() is not guaranteed to be called
+ in the context of the thread handling the rest of the transaction. So it
+ cannot invoke code that relies on thread local storage, in particular it
+ cannot call my_error().
+
+ prepare_ordered() cannot cause a rollback by returning an error, all
+ possible errors must be handled in prepare() (the prepare_ordered()
+ method returns void). In case of some fatal error, a record of the error
+ must be made internally by the engine and returned from commit() later.
+
+ Note that for user-level XA SQL commands, no consistent ordering among
+ prepare_ordered() and commit_ordered() is guaranteed (as that would
+ require blocking all other commits for an indefinite time).
+
+ When 2-phase commit is not used (eg. only one engine (and no binlog) in
+ transaction), neither prepare() nor prepare_ordered() is called.
+ */
+ void (*prepare_ordered)(handlerton *hton, THD *thd, bool all);
int (*recover)(handlerton *hton, XID *xid_list, uint len);
int (*commit_by_xid)(handlerton *hton, XID *xid);
int (*rollback_by_xid)(handlerton *hton, XID *xid);
diff --git a/sql/log.cc b/sql/log.cc
index 5213bdcc937..53c9843ebbc 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -38,6 +38,7 @@
#endif
#include <mysql/plugin.h>
+#include "debug_sync.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -61,6 +62,38 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
+static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
+
+static LEX_STRING const write_error_msg=
+ { C_STRING_WITH_LEN("error writing to the binary log") };
+
+static my_bool opt_optimize_thread_scheduling= TRUE;
+#ifndef DBUG_OFF
+static ulong opt_binlog_dbug_fsync_sleep= 0;
+#endif
+
+static my_bool mutexes_inited;
+pthread_mutex_t LOCK_prepare_ordered;
+pthread_mutex_t LOCK_commit_ordered;
+
+static ulonglong binlog_status_var_num_commits;
+static ulonglong binlog_status_var_num_group_commits;
+static char binlog_snapshot_file[FN_REFLEN];
+static ulonglong binlog_snapshot_position;
+
+static SHOW_VAR binlog_status_vars_detail[]=
+{
+ {"commits",
+ (char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
+ {"group_commits",
+ (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
+ {"snapshot_file",
+ (char *)&binlog_snapshot_file, SHOW_CHAR},
+ {"snapshot_position",
+ (char *)&binlog_snapshot_position, SHOW_LONGLONG},
+ {NullS, NullS, SHOW_LONG}
+};
+
/**
Silence all errors and warnings reported when performing a write
@@ -134,50 +167,17 @@ char *make_once_alloced_filename(const char *basename, const char *ext)
/*
- Helper class to hold a mutex for the duration of the
- block.
-
- Eliminates the need for explicit unlocking of mutexes on, e.g.,
- error returns. On passing a null pointer, the sentry will not do
- anything.
- */
-class Mutex_sentry
-{
-public:
- Mutex_sentry(pthread_mutex_t *mutex)
- : m_mutex(mutex)
- {
- if (m_mutex)
- pthread_mutex_lock(mutex);
- }
-
- ~Mutex_sentry()
- {
- if (m_mutex)
- pthread_mutex_unlock(m_mutex);
-#ifndef DBUG_OFF
- m_mutex= 0;
-#endif
- }
-
-private:
- pthread_mutex_t *m_mutex;
-
- // It's not allowed to copy this object in any way
- Mutex_sentry(Mutex_sentry const&);
- void operator=(Mutex_sentry const&);
-};
-
-/*
Helper class to store binary log transaction data.
*/
class binlog_trx_data {
public:
binlog_trx_data()
: at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
- before_stmt_pos(MY_OFF_T_UNDEF)
+ before_stmt_pos(MY_OFF_T_UNDEF), last_commit_pos_offset(0),
+ using_xa(FALSE), xa_xid(0)
{
trans_log.end_of_file= max_binlog_cache_size;
+ last_commit_pos_file[0]= 0;
}
~binlog_trx_data()
@@ -229,11 +229,14 @@ public:
completely.
*/
void reset() {
- if (!empty())
+ if (trans_log.type != WRITE_CACHE || !empty())
truncate(0);
before_stmt_pos= MY_OFF_T_UNDEF;
incident= FALSE;
trans_log.end_of_file= max_binlog_cache_size;
+ using_xa= FALSE;
+ last_commit_pos_file[0]= 0;
+ last_commit_pos_offset= 0;
DBUG_ASSERT(empty());
}
@@ -278,6 +281,22 @@ public:
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
+ /*
+ Binlog position for current transaction.
+ For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog
+ position corresponding to the snapshot taken. During (and after) commit,
+ this is set to the binlog position corresponding to just after the
+ commit (so storage engines can store it in their transaction log).
+ */
+ char last_commit_pos_file[FN_REFLEN];
+ my_off_t last_commit_pos_offset;
+
+ /*
+ Flag set true if this transaction is committed with log_xid() as part of
+ XA, false if not.
+ */
+ bool using_xa;
+ my_xid xa_xid;
};
handlerton *binlog_hton;
@@ -1421,6 +1440,7 @@ int binlog_init(void *p)
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
+ binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
}
@@ -1437,91 +1457,118 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
}
/*
- End a transaction.
+ End a transaction, writing events to the binary log.
SYNOPSIS
- binlog_end_trans()
+ binlog_flush_trx_cache()
thd The thread whose transaction should be ended
trx_data Pointer to the transaction data to use
- end_ev The end event to use, or NULL
all True if the entire transaction should be ended, false if
only the statement transaction should be ended.
+ end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
DESCRIPTION
End the currently open transaction. The transaction can be either
- a real transaction (if 'all' is true) or a statement transaction
- (if 'all' is false).
+ a real transaction or a statement transaction.
- If 'end_ev' is NULL, the transaction is a rollback of only
- transactional tables, so the transaction cache will be truncated
- to either just before the last opened statement transaction (if
- 'all' is false), or reset completely (if 'all' is true).
+ This can be to commit a transaction, with a COMMIT query event or an XA
+ commit XID event. But it can also be to rollback a transaction with a
+ ROLLBACK query event, used for rolling back transactions which also
+ contain updates to non-transactional tables.
*/
static int
-binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev, bool all)
+binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev, bool all)
{
- DBUG_ENTER("binlog_end_trans");
- int error=0;
+ DBUG_ENTER("binlog_flush_trx_cache");
IO_CACHE *trans_log= &trx_data->trans_log;
- DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx",
- all ? "all" : "stmt", (long) end_ev));
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
+ if (thd->binlog_flush_pending_rows_event(TRUE))
+ DBUG_RETURN(1);
+
/*
- NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
- only transactional tables. If the transaction contain changes to
- any non-transactiona tables, we need write the transaction and log
- a ROLLBACK last.
- */
- if (end_ev != NULL)
- {
- if (thd->binlog_flush_pending_rows_event(TRUE))
- DBUG_RETURN(1);
- /*
- Doing a commit or a rollback including non-transactional tables,
- i.e., ending a transaction where we might write the transaction
- cache to the binary log.
-
- We can always end the statement when ending a transaction since
- transactions are not allowed inside stored functions. If they
- were, we would have to ensure that we're not ending a statement
- inside a stored function.
- */
- error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev,
- trx_data->has_incident());
- trx_data->reset();
+ Doing a commit or a rollback including non-transactional tables,
+ i.e., ending a transaction where we might write the transaction
+ cache to the binary log.
+
+ We can always end the statement when ending a transaction since
+ transactions are not allowed inside stored functions. If they
+ were, we would have to ensure that we're not ending a statement
+ inside a stored function.
+ */
+ int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data,
+ end_ev, all);
- statistic_increment(binlog_cache_use, &LOCK_status);
- if (trans_log->disk_writes != 0)
- {
- statistic_increment(binlog_cache_disk_use, &LOCK_status);
- trans_log->disk_writes= 0;
- }
- }
- else
+ trx_data->reset();
+
+ statistic_increment(binlog_cache_use, &LOCK_status);
+ if (trans_log->disk_writes != 0)
{
- /*
- If rolling back an entire transaction or a single statement not
- inside a transaction, we reset the transaction cache.
+ statistic_increment(binlog_cache_disk_use, &LOCK_status);
+ trans_log->disk_writes= 0;
+ }
- If rolling back a statement in a transaction, we truncate the
- transaction cache to remove the statement.
- */
- thd->binlog_remove_pending_rows_event(TRUE);
- if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
- {
- if (trx_data->has_incident())
- error= mysql_bin_log.write_incident(thd, TRUE);
- trx_data->reset();
- }
- else // ...statement
- trx_data->truncate(trx_data->before_stmt_pos);
+ DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
+ DBUG_RETURN(error);
+}
+
+/*
+ Discard a transaction, ie. ROLLBACK with only transactional table updates.
+
+ SYNOPSIS
+ binlog_truncate_trx_cache()
+
+ thd The thread whose transaction should be ended
+ trx_data Pointer to the transaction data to use
+ all True if the entire transaction should be ended, false if
+ only the statement transaction should be ended.
+
+ DESCRIPTION
+
+ Rollback (and end) a transaction that only modifies transactional
+ tables. The transaction can be either a real transaction (if 'all' is
+ true) or a statement transaction (if 'all' is false).
+
+ The transaction cache will be truncated to either just before the last
+ opened statement transaction (if 'all' is false), or reset completely (if
+ 'all' is true).
+ */
+static int
+binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all)
+{
+ DBUG_ENTER("binlog_truncate_trx_cache");
+ int error= 0;
+ DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt"));
+ DBUG_PRINT("info", ("thd->options={ %s%s}",
+ FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
+ FLAGSTR(thd->options, OPTION_BEGIN)));
+
+ /*
+ ROLLBACK with nothing to replicate: i.e., rollback of only transactional
+ tables.
+ */
+
+ /*
+ If rolling back an entire transaction or a single statement not
+ inside a transaction, we reset the transaction cache.
+
+ If rolling back a statement in a transaction, we truncate the
+ transaction cache to remove the statement.
+ */
+ thd->binlog_remove_pending_rows_event(TRUE);
+ if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+ {
+ if (trx_data->has_incident())
+ error= mysql_bin_log.write_incident(thd);
+ trx_data->reset();
}
+ else // ...statement
+ trx_data->truncate(trx_data->before_stmt_pos);
DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
DBUG_RETURN(error);
@@ -1533,7 +1580,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
do nothing.
just pretend we can do 2pc, so that MySQL won't
switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::log_xid()
+ real work will be done in MYSQL_BIN_LOG::log_and_order()
*/
return 0;
}
@@ -1584,8 +1631,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
(trans_has_no_stmt_committed(thd, all) &&
!stmt_has_updated_trans_table(thd) && stmt_has_updated_non_trans_table(thd)))
{
- Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
- error= binlog_end_trans(thd, trx_data, &qev, all);
+ Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
}
trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
@@ -1649,7 +1696,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
(thd->options & OPTION_KEEP_LOG)) &&
mysql_bin_log.check_write_error(thd))
trx_data->set_incident();
- error= binlog_end_trans(thd, trx_data, 0, all);
+ error= binlog_truncate_trx_cache(thd, trx_data, all);
}
else
{
@@ -1668,8 +1715,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
stmt_has_updated_non_trans_table(thd) &&
thd->current_stmt_binlog_row_based))
{
- Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
- error= binlog_end_trans(thd, trx_data, &qev, all);
+ Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev, all);
}
/*
Otherwise, we simply truncate the cache as there is no change on
@@ -1677,7 +1724,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/
else if (ending_trans(thd, all) ||
(!(thd->options & OPTION_KEEP_LOG) && !stmt_has_updated_non_trans_table(thd)))
- error= binlog_end_trans(thd, trx_data, 0, all);
+ error= binlog_truncate_trx_cache(thd, trx_data, all);
}
if (!all)
trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback
@@ -2518,6 +2565,8 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE),
+ group_commit_queue(0), group_commit_queue_busy(FALSE),
+ num_commits(0), num_group_commits(0),
is_relay_log(0),
description_event_for_exec(0), description_event_for_queue(0)
{
@@ -2574,6 +2623,7 @@ void MYSQL_BIN_LOG::init_pthread_objects()
(void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
MYF_NO_DEADLOCK_DETECTION);
(void) pthread_cond_init(&update_cond, 0);
+ (void) pthread_cond_init(&COND_queue_busy, 0);
}
@@ -2795,6 +2845,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (flush_io_cache(&log_file) ||
my_sync(log_file.file, MYF(MY_WME)))
goto err;
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ strmake(last_commit_pos_file, log_file_name,
+ sizeof(last_commit_pos_file)-1);
+ last_commit_pos_offset= my_b_tell(&log_file);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
if (write_file_name_to_index_file)
{
@@ -4078,6 +4133,10 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{
sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME));
+#ifndef DBUG_OFF
+ if (opt_binlog_dbug_fsync_sleep > 0)
+ my_sleep(opt_binlog_dbug_fsync_sleep);
+#endif
}
return err;
}
@@ -4269,12 +4328,33 @@ void THD::binlog_set_stmt_begin() {
trx_data->before_stmt_pos= pos;
}
+static int
+binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
+{
+ int err= 0;
+ binlog_trx_data *trx_data;
+ DBUG_ENTER("binlog_start_consistent_snapshot");
+
+ thd->binlog_setup_trx_data();
+ trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+ /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
+ strmake(trx_data->last_commit_pos_file, mysql_bin_log.last_commit_pos_file,
+ sizeof(trx_data->last_commit_pos_file)-1);
+ trx_data->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
+
+ trans_register_ha(thd, TRUE, hton);
+
+ DBUG_RETURN(err);
+}
/*
- Write a table map to the binary log.
+ Write a table map to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the table map.
*/
-int THD::binlog_write_table_map(TABLE *table, bool is_trans)
+int THD::binlog_write_table_map(TABLE *table, bool is_trans,
+ my_bool *with_annotate)
{
int error;
DBUG_ENTER("THD::binlog_write_table_map");
@@ -4292,7 +4372,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
if (is_trans && binlog_table_maps == 0)
binlog_start_trans_and_stmt();
- if ((error= mysql_bin_log.write(&the_event)))
+ if ((error= mysql_bin_log.write(&the_event, with_annotate)))
DBUG_RETURN(error);
binlog_table_maps++;
@@ -4376,44 +4456,48 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (Rows_log_event* pending= trx_data->pending())
{
- IO_CACHE *file= &log_file;
-
/*
Decide if we should write to the log file directly or to the
transaction log.
*/
if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
- file= &trx_data->trans_log;
-
- /*
- If we are not writing to the log file directly, we could avoid
- locking the log.
- */
- pthread_mutex_lock(&LOCK_log);
-
- /*
- Write pending event to log file or transaction cache
- */
- if (pending->write(file))
{
- pthread_mutex_unlock(&LOCK_log);
- set_write_error(thd);
- DBUG_RETURN(1);
+ /* Write to transaction log/cache. */
+ if (pending->write(&trx_data->trans_log))
+ {
+ set_write_error(thd);
+ DBUG_RETURN(1);
+ }
}
-
- delete pending;
-
- if (file == &log_file)
+ else
{
+ /* Write directly to log file. */
+ pthread_mutex_lock(&LOCK_log);
+ if (pending->write(&log_file))
+ {
+ pthread_mutex_unlock(&LOCK_log);
+ set_write_error(thd);
+ DBUG_RETURN(1);
+ }
+
error= flush_and_sync();
if (!error)
{
signal_update();
error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= my_b_tell(&log_file);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+ pthread_mutex_unlock(&LOCK_log);
}
- pthread_mutex_unlock(&LOCK_log);
+ delete pending;
}
thd->binlog_set_pending_rows_event(event);
@@ -4422,10 +4506,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
}
/**
- Write an event to the binary log.
+ Write an event to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the event
+ (this should happen only if the event is a Table_map).
*/
-bool MYSQL_BIN_LOG::write(Log_event *event_info)
+bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
@@ -4443,11 +4529,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
}
/*
- Flush the pending rows event to the transaction cache or to the
- log file. Since this function potentially aquire the LOCK_log
- mutex, we do this before aquiring the LOCK_log mutex in this
- function.
-
We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since
this will close all tables on the slave.
@@ -4457,8 +4538,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (thd->binlog_flush_pending_rows_event(end_stmt))
DBUG_RETURN(error);
- pthread_mutex_lock(&LOCK_log);
-
/*
In most cases this is only called if 'is_open()' is true; in fact this is
mostly called if is_open() *was* true a few instructions before, but it
@@ -4480,7 +4559,6 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db)))
{
- VOID(pthread_mutex_unlock(&LOCK_log));
DBUG_RETURN(0);
}
#endif /* HAVE_REPLICATION */
@@ -4524,15 +4602,23 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->binlog_start_trans_and_stmt();
file= trans_log;
}
- /*
- TODO as Mats suggested, for all the cases above where we write to
- trans_log, it sounds unnecessary to lock LOCK_log. We should rather
- test first if we want to write to trans_log, and if not, lock
- LOCK_log.
- */
}
#endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
+ if (file == &log_file)
+ {
+ pthread_mutex_lock(&LOCK_log);
+ /*
+ We did not want to take LOCK_log unless really necessary.
+ However, now that we hold LOCK_log, we must check is_open() again, lest
+ the log was closed just before.
+ */
+ if (unlikely(!is_open()))
+ {
+ pthread_mutex_unlock(&LOCK_log);
+ DBUG_RETURN(error);
+ }
+ }
/*
No check for auto events flag here - this write method should
@@ -4544,6 +4630,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
of the SQL command
*/
+ if (with_annotate && *with_annotate)
+ {
+ DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
+ Annotate_rows_log_event anno(thd);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if (anno.write(file))
+ goto err;
+ }
+
/*
If row-based binlogging, Insert_id, Rand and other kind of "setting
context" events are not needed.
@@ -4556,7 +4652,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
{
@@ -4567,13 +4663,13 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum());
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
if (thd->user_var_events.elements)
{
@@ -4588,7 +4684,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
user_var_event->type,
user_var_event->charset_number);
if (e.write(file))
- goto err;
+ goto err_unlock;
}
}
}
@@ -4597,7 +4693,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
/* Write the SQL command */
if (event_info->write(file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
- goto err;
+ goto err_unlock;
if (file == &log_file) // we are writing to the real log (disk)
{
@@ -4605,20 +4701,33 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
status_var_add(thd->status_var.binlog_bytes_written, data_written);
if (flush_and_sync())
- goto err;
+ goto err_unlock;
signal_update();
if ((error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)))
- goto err;
+ goto err_unlock;
}
error=0;
+err_unlock:
+ if (file == &log_file)
+ {
+ my_off_t offset= my_b_tell(&log_file);
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+ pthread_mutex_unlock(&LOCK_log);
+ }
+
err:
if (error)
set_write_error(thd);
}
- pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -4723,7 +4832,7 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
We give it a shot and try to write an incident event anyway
to the current log.
*/
- if (!write_incident(current_thd, FALSE))
+ if (!write_incident_already_locked(current_thd))
flush_and_sync();
#ifdef HAVE_REPLICATION
@@ -4764,19 +4873,15 @@ uint MYSQL_BIN_LOG::next_file_id()
write_cache()
thd Current_thread
cache Cache to write to the binary log
- lock_log True if the LOCK_log mutex should be aquired, false otherwise
- sync_log True if the log should be flushed and sync:ed
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
*/
-int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
- bool sync_log)
+int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
- Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
-
+ safe_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
@@ -4888,6 +4993,8 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
}
/* Write data to the binary log file */
+ DBUG_EXECUTE_IF("fail_binlog_write_1",
+ errno= 28; return ER_ERROR_ON_WRITE;);
if (my_b_write(&log_file, cache->read_pos, length))
return ER_ERROR_ON_WRITE;
status_var_add(thd->status_var.binlog_bytes_written, length);
@@ -4897,9 +5004,6 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
DBUG_ASSERT(carry == 0);
- if (sync_log)
- flush_and_sync();
-
return 0; // All OK
}
@@ -4932,31 +5036,50 @@ int query_error_code(THD *thd, bool not_killed)
return error;
}
-bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
+
+bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
{
uint error= 0;
- DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
-
- if (!is_open())
- DBUG_RETURN(error);
-
- LEX_STRING const write_error_msg=
- { C_STRING_WITH_LEN("error writing to the binary log") };
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident_already_locked");
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, write_error_msg);
- if (lock)
- pthread_mutex_lock(&LOCK_log);
- error= ev.write(&log_file);
- status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
- if (lock)
+
+ if (likely(is_open()))
+ {
+ error= ev.write(&log_file);
+ status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+bool MYSQL_BIN_LOG::write_incident(THD *thd)
+{
+ uint error= 0;
+ my_off_t offset;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
+
+ pthread_mutex_lock(&LOCK_log);
+ if (likely(is_open()))
{
- if (!error && !(error= flush_and_sync()))
+ if (!(error= write_incident_already_locked(thd)) &&
+ !(error= flush_and_sync()))
{
signal_update();
error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
- pthread_mutex_unlock(&LOCK_log);
+ offset= my_b_tell(&log_file);
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ pthread_mutex_unlock(&LOCK_commit_ordered);
}
+ pthread_mutex_unlock(&LOCK_log);
+
DBUG_RETURN(error);
}
@@ -4984,111 +5107,367 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
'cache' needs to be reinitialized after this functions returns.
*/
-bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
- bool incident)
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev, bool all)
+{
+ group_commit_entry entry;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+
+ entry.thd= thd;
+ entry.trx_data= trx_data;
+ entry.error= 0;
+ entry.all= all;
+
+ /*
+ Log "BEGIN" at the beginning of every transaction. Here, a transaction is
+ either a BEGIN..COMMIT block or a single statement in autocommit mode.
+
+ Create the necessary events here, where we have the correct THD (and
+ thread context).
+
+ Due to group commit the actual writing to binlog may happen in a different
+ thread.
+ */
+ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ entry.begin_event= &qinfo;
+ entry.end_event= end_ev;
+ if (trx_data->has_incident())
+ {
+ Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+ entry.incident_event= &inc_ev;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+ else
+ {
+ entry.incident_event= NULL;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)");
+ /*
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ the group commit queue. Then the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up.
+ */
+
+ entry->thd->clear_wakeup_ready();
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ group_commit_entry *orig_queue= group_commit_queue;
+ entry->next= orig_queue;
+ group_commit_queue= entry;
+
+ if (entry->trx_data->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+ DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+
+ /*
+ The first in the queue handle group commit for all; the others just wait
+ to be signalled when group commit is done.
+ */
+ if (orig_queue != NULL)
+ entry->thd->wait_for_wakeup_ready();
+ else
+ trx_group_commit_leader(entry);
+
+ if (!opt_optimize_thread_scheduling)
+ {
+ /* For the leader, trx_group_commit_leader() already took the lock. */
+ if (orig_queue != NULL)
+ pthread_mutex_lock(&LOCK_commit_ordered);
+
+ DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (entry->trx_data->using_xa && !entry->error)
+ run_commit_ordered(entry->thd, entry->all);
+
+ group_commit_entry *next= entry->next;
+ if (!next)
+ {
+ group_commit_queue_busy= FALSE;
+ pthread_cond_signal(&COND_queue_busy);
+ DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
+ }
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ }
+
+ if (likely(!entry->error))
+ return 0;
+
+ switch (entry->error)
+ {
+ case ER_ERROR_ON_WRITE:
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
+ break;
+ case ER_ERROR_ON_READ:
+ my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ entry->trx_data->trans_log.file_name, entry->commit_errno);
+ break;
+ default:
+ /*
+ There are not (and should not be) any errors thrown not covered above.
+ But just in case one is added later without updating the above switch
+ statement, include a catch-all.
+ */
+ my_printf_error(entry->error,
+ "Error writing transaction to binary log: %d",
+ MYF(ME_NOREFRESH), entry->error);
+ }
+
+ /*
+ Since we return error, this transaction XID will not be committed, so
+ we need to mark it as not needed for recovery (unlog() is not called
+ for a transaction if log_xid() fails).
+ */
+ if (entry->trx_data->using_xa && entry->trx_data->xa_xid)
+ mark_xid_done();
+
+ return 1;
+}
+
+/*
+ Do binlog group commit as the lead thread.
+
+ This must be called when this thread/transaction is queued at the start of
+ the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
+ commit all the transactions in the queue (more may have entered while waiting
+ for LOCK_log). After commit is done, all other threads in the queue will be
+ signalled.
+
+ */
+void
+MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
+{
+ uint xid_count= 0;
+ uint write_count= 0;
+ my_off_t commit_offset;
+ group_commit_entry *current;
+ group_commit_entry *last_in_queue;
+ DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
+ LINT_INIT(commit_offset);
+
+ /*
+ Lock the LOCK_log(), and once we get it, collect any additional writes
+ that queued up while we were waiting.
+ */
VOID(pthread_mutex_lock(&LOCK_log));
+ DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
- /* NULL would represent nothing to replicate after ROLLBACK */
- DBUG_ASSERT(commit_event != NULL);
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ current= group_commit_queue;
+ group_commit_queue= NULL;
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+ /* As the queue is in reverse order of entering, reverse it. */
+ group_commit_entry *queue= NULL;
+ last_in_queue= current;
+ while (current)
+ {
+ group_commit_entry *next= current->next;
+ current->next= queue;
+ queue= current;
+ current= next;
+ }
+ DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
+
+ /* Now we have in queue the list of transactions to be committed in order. */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
- We only bother to write to the binary log if there is anything
- to write.
- */
- if (my_b_tell(cache) > 0)
+ Commit every transaction in the queue.
+
+ Note that we are doing this in a different thread than the one running
+ the transaction! So we are limited in the operations we can do. In
+ particular, we cannot call my_error() on behalf of a transaction, as
+ that obtains the THD from thread local storage. Instead, we must set
+ current->error and let the thread do the error reporting itself once
+ we wake it up.
+ */
+ for (current= queue; current != NULL; current= current->next)
{
- /*
- Log "BEGIN" at the beginning of every transaction. Here, a
- transaction is either a BEGIN..COMMIT block or a single
- statement in autocommit mode.
- */
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ binlog_trx_data *trx_data= current->trx_data;
+ IO_CACHE *cache= &trx_data->trans_log;
/*
- Now this Query_log_event has artificial log_pos 0. It must be
- adjusted to reflect the real position in the log. Not doing it
- would confuse the slave: it would prevent this one from
- knowing where he is in the master's binlog, which would result
- in wrong positions being shown to the user, MASTER_POS_WAIT
- undue waiting etc.
+ We only bother to write to the binary log if there is anything
+ to write.
*/
- if (qinfo.write(&log_file))
- goto err;
- status_var_add(thd->status_var.binlog_bytes_written, qinfo.data_written);
-
- DBUG_EXECUTE_IF("crash_before_writing_xid",
- {
- if ((write_error= write_cache(thd, cache, FALSE,
- TRUE)))
- DBUG_PRINT("info", ("error writing binlog cache: %d",
- write_error));
- DBUG_PRINT("info", ("crashing before writing xid"));
- DBUG_SUICIDE();
- });
-
- if ((write_error= write_cache(thd, cache, FALSE, FALSE)))
- goto err;
-
- if (commit_event)
+ if (my_b_tell(cache) > 0)
{
- if (commit_event->write(&log_file))
- goto err;
- status_var_add(thd->status_var.binlog_bytes_written,
- commit_event->data_written);
+ if ((current->error= write_transaction(current)))
+ current->commit_errno= errno;
+
+ write_count++;
}
- if (incident && write_incident(thd, FALSE))
- goto err;
+ strmake(trx_data->last_commit_pos_file, log_file_name,
+ sizeof(trx_data->last_commit_pos_file)-1);
+ commit_offset= my_b_write_tell(&log_file);
+ trx_data->last_commit_pos_offset= commit_offset;
+ if (trx_data->using_xa && trx_data->xa_xid)
+ xid_count++;
+ }
+ if (write_count > 0)
+ {
if (flush_and_sync())
- goto err;
- DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE(););
- if (cache->error) // Error on read
{
- sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
- write_error=1; // Don't give more errors
- goto err;
+ for (current= queue; current != NULL; current= current->next)
+ {
+ if (!current->error)
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= errno;
+ }
+ }
+ }
+ else
+ {
+ signal_update();
}
- signal_update();
}
/*
- if commit_event is Xid_log_event, increase the number of
- prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
+ if any commit_events are Xid_log_event, increase the number of
+ prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for
an explanation.
- If the commit_event is not Xid_log_event (then it's a Query_log_event)
- rotate binlog, if necessary.
+ If no Xid_log_events (then it's all Query_log_event) rotate binlog,
+ if necessary.
*/
- if (commit_event && commit_event->get_type_code() == XID_EVENT)
+ if (xid_count > 0)
{
- pthread_mutex_lock(&LOCK_prep_xids);
- prepared_xids++;
- pthread_mutex_unlock(&LOCK_prep_xids);
+ mark_xids_active(xid_count);
}
else
+ {
if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))
- goto err;
+ {
+ /*
+ If we fail to rotate, which thread should get the error?
+ We give the error to the *last* transaction thread; that seems to
+ make the most sense, as it was the last to write to the log.
+ */
+ last_in_queue->error= ER_ERROR_ON_WRITE;
+ last_in_queue->commit_errno= errno;
+ }
+ }
}
- VOID(pthread_mutex_unlock(&LOCK_log));
- DBUG_RETURN(0);
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= commit_offset;
+ /*
+ We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
+ otherwise scheduling could allow the next group commit to run ahead of us,
+ messing up the order of commit_ordered() calls. But as soon as
+ LOCK_commit_ordered is obtained, we can let the next group commit start.
+ */
+ pthread_mutex_unlock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
+ ++num_group_commits;
-err:
- if (!write_error)
+ if (!opt_optimize_thread_scheduling)
{
- write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
+ /*
+ If we want to run commit_ordered() each in the transaction's own thread
+ context, then we need to mark the queue reserved; we need to finish all
+ threads in one group commit before the next group commit can be allowed
+ to proceed, and we cannot unlock a simple pthreads mutex in a different
+ thread from the one that locked it.
+ */
+
+ while (group_commit_queue_busy)
+ pthread_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
+ group_commit_queue_busy= TRUE;
+
+ /* Note that we return with LOCK_commit_ordered locked! */
+ DBUG_VOID_RETURN;
}
- VOID(pthread_mutex_unlock(&LOCK_log));
- DBUG_RETURN(1);
+
+ /*
+ Wakeup each participant waiting for our group commit, first calling the
+ commit_ordered() methods for any transactions doing 2-phase commit.
+ */
+ current= queue;
+ while (current != NULL)
+ {
+ group_commit_entry *next;
+
+ DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (current->trx_data->using_xa && !current->error)
+ run_commit_ordered(current->thd, current->all);
+
+ /*
+ Careful not to access current->next after waking up the other thread! As
+ it may change immediately after wakeup.
+ */
+ next= current->next;
+ if (current != leader) // Don't wake up ourself
+ current->thd->signal_wakeup_ready();
+ current= next;
+ }
+ DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+
+ DBUG_VOID_RETURN;
}
+int
+MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry)
+{
+ binlog_trx_data *trx_data= entry->trx_data;
+ IO_CACHE *cache= &trx_data->trans_log;
+
+ if (entry->begin_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->begin_event->data_written);
+
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(entry->thd, cache)))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync();
+
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ DBUG_SUICIDE();
+ });
+
+ if (write_cache(entry->thd, cache))
+ return ER_ERROR_ON_WRITE;
+
+ if (entry->end_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->end_event->data_written);
+
+ if (entry->incident_event && entry->incident_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+
+ if (cache->error) // Error on read
+ return ER_ERROR_ON_READ;
+
+ return 0;
+}
/**
Wait until we get a signal that the binary log has been updated.
@@ -5467,6 +5846,171 @@ void sql_print_information(const char *format, ...)
}
+void
+TC_init()
+{
+ my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW,
+ "LOCK_prepare_ordered", MYF(0));
+ my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW,
+ "LOCK_commit_ordered", MYF(0));
+ mutexes_inited= TRUE;
+}
+
+
+void
+TC_destroy()
+{
+ if (mutexes_inited)
+ {
+ pthread_mutex_destroy(&LOCK_prepare_ordered);
+ pthread_mutex_destroy(&LOCK_commit_ordered);
+ mutexes_inited= FALSE;
+ }
+}
+
+
+void
+TC_LOG::run_prepare_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ safe_mutex_assert_owner(&LOCK_prepare_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->prepare_ordered)
+ continue;
+ ht->prepare_ordered(ht, thd, all);
+ }
+}
+
+
+void
+TC_LOG::run_commit_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ safe_mutex_assert_owner(&LOCK_commit_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->commit_ordered)
+ continue;
+ ht->commit_ordered(ht, thd, all);
+ DEBUG_SYNC(thd, "commit_after_run_commit_ordered");
+ }
+}
+
+
+int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered,
+ bool need_commit_ordered)
+{
+ int cookie;
+ struct commit_entry entry;
+ bool is_group_commit_leader;
+ LINT_INIT(is_group_commit_leader);
+
+ if (need_prepare_ordered)
+ {
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ run_prepare_ordered(thd, all);
+ if (need_commit_ordered)
+ {
+ /*
+ Must put us in queue so we can run_commit_ordered() in same sequence
+ as we did run_prepare_ordered().
+ */
+ thd->clear_wakeup_ready();
+ entry.thd= thd;
+ commit_entry *previous_queue= commit_ordered_queue;
+ entry.next= previous_queue;
+ commit_ordered_queue= &entry;
+ is_group_commit_leader= (previous_queue == NULL);
+ }
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+ }
+
+ cookie= 0;
+ if (xid)
+ cookie= log_one_transaction(xid);
+
+ if (need_commit_ordered)
+ {
+ if (need_prepare_ordered)
+ {
+ /*
+ We did the run_prepare_ordered() serialised, then ran the log_xid() in
+ parallel. Now we have to do run_commit_ordered() serialised in the
+ same sequence as run_prepare_ordered().
+
+ We do this starting from the head of the queue, each thread doing
+ run_commit_ordered() and signalling the next in queue.
+ */
+ if (is_group_commit_leader)
+ {
+ /* The first in queue starts the ball rolling. */
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ while (commit_ordered_queue_busy)
+ pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
+ commit_entry *queue= commit_ordered_queue;
+ commit_ordered_queue= NULL;
+ /*
+ Mark the queue busy while we bounce it from one thread to the
+ next.
+ */
+ commit_ordered_queue_busy= true;
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* Reverse the queue list so we get correct order. */
+ commit_entry *prev= NULL;
+ while (queue)
+ {
+ commit_entry *next= queue->next;
+ queue->next= prev;
+ prev= queue;
+ queue= next;
+ }
+ DBUG_ASSERT(prev == &entry && prev->thd == thd);
+ }
+ else
+ {
+ /* Not first in queue; just wait until previous thread wakes us up. */
+ thd->wait_for_wakeup_ready();
+ }
+ }
+
+ /* Only run commit_ordered() if log_xid was successful. */
+ if (cookie)
+ {
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ run_commit_ordered(thd, all);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+ }
+
+ if (need_prepare_ordered)
+ {
+ commit_entry *next= entry.next;
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ else
+ {
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+ commit_ordered_queue_busy= false;
+ pthread_cond_signal(&COND_queue_busy);
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+ }
+ }
+ }
+
+ return cookie;
+}
+
+
/********* transaction coordinator log for 2pc - mmap() based solution *******/
/*
@@ -5603,6 +6147,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_active, 0);
pthread_cond_init(&COND_pool, 0);
+ pthread_cond_init(&COND_queue_busy, 0);
inited=6;
@@ -5610,6 +6155,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
active=pages;
pool=pages+1;
pool_last=pages+npages-1;
+ commit_ordered_queue= NULL;
+ commit_ordered_queue_busy= false;
return 0;
@@ -5715,7 +6262,7 @@ int TC_LOG_MMAP::overflow()
to the position in memory where xid was logged to.
*/
-int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
+int TC_LOG_MMAP::log_one_transaction(my_xid xid)
{
int err;
PAGE *p;
@@ -5885,6 +6432,8 @@ void TC_LOG_MMAP::close()
pthread_mutex_destroy(&LOCK_active);
pthread_mutex_destroy(&LOCK_pool);
pthread_cond_destroy(&COND_pool);
+ pthread_cond_destroy(&COND_active);
+ pthread_cond_destroy(&COND_queue_busy);
case 5:
data[0]='A'; // garble the first (signature) byte, in case my_delete fails
case 4:
@@ -6093,39 +6642,98 @@ void TC_LOG_BINLOG::close()
pthread_cond_destroy (&COND_prep_xids);
}
-/**
- @todo
- group commit
-
- @retval
- 0 error
- @retval
- 1 success
+/*
+ Do a binlog log_xid() for a group of transactions, linked through
+ thd->next_commit_ordered.
*/
-int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
+int
+TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered __attribute__((unused)),
+ bool need_commit_ordered __attribute__((unused)))
{
- DBUG_ENTER("TC_LOG_BINLOG::log");
- Xid_log_event xle(thd, xid);
- binlog_trx_data *trx_data=
+ int err;
+ DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
+
+ binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- /*
- We always commit the entire transaction when writing an XID. Also
- note that the return value is inverted.
- */
- DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE));
+
+ trx_data->using_xa= TRUE;
+ trx_data->xa_xid= xid;
+ if (xid)
+ {
+ Xid_log_event xid_event(thd, xid);
+ err= binlog_flush_trx_cache(thd, trx_data, &xid_event, all);
+ }
+ else
+ {
+ /*
+ Empty xid occurs in XA COMMIT ... ONE PHASE.
+ In this case, we do not have a MySQL xid for the transaction, and the
+ external XA transaction coordinator will have to handle recovery if
+ needed. So we end the transaction with a plain COMMIT query event.
+ */
+ Query_log_event end_event(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
+ err= binlog_flush_trx_cache(thd, trx_data, &end_event, all);
+ }
+
+ DEBUG_SYNC(thd, "binlog_after_log_and_order");
+
+ DBUG_RETURN(!err);
}
-int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+/*
+ After an XID is logged, we need to hold on to the current binlog file until
+ it is fully committed in the storage engine. The reason is that crash
+ recovery only looks at the latest binlog, so we must make sure there are no
+ outstanding prepared (but not committed) transactions before rotating the
+ binlog.
+
+ To handle this, we keep a count of outstanding XIDs. This function is used
+ to increase this count when committing one or more transactions to the
+ binary log.
+*/
+void
+TC_LOG_BINLOG::mark_xids_active(uint xid_count)
{
- DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
+ DBUG_PRINT("info", ("xid_count=%u", xid_count));
+ pthread_mutex_lock(&LOCK_prep_xids);
+ prepared_xids+= xid_count;
+ pthread_mutex_unlock(&LOCK_prep_xids);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Once an XID is committed, it is safe to rotate the binary log, as it can no
+ longer be needed during crash recovery.
+
+ This function is called to mark an XID this way. It needs to decrease the
+ count of pending XIDs, and signal the log rotator thread when it reaches zero.
+*/
+void
+TC_LOG_BINLOG::mark_xid_done()
+{
+ my_bool send_signal;
+
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
- if (--prepared_xids == 0) {
+ send_signal= !--prepared_xids;
+ pthread_mutex_unlock(&LOCK_prep_xids);
+ if (send_signal) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids);
}
- pthread_mutex_unlock(&LOCK_prep_xids);
- DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate
+ DBUG_VOID_RETURN;
+}
+
+int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ if (xid)
+ mark_xid_done();
+ /* As ::write_transaction_to_binlog() did not rotate, do it here. */
+ DBUG_RETURN(rotate_and_purge(0));
}
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
@@ -6194,9 +6802,131 @@ ulonglong mysql_bin_log_file_pos(void)
{
return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file;
}
+/*
+ Get the current position of the MySQL binlog for transaction currently being
+ committed.
+
+ This is valid to call from within storage engine commit_ordered() and
+ commit() methods only.
+
+ Since it stores the position inside THD, it is safe to call without any
+ locking.
+*/
+void
+mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
+{
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ if (trx_data)
+ {
+ *out_file= trx_data->last_commit_pos_file;
+ *out_pos= (ulonglong)(trx_data->last_commit_pos_offset);
+ }
+ else
+ {
+ *out_file= NULL;
+ *out_pos= 0;
+ }
+}
#endif /* INNODB_COMPATIBILITY_HOOKS */
+static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+ mysql_bin_log.set_status_variables(thd);
+ var->type= SHOW_ARRAY;
+ var->value= (char *)&binlog_status_vars_detail;
+ return 0;
+}
+
+static SHOW_VAR binlog_status_vars_top[]= {
+ {"binlog", (char *) &show_binlog_vars, SHOW_FUNC},
+ {NullS, NullS, SHOW_LONG}
+};
+
+static MYSQL_SYSVAR_BOOL(
+ optimize_thread_scheduling,
+ opt_optimize_thread_scheduling,
+ PLUGIN_VAR_READONLY,
+ "Run fast part of group commit in a single thread, to optimize kernel "
+ "thread scheduling. On by default. Disable to run each transaction in group "
+ "commit in its own thread, which can be slower at very high concurrency. "
+ "This option is mostly for testing one algorithm versus the other, and it "
+ "should not normally be necessary to change it.",
+ NULL,
+ NULL,
+ 1);
+
+#ifndef DBUG_OFF
+static MYSQL_SYSVAR_ULONG(
+ dbug_fsync_sleep,
+ opt_binlog_dbug_fsync_sleep,
+ PLUGIN_VAR_RQCMDARG,
+ "Extra sleep (in microseconds) to add to binlog fsync(), for debugging",
+ NULL,
+ NULL,
+ 0,
+ 0,
+ ULONG_MAX,
+ 0);
+#endif
+
+static struct st_mysql_sys_var *binlog_sys_vars[]=
+{
+ MYSQL_SYSVAR(optimize_thread_scheduling),
+#ifndef DBUG_OFF
+ MYSQL_SYSVAR(dbug_fsync_sleep),
+#endif
+ NULL
+};
+
+
+/*
+ Copy out the non-directory part of binlog position filename for the
+ `binlog_snapshot_file' status variable, same way as it is done for
+ SHOW MASTER STATUS.
+*/
+static void
+set_binlog_snapshot_file(const char *src)
+{
+ int dir_len = dirname_length(src);
+ strmake(binlog_snapshot_file, src + dir_len, sizeof(binlog_snapshot_file)-1);
+}
+
+/*
+ Copy out current values of status variables, for SHOW STATUS or
+ information_schema.global_status.
+
+ This is called only under LOCK_status, so we can fill in a static array.
+*/
+void
+TC_LOG_BINLOG::set_status_variables(THD *thd)
+{
+ binlog_trx_data *trx_data;
+
+ if (thd)
+ trx_data= (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ else
+ trx_data= NULL;
+
+ bool have_snapshot= (trx_data && trx_data->last_commit_pos_file[0] != 0);
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ binlog_status_var_num_commits= this->num_commits;
+ binlog_status_var_num_group_commits= this->num_group_commits;
+ if (!have_snapshot)
+ {
+ set_binlog_snapshot_file(last_commit_pos_file);
+ binlog_snapshot_position= last_commit_pos_offset;
+ }
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+
+ if (have_snapshot)
+ {
+ set_binlog_snapshot_file(trx_data->last_commit_pos_file);
+ binlog_snapshot_position= trx_data->last_commit_pos_offset;
+ }
+}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
@@ -6211,8 +6941,8 @@ mysql_declare_plugin(binlog)
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
- NULL, /* status variables */
- NULL, /* system variables */
+ binlog_status_vars_top, /* status variables */
+ binlog_sys_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
@@ -6227,8 +6957,8 @@ maria_declare_plugin(binlog)
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
- NULL, /* status variables */
- NULL, /* system variables */
+ binlog_status_vars_top, /* status variables */
+ binlog_sys_vars, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
diff --git a/sql/log.h b/sql/log.h
index df475268cc3..503e94df981 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -38,17 +38,58 @@ class TC_LOG
virtual int open(const char *opt_name)=0;
virtual void close()=0;
- virtual int log_xid(THD *thd, my_xid xid)=0;
+ virtual int log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered,
+ bool need_commit_ordered) = 0;
virtual int unlog(ulong cookie, my_xid xid)=0;
+
+protected:
+ /*
+ These methods are meant to be invoked from log_and_order() implementations
+ to run any prepare_ordered() respectively commit_ordered() methods in
+ participating handlers.
+
+ They must be called using suitable thread syncronisation to ensure that
+ they are each called in the correct commit order among all
+ transactions. However, it is only necessary to call them if the
+ corresponding flag passed to log_and_order is set (it is safe, but not
+ required, to call them when the flag is false).
+
+ The caller must be holding LOCK_prepare_ordered respectively
+ LOCK_commit_ordered when calling these methods.
+ */
+ void run_prepare_ordered(THD *thd, bool all);
+ void run_commit_ordered(THD *thd, bool all);
};
+/*
+ Locks used to ensure serialised execution of TC_LOG::run_prepare_ordered()
+ and TC_LOG::run_commit_ordered(), or any other code that calls handler
+ prepare_ordered() or commit_ordered() methods.
+*/
+extern pthread_mutex_t LOCK_prepare_ordered;
+extern pthread_mutex_t LOCK_commit_ordered;
+
+extern void TC_init();
+extern void TC_destroy();
+
class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
{
public:
TC_LOG_DUMMY() {}
int open(const char *opt_name) { return 0; }
void close() { }
- int log_xid(THD *thd, my_xid xid) { return 1; }
+ /*
+ TC_LOG_DUMMY is only used when there are <= 1 XA-capable engines, and we
+ only use internal XA during commit when >= 2 XA-capable engines
+ participate.
+ */
+ int log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered, bool need_commit_ordered)
+ {
+ DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
+ return 1;
+ }
int unlog(ulong cookie, my_xid xid) { return 0; }
};
@@ -74,6 +115,13 @@ class TC_LOG_MMAP: public TC_LOG
pthread_cond_t cond; // to wait for a sync
} PAGE;
+ /* List of THDs for which to invoke commit_ordered(), in order. */
+ struct commit_entry
+ {
+ struct commit_entry *next;
+ THD *thd;
+ };
+
char logname[FN_REFLEN];
File fd;
my_off_t file_length;
@@ -88,16 +136,38 @@ class TC_LOG_MMAP: public TC_LOG
*/
pthread_mutex_t LOCK_active, LOCK_pool, LOCK_sync;
pthread_cond_t COND_pool, COND_active;
+ /*
+ Queue of threads that need to call commit_ordered().
+ Access to this queue must be protected by LOCK_prepare_ordered.
+ */
+ commit_entry *commit_ordered_queue;
+ /*
+ This flag and condition is used to reserve the queue while threads in it
+ each run the commit_ordered() methods one after the other. Only once the
+ last commit_ordered() in the queue is done can we start on a new queue
+ run.
+
+ Since we start this process in the first thread in the queue and finish in
+ the last (and possibly different) thread, we need a condition variable for
+ this (we cannot unlock a mutex in a different thread than the one who
+ locked it).
+
+ The condition is used together with the LOCK_prepare_ordered mutex.
+ */
+ my_bool commit_ordered_queue_busy;
+ pthread_cond_t COND_queue_busy;
public:
TC_LOG_MMAP(): inited(0) {}
int open(const char *opt_name);
void close();
- int log_xid(THD *thd, my_xid xid);
+ int log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered, bool need_commit_ordered);
int unlog(ulong cookie, my_xid xid);
int recover();
private:
+ int log_one_transaction(my_xid xid);
void get_active_from_pool();
int sync();
int overflow();
@@ -232,9 +302,31 @@ private:
time_t last_time;
};
+class binlog_trx_data;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
+ struct group_commit_entry
+ {
+ struct group_commit_entry *next;
+ THD *thd;
+ binlog_trx_data *trx_data;
+ /*
+ Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
+ written during group commit. The incident_event is only valid if
+ trx_data->has_incident() is true.
+ */
+ Log_event *begin_event;
+ Log_event *end_event;
+ Log_event *incident_event;
+ /* Set during group commit to record any per-thread error. */
+ int error;
+ int commit_errno;
+ /* This is the `all' parameter for ha_commit_ordered(). */
+ bool all;
+ /* True if we come in through XA log_and_order(), false otherwise. */
+ };
+
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */
pthread_mutex_t LOCK_index;
pthread_mutex_t LOCK_prep_xids;
@@ -276,6 +368,20 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
In 5.0 it's 0 for relay logs too!
*/
bool no_auto_events;
+ /* Queue of transactions queued up to participate in group commit. */
+ group_commit_entry *group_commit_queue;
+ /*
+ Condition variable to mark that the group commit queue is busy.
+ Used when each thread does it's own commit_ordered() (when
+ binlog_optimize_thread_scheduling=1).
+ Used with the LOCK_commit_ordered mutex.
+ */
+ my_bool group_commit_queue_busy;
+ pthread_cond_t COND_queue_busy;
+ /* Total number of committed transactions. */
+ ulonglong num_commits;
+ /* Number of group commits done. */
+ ulonglong num_group_commits;
int write_to_file(IO_CACHE *cache);
/*
@@ -285,6 +391,11 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
*/
int new_file_without_locking();
int new_file_impl(bool need_lock);
+ int write_transaction(group_commit_entry *entry);
+ bool write_transaction_to_binlog_events(group_commit_entry *entry);
+ void trx_group_commit_leader(group_commit_entry *leader);
+ void mark_xid_done();
+ void mark_xids_active(uint xid_count);
public:
MYSQL_LOG::generate_name;
@@ -303,6 +414,12 @@ public:
*/
Format_description_log_event *description_event_for_exec,
*description_event_for_queue;
+ /*
+ Binlog position of last commit (or non-transactional write) to the binlog.
+ Access to this is protected by LOCK_commit_ordered.
+ */
+ char last_commit_pos_file[FN_REFLEN];
+ my_off_t last_commit_pos_offset;
MYSQL_BIN_LOG();
/*
@@ -313,7 +430,8 @@ public:
int open(const char *opt_name);
void close();
- int log_xid(THD *thd, my_xid xid);
+ int log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered, bool need_commit_ordered);
int unlog(ulong cookie, my_xid xid);
int recover(IO_CACHE *log, Format_description_log_event *fdle);
#if !defined(MYSQL_CLIENT)
@@ -357,11 +475,14 @@ public:
int new_file();
void reset_gathered_updates(THD *thd);
- bool write(Log_event* event_info); // binary log write
- bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident);
- bool write_incident(THD *thd, bool lock);
- int write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
- bool flush_and_sync);
+ bool write(Log_event* event_info,
+ my_bool *with_annotate= 0); // binary log write
+ bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev, bool all);
+
+ bool write_incident_already_locked(THD *thd);
+ bool write_incident(THD *thd);
+ int write_cache(THD *thd, IO_CACHE *cache);
void set_write_error(THD *thd);
bool check_write_error(THD *thd);
@@ -416,6 +537,7 @@ public:
inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);}
inline IO_CACHE *get_index_file() { return &index_file;}
inline uint32 get_open_count() { return open_count; }
+ void set_status_variables(THD *thd);
};
class Log_event_handler
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 95cf853d9ff..0259ff762ad 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -649,6 +649,7 @@ const char* Log_event::get_type_str(Log_event_type type)
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
case INCIDENT_EVENT: return "Incident";
+ case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
default: return "Unknown"; /* impossible */
}
}
@@ -728,7 +729,7 @@ Log_event::Log_event(const char* buf,
logs are in 4.0 format, until it finds a Format_desc).
*/
if (description_event->binlog_version==3 &&
- buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
+ (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
{
/*
If log_pos=0, don't change it. log_pos==0 is a marker to mean
@@ -746,8 +747,8 @@ Log_event::Log_event(const char* buf,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
flags= uint2korr(buf + FLAGS_OFFSET);
- if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
- (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
+ if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
{
/*
These events always have a header which stops here (i.e. their
@@ -1168,14 +1169,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
/* Check the integrity */
if (event_len < EVENT_LEN_OFFSET ||
- buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
+ (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
- uint event_type= buf[EVENT_TYPE_OFFSET];
+ uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
if (event_type > description_event->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
@@ -1297,6 +1298,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case INCIDENT_EVENT:
ev = new Incident_log_event(buf, event_len, description_event);
break;
+ case ANNOTATE_ROWS_EVENT:
+ ev = new Annotate_rows_log_event(buf, event_len, description_event);
+ break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
@@ -3801,6 +3805,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ // Set header length of the reserved events to 0
+ memset(post_header_len + MYSQL_EVENTS_END - 1, 0,
+ (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8));
+
+ // Set header lengths of Maria events
+ post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
+
// Sanity-check that all post header lengths are initialized.
IF_DBUG({
int i;
@@ -4445,8 +4456,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len,
*/
if (event_len)
copy_log_event(buf, event_len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
- LOAD_HEADER_LEN +
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN +
description_event->common_header_len :
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
description_event);
@@ -4483,7 +4494,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
*/
if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
buf_end,
- buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
+ (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
DBUG_RETURN(1);
data_len = event_len - body_offset;
@@ -6178,7 +6189,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len,
uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
copy_log_event(event_buf,len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
load_header_len + header_len :
(fake_base ? (header_len+load_header_len) :
(header_len+load_header_len) +
@@ -7134,7 +7145,8 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
m_width(tbl_arg ? tbl_arg->s->fields : 1),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
#ifdef HAVE_REPLICATION
- , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
+ , m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL), m_key_info(NULL), m_key_nr(0)
#endif
{
/*
@@ -7182,7 +7194,8 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
#endif
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
+ , m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL), m_key_info(NULL), m_key_nr(0)
#endif
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
@@ -7944,6 +7957,142 @@ void Rows_log_event::print_helper(FILE *file,
#endif
/**************************************************************************
+ Annotate_rows_log_event member functions
+**************************************************************************/
+
+#ifndef MYSQL_CLIENT
+Annotate_rows_log_event::Annotate_rows_log_event(THD *thd)
+ : Log_event(thd, 0, true),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_txt= thd->query();
+ m_query_len= thd->query_length();
+}
+#endif
+
+Annotate_rows_log_event::Annotate_rows_log_event(const char *buf,
+ uint event_len,
+ const Format_description_log_event *desc)
+ : Log_event(buf, desc),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_len= event_len - desc->common_header_len;
+ m_query_txt= (char*) buf + desc->common_header_len;
+}
+
+Annotate_rows_log_event::~Annotate_rows_log_event()
+{
+#ifndef MYSQL_CLIENT
+ if (m_save_thd_query_txt)
+ thd->set_query(m_save_thd_query_txt, m_save_thd_query_len);
+#endif
+}
+
+int Annotate_rows_log_event::get_data_size()
+{
+ return m_query_len;
+}
+
+Log_event_type Annotate_rows_log_event::get_type_code()
+{
+ return ANNOTATE_ROWS_EVENT;
+}
+
+bool Annotate_rows_log_event::is_valid() const
+{
+ return (m_query_txt != NULL && m_query_len != 0);
+}
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
+{
+ return 0;
+}
+#endif
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
+{
+ return my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+void Annotate_rows_log_event::pack_info(Protocol* protocol)
+{
+ if (m_query_txt && m_query_len)
+ protocol->store(m_query_txt, m_query_len, &my_charset_bin);
+}
+#endif
+
+#ifdef MYSQL_CLIENT
+void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo)
+{
+ if (pinfo->short_form)
+ return;
+
+ print_header(&pinfo->head_cache, pinfo, TRUE);
+ my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n");
+
+ char *pbeg; // beginning of the next line
+ char *pend; // end of the next line
+ uint cnt= 0; // characters counter
+
+ for (pbeg= m_query_txt; ; pbeg= pend)
+ {
+ // skip all \r's and \n's at the beginning of the next line
+ for (;; pbeg++)
+ {
+ if (++cnt > m_query_len)
+ return;
+
+ if (*pbeg != '\r' && *pbeg != '\n')
+ break;
+ }
+
+ // find end of the next line
+ for (pend= pbeg + 1;
+ ++cnt <= m_query_len && *pend != '\r' && *pend != '\n';
+ pend++)
+ ;
+
+ // print next line
+ my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4);
+ my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg);
+ my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1);
+ }
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ m_save_thd_query_txt= thd->query();
+ m_save_thd_query_len= thd->query_length();
+ thd->set_query(m_query_txt, m_query_len);
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+Log_event::enum_skip_reason
+Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ return continue_group(rli);
+}
+#endif
+
+/**************************************************************************
Table_map_log_event member functions and support functions
**************************************************************************/
@@ -9057,6 +9206,86 @@ record_compare_exit:
return result;
}
+
+/**
+ Find the best key to use when locating the row in @c find_row().
+
+ A primary key is preferred if it exists; otherwise a unique index is
+ preferred. Else we pick the index with the smalles rec_per_key value.
+
+ If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info
+ member fields appropriately.
+
+ @returns Error code on failure, 0 on success.
+*/
+int Rows_log_event::find_key()
+{
+ uint i, best_key_nr, last_part;
+ KEY *key, *best_key;
+ ulong best_rec_per_key, tmp;
+ DBUG_ENTER("Rows_log_event::find_key");
+ DBUG_ASSERT(m_table);
+
+ best_key_nr= MAX_KEY;
+ LINT_INIT(best_key);
+ LINT_INIT(best_rec_per_key);
+
+ /*
+ Keys are sorted so that any primary key is first, followed by unique keys,
+ followed by any other. So we will automatically pick the primary key if
+ it exists.
+ */
+ for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++)
+ {
+ if (!m_table->s->keys_in_use.is_set(i))
+ continue;
+ /*
+ We cannot use a unique key with NULL-able columns to uniquely identify
+ a row (but we can still select it for range scan below if nothing better
+ is available).
+ */
+ if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)
+ {
+ best_key_nr= i;
+ best_key= key;
+ break;
+ }
+ /*
+ We can only use a non-unique key if it allows range scans (ie. skip
+ FULLTEXT indexes and such).
+ */
+ last_part= key->key_parts - 1;
+ DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu",
+ key->name, last_part, key->rec_per_key[last_part]));
+ if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT))
+ continue;
+
+ tmp= key->rec_per_key[last_part];
+ if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key))
+ {
+ best_key_nr= i;
+ best_key= key;
+ best_rec_per_key= tmp;
+ }
+ }
+
+ if (best_key_nr == MAX_KEY)
+ {
+ m_key_info= NULL;
+ DBUG_RETURN(0);
+ }
+
+ // Allocate buffer for key searches
+ m_key= (uchar *) my_malloc(best_key->key_length, MYF(MY_WME));
+ if (m_key == NULL)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ m_key_info= best_key;
+ m_key_nr= best_key_nr;
+
+ DBUG_RETURN(0);;
+}
+
+
/**
Locate the current row in event's table.
@@ -9156,12 +9385,17 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
*/
store_record(table,record[1]);
- if (table->s->keys > 0 && table->s->keys_in_use.is_set(0))
+ if (m_key_info)
{
- DBUG_PRINT("info",("locating record using primary key (index_read)"));
+ DBUG_PRINT("info",("locating record using key #%u [%s] (index_read)",
+ m_key_nr, m_key_info->name));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_wrong_index",
+ if(0 != strcmp(m_key_info->name,"expected_key")) abort(););
- /* The 0th key is active: search the table using the index */
- if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
+ /* The key is active: search the table using the index */
+ if (!table->file->inited &&
+ (error= table->file->ha_index_init(m_key_nr, FALSE)))
{
DBUG_PRINT("info",("ha_index_init returns error %d",error));
table->file->print_error(error, MYF(0));
@@ -9171,14 +9405,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
/* Fill key data for the row */
DBUG_ASSERT(m_key);
- key_copy(m_key, table->record[0], table->key_info, 0);
+ key_copy(m_key, table->record[0], m_key_info, 0);
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
#ifndef HAVE_valgrind
- DBUG_DUMP("key data", m_key, table->key_info->key_length);
+ DBUG_DUMP("key data", m_key, m_key_info->key_length);
#endif
/*
@@ -9264,6 +9498,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
record we are looking for is stored in record[1].
*/
DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort(););
while (record_compare(table))
{
@@ -9302,6 +9538,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
else
{
DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_table_scan", abort(););
int restart_count= 0; // Number of times scanning has restarted from top
@@ -9421,14 +9659,7 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability
return 0;
}
- if (m_table->s->keys > 0)
- {
- // Allocate buffer for key searches
- m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
- if (!m_key)
- return HA_ERR_OUT_OF_MEM;
- }
- return 0;
+ return find_key();
}
int
@@ -9439,6 +9670,7 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability
m_table->file->ha_index_or_rnd_end();
my_free(m_key, MYF(MY_ALLOW_ZERO_PTR));
m_key= NULL;
+ m_key_info= NULL;
return error;
}
@@ -9541,13 +9773,9 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
int
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
{
- if (m_table->s->keys > 0)
- {
- // Allocate buffer for key searches
- m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
- if (!m_key)
- return HA_ERR_OUT_OF_MEM;
- }
+ int err;
+ if ((err= find_key()))
+ return err;
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
@@ -9562,6 +9790,7 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability
m_table->file->ha_index_or_rnd_end();
my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
m_key= NULL;
+ m_key_info= NULL;
return error;
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 4ea511f45b5..91a2ad9693c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -250,6 +250,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1)
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
+#define ANNOTATE_ROWS_HEADER_LEN 0
/*
Max number of possible extra bytes in a replication event compared to a
packet (i.e. a query) sent from client to master;
@@ -582,8 +583,14 @@ enum Log_event_type
*/
INCIDENT_EVENT= 26,
+ /* New MySQL/Sun events are to be added right above this comment */
+ MYSQL_EVENTS_END,
+
+ MARIA_EVENTS_BEGIN= 160,
+ /* New Maria event numbers start from here */
+ ANNOTATE_ROWS_EVENT= 160,
+
/*
- Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
@@ -2988,6 +2995,59 @@ public:
char *str_to_hex(char *to, const char *from, uint len);
/**
+ @class Annotate_rows_log_event
+
+ In row-based mode, if binlog_annotate_rows_events = ON, each group of
+ Table_map_log_events is preceded by an Annotate_rows_log_event which
+ contains the query which caused the subsequent rows operations.
+
+ The Annotate_rows_log_event has no post-header and its body contains
+ the corresponding query (without trailing zero). Note. The query length
+ is to be calculated as a difference between the whole event length and
+ the common header length.
+*/
+class Annotate_rows_log_event: public Log_event
+{
+public:
+#ifndef MYSQL_CLIENT
+ Annotate_rows_log_event(THD*);
+#endif
+ Annotate_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event*);
+ ~Annotate_rows_log_event();
+
+ virtual int get_data_size();
+ virtual Log_event_type get_type_code();
+ virtual bool is_valid() const;
+
+#ifndef MYSQL_CLIENT
+ virtual bool write_data_header(IO_CACHE*);
+ virtual bool write_data_body(IO_CACHE*);
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual void pack_info(Protocol*);
+#endif
+
+#ifdef MYSQL_CLIENT
+ virtual void print(FILE*, PRINT_EVENT_INFO*);
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+private:
+ virtual int do_apply_event(Relay_log_info const*);
+ virtual int do_update_pos(Relay_log_info*);
+ virtual enum_skip_reason do_shall_skip(Relay_log_info*);
+#endif
+
+private:
+ char *m_query_txt;
+ uint m_query_len;
+ char *m_save_thd_query_txt;
+ uint m_save_thd_query_len;
+};
+
+/**
@class Table_map_log_event
In row-based mode, every row operation event is preceded by a
@@ -3592,7 +3652,10 @@ protected:
const uchar *m_curr_row; /* Start of the row being processed */
const uchar *m_curr_row_end; /* One-after the end of the current row */
uchar *m_key; /* Buffer to keep key value during searches */
+ KEY *m_key_info; /* Pointer to KEY info for m_key_nr */
+ uint m_key_nr; /* Key number */
+ int find_key(); // Find a best key to use in find_row()
int find_row(const Relay_log_info *const);
int write_row(const Relay_log_info *const, const bool);
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 5255d0a8403..b9bc99e9747 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -688,7 +688,11 @@ protected:
/* BINLOG_DUMP options */
#define BINLOG_DUMP_NON_BLOCK 1
+#endif /* !MYSQL_CLIENT */
+#define BINLOG_SEND_ANNOTATE_ROWS_EVENT 2
+
+#ifndef MYSQL_CLIENT
/* sql_show.cc:show_log_files() */
#define SHOW_LOG_STATUS_FREE "FREE"
#define SHOW_LOG_STATUS_INUSE "IN USE"
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 1d8fadd8bb6..68140ae1b5a 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -548,6 +548,7 @@ my_bool opt_local_infile, opt_slave_compressed_protocol;
my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
+my_bool opt_replicate_annotate_rows_events= 0;
bool slave_warning_issued = false;
/*
@@ -1425,6 +1426,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+ TC_destroy();
xid_cache_free();
wt_end();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
@@ -4276,6 +4278,8 @@ a file name for --log-bin-index option", opt_binlog_index_name);
if (!errmesg[0][0])
unireg_abort(1);
+ TC_init();
+
/* We have to initialize the storage engines before CSV logging */
if (ha_init())
{
@@ -5905,6 +5909,8 @@ enum options_mysqld
OPT_REPLICATE_IGNORE_DB, OPT_LOG_SLAVE_UPDATES,
OPT_BINLOG_DO_DB, OPT_BINLOG_IGNORE_DB,
OPT_BINLOG_FORMAT,
+ OPT_BINLOG_ANNOTATE_ROWS_EVENTS,
+ OPT_REPLICATE_ANNOTATE_ROWS_EVENTS,
#ifndef DBUG_OFF
OPT_BINLOG_SHOW_XID,
#endif
@@ -6135,6 +6141,18 @@ struct my_option my_long_options[] =
#endif
, &opt_binlog_format, &opt_binlog_format,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"binlog-annotate-rows-events", OPT_BINLOG_ANNOTATE_ROWS_EVENTS,
+ "Tells the master to annotate RBR events with the statement that "
+ "caused these events.",
+ (uchar**) &global_system_variables.binlog_annotate_rows_events,
+ (uchar**) &max_system_variables.binlog_annotate_rows_events,
+ 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"replicate-annotate-rows-events", OPT_REPLICATE_ANNOTATE_ROWS_EVENTS,
+ "Tells the slave to write annotate rows events recieved from the master "
+ "to its own binary log. Sensible only in pair with log-slave-updates option.",
+ (uchar**) &opt_replicate_annotate_rows_events,
+ (uchar**) &opt_replicate_annotate_rows_events,
+ 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"binlog-do-db", OPT_BINLOG_DO_DB,
"Tells the master it should log updates for the specified database, "
"and exclude all others not explicitly mentioned.",
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6319bbb60cf..42701bc9e24 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -38,7 +38,8 @@ Relay_log_info::Relay_log_info()
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), m_flags(0)
+ last_event_start_time(0), m_flags(0),
+ m_annotate_event(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -72,6 +73,7 @@ Relay_log_info::~Relay_log_info()
pthread_cond_destroy(&stop_cond);
pthread_cond_destroy(&log_space_cond);
relay_log.cleanup();
+ free_annotate_event();
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 5cafcf47086..0ea3c23bfd8 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -423,8 +423,46 @@ public:
(m_flags & (1UL << IN_STMT));
}
+ /**
+ Save pointer to Annotate_rows event and switch on the
+ binlog_annotate_rows_events for this sql thread.
+ To be called when sql thread recieves an Annotate_rows event.
+ */
+ inline void set_annotate_event(Annotate_rows_log_event *event)
+ {
+ free_annotate_event();
+ m_annotate_event= event;
+ sql_thd->variables.binlog_annotate_rows_events= 1;
+ }
+
+ /**
+ Returns pointer to the saved Annotate_rows event or NULL if there is
+ no saved event.
+ */
+ inline Annotate_rows_log_event* get_annotate_event()
+ {
+ return m_annotate_event;
+ }
+
+ /**
+ Delete saved Annotate_rows event (if any) and switch off the
+ binlog_annotate_rows_events for this sql thread.
+ To be called when sql thread has applied the last (i.e. with
+ STMT_END_F flag) rbr event.
+ */
+ inline void free_annotate_event()
+ {
+ if (m_annotate_event)
+ {
+ sql_thd->variables.binlog_annotate_rows_events= 0;
+ delete m_annotate_event;
+ m_annotate_event= 0;
+ }
+ }
+
private:
uint32 m_flags;
+ Annotate_rows_log_event *m_annotate_event;
};
diff --git a/sql/set_var.cc b/sql/set_var.cc
index b02330727e9..33575de1ccf 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -183,6 +183,9 @@ static sys_var_const sys_back_log(&vars, "back_log",
OPT_GLOBAL, SHOW_LONG,
(uchar*) &back_log);
static sys_var_const_os_str sys_basedir(&vars, "basedir", mysql_home);
+static sys_var_thd_bool
+sys_binlog_annotate_rows_events(&vars, "binlog_annotate_rows_events",
+ &SV::binlog_annotate_rows_events);
static sys_var_long_ptr sys_binlog_cache_size(&vars, "binlog_cache_size",
&binlog_cache_size);
static sys_var_thd_binlog_format sys_binlog_format(&vars, "binlog_format",
diff --git a/sql/slave.cc b/sql/slave.cc
index 0d14234766b..63b7ce715c9 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1867,6 +1867,9 @@ static int request_dump(MYSQL* mysql, Master_info* mi,
*suppress_warnings= FALSE;
+ if (opt_log_slave_updates && opt_replicate_annotate_rows_events)
+ binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2261,17 +2264,41 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
}
exec_res= apply_event_and_update_pos(ev, thd, rli);
- /*
- Format_description_log_event should not be deleted because it will be
- used to read info about the relay log's format; it will be deleted when
- the SQL thread does not need it, i.e. when this thread terminates.
- */
- if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
- {
- DBUG_PRINT("info", ("Deleting the event after it has been executed"));
- delete ev;
+ switch (ev->get_type_code()) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rli->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rli->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ delete ev;
+ break;
}
+
/*
update_log_pos failed: this should not happen, so we don't
retry.
@@ -2899,6 +2926,12 @@ pthread_handler_t handle_slave_sql(void *arg)
thd->init_for_queries();
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
+ /*
+ binlog_annotate_rows_events must be TRUE only after an Annotate_rows event
+ has been recieved and only till the last corresponding rbr event has been
+ applied. In all other cases it must be FALSE.
+ */
+ thd->variables.binlog_annotate_rows_events= 0;
pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
pthread_mutex_unlock(&LOCK_thread_count);
@@ -3381,7 +3414,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
If we get Load event, we need to pass a non-reusable buffer
to read_log_event, so we do a trick
*/
- if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
{
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
@@ -3588,13 +3621,13 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
LINT_INIT(inc_pos);
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
- buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
+ (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
LINT_INIT(inc_pos);
pthread_mutex_lock(&mi->data_lock);
- switch (buf[EVENT_TYPE_OFFSET]) {
+ switch ((uchar)buf[EVENT_TYPE_OFFSET]) {
case STOP_EVENT:
/*
We needn't write this event to the relay log. Indeed, it just indicates a
@@ -3697,9 +3730,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
*/
- if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
diff --git a/sql/slave.h b/sql/slave.h
index 1aa5b374e4b..3333e583559 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -106,6 +106,7 @@ extern MYSQL_PLUGIN_IMPORT char *relay_log_info_file;
extern char *opt_relay_logname, *opt_relaylog_index_name;
extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
+extern my_bool opt_replicate_annotate_rows_events;
extern ulonglong relay_log_space_limit;
/*
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index 9713ec1ef5c..a59cd631fef 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -174,7 +174,7 @@ void mysql_client_binlog_statement(THD* thd)
*/
if (!have_fd_event)
{
- int type = bufptr[EVENT_TYPE_OFFSET];
+ int type = (uchar)bufptr[EVENT_TYPE_OFFSET];
if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3)
have_fd_event= TRUE;
else
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index f10655c3e51..f8905cd5f8c 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -763,6 +763,8 @@ THD::THD()
active_vio = 0;
#endif
pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&LOCK_wakeup_ready, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_wakeup_ready, 0);
/* Variables with default values */
proc_info="login";
@@ -1149,6 +1151,8 @@ THD::~THD()
free_root(&transaction.mem_root,MYF(0));
#endif
mysys_var=0; // Safety (shouldn't be needed)
+ pthread_cond_destroy(&COND_wakeup_ready);
+ pthread_mutex_destroy(&LOCK_wakeup_ready);
pthread_mutex_destroy(&LOCK_thd_data);
#ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE;
@@ -4243,6 +4247,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_RETURN(0);
}
+void
+THD::wait_for_wakeup_ready()
+{
+ pthread_mutex_lock(&LOCK_wakeup_ready);
+ while (!wakeup_ready)
+ pthread_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready);
+ pthread_mutex_unlock(&LOCK_wakeup_ready);
+}
+
+void
+THD::signal_wakeup_ready()
+{
+ pthread_mutex_lock(&LOCK_wakeup_ready);
+ wakeup_ready= true;
+ pthread_mutex_unlock(&LOCK_wakeup_ready);
+ pthread_cond_signal(&COND_wakeup_ready);
+}
+
+
bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr)
{
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 477ae9bf751..9ea4e37e610 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -451,6 +451,7 @@ struct system_variables
ulong ndb_index_stat_cache_entries;
ulong ndb_index_stat_update_freq;
ulong binlog_format; // binlog format for this thd (see enum_binlog_format)
+ my_bool binlog_annotate_rows_events;
my_bool binlog_direct_non_trans_update;
/*
In slave thread we need to know in behalf of which
@@ -1577,7 +1578,8 @@ public:
*/
void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin();
- int binlog_write_table_map(TABLE *table, bool is_transactional);
+ int binlog_write_table_map(TABLE *table, bool is_transactional,
+ my_bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
@@ -2551,6 +2553,14 @@ public:
return backup;
}
+ void clear_wakeup_ready() { wakeup_ready= false; }
+ /*
+ Sleep waiting for others to wake us up with signal_wakeup_ready().
+ Must call clear_wakeup_ready() before waiting.
+ */
+ void wait_for_wakeup_ready();
+ /* Wake this thread up from wait_for_wakeup_ready(). */
+ void signal_wakeup_ready();
private:
/** The current internal error handler for this thread, or NULL. */
Internal_error_handler *m_internal_handler;
@@ -2589,6 +2599,16 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+ /*
+ Flag, mutex and condition for a thread to wait for a signal from another
+ thread.
+
+ Currently used to wait for group commit to complete, can also be used for
+ other purposes.
+ */
+ bool wakeup_ready;
+ pthread_mutex_t LOCK_wakeup_ready;
+ pthread_cond_t COND_wakeup_ready;
};
/** A short cut for thd->main_da.set_ok_status(). */
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index adc82d791b6..05804c2ee7e 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -1970,6 +1970,11 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
pthread_mutex_lock(&LOCK_thread_count);
thread_count++;
pthread_mutex_unlock(&LOCK_thread_count);
+ /*
+ Annotating delayed inserts is not supported.
+ */
+ di->thd.variables.binlog_annotate_rows_events= 0;
+
di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0);
if (di->thd.db == NULL || di->thd.query() == NULL)
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 3088d807549..103cf72cf14 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1008,6 +1008,10 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
DBUG_ENTER("dispatch_command");
DBUG_PRINT("info", ("command: %d", command));
+ DBUG_EXECUTE_IF("crash_dispatch_command_before",
+ { DBUG_PRINT("crash_dispatch_command_before", ("now"));
+ DBUG_ABORT(); });
+
thd->command=command;
/*
Commands which always take a long time are logged into
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 95e48c531be..596f0f5c1e6 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -489,7 +489,7 @@ impossible position";
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
(*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
@@ -556,32 +556,36 @@ impossible position";
}
#endif
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT ||
+ (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ DBUG_PRINT("info", ("log event code %d",
+ (*packet)[LOG_EVENT_OFFSET+1] ));
+ if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
}
packet->set("\0", 1, &my_charset_bin);
}
@@ -677,23 +681,27 @@ impossible position";
if (read_packet)
{
- thd_proc_info(thd, "Sending binlog event to slave");
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT ||
+ (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
- errmsg = "failed in send_file()";
+ errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- }
+
+ if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
+ }
packet->set("\0", 1, &my_charset_bin);
/*
No need to net_flush because we will get to flush later when
@@ -1774,6 +1782,11 @@ static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
OPT_GLOBAL, SHOW_MY_BOOL,
(uchar*) &opt_log_slave_updates);
+static sys_var_const
+sys_replicate_annotate_rows_events(&vars,
+ "replicate_annotate_rows_events",
+ OPT_GLOBAL, SHOW_MY_BOOL,
+ (uchar*) &opt_replicate_annotate_rows_events);
static sys_var_const sys_relay_log(&vars, "relay_log",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &opt_relay_logname);