diff options
author | monty@hundin.mysql.fi <> | 2002-06-05 23:04:38 +0300 |
---|---|---|
committer | monty@hundin.mysql.fi <> | 2002-06-05 23:04:38 +0300 |
commit | e4130b11c9728ff328f18560e8c7755b73f61a0d (patch) | |
tree | 9260753468997c0d4b3b8c78aea517507e2791eb /sql/slave.cc | |
parent | 8f29ae77bff7e758081f5588f34fc221fba29b1e (diff) | |
download | mariadb-git-e4130b11c9728ff328f18560e8c7755b73f61a0d.tar.gz |
removed init_count from IO_CACHE.
Added missing mutex_unlock to slave replication code.
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 673 |
1 files changed, 366 insertions, 307 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 93e711f2e14..66837436a09 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -42,12 +42,16 @@ bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; static TABLE* save_temporary_tables = 0; -ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong - values and make it ulonglong */ -// when slave thread exits, we need to remember the temporary tables so we -// can re-use them on slave start +/* TODO: fix variables to access ulonglong values and make it ulonglong */ +ulong relay_log_space_limit = 0; + +/* + When slave thread exits, we need to remember the temporary tables so we + can re-use them on slave start. + + TODO: move the vars below under MASTER_INFO +*/ -// TODO: move the vars below under MASTER_INFO int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static int events_till_disconnect = -1; int events_till_abort = -1; @@ -83,9 +87,10 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; if (inverse) { - /* This makes me think of the Russian idiom "I am not I, and this is - not my horse", which is used to deny reponsibility for - one's actions. + /* + This makes me think of the Russian idiom "I am not I, and this is + not my horse", which is used to deny reponsibility for + one's actions. */ set_io = !set_io; set_sql = !set_sql; @@ -164,13 +169,16 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, } // TODO: check proper initialization of master_log_name/master_log_pos + int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ulonglong pos, bool need_data_lock, const char** errmsg) { + DBUG_ENTER("init_relay_log_pos"); + *errmsg=0; if (rli->log_pos_current) - return 0; + DBUG_RETURN(0); pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); pthread_mutex_lock(log_lock); if (need_data_lock) @@ -190,8 +198,10 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, else rli->relay_log_pos = pos; - // test to see if the previous run was with the skip of purging - // if yes, we do not purge when we restart + /* + Test to see if the previous run was with the skip of purging + If yes, we do not purge when we restart + */ if (rli->relay_log.find_first_log(&rli->linfo,"")) { *errmsg="Could not find first log during relay log initialization"; @@ -213,34 +223,31 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, if (rli->relay_log.is_active(rli->linfo.log_file_name)) { if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 && - check_binlog_magic(rli->cur_log,errmsg)) - { + check_binlog_magic(rli->cur_log,errmsg)) goto err; - } - rli->cur_log_init_count=rli->cur_log->init_count; + rli->cur_log_old_open_count=rli->relay_log.get_open_count(); } else { if (rli->inited) end_io_cache(&rli->cache_buf); - if (rli->cur_log_fd>=0) + if (rli->cur_log_fd >= 0) my_close(rli->cur_log_fd,MYF(MY_WME)); if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, rli->linfo.log_file_name,errmsg)) < 0) - { goto err; - } rli->cur_log = &rli->cache_buf; } - if (pos > 4) - my_b_seek(rli->cur_log,(off_t)pos); - rli->log_pos_current=1; + if (pos > BIN_LOG_HEADER_SIZE) + my_b_seek(rli->cur_log,(off_t)pos); + rli->log_pos_current=1; + err: - pthread_cond_broadcast(&rli->data_cond); - if (need_data_lock) - pthread_mutex_unlock(&rli->data_lock); - pthread_mutex_unlock(log_lock); - return (*errmsg) ? 1 : 0; + pthread_cond_broadcast(&rli->data_cond); + if (need_data_lock) + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(log_lock); + DBUG_RETURN ((*errmsg) ? 1 : 0); } /* called from get_options() in mysqld.cc on start-up */ @@ -274,8 +281,9 @@ void init_slave_skip_errors(const char* arg) } } + /* - We assume we have a run lock on rli and that the both slave thread + We assume we have a run lock on rli and that both slave thread are not running */ @@ -284,9 +292,11 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) DBUG_ENTER("purge_relay_logs"); if (!rli->inited) DBUG_RETURN(0); /* successfully do nothing */ + int error=0; + DBUG_ASSERT(rli->slave_running == 0); DBUG_ASSERT(rli->mi->slave_running == 0); - int error=0; + rli->slave_skip_counter=0; pthread_mutex_lock(&rli->data_lock); rli->pending=0; @@ -301,17 +311,19 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) } strnmov(rli->relay_log_name,rli->linfo.log_file_name, sizeof(rli->relay_log_name)-1); - rli->log_space_total=4; //just first log with magic number and nothing else - rli->relay_log_pos=4; + // Just first log with magic number and nothing else + rli->log_space_total= BIN_LOG_HEADER_SIZE; + rli->relay_log_pos= BIN_LOG_HEADER_SIZE; rli->relay_log.reset_bytes_written(); rli->log_pos_current=0; if (!just_reset) - error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); + error = init_relay_log_pos(rli,0,0,0 /* do not need data lock */,errmsg); + err: #ifndef DBUG_OFF char buf[22]; #endif - DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf))); + DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf))); pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(error); } @@ -451,10 +463,12 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, "Waiting for slave thread to start"); pthread_cond_wait(start_cond,cond_lock); thd->exit_cond(old_msg); - // TODO: in a very rare case of init_slave_thread failing, it is - // possible that we can get stuck here since slave_running will not - // be set. We need to change slave_running to int and have -1 as - // error code + /* + TODO: in a very rare case of init_slave_thread failing, it is + possible that we can get stuck here since slave_running will not + be set. We need to change slave_running to int and have -1 as + error code. + */ if (thd->killed) { pthread_mutex_unlock(cond_lock); @@ -466,10 +480,14 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, pthread_mutex_unlock(start_lock); return 0; } -/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make - sense to do that for starting a slave - we always care if it actually - started the threads that were not previously running + + +/* + SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave - we always care if it actually + started the threads that were not previously running */ + int start_slave_threads(bool need_slave_mutex, bool wait_for_start, MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname, int thread_mask) @@ -567,9 +585,10 @@ int tables_ok(THD* thd, TABLE_LIST* tables) return 0; } - // if no explicit rule found - // and there was a do list, do not replicate. If there was - // no do list, go ahead + /* + If no explicit rule found and there was a do list, do not replicate. + If there was no do list, go ahead + */ return !do_table_inited && !wild_do_table_inited; } @@ -577,12 +596,12 @@ int tables_ok(THD* thd, TABLE_LIST* tables) int add_table_rule(HASH* h, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if(!dot) return 1; + if (!dot) return 1; // len is always > 0 because we know the there exists a '.' uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if(!e) return 1; + if (!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -594,11 +613,11 @@ int add_table_rule(HASH* h, const char* table_spec) int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if(!dot) return 1; + if (!dot) return 1; uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if(!e) return 1; + if (!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -627,9 +646,11 @@ static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) void end_slave() { - // TODO: replace the line below with - // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); - // once multi-master code is ready + /* + TODO: replace the line below with + list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); + once multi-master code is ready. + */ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); end_master_info(active_mi); if (do_table_inited) @@ -671,65 +692,68 @@ void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); - (void)my_net_read(net); // discard response - send_ok(net); // the master expects it + (void)my_net_read(net); // discard response + send_ok(net); // the master expects it } char* rewrite_db(char* db) { - if(replicate_rewrite_db.is_empty() || !db) return db; + if (replicate_rewrite_db.is_empty() || !db) + return db; I_List_iterator<i_string_pair> it(replicate_rewrite_db); i_string_pair* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->key, db)) - return tmp->val; - } - + while ((tmp=it++)) + { + if (!strcmp(tmp->key, db)) + return tmp->val; + } return db; } + int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { if (do_list.is_empty() && ignore_list.is_empty()) return 1; // ok to replicate if the user puts no constraints - // if the user has specified restrictions on which databases to replicate - // and db was not selected, do not replicate - if(!db) + /* + If the user has specified restrictions on which databases to replicate + and db was not selected, do not replicate. + */ + if (!db) return 0; - if(!do_list.is_empty()) // if the do's are not empty - { - I_List_iterator<i_string> it(do_list); - i_string* tmp; + if (!do_list.is_empty()) // if the do's are not empty + { + I_List_iterator<i_string> it(do_list); + i_string* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->ptr, db)) - return 1; // match - } - return 0; + while ((tmp=it++)) + { + if (!strcmp(tmp->ptr, db)) + return 1; // match } + return 0; + } else // there are some elements in the don't, otherwise we cannot get here - { - I_List_iterator<i_string> it(ignore_list); - i_string* tmp; + { + I_List_iterator<i_string> it(ignore_list); + i_string* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->ptr, db)) - return 0; // match - } - - return 1; + while ((tmp=it++)) + { + if (!strcmp(tmp->ptr, db)) + return 0; // match } + + return 1; + } } -static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, - char* default_val) +static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, + const char *default_val) { uint length; if ((length=my_b_gets(f,var, max_size))) @@ -739,10 +763,12 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, *last_p = 0; // if we stopped on newline, kill it else { - // if we truncated a line or stopped on last char, remove all chars - // up to and including newline + /* + If we truncated a line or stopped on last char, remove all chars + up to and including newline. + */ int c; - while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF)); + while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)); } return 0; } @@ -763,7 +789,7 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) *var = atoi(buf); return 0; } - else if(default_val) + else if (default_val) { *var = default_val; return 0; @@ -796,12 +822,12 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi) goto err; } - switch (*version) - { + switch (*version) { case '3': mi->old_format = 1; break; case '4': + case '5': mi->old_format = 0; break; default: @@ -895,9 +921,11 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, check_opt.init(); check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK; thd->proc_info = "Rebuilding the index on master dump table"; - // we do not want repair() to spam us with messages - // just send them to the error log, and report the failure in case of - // problems + /* + We do not want repair() to spam us with messages + just send them to the error log, and report the failure in case of + problems. + */ save_vio = thd->net.vio; thd->net.vio = 0; error=file->repair(thd,&check_opt) != 0; @@ -978,15 +1006,15 @@ void end_master_info(MASTER_INFO* mi) int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) { DBUG_ENTER("init_relay_log_info"); - if (rli->inited) - DBUG_RETURN(0); MY_STAT stat_area; char fname[FN_REFLEN+128]; int info_fd; const char* msg = 0; int error = 0; - fn_format(fname, info_fname, - mysql_data_home, "", 4+32); + + if (rli->inited) + DBUG_RETURN(0); + fn_format(fname, info_fname, mysql_data_home, "", 4+32); pthread_mutex_lock(&rli->data_lock); info_fd = rli->info_fd; rli->pending = 0; @@ -1001,8 +1029,9 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) if (!opt_relay_logname) { char tmp[FN_REFLEN]; - /* TODO: The following should be using fn_format(); We just need to - first change fn_format() to cut the file name if it's too long. + /* + TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. */ strmake(tmp,glob_hostname,FN_REFLEN-5); strmov(strcend(tmp,'.'),"-relay-bin"); @@ -1011,72 +1040,76 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->relay_log.set_index_file_name(opt_relaylog_index_name); open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin", LOG_BIN, 1 /* read_append cache */, - 1 /* no auto events*/); + 1 /* no auto events */); /* if file does not exist */ if (!my_stat(fname, &stat_area, MYF(0))) { - // if someone removed the file from underneath our feet, just close - // the old descriptor and re-create the old file + /* + If someone removed the file from underneath our feet, just close + the old descriptor and re-create the old file + */ if (info_fd >= 0) my_close(info_fd, MYF(MY_WME)); - if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, - MYF(MY_WME))) + if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) { - if(info_fd >= 0) + if (info_fd >= 0) my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } - if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) + if (init_relay_log_pos(rli,"",BIN_LOG_HEADER_SIZE,0 /*no data mutex*/, + &msg)) goto err; rli->master_log_pos = 0; // uninitialized rli->info_fd = info_fd; } else // file exists { - if(info_fd >= 0) + if (info_fd >= 0) reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); - else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&rli->info_file, info_fd, - IO_SIZE*2, READ_CACHE, 0L, - 0, MYF(MY_WME))) + else if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&rli->info_file, info_fd, + IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) { if (info_fd >= 0) my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } rli->info_fd = info_fd; if (init_strvar_from_file(rli->relay_log_name, - sizeof(rli->relay_log_name), &rli->info_file, - (char*)"") || + sizeof(rli->relay_log_name), &rli->info_file, + "") || init_intvar_from_file((int*)&rli->relay_log_pos, - &rli->info_file, 4) || + &rli->info_file, BIN_LOG_HEADER_SIZE) || init_strvar_from_file(rli->master_log_name, sizeof(rli->master_log_name), &rli->info_file, - (char*)"") || + "") || init_intvar_from_file((int*)&rli->master_log_pos, &rli->info_file, 0)) { msg="Error reading slave log configuration"; goto err; } - if (init_relay_log_pos(rli,0 /*log already inited*/, - 0 /*pos already inited*/, + if (init_relay_log_pos(rli,0 /* log already inited */, + 0 /* pos already inited */, 0 /* no data lock*/, &msg)) goto err; } - DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); rli->inited = 1; - // now change the cache from READ to WRITE - must do this - // before flush_relay_log_info + /* + Now change the cache from READ to WRITE - must do this + before flush_relay_log_info + */ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); error=test(flush_relay_log_info(rli)); if (count_relay_log_space(rli)) @@ -1091,7 +1124,7 @@ err: sql_print_error(msg); end_io_cache(&rli->info_file); my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } @@ -1135,6 +1168,7 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) DBUG_RETURN(slave_killed); } + static int count_relay_log_space(RELAY_LOG_INFO* rli) { LOG_INFO linfo; @@ -1145,31 +1179,32 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) sql_print_error("Could not find first log while counting relay log space"); DBUG_RETURN(1); } - if (add_relay_log(rli,&linfo)) - DBUG_RETURN(1); - while (!rli->relay_log.find_next_log(&linfo)) + do { if (add_relay_log(rli,&linfo)) DBUG_RETURN(1); - } + } while (!rli->relay_log.find_next_log(&linfo)); DBUG_RETURN(0); } + int init_master_info(MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname) { + int fd,error; + MY_STAT stat_area; + char fname[FN_REFLEN+128]; + const char *msg; + DBUG_ENTER("init_master_info"); + if (mi->inited) - return 0; + DBUG_RETURN(0); if (init_relay_log_info(&mi->rli, slave_info_fname)) - return 1; + DBUG_RETURN(1); mi->rli.mi = mi; mi->mysql=0; mi->file_id=1; mi->ignore_stop_event=0; - int fd,error; - MY_STAT stat_area; - char fname[FN_REFLEN+128]; - const char *msg; fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); /* @@ -1183,23 +1218,19 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, // we do not want any messages if the file does not exist if (!my_stat(fname, &stat_area, MYF(0))) { - // if someone removed the file from underneath our feet, just close - // the old descriptor and re-create the old file + /* + if someone removed the file from underneath our feet, just close + the old descriptor and re-create the old file + */ if (fd >= 0) my_close(fd, MYF(MY_WME)); - if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0, - MYF(MY_WME))) - { - if(fd >= 0) - my_close(fd, MYF(0)); - mi->fd=-1; - end_relay_log_info(&mi->rli); - pthread_mutex_unlock(&mi->data_lock); - return 1; - } + if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) + goto err; + mi->master_log_name[0] = 0; - mi->master_log_pos = 4; // skip magic number + mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number mi->fd = fd; if (master_host) @@ -1213,24 +1244,17 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, } else // file exists { - if(fd >= 0) + if (fd >= 0) reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0); - else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L, - 0, MYF(MY_WME))) - { - if(fd >= 0) - my_close(fd, MYF(0)); - mi->fd=-1; - end_relay_log_info(&mi->rli); - pthread_mutex_unlock(&mi->data_lock); - return 1; - } + else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L, + 0, MYF(MY_WME))) + goto err; mi->fd = fd; if (init_strvar_from_file(mi->master_log_name, sizeof(mi->master_log_name), &mi->file, - (char*)"") || + "") || init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) || init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || @@ -1242,7 +1266,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, init_intvar_from_file((int*)&mi->connect_retry, &mi->file, master_connect_retry)) { - msg="Error reading master configuration"; + sql_print_error("Error reading master configuration"); goto err; } } @@ -1252,17 +1276,18 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); pthread_mutex_unlock(&mi->data_lock); - return error; + DBUG_RETURN(error); err: - sql_print_error(msg); - end_io_cache(&mi->file); end_relay_log_info(&mi->rli); - DBUG_ASSERT(fd>=0); - my_close(fd, MYF(0)); - mi->fd=-1; + if (fd >= 0) + { + my_close(fd, MYF(0)); + end_io_cache(&mi->file); + } + mi->fd= -1; pthread_mutex_unlock(&mi->data_lock); - return 1; + DBUG_RETURN(1); } int register_slave_on_master(MYSQL* mysql) @@ -1282,7 +1307,7 @@ int register_slave_on_master(MYSQL* mysql) else packet.append((char)0); - if(report_password) + if (report_password) net_store_data(&packet, report_user); else packet.append((char)0); @@ -1333,7 +1358,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_empty_string("Skip_counter", 12)); field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); field_list.push_back(new Item_empty_string("Relay_log_space", 12)); - if(send_fields(thd, field_list, 1)) + if (send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; @@ -1502,8 +1527,10 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, */ thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); sleep(nap_time); - // if we wake up before the alarm goes off, hit the button - // so it will not wake up the wife and kids :-) + /* + If we wake up before the alarm goes off, hit the button + so it will not wake up the wife and kids :-) + */ if (thr_alarm_in_use(&alarmed)) thr_end_alarm(&alarmed); @@ -1528,9 +1555,11 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi) memcpy(buf + 10, logname,len); if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) { - // something went wrong, so we will just reconnect and retry later - // in the future, we should do a better error analysis, but for - // now we just fill up the error log :-) + /* + Something went wrong, so we will just reconnect and retry later + in the future, we should do a better error analysis, but for + now we just fill up the error log :-) + */ sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs", mc_mysql_error(mysql), master_connect_retry); return 1; @@ -1545,7 +1574,7 @@ static int request_table_dump(MYSQL* mysql, const char* db, const char* table) char * p = buf; uint table_len = (uint) strlen(table); uint db_len = (uint) strlen(db); - if(table_len + db_len > sizeof(buf) - 2) + if (table_len + db_len > sizeof(buf) - 2) { sql_print_error("request_table_dump: Buffer overrun"); return 1; @@ -1571,8 +1600,10 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; - // my_real_read() will time us out - // we check if we were told to die, and if not, try reading again + /* + my_real_read() will time us out + We check if we were told to die, and if not, try reading again + */ #ifndef DBUG_OFF if (disconnect_slave_event_count && !(events_till_disconnect--)) return packet_error; @@ -1643,12 +1674,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) type_code != STOP_EVENT ? ev->log_pos : LL(0), 1/* skip lock*/); flush_relay_log_info(rli); - if (rli->slave_skip_counter && /* protect against common user error of - setting the counter to 1 instead of 2 - while recovering from an failed - auto-increment insert */ - !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && - rli->slave_skip_counter == 1)) + + /* + Protect against common user error of setting the counter to 1 + instead of 2 while recovering from an failed auto-increment insert + */ + if (rli->slave_skip_counter && + !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && + rli->slave_skip_counter == 1)) --rli->slave_skip_counter; pthread_mutex_unlock(&rli->data_lock); delete ev; @@ -1718,7 +1751,7 @@ slave_begin: pthread_cond_broadcast(&mi->start_cond); pthread_mutex_unlock(&mi->run_lock); - DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + DBUG_PRINT("info",("master info: log_file_name='%s', position=%s", mi->master_log_name, llstr(mi->master_log_pos,llbuff))); if (!(mi->mysql = mysql = mc_mysql_init(NULL))) @@ -1768,7 +1801,7 @@ connected: if (request_dump(mysql, mi)) { sql_print_error("Failed on request_dump()"); - if(io_slave_killed(thd,mi)) + if (io_slave_killed(thd,mi)) { sql_print_error("Slave I/O thread killed while requesting master \ dump"); @@ -1855,7 +1888,7 @@ reconnect done to recover from failed read"); goto err; } goto connected; - } // if(event_len == packet_error) + } // if (event_len == packet_error) thd->proc_info = "Queueing event from master"; if (queue_event(mi,(const char*)mysql->net.read_pos + 1, @@ -1909,11 +1942,11 @@ err: THD_CHECK_SENTRY(thd); delete thd; pthread_mutex_unlock(&LOCK_thread_count); - my_thread_end(); // clean-up before broadcast - pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done + my_thread_end(); // clean-up before broadcast + pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done pthread_mutex_unlock(&mi->run_lock); #ifndef DBUG_OFF - if(abort_slave_event_count && !events_till_abort) + if (abort_slave_event_count && !events_till_abort) goto slave_begin; #endif pthread_exit(0); @@ -1970,22 +2003,22 @@ slave_begin: rli->abort_slave = 0; pthread_cond_broadcast(&rli->start_cond); pthread_mutex_unlock(&rli->run_lock); - rli->pending = 0; //this should always be set to 0 when the slave thread - // is started - if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg)) + // This should always be set to 0 when the slave thread is started + rli->pending = 0; + if (init_relay_log_pos(rli,0,0,1 /*need data lock*/, &errmsg)) { sql_print_error("Error initializing relay log position: %s", errmsg); goto err; } - DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); - DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + DBUG_PRINT("info",("master info: log_file_name: %s, position: %s", rli->master_log_name, llstr(rli->master_log_pos,llbuff))); DBUG_ASSERT(rli->sql_thd == thd); sql_print_error("Slave SQL thread initialized, starting replication in \ -log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, +log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff),rli->relay_log_name, llstr(rli->relay_log_pos,llbuff1)); while (!sql_slave_killed(thd,rli)) @@ -2004,7 +2037,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); goto err; } - } // while(!sql_slave_killed(thd,rli)) - read/exec loop + } // while (!sql_slave_killed(thd,rli)) - read/exec loop // error = 0; err: @@ -2053,16 +2086,17 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) bool cev_not_written; THD* thd; NET* net = &mi->mysql->net; + DBUG_ENTER("process_io_create_file"); if (unlikely(!cev->is_valid())) - return 1; + DBUG_RETURN(1); /* TODO: fix to honor table rules, not only db rules */ if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db)) { skip_load_data_infile(net); - return 0; + DBUG_RETURN(0); } DBUG_ASSERT(cev->inited_from_old); thd = mi->io_thd; @@ -2137,10 +2171,13 @@ relay log"); } error=0; err: - return error; + DBUG_RETURN(error); } -// We assume we already locked mi->data_lock +/* + We assume we already locked mi->data_lock +*/ + static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev) { if (unlikely(!rev->is_valid())) @@ -2175,6 +2212,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, bool inc_pos = 1; bool processed_stop_event = 0; char* tmp_buf = 0; + DBUG_ENTER("queue_old_event"); + /* if we get Load event, we need to pass a non-reusable buffer to read_log_event, so we do a trick */ @@ -2183,7 +2222,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) { sql_print_error("Slave I/O: out of memory for Load event"); - return 1; + DBUG_RETURN(1); } memcpy(tmp_buf,buf,event_len); tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer @@ -2196,8 +2235,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, sql_print_error("Read invalid event from master: '%s',\ master could be corrupt but a more likely cause of this is a bug", errmsg); - my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); - return 1; + my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); } pthread_mutex_lock(&mi->data_lock); ev->log_pos = mi->master_log_pos; @@ -2208,7 +2247,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, delete ev; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 1; + DBUG_RETURN(1); } mi->ignore_stop_event=1; inc_pos = 0; @@ -2224,7 +2263,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(tmp_buf); my_free((char*)tmp_buf, MYF(0)); - return error; + DBUG_RETURN(error); } default: mi->ignore_stop_event=0; @@ -2237,7 +2276,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, delete ev; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 1; + DBUG_RETURN(1); } mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); } @@ -2248,7 +2287,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, mi->ignore_stop_event=1; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 0; + DBUG_RETURN(0); } /* @@ -2261,8 +2300,10 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) int error=0; bool inc_pos = 1; bool processed_stop_event = 0; + DBUG_ENTER("queue_event"); + if (mi->old_format) - return queue_old_event(mi,buf,event_len); + DBUG_RETURN(queue_old_event(mi,buf,event_len)); pthread_mutex_lock(&mi->data_lock); @@ -2278,7 +2319,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) { Rotate_log_event rev(buf,event_len,0); if (unlikely(process_io_rotate(mi,&rev))) - return 1; + DBUG_RETURN(1); inc_pos=0; mi->ignore_stop_event=1; break; @@ -2298,7 +2339,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) if (unlikely(processed_stop_event)) mi->ignore_stop_event=1; pthread_mutex_unlock(&mi->data_lock); - return error; + DBUG_RETURN(error); } @@ -2425,18 +2466,27 @@ int flush_relay_log_info(RELAY_LOG_INFO* rli) return 0; } -IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg) + +/* + This function is called when we notice that the current "hot" log + got rotated under our feet. +*/ + +static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) { DBUG_ASSERT(rli->cur_log != &rli->cache_buf); - IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf; DBUG_ASSERT(rli->cur_log_fd == -1); + DBUG_ENTER("reopen_relay_log"); + + IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, errmsg)) <0) - return 0; + DBUG_RETURN(0); my_b_seek(cur_log,rli->relay_log_pos); - return cur_log; + DBUG_RETURN(cur_log); } + Log_event* next_event(RELAY_LOG_INFO* rli) { Log_event* ev; @@ -2445,6 +2495,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) const char* errmsg=0; THD* thd = rli->sql_thd; bool was_killed; + DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0); /* @@ -2456,16 +2507,18 @@ Log_event* next_event(RELAY_LOG_INFO* rli) */ pthread_mutex_lock(&rli->data_lock); - for (; !(was_killed=sql_slave_killed(thd,rli)) ;) + while (!(was_killed=sql_slave_killed(thd,rli))) { /* We can have two kinds of log reading: - hot_log - rli->cur_log points at the IO_CACHE of relay_log, which - is actively being updated by the I/O thread. We need to be careful - in this case and make sure that we are not looking at a stale log that - has already been rotated. If it has been, we reopen the log - the other case is much simpler - we just have a read only log that - nobody else will be updating. + hot_log: + rli->cur_log points at the IO_CACHE of relay_log, which + is actively being updated by the I/O thread. We need to be careful + in this case and make sure that we are not looking at a stale log that + has already been rotated. If it has been, we reopen the log. + + The other case is much simpler: + We just have a read only log that nobody else will be updating. */ bool hot_log; if ((hot_log = (cur_log != &rli->cache_buf))) @@ -2474,43 +2527,43 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_lock(log_lock); /* - Reading cur_log->init_count here is safe because the log will only + Reading xxx_file_id is safe because the log will only be rotated when we hold relay_log.LOCK_log */ - if (cur_log->init_count != rli->cur_log_init_count) + if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count) { - if (!(cur_log=reopen_relay_log(rli,&errmsg))) - { - pthread_mutex_unlock(log_lock); + // The master has switched to a new log file; Reopen the old log file + cur_log=reopen_relay_log(rli, &errmsg); + pthread_mutex_unlock(log_lock); + if (!cur_log) // No more log files goto err; - } - pthread_mutex_unlock(log_lock); - hot_log=0; + hot_log=0; // Using old binary log } } - DBUG_ASSERT(my_b_tell(cur_log) >= 4); + DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); - /* relay log is always in new format - if the master is 3.23, the - I/O thread will convert the format for us + /* + Relay log is always in new format - if the master is 3.23, the + I/O thread will convert the format for us */ - if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/))) + if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */))) { DBUG_ASSERT(thd==rli->sql_thd); if (hot_log) pthread_mutex_unlock(log_lock); pthread_mutex_unlock(&rli->data_lock); - return ev; + DBUG_RETURN(ev); } DBUG_ASSERT(thd==rli->sql_thd); - if (opt_reckless_slave) + if (opt_reckless_slave) // For mysql-test cur_log->error = 0; - if ( cur_log->error < 0) + if (cur_log->error < 0) { errmsg = "slave SQL thread aborted because of I/O error"; + if (hot_log) + pthread_mutex_unlock(log_lock); goto err; } - - if (!cur_log->error) /* EOF */ { /* @@ -2520,7 +2573,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) */ if (hot_log) { - DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count); + DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count); /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block @@ -2528,7 +2581,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_unlock(&rli->data_lock); /* - IMPORTANT: note that wait_for_update will unlock LOCK_log, but + IMPORTANT: note that wait_for_update will unlock lock_log, but expects the caller to lock it */ rli->relay_log.wait_for_update(rli->sql_thd); @@ -2537,102 +2590,108 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_lock(&rli->data_lock); continue; } + /* + If the log was not hot, we need to move to the next log in + sequence. The next log could be hot or cold, we deal with both + cases separately after doing some common initialization + */ + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + + /* + TODO: make skip_log_purge a start-up option. At this point this + is not critical priority + */ + if (!rli->skip_log_purge) + { + // purge_first_log will properly set up relay log coordinates in rli + if (rli->relay_log.purge_first_log(rli)) + { + errmsg = "Error purging processed log"; + goto err; + } + } else { /* - If the log was not hot, we need to move to the next log in - sequence. The next log could be hot or cold, we deal with both - cases separately after doing some common initialization + TODO: verify that no lock is ok here. At this point, if we + get this wrong, this is actually no big deal - the only time + this code will ever be executed is if we are recovering from + a bug when a full reload of the slave is not feasible or + desirable. */ - end_io_cache(cur_log); - DBUG_ASSERT(rli->cur_log_fd >= 0); - my_close(rli->cur_log_fd, MYF(MY_WME)); - rli->cur_log_fd = -1; - - // TODO: make skip_log_purge a start-up option. At this point this - // is not critical priority - if (!rli->skip_log_purge) - { - // purge_first_log will properly set up relay log coordinates in rli - if (rli->relay_log.purge_first_log(rli)) - { - errmsg = "Error purging processed log"; - goto err; - } - } - else + if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/)) { - // TODO: verify that no lock is ok here. At this point, if we - // get this wrong, this is actually no big deal - the only time - // this code will ever be executed is if we are recovering from - // a bug when a full reload of the slave is not feasible or - // desirable. - if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/)) - { - errmsg = "error switching to the next log"; - goto err; - } - rli->relay_log_pos = 4; - rli->pending=0; - strnmov(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)); - flush_relay_log_info(rli); + errmsg = "error switching to the next log"; + goto err; } + rli->relay_log_pos = BIN_LOG_HEADER_SIZE; + rli->pending=0; + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + flush_relay_log_info(rli); + } - // next log is hot - if (rli->relay_log.is_active(rli->linfo.log_file_name)) - { + // next log is hot + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { #ifdef EXTRA_DEBUG - sql_print_error("next log '%s' is currently active", - rli->linfo.log_file_name); + sql_print_error("next log '%s' is currently active", + rli->linfo.log_file_name); #endif - rli->cur_log = cur_log = rli->relay_log.get_log_file(); - rli->cur_log_init_count = cur_log->init_count; - DBUG_ASSERT(rli->cur_log_fd == -1); + rli->cur_log= cur_log= rli->relay_log.get_log_file(); + rli->cur_log_old_open_count= rli->relay_log.get_open_count(); + DBUG_ASSERT(rli->cur_log_fd == -1); - /* - Read pointer has to be at the start since we are the only - reader - */ - if (check_binlog_magic(cur_log,&errmsg)) - goto err; - continue; - } /* - if we get here, the log was not hot, so we will have to - open it ourselves + Read pointer has to be at the start since we are the only + reader */ -#ifdef EXTRA_DEBUG - sql_print_error("next log '%s' is not active", - rli->linfo.log_file_name); -#endif - // open_binlog() will check the magic header - if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, - &errmsg)) <0) + if (check_binlog_magic(cur_log,&errmsg)) goto err; + continue; } + /* + if we get here, the log was not hot, so we will have to + open it ourselves + */ +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is not active", + rli->linfo.log_file_name); +#endif + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg)) <0) + goto err; } - else // read failed with a non-EOF error + else { - // TODO: come up with something better to handle this error + /* + Read failed with a non-EOF error. + TODO: come up with something better to handle this error + */ + if (hot_log) + pthread_mutex_unlock(log_lock); sql_print_error("Slave SQL thread: I/O error reading \ -event(errno=%d,cur_log->error=%d)", +event(errno: %d cur_log->error: %d)", my_errno,cur_log->error); // set read position to the beginning of the event my_b_seek(cur_log,rli->relay_log_pos+rli->pending); /* otherwise, we have had a partial read */ - /* TODO; see if there is a way to do this without this goto */ errmsg = "Aborting slave SQL thread because of partial event read"; + /* TODO; see if there is a way to do this without this goto */ goto err; } - } if (!errmsg && was_killed) errmsg = "slave SQL thread was killed"; + err: pthread_mutex_unlock(&rli->data_lock); sql_print_error("Error reading relay log event: %s", errmsg); - return 0; + DBUG_RETURN(0); } |