diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 337 |
1 files changed, 228 insertions, 109 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 6b9c376a625..e8ffb15110b 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -20,6 +20,7 @@ #include <myisam.h> #include "mini_client.h" #include "slave.h" +#include "sql_repl.h" #include <thr_alarm.h> #include <my_dir.h> @@ -55,7 +56,7 @@ static int init_slave_thread(THD* thd); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_sleep(THD* thd, int sec); -static int request_table_dump(MYSQL* mysql, char* db, char* table); +static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); inline char* rewrite_db(char* db); @@ -314,28 +315,31 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name) { uint packet_len = my_net_read(net); // read create table statement + Vio* save_vio; + HA_CHECK_OPT check_opt; TABLE_LIST tables; - int error = 0; + int error= 1; + handler *file; - if(packet_len == packet_error) - { - send_error(&thd->net, ER_MASTER_NET_READ); - return 1; - } - if(net->read_pos[0] == 255) // error from master - { - net->read_pos[packet_len] = 0; - net_printf(&thd->net, ER_MASTER, net->read_pos + 3); - return 1; - } + if (packet_len == packet_error) + { + send_error(&thd->net, ER_MASTER_NET_READ); + return 1; + } + if (net->read_pos[0] == 255) // error from master + { + net->read_pos[packet_len] = 0; + net_printf(&thd->net, ER_MASTER, net->read_pos + 3); + return 1; + } thd->command = COM_TABLE_DUMP; thd->query = sql_alloc(packet_len + 1); - if(!thd->query) - { - sql_print_error("create_table_from_dump: out of memory"); - net_printf(&thd->net, ER_GET_ERRNO, "Out of memory"); - return 1; - } + if (!thd->query) + { + sql_print_error("create_table_from_dump: out of memory"); + net_printf(&thd->net, ER_GET_ERRNO, "Out of memory"); + return 1; + } memcpy(thd->query, net->read_pos, packet_len); thd->query[packet_len] = 0; thd->current_tablenr = 0; @@ -344,15 +348,12 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->proc_info = "Creating table from master dump"; // save old db in case we are creating in a different database char* save_db = thd->db; - thd->db = thd->last_nx_db; + thd->db = (char*)db; mysql_parse(thd, thd->query, packet_len); // run create table - thd->db = save_db; // leave things the way the were before + thd->db = save_db; // leave things the way the were before - if(thd->query_error) - { - close_thread_tables(thd); // mysql_parse takes care of the error send - return 1; - } + if (thd->query_error) + goto err; // mysql_parse took care of the error send bzero((char*) &tables,sizeof(tables)); tables.db = (char*)db; @@ -361,83 +362,90 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->proc_info = "Opening master dump table"; if (!open_ltable(thd, &tables, TL_WRITE)) { - // open tables will send the error + send_error(&thd->net,0,0); // Send error from open_ltable sql_print_error("create_table_from_dump: could not open created table"); - close_thread_tables(thd); - return 1; + goto err; } - handler *file = tables.table->file; + file = tables.table->file; thd->proc_info = "Reading master dump table data"; if (file->net_read_dump(net)) { net_printf(&thd->net, ER_MASTER_NET_READ); sql_print_error("create_table_from_dump::failed in\ handler::net_read_dump()"); - close_thread_tables(thd); - return 1; + goto err; } - HA_CHECK_OPT check_opt; check_opt.init(); check_opt.flags|= T_VERY_SILENT; check_opt.quick = 1; thd->proc_info = "Rebuilding the index on master dump table"; - Vio* save_vio = thd->net.vio; // we do not want repair() to spam us with messages // just send them to the error log, and report the failure in case of // problems + save_vio = thd->net.vio; thd->net.vio = 0; - if (file->repair(thd,&check_opt )) - { - net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name ); - error = 1; - } + error=file->repair(thd,&check_opt) != 0; thd->net.vio = save_vio; + if (error) + net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name); + +err: close_thread_tables(thd); - thd->net.no_send_ok = 0; return error; } -int fetch_nx_table(THD* thd, MASTER_INFO* mi) +int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, + MASTER_INFO* mi, MYSQL* mysql) { - MYSQL* mysql = mc_mysql_init(NULL); int error = 1; int nx_errno = 0; - if(!mysql) - { - sql_print_error("fetch_nx_table: Error in mysql_init()"); - nx_errno = ER_GET_ERRNO; - goto err; - } - - safe_connect(thd, mysql, mi); - if(slave_killed(thd)) + bool called_connected = (mysql != NULL); + if (!called_connected && !(mysql = mc_mysql_init(NULL))) + { + sql_print_error("fetch_nx_table: Error in mysql_init()"); + nx_errno = ER_GET_ERRNO; goto err; + } - if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table)) + if (!called_connected) + { + if (connect_to_master(thd, mysql, mi)) { - nx_errno = ER_GET_ERRNO; - sql_print_error("fetch_nx_table: failed on table dump request "); + sql_print_error("Could not connect to master while fetching table\ + '%-64s.%-64s'", db_name, table_name); + nx_errno = ER_CONNECT_TO_MASTER; goto err; } + } + if (slave_killed(thd)) + goto err; - if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db, - thd->last_nx_table)) - { - // create_table_from_dump will have sent the error alread - sql_print_error("fetch_nx_table: failed on create table "); - goto err; - } + if (request_table_dump(mysql, db_name, table_name)) + { + nx_errno = ER_GET_ERRNO; + sql_print_error("fetch_nx_table: failed on table dump request "); + goto err; + } + + if (create_table_from_dump(thd, &mysql->net, db_name, + table_name)) + { + // create_table_from_dump will have sent the error alread + sql_print_error("fetch_nx_table: failed on create table "); + goto err; + } error = 0; err: - if (mysql) + if (mysql && !called_connected) mc_mysql_close(mysql); if (nx_errno && thd->net.vio) send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); + thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump return error; } @@ -460,7 +468,7 @@ int init_master_info(MASTER_INFO* mi) MY_STAT stat_area; char fname[FN_REFLEN+128]; const char *msg; - fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + fn_format(fname, master_info_file, mysql_data_home, "", 4+32); // we need a mutex while we are changing master info parameters to // keep other threads from reading bogus info @@ -537,7 +545,9 @@ int init_master_info(MASTER_INFO* mi) master_password) || init_intvar_from_file((int*)&mi->port, &mi->file, master_port) || init_intvar_from_file((int*)&mi->connect_retry, &mi->file, - master_connect_retry)) + master_connect_retry) || + init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0) + ) { msg="Error reading master configuration"; goto error; @@ -560,6 +570,44 @@ error: return 1; } +int register_slave_on_master(MYSQL* mysql) +{ + String packet; + uint len; + char buf[4]; + + if(!report_host) + return 0; + + int4store(buf, server_id); + packet.append(buf, 4); + + net_store_data(&packet, report_host); + if(report_user) + net_store_data(&packet, report_user); + else + packet.append((char)0); + + if(report_password) + net_store_data(&packet, report_user); + else + packet.append((char)0); + + int2store(buf, (uint16)report_port); + packet.append(buf, 2); + + if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), + packet.length(), 0)) + { + sql_print_error("Error on COM_REGISTER_SLAVE: '%s'", + mc_mysql_error(mysql)); + return 1; + } + + return 0; +} + + int show_master_info(THD* thd) { DBUG_ENTER("show_master_info"); @@ -579,10 +627,12 @@ int show_master_info(THD* thd) field_list.push_back(new Item_empty_string("Last_errno", 4)); field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Skip_counter", 12)); + field_list.push_back(new Item_empty_string("Last_log_seq", 12)); if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; + uint32 last_log_seq; packet->length(0); pthread_mutex_lock(&glob_mi.lock); @@ -591,7 +641,8 @@ int show_master_info(THD* thd) net_store_data(packet, (uint32) glob_mi.port); net_store_data(packet, (uint32) glob_mi.connect_retry); net_store_data(packet, glob_mi.log_file_name); - net_store_data(packet, (uint32) glob_mi.pos); // QQ: Should be fixed + net_store_data(packet, (longlong) glob_mi.pos); + last_log_seq = glob_mi.last_log_seq; pthread_mutex_unlock(&glob_mi.lock); pthread_mutex_lock(&LOCK_slave); net_store_data(packet, slave_running ? "Yes":"No"); @@ -601,6 +652,7 @@ int show_master_info(THD* thd) net_store_data(packet, (uint32)last_slave_errno); net_store_data(packet, last_slave_error); net_store_data(packet, slave_skip_counter); + net_store_data(packet, last_log_seq); if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); @@ -613,11 +665,13 @@ int flush_master_info(MASTER_INFO* mi) { IO_CACHE* file = &mi->file; char lbuf[22]; + char lbuf1[22]; my_b_seek(file, 0L); - my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n", + my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n", mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, - mi->password, mi->port, mi->connect_retry); + mi->password, mi->port, mi->connect_retry, + llstr(mi->last_log_seq, lbuf1)); flush_io_cache(file); return 0; } @@ -764,7 +818,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi) return 0; } -static int request_table_dump(MYSQL* mysql, char* db, char* table) +static int request_table_dump(MYSQL* mysql, const char* db, const char* table) { char buf[1024]; char * p = buf; @@ -882,7 +936,10 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query - if(!ev->when) + if(!thd->log_seq) + thd->log_seq = ev->log_seq; + + if (!ev->when) ev->when = time(NULL); switch(type_code) { @@ -901,7 +958,6 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->query_id = query_id++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->last_nx_table = thd->last_nx_db = 0; thd->query_error = 0; // clear error thd->net.last_errno = 0; thd->net.last_error[0] = 0; @@ -909,36 +965,37 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) // sanity check to make sure the master did not get a really bad // error on the query - if(!check_expected_error(thd, (expected_error = qev->error_code))) + if (!check_expected_error(thd, (expected_error = qev->error_code))) + { + mysql_parse(thd, thd->query, q_len); + if (expected_error != + (actual_error = thd->net.last_errno) && expected_error) { - mysql_parse(thd, thd->query, q_len); - if (expected_error != - (actual_error = thd->net.last_errno) && expected_error) - { - const char* errmsg = "Slave: did not get the expected error\ - running query from master - expected: '%s', got '%s'"; - sql_print_error(errmsg, ER(expected_error), - actual_error ? thd->net.last_error:"no error" - ); - thd->query_error = 1; - } - else if (expected_error == actual_error) - { - thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; - } + const char* errmsg = "Slave: did not get the expected error\ + running query from master - expected: '%s'(%d), got '%s'(%d)"; + sql_print_error(errmsg, ER_SAFE(expected_error), + expected_error, + actual_error ? thd->net.last_error:"no error", + actual_error); + thd->query_error = 1; } - else // master could be inconsistent, abort and tell DBA to - // check/fix it + else if (expected_error == actual_error) { - thd->db = thd->query = 0; - thd->convert_set = 0; - close_thread_tables(thd); - free_root(&thd->mem_root,0); - delete ev; - return 1; + thd->query_error = 0; + *last_slave_error = 0; + last_slave_errno = 0; } + } + else + { + // master could be inconsistent, abort and tell DBA to check/fix it + thd->db = thd->query = 0; + thd->convert_set = 0; + close_thread_tables(thd); + free_root(&thd->mem_root,0); + delete ev; + return 1; + } } thd->db = 0; // prevent db from being freed thd->query = 0; // just to be sure @@ -962,8 +1019,25 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) return 1; } free_root(&thd->mem_root,0); + mi->last_log_seq = ev->log_seq; delete ev; + thd->log_seq = 0; + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } + case SLAVE_EVENT: + { + if(mysql_bin_log.is_open()) + { + Slave_log_event *sev = (Slave_log_event*)ev; + mysql_bin_log.write(sev); + } + + mi->last_log_seq = ev->log_seq; + delete ev; + thd->log_seq = 0; mi->inc_pos(event_len); flush_master_info(mi); break; @@ -1076,7 +1150,9 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) return 1; } + mi->last_log_seq = ev->log_seq; delete ev; + thd->log_seq = 0; free_root(&thd->mem_root,0); if(thd->fatal_error) @@ -1094,8 +1170,10 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) case START_EVENT: close_temporary_tables(thd); mi->inc_pos(event_len); + mi->last_log_seq = ev->log_seq; flush_master_info(mi); delete ev; + thd->log_seq = 0; break; case STOP_EVENT: @@ -1105,24 +1183,56 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) mi->inc_pos(event_len); flush_master_info(mi); } + mi->last_log_seq = ev->log_seq; delete ev; + thd->log_seq = 0; break; case ROTATE_EVENT: { Rotate_log_event* rev = (Rotate_log_event*)ev; int ident_len = rev->ident_len; + bool rotate_binlog = 0, write_slave_event = 0; + char* log_name = mi->log_file_name; pthread_mutex_lock(&mi->lock); - memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); - mi->log_file_name[ident_len] = 0; - mi->pos = 4; // skip magic number + + // rotate local binlog only if the name of remote has changed + if (!*log_name || !(log_name[ident_len] == 0 && + !memcmp(log_name, rev->new_log_ident, ident_len))) + { + write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F) + && mysql_bin_log.is_open()); + rotate_binlog = (*log_name && write_slave_event); + memcpy(log_name, rev->new_log_ident,ident_len ); + log_name[ident_len] = 0; + } + mi->pos = rev->pos; + mi->last_log_seq = ev->log_seq; pthread_cond_broadcast(&mi->cond); pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); #ifndef DBUG_OFF - if(abort_slave_event_count) + if (abort_slave_event_count) ++events_till_abort; -#endif +#endif + if (rotate_binlog) + { + mi->last_log_seq = 0; + mysql_bin_log.new_file(); + } + flush_master_info(mi); + + if (write_slave_event) + { + Slave_log_event s(thd, mi); + if (s.master_host) + { + s.set_log_seq(0, &mysql_bin_log); + s.server_id = ::server_id; + mysql_bin_log.write(&s); + } + } + delete ev; + thd->log_seq = 0; break; } @@ -1142,6 +1252,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) } mi->inc_pending(event_len); delete ev; + // do not reset log_seq break; } } @@ -1237,6 +1348,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) sql_print_error("Slave thread killed while connecting to master"); goto err; } + +connected: + + // register ourselves with the master + // if fails, this is not fatal - we just print the error message and go + // on with life + thd->proc_info = "Registering slave on master"; + register_slave_on_master(mysql); while (!slave_killed(thd)) { @@ -1280,7 +1399,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME, goto err; } - continue; + goto connected; } @@ -1331,8 +1450,9 @@ reconnecting to retry, log '%s' position %s", RPL_LOG_NAME, reconnect done to recover from failed read"); goto err; } - break; - } + + goto connected; + } // if(event_len == packet_error) thd->proc_info = "Processing master log event"; if(exec_event(thd, &mysql->net, &glob_mi, event_len)) @@ -1369,15 +1489,14 @@ the slave thread with \"mysqladmin start-slave\". We stopped at log \ { // show a little mercy, allow slave to read one more event // before cutting him off - otherwise he gets stuck - // on Invar events, since they do not advance the offset + // on Intvar events, since they do not advance the offset // immediately if (++stuck_count > 2) events_till_disconnect++; } #endif - - } - } + } // while(!slave_killed(thd)) - read/exec loop + } // while(!slave_killed(thd)) - slave loop // error = 0; err: |