summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc497
1 files changed, 489 insertions, 8 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index e5039d118be..6153c4bd0f9 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -21,17 +21,54 @@
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
+#include "mini_client.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#define SLAVE_LIST_CHUNK 128
+
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
+HASH slave_list;
+
+#ifndef DBUG_OFF
+int max_binlog_dump_events = 0; // unlimited
+bool opt_sporadic_binlog_dump_fail = 0;
+static int binlog_dump_count = 0;
+#endif
+
+static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
+ my_bool not_used __attribute__((unused)))
+{
+ *len = 4;
+ return &si->server_id;
+}
+
+static void slave_info_free(void *s)
+{
+ my_free((byte*)s, MYF(MY_WME));
+}
+
+void init_slave_list()
+{
+ hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
+ (hash_get_key) slave_list_key, slave_info_free, 0);
+ pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
+}
+
+void end_slave_list()
+{
+ pthread_mutex_lock(&LOCK_slave_list);
+ hash_free(&slave_list);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ pthread_mutex_destroy(&LOCK_slave_list);
+}
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
- char header[LOG_EVENT_HEADER_LEN];
+ char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR);
@@ -42,10 +79,14 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
p = log_file_name;
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + sizeof(header);
- int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
+ ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
+ int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, 0);
+ int4store(header + LOG_SEQ_OFFSET, 0);
packet->append(header, sizeof(header));
+ int8store(buf, 4); // tell slave to skip magic number
+ packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p,ident_len);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
@@ -55,6 +96,55 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return 0;
}
+int register_slave(THD* thd, uchar* packet, uint packet_length)
+{
+ uint len;
+ SLAVE_INFO* si, *old_si;
+ int res = 1;
+ uchar* p = packet, *p_end = packet + packet_length;
+
+ if(check_access(thd, FILE_ACL, any_db))
+ return 1;
+
+ if(!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
+ goto err;
+
+ si->server_id = uint4korr(p);
+ p += 4;
+ len = (uint)*p++;
+ if(p + len > p_end || len > sizeof(si->host) - 1)
+ goto err;
+ memcpy(si->host, p, len);
+ si->host[len] = 0;
+ p += len;
+ len = *p++;
+ if(p + len > p_end || len > sizeof(si->user) - 1)
+ goto err;
+ memcpy(si->user, p, len);
+ si->user[len] = 0;
+ p += len;
+ len = *p++;
+ if(p + len > p_end || len > sizeof(si->password) - 1)
+ goto err;
+ memcpy(si->password, p, len);
+ si->password[len] = 0;
+ p += len;
+ si->port = uint2korr(p);
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ if((old_si = (SLAVE_INFO*)hash_search(&slave_list,
+ (byte*)&si->server_id, 4)))
+ hash_delete(&slave_list, (byte*)old_si);
+
+ res = hash_insert(&slave_list, (byte*)si);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ return res;
+err:
+ if(si)
+ my_free((byte*)si, MYF(MY_WME));
+ return res;
+}
+
static int send_file(THD *thd)
{
@@ -265,8 +355,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
int error;
const char *errmsg = "Unknown error";
NET* net = &thd->net;
+#ifndef DBUG_OFF
+ int left_events = max_binlog_dump_events;
+#endif
DBUG_ENTER("mysql_binlog_send");
+#ifndef DBUG_OFF
+ if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
+ {
+ errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
+ goto err;
+ }
+#endif
+
bzero((char*) &log,sizeof(log));
if(!mysql_bin_log.is_open())
@@ -297,10 +398,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
goto err;
- if(pos < 4)
+ if (pos < 4)
{
- errmsg = "Congratulations! You have hit the magic number and can win \
-sweepstakes if you report the bug";
+ errmsg = "Client requested master to start repliction from \
+impossible position";
goto err;
}
@@ -326,6 +427,14 @@ sweepstakes if you report the bug";
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
+#ifndef DBUG_OFF
+ if(max_binlog_dump_events && !left_events--)
+ {
+ net_flush(net);
+ errmsg = "Debugging binlog dump abort";
+ goto err;
+ }
+#endif
if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -400,6 +509,15 @@ sweepstakes if you report the bug";
bool read_packet = 0, fatal_error = 0;
+#ifndef DBUG_OFF
+ if(max_binlog_dump_events && !left_events--)
+ {
+ net_flush(net);
+ errmsg = "Debugging binlog dump abort";
+ goto err;
+ }
+#endif
+
// no one will update the log while we are reading
// now, but we'll be quick and just read one record
pthread_mutex_lock(log_lock);
@@ -614,7 +732,7 @@ void reset_slave()
pthread_mutex_unlock(&LOCK_slave);
end_master_info(&glob_mi);
- fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
if(my_stat(fname, &stat_area, MYF(0)))
if(my_delete(fname, MYF(MY_WME)))
return;
@@ -685,14 +803,18 @@ int change_master(THD* thd)
// if we change host or port, we must reset the postion
glob_mi.log_file_name[0] = 0;
glob_mi.pos = 4; // skip magic number
+ glob_mi.pending = 0;
}
if(lex_mi->log_file_name)
strmake(glob_mi.log_file_name, lex_mi->log_file_name,
sizeof(glob_mi.log_file_name));
if(lex_mi->pos)
+ {
glob_mi.pos = lex_mi->pos;
-
+ glob_mi.pending = 0;
+ }
+
if(lex_mi->host)
{
strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host));
@@ -741,6 +863,149 @@ void reset_master()
}
+
+int show_binlog_events(THD* thd)
+{
+ DBUG_ENTER("show_binlog_events");
+ List<Item> field_list;
+ const char* errmsg = 0;
+ IO_CACHE log;
+ File file = -1;
+
+ Log_event::init_show_field_list(&field_list);
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+
+ if (mysql_bin_log.is_open())
+ {
+ LOG_INFO linfo;
+ char search_file_name[FN_REFLEN];
+ LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+ uint event_count, limit_start, limit_end;
+ const char* log_file_name = lex_mi->log_file_name;
+ Log_event* ev;
+ ulong pos = (ulong) lex_mi->pos;
+
+ limit_start = thd->lex.select->offset_limit;
+ limit_end = thd->lex.select->select_limit + limit_start;
+
+ if (log_file_name)
+ mysql_bin_log.make_log_name(search_file_name, log_file_name);
+ else
+ search_file_name[0] = 0;
+
+ linfo.index_file_offset = 0;
+ thd->current_linfo = &linfo;
+
+ if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ {
+ errmsg = "Could not find target log";
+ goto err;
+ }
+
+ if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
+ goto err;
+
+ if (pos < 4)
+ {
+ errmsg = "Invalid log position";
+ goto err;
+ }
+
+ pthread_mutex_lock(mysql_bin_log.get_log_lock());
+
+ my_b_seek(&log, pos);
+
+ for (event_count = 0;
+ (ev = Log_event::read_log_event(&log, 0));)
+ {
+ if (event_count >= limit_start &&
+ ev->net_send(thd, linfo.log_file_name, pos))
+ {
+ errmsg = "Net error";
+ delete ev;
+ pthread_mutex_unlock(mysql_bin_log.get_log_lock());
+ goto err;
+ }
+
+ pos = my_b_tell(&log);
+ delete ev;
+
+ if (++event_count >= limit_end)
+ break;
+ }
+
+ if (event_count < limit_end && log.error)
+ {
+ errmsg = "Wrong offset or I/O error";
+ goto err;
+ }
+
+ pthread_mutex_unlock(mysql_bin_log.get_log_lock());
+ }
+
+err:
+ if (file >= 0)
+ {
+ end_io_cache(&log);
+ (void) my_close(file, MYF(MY_WME));
+ }
+
+ if (errmsg)
+ {
+ net_printf(&thd->net, ER_SHOW_BINLOG_EVENTS, errmsg);
+ DBUG_RETURN(1);
+ }
+
+ send_eof(&thd->net);
+ DBUG_RETURN(0);
+}
+
+
+int show_slave_hosts(THD* thd)
+{
+ DBUG_ENTER("show_slave_hosts");
+ List<Item> field_list;
+ field_list.push_back(new Item_empty_string("Server_id", 20));
+ field_list.push_back(new Item_empty_string("Host", 20));
+ if(opt_show_slave_auth_info)
+ {
+ field_list.push_back(new Item_empty_string("User",20));
+ field_list.push_back(new Item_empty_string("Password",20));
+ }
+ field_list.push_back(new Item_empty_string("Port",20));
+
+ if(send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+ String* packet = &thd->packet;
+ uint i;
+ NET* net = &thd->net;
+
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ for(i = 0; i < slave_list.records; ++i)
+ {
+ SLAVE_INFO* si = (SLAVE_INFO*)hash_element(&slave_list, i);
+ packet->length(0);
+ net_store_data(packet, si->server_id);
+ net_store_data(packet, si->host);
+ if(opt_show_slave_auth_info)
+ {
+ net_store_data(packet, si->user);
+ net_store_data(packet, si->password);
+ }
+ net_store_data(packet, (uint)si->port);
+ if(my_net_write(net, (char*)packet->ptr(), packet->length()))
+ {
+ pthread_mutex_unlock(&LOCK_slave_list);
+ DBUG_RETURN(-1);
+ }
+ }
+ pthread_mutex_unlock(&LOCK_slave_list);
+ send_eof(net);
+ DBUG_RETURN(0);
+}
+
int show_binlog_info(THD* thd)
{
DBUG_ENTER("show_binlog_info");
@@ -845,5 +1110,221 @@ err:
return 1;
}
+int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
+{
+ if(!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ sql_print_error("Connection to master failed: %s",
+ mc_mysql_error(mysql));
+ return 1;
+ }
+ return 0;
+}
+
+static inline void cleanup_mysql_results(MYSQL_RES* db_res,
+ MYSQL_RES** cur, MYSQL_RES** start)
+{
+ for( ; cur >= start; --cur)
+ if(*cur)
+ mc_mysql_free_result(*cur);
+ mc_mysql_free_result(db_res);
+}
+
+static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
+ MYSQL_RES* table_res)
+{
+ MYSQL_ROW row;
+
+ for( row = mc_mysql_fetch_row(table_res); row;
+ row = mc_mysql_fetch_row(table_res))
+ {
+ TABLE_LIST table;
+ const char* table_name = row[0];
+ int error;
+ if(table_rules_on)
+ {
+ table.next = 0;
+ table.db = (char*)db;
+ table.real_name = (char*)table_name;
+ table.updating = 1;
+ if(!tables_ok(thd, &table))
+ continue;
+ }
+
+ if((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
+ return error;
+ }
+
+ return 0;
+}
+
+int load_master_data(THD* thd)
+{
+ MYSQL mysql;
+ MYSQL_RES* master_status_res = 0;
+ bool slave_was_running = 0;
+ int error = 0;
+
+ mc_mysql_init(&mysql);
+
+ pthread_mutex_lock(&LOCK_slave);
+ // we do not want anyone messing with the slave at all for the entire
+ // duration of the data load;
+
+ // first, kill the slave
+ if((slave_was_running = slave_running))
+ {
+ abort_slave = 1;
+ thr_alarm_kill(slave_real_id);
+ thd->proc_info = "waiting for slave to die";
+ while(slave_running)
+ pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ }
+
+
+ if(connect_to_master(thd, &mysql, &glob_mi))
+ {
+ net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // now that we are connected, get all database and tables in each
+ {
+ MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
+ uint num_dbs;
+ MYSQL_ROW row;
+
+ if(mc_mysql_query(&mysql, "show databases", 0) ||
+ !(db_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ if(!(num_dbs = mc_mysql_num_rows(db_res)))
+ goto err;
+ // in theory, the master could have no databases at all
+ // and run with skip-grant
+
+ if(!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
+ {
+ net_printf(&thd->net, error = ER_OUTOFMEMORY);
+ goto err;
+ }
+
+ // this is a temporary solution until we have online backup
+ // capabilities - to be replaced once online backup is working
+ // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
+ // can to minimize the lock time
+ if(mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0)
+ || mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
+ !(master_status_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // go through every table in every database, and if the replication
+ // rules allow replicating it, get it
+
+ table_res_end = table_res + num_dbs;
+
+ for(cur_table_res = table_res; cur_table_res < table_res_end;
+ ++cur_table_res)
+ {
+ MYSQL_ROW row = mc_mysql_fetch_row(db_res);
+ // since we know how many rows we have, this can never be NULL
+
+ char* db = row[0];
+ int drop_error = 0;
+
+ // do not replicate databases excluded by rules
+ // also skip mysql database - in most cases the user will
+ // mess up and not exclude mysql database with the rules when
+ // he actually means to - in this case, he is up for a surprise if
+ // his priv tables get dropped and downloaded from master
+ // TO DO - add special option, not enabled
+ // by default, to allow inclusion of mysql database into load
+ // data from master
+ if(!db_ok(db, replicate_do_db, replicate_ignore_db) ||
+ !strcmp(db,"mysql"))
+ {
+ *cur_table_res = 0;
+ continue;
+ }
+
+ if((drop_error = mysql_rm_db(0, db, 1)) ||
+ mysql_create_db(0, db, 0))
+ {
+ error = (drop_error) ? ER_DB_DROP_DELETE : ER_CANT_CREATE_DB;
+ net_printf(&thd->net, error, db, my_error);
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if(mc_mysql_select_db(&mysql, db) ||
+ mc_mysql_query(&mysql, "show tables", 0) ||
+ !(*cur_table_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
+ {
+ // we do not report the error - fetch_db_tables handles it
+ cleanup_mysql_results(db_res, cur_table_res, table_res);
+ goto err;
+ }
+ }
+
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+
+ // adjust position in the master
+ if(master_status_res)
+ {
+ MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
+
+ // we need this check because the master may not be running with
+ // log-bin, but it will still allow us to do all the steps
+ // of LOAD DATA FROM MASTER - no reason to forbid it, really,
+ // although it does not make much sense for the user to do it
+ if(row[0] && row[1])
+ {
+ strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
+ glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
+ if(glob_mi.pos < 4)
+ glob_mi.pos = 4; // don't hit the magic number
+ glob_mi.pending = 0;
+ flush_master_info(&glob_mi);
+ }
+
+ mc_mysql_free_result(master_status_res);
+ }
+
+ if(mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+ }
+err:
+ pthread_mutex_unlock(&LOCK_slave);
+ if(slave_was_running)
+ start_slave(0, 0);
+ mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
+ if(!error)
+ send_ok(&thd->net);
+
+ return error;
+}
+