summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2016-09-22 08:26:45 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2016-10-14 23:15:58 +0200
commit19abe79fd15ab6d8ac0c2f65476bc3c7d29a008d (patch)
treef5a34e32e5bceba084584efb471c2ff7b369eed4 /sql/slave.cc
parent50f19ca8099994e992e1b411c7c05287855a7bdd (diff)
downloadmariadb-git-19abe79fd15ab6d8ac0c2f65476bc3c7d29a008d.tar.gz
MDEV-7145: Delayed replication, intermediate commit.
Initial merge of delayed replication from MySQL git. The code from the initial push into MySQL is merged, and the associated test case passes. A number of tasks are still pending: 1. Check full test suite run for any regressions or .result file updates. 2. Extend the feature to also work for parallel replication. 3. There are some todo-comments about future refactoring left from MySQL, these should be located and merged on top. 4. There are some later related MySQL commits, these should be checked and merged. These include: e134b9362ba0b750d6ac1b444780019622d14aa5 b38f0f7857c073edfcc0a64675b7f7ede04be00f fd2b210383358fe7697f201e19ac9779879ba72a afc397376ec50e96b2918ee64e48baf4dda0d37d 5. The testcase from MySQL relies heavily on sleep and timing for testing, and seems likely to sporadically fail on heavily loaded test servers in buildbot or distro build farms. Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc245
1 files changed, 192 insertions, 53 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index a55d26b1aa0..d1d9abd4027 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1634,8 +1634,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
+ mysql_mutex_unlock(&mi->data_lock);
}
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
@@ -1647,7 +1649,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
}
else
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master= 0; /* The "most sensible" value */
+ mysql_mutex_unlock(&mi->data_lock);
sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
"do not trust column Seconds_Behind_Master of SHOW "
"SLAVE STATUS. Error: %s (%d)",
@@ -2797,6 +2801,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
Item_empty_string(thd, "Parallel_Mode",
sizeof("conservative")-1),
mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Delay", 10,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Remaining_Delay", 8,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_SQL_Running_State",
+ 20));
if (full)
{
field_list->push_back(new (mem_root)
@@ -2986,6 +2999,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
+ // SQL_Delay
// Master_Ssl_Crl
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
@@ -3008,6 +3022,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
}
+ protocol->store((uint32) mi->rli.get_sql_delay());
+ // SQL_Remaining_Delay
+ // THD::proc_info is not protected by any lock, so we read it once
+ // to ensure that we use the same value throughout this function.
+ const char *slave_sql_running_state=
+ mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : "";
+ if (slave_sql_running_state == Relay_log_info::state_delaying_string)
+ {
+ time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end();
+ protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0));
+ }
+ else
+ protocol->store_null();
+ // Slave_SQL_Running_State
+ protocol->store(slave_sql_running_state, &my_charset_bin);
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3377,6 +3407,69 @@ has_temporary_error(THD *thd)
}
+/**
+ If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock.
+
+ Design note: this is the place to unlock rli->data_lock here since
+ it should be held when reading delay info from rli, but it should
+ not be held while sleeping.
+
+ @param ev Event that is about to be executed.
+
+ @param thd The sql thread's THD object.
+
+ @param rli The sql thread's Relay_log_info structure.
+*/
+static void sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi)
+{
+ Relay_log_info* rli= rgi->rli;
+ long sql_delay= rli->get_sql_delay();
+
+ DBUG_ENTER("sql_delay_event");
+ mysql_mutex_assert_owner(&rli->data_lock);
+ DBUG_ASSERT(!rli->belongs_to_client());
+
+ int type= ev->get_type_code();
+ if (sql_delay && type != ROTATE_EVENT &&
+ type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3)
+ {
+ // The time when we should execute the event.
+ time_t sql_delay_end=
+ ev->when + rli->mi->clock_diff_with_master + sql_delay;
+ // The current time.
+ time_t now= my_time(0);
+ // The time we will have to sleep before executing the event.
+ unsigned long nap_time= 0;
+ if (sql_delay_end > now)
+ nap_time= sql_delay_end - now;
+
+ DBUG_PRINT("info", ("sql_delay= %lu "
+ "ev->when= %lu "
+ "rli->mi->clock_diff_with_master= %lu "
+ "now= %ld "
+ "sql_delay_end= %lu "
+ "nap_time= %ld",
+ sql_delay, (long)ev->when,
+ rli->mi->clock_diff_with_master,
+ (long)now, sql_delay_end, (long)nap_time));
+
+ if (sql_delay_end > now)
+ {
+ DBUG_PRINT("info", ("delaying replication event %lu secs",
+ nap_time));
+ rli->start_sql_delay(sql_delay_end);
+ mysql_mutex_unlock(&rli->data_lock);
+ slave_sleep(thd, nap_time, sql_slave_killed, rgi);
+ DBUG_VOID_RETURN;
+ }
+ }
+
+ mysql_mutex_unlock(&rli->data_lock);
+
+ DBUG_VOID_RETURN;
+}
+
+
/*
First half of apply_event_and_update_pos(), see below.
Setup some THD variables for applying the event.
@@ -3500,16 +3593,16 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
if (exec_res == 0)
{
int error= ev->update_pos(rgi);
-#ifdef HAVE_valgrind
- if (!rli->is_fake)
-#endif
+ #ifndef DBUG_OFF
+ DBUG_PRINT("info", ("update_pos error = %d", error));
+ if (!rli->belongs_to_client())
{
- DBUG_PRINT("info", ("update_pos error = %d", error));
DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos,
rli->group_relay_log_name));
DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos,
rli->event_relay_log_name));
}
+#endif
/*
The update should not fail, so print an error message and
return an error code.
@@ -3544,21 +3637,39 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
/**
Applies the given event and advances the relay log position.
- In essence, this function does:
+ This is needed by the sql thread to execute events from the binlog,
+ and by clients executing BINLOG statements. Conceptually, this
+ function does:
@code
ev->apply_event(rli);
ev->update_pos(rli);
@endcode
- But it also does some maintainance, such as skipping events if
- needed and reporting errors.
+ It also does the following maintainance:
- If the @c skip flag is set, then it is tested whether the event
- should be skipped, by looking at the slave_skip_counter and the
- server id. The skip flag should be set when calling this from a
- replication thread but not set when executing an explicit BINLOG
- statement.
+ - Initializes the thread's server_id and time; and the event's
+ thread.
+
+ - If !rli->belongs_to_client() (i.e., if it belongs to the slave
+ sql thread instead of being used for executing BINLOG
+ statements), it does the following things: (1) skips events if it
+ is needed according to the server id or slave_skip_counter; (2)
+ unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER
+ TO MASTER_DELAY=X'; (4) maintains the running state of the sql
+ thread (rli->thread_state).
+
+ - Reports errors as needed.
+
+ @param ev The event to apply.
+
+ @param thd The client thread that executes the event (i.e., the
+ slave sql thread if called from a replication slave, or the client
+ thread if called to execute a BINLOG statement).
+
+ @param rli The relay log info (i.e., the slave's rli if called from
+ a replication slave, or the client's thd->rli_fake if called to
+ execute a BINLOG statement).
@retval 0 OK.
@@ -3581,7 +3692,15 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
DBUG_ASSERT(rli->slave_skip_counter > 0);
rli->slave_skip_counter--;
}
- mysql_mutex_unlock(&rli->data_lock);
+
+ if (reason == Log_event::EVENT_SKIP_NOT)
+ {
+ // Sleeps if needed, and unlocks rli->data_lock.
+ sql_delay_event(ev, thd, rgi);
+ }
+ else
+ mysql_mutex_unlock(&rli->data_lock);
+
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3603,6 +3722,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
driver thread, so 23 should never see EVENT_SKIP_COUNT here.
*/
DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
+ /*
+ Calling sql_delay_event() was handled in the SQL driver thread when
+ doing parallel replication.
+ */
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3682,7 +3805,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
/**
- Top-level function for executing the next event from the relay log.
+ Top-level function for executing the next event in the relay log.
+ This is called from the SQL thread.
This function reads the event from the relay log, executes it, and
advances the relay log position. It also handles errors, etc.
@@ -4229,8 +4353,10 @@ connected:
};);
#endif
- // TODO: the assignment below should be under mutex (5.0)
+ mysql_mutex_lock(&mi->run_lock);
mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
+ mysql_mutex_unlock(&mi->run_lock);
+
thd->slave_net = &mysql->net;
THD_STAGE_INFO(thd, stage_checking_master_version);
ret= get_master_version_and_clock(mysql, mi);
@@ -6582,67 +6708,80 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
#endif
-/*
- Store the file and position where the execute-slave thread are in the
+/**
+ Store the file and position where the slave's SQL thread are in the
relay log.
- SYNOPSIS
- flush_relay_log_info()
- rli Relay log information
+ Notes:
- NOTES
- - As this is only called by the slave thread or on STOP SLAVE, with the
- log_lock grabbed and the slave thread stopped, we don't need to have
- a lock here.
- - If there is an active transaction, then we don't update the position
- in the relay log. This is to ensure that we re-execute statements
- if we die in the middle of an transaction that was rolled back.
- - As a transaction never spans binary logs, we don't have to handle the
- case where we do a relay-log-rotation in the middle of the transaction.
- If this would not be the case, we would have to ensure that we
- don't delete the relay log file where the transaction started when
- we switch to a new relay log file.
-
- TODO
- - Change the log file information to a binary format to avoid calling
- longlong2str.
+ - This function should be called either from the slave SQL thread,
+ or when the slave thread is not running. (It reads the
+ group_{relay|master}_log_{pos|name} and delay fields in the rli
+ object. These may only be modified by the slave SQL thread or by
+ a client thread when the slave SQL thread is not running.)
- RETURN VALUES
- 0 ok
- 1 write error
-*/
+ - If there is an active transaction, then we do not update the
+ position in the relay log. This is to ensure that we re-execute
+ statements if we die in the middle of an transaction that was
+ rolled back.
+
+ - As a transaction never spans binary logs, we don't have to handle
+ the case where we do a relay-log-rotation in the middle of the
+ transaction. If transactions could span several binlogs, we would
+ have to ensure that we do not delete the relay log file where the
+ transaction started before switching to a new relay log file.
+
+ - Error can happen if writing to file fails or if flushing the file
+ fails.
+
+ @param rli The object representing the Relay_log_info.
+ @todo Change the log file information to a binary format to avoid
+ calling longlong2str.
+
+ @todo Move the member function into rpl_rli.cc and get rid of the
+ global function. /SVEN
+
+ @return 0 on success, 1 on error.
+*/
bool flush_relay_log_info(Relay_log_info* rli)
{
- bool error=0;
- DBUG_ENTER("flush_relay_log_info");
+ return rli->flush();
+}
- if (unlikely(rli->no_storage))
- DBUG_RETURN(0);
+bool Relay_log_info::flush()
+{
+ bool error=0;
- IO_CACHE *file = &rli->info_file;
- char buff[FN_REFLEN*2+22*2+4], *pos;
+ DBUG_ENTER("Relay_log_info::flush()");
+ IO_CACHE *file = &info_file;
+ // 2*file name, 2*long long, 2*unsigned long, 6*'\n'
+ char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos;
my_b_seek(file, 0L);
- pos=strmov(buff, rli->group_relay_log_name);
+ pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10);
+ *pos++='\n';
+ pos=strmov(pos, group_relay_log_name);
*pos++='\n';
- pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10);
+ pos=longlong10_to_str(group_relay_log_pos, pos, 10);
*pos++='\n';
- pos=strmov(pos, rli->group_master_log_name);
+ pos=strmov(pos, group_master_log_name);
*pos++='\n';
- pos=longlong10_to_str(rli->group_master_log_pos, pos, 10);
+ pos=longlong10_to_str(group_master_log_pos, pos, 10);
*pos='\n';
+ pos= longlong10_to_str(sql_delay, pos, 10);
+ *pos= '\n';
if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
error=1;
if (flush_io_cache(file))
error=1;
if (sync_relayloginfo_period &&
!error &&
- ++(rli->sync_counter) >= sync_relayloginfo_period)
+ ++sync_counter >= sync_relayloginfo_period)
{
- if (my_sync(rli->info_fd, MYF(MY_WME)))
+ if (my_sync(info_fd, MYF(MY_WME)))
error=1;
- rli->sync_counter= 0;
+ sync_counter= 0;
}
/*
Flushing the relay log is done by the slave I/O thread