diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 497 |
1 files changed, 489 insertions, 8 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index e5039d118be..6153c4bd0f9 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -21,17 +21,54 @@ #include "sql_repl.h" #include "sql_acl.h" #include "log_event.h" +#include "mini_client.h" #include <thr_alarm.h> #include <my_dir.h> +#define SLAVE_LIST_CHUNK 128 + extern const char* any_db; extern pthread_handler_decl(handle_slave,arg); +HASH slave_list; + +#ifndef DBUG_OFF +int max_binlog_dump_events = 0; // unlimited +bool opt_sporadic_binlog_dump_fail = 0; +static int binlog_dump_count = 0; +#endif + +static uint32* slave_list_key(SLAVE_INFO* si, uint* len, + my_bool not_used __attribute__((unused))) +{ + *len = 4; + return &si->server_id; +} + +static void slave_info_free(void *s) +{ + my_free((byte*)s, MYF(MY_WME)); +} + +void init_slave_list() +{ + hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0, + (hash_get_key) slave_list_key, slave_info_free, 0); + pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST); +} + +void end_slave_list() +{ + pthread_mutex_lock(&LOCK_slave_list); + hash_free(&slave_list); + pthread_mutex_unlock(&LOCK_slave_list); + pthread_mutex_destroy(&LOCK_slave_list); +} static int fake_rotate_event(NET* net, String* packet, char* log_file_name, const char**errmsg) { - char header[LOG_EVENT_HEADER_LEN]; + char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN]; memset(header, 0, 4); // when does not matter header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; char* p = strrchr(log_file_name, FN_LIBCHAR); @@ -42,10 +79,14 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, p = log_file_name; uint ident_len = (uint) strlen(p); - ulong event_len = ident_len + sizeof(header); - int4store(header + EVENT_TYPE_OFFSET + 1, server_id); + ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD; + int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, 0); + int4store(header + LOG_SEQ_OFFSET, 0); packet->append(header, sizeof(header)); + int8store(buf, 4); // tell slave to skip magic number + packet->append(buf, ROTATE_HEADER_LEN); packet->append(p,ident_len); if(my_net_write(net, (char*)packet->ptr(), packet->length())) { @@ -55,6 +96,55 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, return 0; } +int register_slave(THD* thd, uchar* packet, uint packet_length) +{ + uint len; + SLAVE_INFO* si, *old_si; + int res = 1; + uchar* p = packet, *p_end = packet + packet_length; + + if(check_access(thd, FILE_ACL, any_db)) + return 1; + + if(!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) + goto err; + + si->server_id = uint4korr(p); + p += 4; + len = (uint)*p++; + if(p + len > p_end || len > sizeof(si->host) - 1) + goto err; + memcpy(si->host, p, len); + si->host[len] = 0; + p += len; + len = *p++; + if(p + len > p_end || len > sizeof(si->user) - 1) + goto err; + memcpy(si->user, p, len); + si->user[len] = 0; + p += len; + len = *p++; + if(p + len > p_end || len > sizeof(si->password) - 1) + goto err; + memcpy(si->password, p, len); + si->password[len] = 0; + p += len; + si->port = uint2korr(p); + pthread_mutex_lock(&LOCK_slave_list); + + if((old_si = (SLAVE_INFO*)hash_search(&slave_list, + (byte*)&si->server_id, 4))) + hash_delete(&slave_list, (byte*)old_si); + + res = hash_insert(&slave_list, (byte*)si); + pthread_mutex_unlock(&LOCK_slave_list); + return res; +err: + if(si) + my_free((byte*)si, MYF(MY_WME)); + return res; +} + static int send_file(THD *thd) { @@ -265,8 +355,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) int error; const char *errmsg = "Unknown error"; NET* net = &thd->net; +#ifndef DBUG_OFF + int left_events = max_binlog_dump_events; +#endif DBUG_ENTER("mysql_binlog_send"); +#ifndef DBUG_OFF + if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) + { + errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover"; + goto err; + } +#endif + bzero((char*) &log,sizeof(log)); if(!mysql_bin_log.is_open()) @@ -297,10 +398,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0) goto err; - if(pos < 4) + if (pos < 4) { - errmsg = "Congratulations! You have hit the magic number and can win \ -sweepstakes if you report the bug"; + errmsg = "Client requested master to start repliction from \ +impossible position"; goto err; } @@ -326,6 +427,14 @@ sweepstakes if you report the bug"; while (!(error = Log_event::read_log_event(&log, packet, log_lock))) { +#ifndef DBUG_OFF + if(max_binlog_dump_events && !left_events--) + { + net_flush(net); + errmsg = "Debugging binlog dump abort"; + goto err; + } +#endif if (my_net_write(net, (char*)packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -400,6 +509,15 @@ sweepstakes if you report the bug"; bool read_packet = 0, fatal_error = 0; +#ifndef DBUG_OFF + if(max_binlog_dump_events && !left_events--) + { + net_flush(net); + errmsg = "Debugging binlog dump abort"; + goto err; + } +#endif + // no one will update the log while we are reading // now, but we'll be quick and just read one record pthread_mutex_lock(log_lock); @@ -614,7 +732,7 @@ void reset_slave() pthread_mutex_unlock(&LOCK_slave); end_master_info(&glob_mi); - fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + fn_format(fname, master_info_file, mysql_data_home, "", 4+32); if(my_stat(fname, &stat_area, MYF(0))) if(my_delete(fname, MYF(MY_WME))) return; @@ -685,14 +803,18 @@ int change_master(THD* thd) // if we change host or port, we must reset the postion glob_mi.log_file_name[0] = 0; glob_mi.pos = 4; // skip magic number + glob_mi.pending = 0; } if(lex_mi->log_file_name) strmake(glob_mi.log_file_name, lex_mi->log_file_name, sizeof(glob_mi.log_file_name)); if(lex_mi->pos) + { glob_mi.pos = lex_mi->pos; - + glob_mi.pending = 0; + } + if(lex_mi->host) { strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host)); @@ -741,6 +863,149 @@ void reset_master() } + +int show_binlog_events(THD* thd) +{ + DBUG_ENTER("show_binlog_events"); + List<Item> field_list; + const char* errmsg = 0; + IO_CACHE log; + File file = -1; + + Log_event::init_show_field_list(&field_list); + if (send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + + if (mysql_bin_log.is_open()) + { + LOG_INFO linfo; + char search_file_name[FN_REFLEN]; + LEX_MASTER_INFO* lex_mi = &thd->lex.mi; + uint event_count, limit_start, limit_end; + const char* log_file_name = lex_mi->log_file_name; + Log_event* ev; + ulong pos = (ulong) lex_mi->pos; + + limit_start = thd->lex.select->offset_limit; + limit_end = thd->lex.select->select_limit + limit_start; + + if (log_file_name) + mysql_bin_log.make_log_name(search_file_name, log_file_name); + else + search_file_name[0] = 0; + + linfo.index_file_offset = 0; + thd->current_linfo = &linfo; + + if (mysql_bin_log.find_first_log(&linfo, search_file_name)) + { + errmsg = "Could not find target log"; + goto err; + } + + if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) + goto err; + + if (pos < 4) + { + errmsg = "Invalid log position"; + goto err; + } + + pthread_mutex_lock(mysql_bin_log.get_log_lock()); + + my_b_seek(&log, pos); + + for (event_count = 0; + (ev = Log_event::read_log_event(&log, 0));) + { + if (event_count >= limit_start && + ev->net_send(thd, linfo.log_file_name, pos)) + { + errmsg = "Net error"; + delete ev; + pthread_mutex_unlock(mysql_bin_log.get_log_lock()); + goto err; + } + + pos = my_b_tell(&log); + delete ev; + + if (++event_count >= limit_end) + break; + } + + if (event_count < limit_end && log.error) + { + errmsg = "Wrong offset or I/O error"; + goto err; + } + + pthread_mutex_unlock(mysql_bin_log.get_log_lock()); + } + +err: + if (file >= 0) + { + end_io_cache(&log); + (void) my_close(file, MYF(MY_WME)); + } + + if (errmsg) + { + net_printf(&thd->net, ER_SHOW_BINLOG_EVENTS, errmsg); + DBUG_RETURN(1); + } + + send_eof(&thd->net); + DBUG_RETURN(0); +} + + +int show_slave_hosts(THD* thd) +{ + DBUG_ENTER("show_slave_hosts"); + List<Item> field_list; + field_list.push_back(new Item_empty_string("Server_id", 20)); + field_list.push_back(new Item_empty_string("Host", 20)); + if(opt_show_slave_auth_info) + { + field_list.push_back(new Item_empty_string("User",20)); + field_list.push_back(new Item_empty_string("Password",20)); + } + field_list.push_back(new Item_empty_string("Port",20)); + + if(send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + String* packet = &thd->packet; + uint i; + NET* net = &thd->net; + + pthread_mutex_lock(&LOCK_slave_list); + + for(i = 0; i < slave_list.records; ++i) + { + SLAVE_INFO* si = (SLAVE_INFO*)hash_element(&slave_list, i); + packet->length(0); + net_store_data(packet, si->server_id); + net_store_data(packet, si->host); + if(opt_show_slave_auth_info) + { + net_store_data(packet, si->user); + net_store_data(packet, si->password); + } + net_store_data(packet, (uint)si->port); + if(my_net_write(net, (char*)packet->ptr(), packet->length())) + { + pthread_mutex_unlock(&LOCK_slave_list); + DBUG_RETURN(-1); + } + } + pthread_mutex_unlock(&LOCK_slave_list); + send_eof(net); + DBUG_RETURN(0); +} + int show_binlog_info(THD* thd) { DBUG_ENTER("show_binlog_info"); @@ -845,5 +1110,221 @@ err: return 1; } +int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi) +{ + if(!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + sql_print_error("Connection to master failed: %s", + mc_mysql_error(mysql)); + return 1; + } + return 0; +} + +static inline void cleanup_mysql_results(MYSQL_RES* db_res, + MYSQL_RES** cur, MYSQL_RES** start) +{ + for( ; cur >= start; --cur) + if(*cur) + mc_mysql_free_result(*cur); + mc_mysql_free_result(db_res); +} + +static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, + MYSQL_RES* table_res) +{ + MYSQL_ROW row; + + for( row = mc_mysql_fetch_row(table_res); row; + row = mc_mysql_fetch_row(table_res)) + { + TABLE_LIST table; + const char* table_name = row[0]; + int error; + if(table_rules_on) + { + table.next = 0; + table.db = (char*)db; + table.real_name = (char*)table_name; + table.updating = 1; + if(!tables_ok(thd, &table)) + continue; + } + + if((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql))) + return error; + } + + return 0; +} + +int load_master_data(THD* thd) +{ + MYSQL mysql; + MYSQL_RES* master_status_res = 0; + bool slave_was_running = 0; + int error = 0; + + mc_mysql_init(&mysql); + + pthread_mutex_lock(&LOCK_slave); + // we do not want anyone messing with the slave at all for the entire + // duration of the data load; + + // first, kill the slave + if((slave_was_running = slave_running)) + { + abort_slave = 1; + thr_alarm_kill(slave_real_id); + thd->proc_info = "waiting for slave to die"; + while(slave_running) + pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + } + + + if(connect_to_master(thd, &mysql, &glob_mi)) + { + net_printf(&thd->net, error = ER_CONNECT_TO_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // now that we are connected, get all database and tables in each + { + MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res; + uint num_dbs; + MYSQL_ROW row; + + if(mc_mysql_query(&mysql, "show databases", 0) || + !(db_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + if(!(num_dbs = mc_mysql_num_rows(db_res))) + goto err; + // in theory, the master could have no databases at all + // and run with skip-grant + + if(!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) + { + net_printf(&thd->net, error = ER_OUTOFMEMORY); + goto err; + } + + // this is a temporary solution until we have online backup + // capabilities - to be replaced once online backup is working + // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we + // can to minimize the lock time + if(mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) + || mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || + !(master_status_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // go through every table in every database, and if the replication + // rules allow replicating it, get it + + table_res_end = table_res + num_dbs; + + for(cur_table_res = table_res; cur_table_res < table_res_end; + ++cur_table_res) + { + MYSQL_ROW row = mc_mysql_fetch_row(db_res); + // since we know how many rows we have, this can never be NULL + + char* db = row[0]; + int drop_error = 0; + + // do not replicate databases excluded by rules + // also skip mysql database - in most cases the user will + // mess up and not exclude mysql database with the rules when + // he actually means to - in this case, he is up for a surprise if + // his priv tables get dropped and downloaded from master + // TO DO - add special option, not enabled + // by default, to allow inclusion of mysql database into load + // data from master + if(!db_ok(db, replicate_do_db, replicate_ignore_db) || + !strcmp(db,"mysql")) + { + *cur_table_res = 0; + continue; + } + + if((drop_error = mysql_rm_db(0, db, 1)) || + mysql_create_db(0, db, 0)) + { + error = (drop_error) ? ER_DB_DROP_DELETE : ER_CANT_CREATE_DB; + net_printf(&thd->net, error, db, my_error); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if(mc_mysql_select_db(&mysql, db) || + mc_mysql_query(&mysql, "show tables", 0) || + !(*cur_table_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if((error = fetch_db_tables(thd, &mysql, db, *cur_table_res))) + { + // we do not report the error - fetch_db_tables handles it + cleanup_mysql_results(db_res, cur_table_res, table_res); + goto err; + } + } + + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + + // adjust position in the master + if(master_status_res) + { + MYSQL_ROW row = mc_mysql_fetch_row(master_status_res); + + // we need this check because the master may not be running with + // log-bin, but it will still allow us to do all the steps + // of LOAD DATA FROM MASTER - no reason to forbid it, really, + // although it does not make much sense for the user to do it + if(row[0] && row[1]) + { + strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name)); + glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB + if(glob_mi.pos < 4) + glob_mi.pos = 4; // don't hit the magic number + glob_mi.pending = 0; + flush_master_info(&glob_mi); + } + + mc_mysql_free_result(master_status_res); + } + + if(mc_mysql_query(&mysql, "UNLOCK TABLES", 0)) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + } +err: + pthread_mutex_unlock(&LOCK_slave); + if(slave_was_running) + start_slave(0, 0); + mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init() + if(!error) + send_ok(&thd->net); + + return error; +} + |