summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorSergei Golubchik <sergii@pisem.net>2013-07-18 16:46:57 +0200
committerSergei Golubchik <sergii@pisem.net>2013-07-18 16:46:57 +0200
commit5f6380adde2dac3f32b40339b9b702c0135eb7d6 (patch)
tree31068acc0b39c208d35d524688a5985831af0447 /sql/slave.cc
parent8a23ae088dc38f591efeab9eccdef5eb9094add9 (diff)
parent97e640b9ae83e07b444fceede6b0524256c7a3cc (diff)
downloadmariadb-git-5f6380adde2dac3f32b40339b9b702c0135eb7d6.tar.gz
10.0-base merge
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc105
1 files changed, 74 insertions, 31 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 46a7ddb28f3..55fe53345da 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -166,6 +166,37 @@ static bool send_show_master_info_header(THD *thd, bool full,
size_t gtid_pos_length);
static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
String *gtid_pos);
+/*
+ Function to set the slave's max_allowed_packet based on the value
+ of slave_max_allowed_packet.
+
+ @in_param thd Thread handler for slave
+ @in_param mysql MySQL connection handle
+*/
+
+static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql)
+{
+ DBUG_ENTER("set_slave_max_allowed_packet");
+ // thd and mysql must be valid
+ DBUG_ASSERT(thd && mysql);
+
+ thd->variables.max_allowed_packet= slave_max_allowed_packet;
+ thd->net.max_packet_size= slave_max_allowed_packet;
+ /*
+ Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
+ thread and the mysql->option max_allowed_packet, since a
+ replication event can become this much larger than
+ the corresponding packet (query) sent from client to master.
+ */
+ thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
+ /*
+ Skipping the setting of mysql->net.max_packet size to slave
+ max_allowed_packet since this is done during mysql_real_connect.
+ */
+ mysql->options.max_allowed_packet=
+ slave_max_allowed_packet+MAX_LOG_EVENT_HEADER;
+ DBUG_VOID_RETURN;
+}
/*
Find out which replications threads are running
@@ -887,9 +918,21 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
if (mi->using_gtid != Master_info::USE_GTID_NO &&
!mi->slave_running && !mi->rli.slave_running)
{
+ /*
+ purge_relay_logs() clears the mi->rli.group_master_log_pos.
+ So save and restore them, like we do in CHANGE MASTER.
+ (We are not going to use them for GTID, but it might be worth to
+ keep them in case connection with GTID fails and user wants to go
+ back and continue with previous old-style replication coordinates).
+ */
+ mi->master_log_pos = max(BIN_LOG_HEADER_SIZE, mi->rli.group_master_log_pos);
+ strmake(mi->master_log_name, mi->rli.group_master_log_name,
+ sizeof(mi->master_log_name)-1);
purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
- mi->master_log_name[0]= 0;
- mi->master_log_pos= 0;
+ mi->rli.group_master_log_pos= mi->master_log_pos;
+ strmake(mi->rli.group_master_log_name, mi->master_log_name,
+ sizeof(mi->rli.group_master_log_name)-1);
+
error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid ==
Master_info::USE_GTID_CURRENT_POS);
mi->events_queued_since_last_gtid= 0;
@@ -2765,12 +2808,6 @@ static int init_slave_thread(THD* thd, Master_info *mi,
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
thd->security_ctx->skip_grants();
-/*
- Adding MAX_LOG_EVENT_HEADER_LEN to the max_allowed_packet on all
- slave threads, since a replication event can become this much larger
- than the corresponding packet (query) sent from client to master.
-*/
- thd->variables.max_allowed_packet= slave_max_allowed_packet;
thd->slave_thread= 1;
thd->connection_name= mi->connection_name;
thd->enable_slow_log= opt_log_slow_slave_statements;
@@ -3546,7 +3583,13 @@ pthread_handler_t handle_slave_io(void *arg)
mi->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Unable to load replication GTID slave state from mysql.%s: %s",
rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
- goto err;
+ /*
+ If we are using old-style replication, we can continue, even though we
+ then will not be able to record the GTIDs we receive. But if using GTID,
+ we must give up.
+ */
+ if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode)
+ goto err;
}
@@ -3582,14 +3625,6 @@ pthread_handler_t handle_slave_io(void *arg)
"replication starts at GTID position '%s'",
mi->user, mi->host, mi->port, tmp.c_ptr_safe());
}
-
- /*
- Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O
- thread, since a replication event can become this much larger than
- the corresponding packet (query) sent from client to master.
- */
- thd->net.max_packet_size= slave_max_allowed_packet;
- mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER;
}
else
{
@@ -4151,7 +4186,13 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
"Unable to load replication GTID slave state from mysql.%s: %s",
rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
- goto err;
+ /*
+ If we are using old-style replication, we can continue, even though we
+ then will not be able to record the GTIDs we receive. But if using GTID,
+ we must give up.
+ */
+ if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode)
+ goto err;
}
/* execute init_slave variable */
@@ -5069,9 +5110,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
const char *errmsg;
Gtid_list_log_event *glev;
Log_event *tmp;
+ uint32 flags;
- if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID)
- goto default_action;
if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue,
opt_slave_sql_verify_checksum)))
@@ -5080,16 +5120,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
glev= static_cast<Gtid_list_log_event *>(tmp);
- if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED)
- {
- char str_buf[128];
- String str(str_buf, sizeof(str_buf), system_charset_info);
- mi->rli.until_gtid_pos.to_string(&str);
- sql_print_information("Slave IO thread stops because it reached its"
- " UNTIL master_gtid_pos %s", str.c_ptr_safe());
- mi->abort_slave= true;
- }
event_pos= glev->log_pos;
+ flags= glev->gl_flags;
delete glev;
/*
@@ -5103,6 +5135,17 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
inc_pos= 0;
else
inc_pos= event_pos - mi->master_log_pos;
+
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID &&
+ flags & Gtid_list_log_event::FLAG_UNTIL_REACHED)
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ mi->rli.until_gtid_pos.to_string(&str);
+ sql_print_information("Slave IO thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ mi->abort_slave= true;
+ }
}
break;
@@ -5415,7 +5458,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
char llbuff[22];
my_bool my_true= 1;
DBUG_ENTER("connect_to_master");
-
+ set_slave_max_allowed_packet(thd, mysql);
#ifndef DBUG_OFF
mi->events_till_disconnect = disconnect_slave_event_count;
#endif
@@ -5823,7 +5866,7 @@ static Log_event* next_event(Relay_log_info* rli)
if (ev->get_type_code() == GTID_EVENT)
{
Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
- uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+ uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id);
if (!sub_id)
{
errmsg = "slave SQL thread aborted because of out-of-memory error";