summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc337
1 files changed, 228 insertions, 109 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 6b9c376a625..e8ffb15110b 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -20,6 +20,7 @@
#include <myisam.h>
#include "mini_client.h"
#include "slave.h"
+#include "sql_repl.h"
#include <thr_alarm.h>
#include <my_dir.h>
@@ -55,7 +56,7 @@ static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_sleep(THD* thd, int sec);
-static int request_table_dump(MYSQL* mysql, char* db, char* table);
+static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
inline char* rewrite_db(char* db);
@@ -314,28 +315,31 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name)
{
uint packet_len = my_net_read(net); // read create table statement
+ Vio* save_vio;
+ HA_CHECK_OPT check_opt;
TABLE_LIST tables;
- int error = 0;
+ int error= 1;
+ handler *file;
- if(packet_len == packet_error)
- {
- send_error(&thd->net, ER_MASTER_NET_READ);
- return 1;
- }
- if(net->read_pos[0] == 255) // error from master
- {
- net->read_pos[packet_len] = 0;
- net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
- return 1;
- }
+ if (packet_len == packet_error)
+ {
+ send_error(&thd->net, ER_MASTER_NET_READ);
+ return 1;
+ }
+ if (net->read_pos[0] == 255) // error from master
+ {
+ net->read_pos[packet_len] = 0;
+ net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
+ return 1;
+ }
thd->command = COM_TABLE_DUMP;
thd->query = sql_alloc(packet_len + 1);
- if(!thd->query)
- {
- sql_print_error("create_table_from_dump: out of memory");
- net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
- return 1;
- }
+ if (!thd->query)
+ {
+ sql_print_error("create_table_from_dump: out of memory");
+ net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
+ return 1;
+ }
memcpy(thd->query, net->read_pos, packet_len);
thd->query[packet_len] = 0;
thd->current_tablenr = 0;
@@ -344,15 +348,12 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Creating table from master dump";
// save old db in case we are creating in a different database
char* save_db = thd->db;
- thd->db = thd->last_nx_db;
+ thd->db = (char*)db;
mysql_parse(thd, thd->query, packet_len); // run create table
- thd->db = save_db; // leave things the way the were before
+ thd->db = save_db; // leave things the way the were before
- if(thd->query_error)
- {
- close_thread_tables(thd); // mysql_parse takes care of the error send
- return 1;
- }
+ if (thd->query_error)
+ goto err; // mysql_parse took care of the error send
bzero((char*) &tables,sizeof(tables));
tables.db = (char*)db;
@@ -361,83 +362,90 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Opening master dump table";
if (!open_ltable(thd, &tables, TL_WRITE))
{
- // open tables will send the error
+ send_error(&thd->net,0,0); // Send error from open_ltable
sql_print_error("create_table_from_dump: could not open created table");
- close_thread_tables(thd);
- return 1;
+ goto err;
}
- handler *file = tables.table->file;
+ file = tables.table->file;
thd->proc_info = "Reading master dump table data";
if (file->net_read_dump(net))
{
net_printf(&thd->net, ER_MASTER_NET_READ);
sql_print_error("create_table_from_dump::failed in\
handler::net_read_dump()");
- close_thread_tables(thd);
- return 1;
+ goto err;
}
- HA_CHECK_OPT check_opt;
check_opt.init();
check_opt.flags|= T_VERY_SILENT;
check_opt.quick = 1;
thd->proc_info = "Rebuilding the index on master dump table";
- Vio* save_vio = thd->net.vio;
// we do not want repair() to spam us with messages
// just send them to the error log, and report the failure in case of
// problems
+ save_vio = thd->net.vio;
thd->net.vio = 0;
- if (file->repair(thd,&check_opt ))
- {
- net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name );
- error = 1;
- }
+ error=file->repair(thd,&check_opt) != 0;
thd->net.vio = save_vio;
+ if (error)
+ net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);
+
+err:
close_thread_tables(thd);
-
thd->net.no_send_ok = 0;
return error;
}
-int fetch_nx_table(THD* thd, MASTER_INFO* mi)
+int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
+ MASTER_INFO* mi, MYSQL* mysql)
{
- MYSQL* mysql = mc_mysql_init(NULL);
int error = 1;
int nx_errno = 0;
- if(!mysql)
- {
- sql_print_error("fetch_nx_table: Error in mysql_init()");
- nx_errno = ER_GET_ERRNO;
- goto err;
- }
-
- safe_connect(thd, mysql, mi);
- if(slave_killed(thd))
+ bool called_connected = (mysql != NULL);
+ if (!called_connected && !(mysql = mc_mysql_init(NULL)))
+ {
+ sql_print_error("fetch_nx_table: Error in mysql_init()");
+ nx_errno = ER_GET_ERRNO;
goto err;
+ }
- if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
+ if (!called_connected)
+ {
+ if (connect_to_master(thd, mysql, mi))
{
- nx_errno = ER_GET_ERRNO;
- sql_print_error("fetch_nx_table: failed on table dump request ");
+ sql_print_error("Could not connect to master while fetching table\
+ '%-64s.%-64s'", db_name, table_name);
+ nx_errno = ER_CONNECT_TO_MASTER;
goto err;
}
+ }
+ if (slave_killed(thd))
+ goto err;
- if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
- thd->last_nx_table))
- {
- // create_table_from_dump will have sent the error alread
- sql_print_error("fetch_nx_table: failed on create table ");
- goto err;
- }
+ if (request_table_dump(mysql, db_name, table_name))
+ {
+ nx_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_nx_table: failed on table dump request ");
+ goto err;
+ }
+
+ if (create_table_from_dump(thd, &mysql->net, db_name,
+ table_name))
+ {
+ // create_table_from_dump will have sent the error alread
+ sql_print_error("fetch_nx_table: failed on create table ");
+ goto err;
+ }
error = 0;
err:
- if (mysql)
+ if (mysql && !called_connected)
mc_mysql_close(mysql);
if (nx_errno && thd->net.vio)
send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
+ thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
return error;
}
@@ -460,7 +468,7 @@ int init_master_info(MASTER_INFO* mi)
MY_STAT stat_area;
char fname[FN_REFLEN+128];
const char *msg;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
// we need a mutex while we are changing master info parameters to
// keep other threads from reading bogus info
@@ -537,7 +545,9 @@ int init_master_info(MASTER_INFO* mi)
master_password) ||
init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
- master_connect_retry))
+ master_connect_retry) ||
+ init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0)
+ )
{
msg="Error reading master configuration";
goto error;
@@ -560,6 +570,44 @@ error:
return 1;
}
+int register_slave_on_master(MYSQL* mysql)
+{
+ String packet;
+ uint len;
+ char buf[4];
+
+ if(!report_host)
+ return 0;
+
+ int4store(buf, server_id);
+ packet.append(buf, 4);
+
+ net_store_data(&packet, report_host);
+ if(report_user)
+ net_store_data(&packet, report_user);
+ else
+ packet.append((char)0);
+
+ if(report_password)
+ net_store_data(&packet, report_user);
+ else
+ packet.append((char)0);
+
+ int2store(buf, (uint16)report_port);
+ packet.append(buf, 2);
+
+ if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
+ packet.length(), 0))
+ {
+ sql_print_error("Error on COM_REGISTER_SLAVE: '%s'",
+ mc_mysql_error(mysql));
+ return 1;
+ }
+
+ return 0;
+}
+
+
int show_master_info(THD* thd)
{
DBUG_ENTER("show_master_info");
@@ -579,10 +627,12 @@ int show_master_info(THD* thd)
field_list.push_back(new Item_empty_string("Last_errno", 4));
field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12));
+ field_list.push_back(new Item_empty_string("Last_log_seq", 12));
if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
+ uint32 last_log_seq;
packet->length(0);
pthread_mutex_lock(&glob_mi.lock);
@@ -591,7 +641,8 @@ int show_master_info(THD* thd)
net_store_data(packet, (uint32) glob_mi.port);
net_store_data(packet, (uint32) glob_mi.connect_retry);
net_store_data(packet, glob_mi.log_file_name);
- net_store_data(packet, (uint32) glob_mi.pos); // QQ: Should be fixed
+ net_store_data(packet, (longlong) glob_mi.pos);
+ last_log_seq = glob_mi.last_log_seq;
pthread_mutex_unlock(&glob_mi.lock);
pthread_mutex_lock(&LOCK_slave);
net_store_data(packet, slave_running ? "Yes":"No");
@@ -601,6 +652,7 @@ int show_master_info(THD* thd)
net_store_data(packet, (uint32)last_slave_errno);
net_store_data(packet, last_slave_error);
net_store_data(packet, slave_skip_counter);
+ net_store_data(packet, last_log_seq);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
DBUG_RETURN(-1);
@@ -613,11 +665,13 @@ int flush_master_info(MASTER_INFO* mi)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
+ char lbuf1[22];
my_b_seek(file, 0L);
- my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
+ my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n",
mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry);
+ mi->password, mi->port, mi->connect_retry,
+ llstr(mi->last_log_seq, lbuf1));
flush_io_cache(file);
return 0;
}
@@ -764,7 +818,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
return 0;
}
-static int request_table_dump(MYSQL* mysql, char* db, char* table)
+static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
{
char buf[1024];
char * p = buf;
@@ -882,7 +936,10 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
- if(!ev->when)
+ if(!thd->log_seq)
+ thd->log_seq = ev->log_seq;
+
+ if (!ev->when)
ev->when = time(NULL);
switch(type_code) {
@@ -901,7 +958,6 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
- thd->last_nx_table = thd->last_nx_db = 0;
thd->query_error = 0; // clear error
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
@@ -909,36 +965,37 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
// sanity check to make sure the master did not get a really bad
// error on the query
- if(!check_expected_error(thd, (expected_error = qev->error_code)))
+ if (!check_expected_error(thd, (expected_error = qev->error_code)))
+ {
+ mysql_parse(thd, thd->query, q_len);
+ if (expected_error !=
+ (actual_error = thd->net.last_errno) && expected_error)
{
- mysql_parse(thd, thd->query, q_len);
- if (expected_error !=
- (actual_error = thd->net.last_errno) && expected_error)
- {
- const char* errmsg = "Slave: did not get the expected error\
- running query from master - expected: '%s', got '%s'";
- sql_print_error(errmsg, ER(expected_error),
- actual_error ? thd->net.last_error:"no error"
- );
- thd->query_error = 1;
- }
- else if (expected_error == actual_error)
- {
- thd->query_error = 0;
- *last_slave_error = 0;
- last_slave_errno = 0;
- }
+ const char* errmsg = "Slave: did not get the expected error\
+ running query from master - expected: '%s'(%d), got '%s'(%d)";
+ sql_print_error(errmsg, ER_SAFE(expected_error),
+ expected_error,
+ actual_error ? thd->net.last_error:"no error",
+ actual_error);
+ thd->query_error = 1;
}
- else // master could be inconsistent, abort and tell DBA to
- // check/fix it
+ else if (expected_error == actual_error)
{
- thd->db = thd->query = 0;
- thd->convert_set = 0;
- close_thread_tables(thd);
- free_root(&thd->mem_root,0);
- delete ev;
- return 1;
+ thd->query_error = 0;
+ *last_slave_error = 0;
+ last_slave_errno = 0;
}
+ }
+ else
+ {
+ // master could be inconsistent, abort and tell DBA to check/fix it
+ thd->db = thd->query = 0;
+ thd->convert_set = 0;
+ close_thread_tables(thd);
+ free_root(&thd->mem_root,0);
+ delete ev;
+ return 1;
+ }
}
thd->db = 0; // prevent db from being freed
thd->query = 0; // just to be sure
@@ -962,8 +1019,25 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
return 1;
}
free_root(&thd->mem_root,0);
+ mi->last_log_seq = ev->log_seq;
delete ev;
+ thd->log_seq = 0;
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
+ }
+ case SLAVE_EVENT:
+ {
+ if(mysql_bin_log.is_open())
+ {
+ Slave_log_event *sev = (Slave_log_event*)ev;
+ mysql_bin_log.write(sev);
+ }
+
+ mi->last_log_seq = ev->log_seq;
+ delete ev;
+ thd->log_seq = 0;
mi->inc_pos(event_len);
flush_master_info(mi);
break;
@@ -1076,7 +1150,9 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
return 1;
}
+ mi->last_log_seq = ev->log_seq;
delete ev;
+ thd->log_seq = 0;
free_root(&thd->mem_root,0);
if(thd->fatal_error)
@@ -1094,8 +1170,10 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
case START_EVENT:
close_temporary_tables(thd);
mi->inc_pos(event_len);
+ mi->last_log_seq = ev->log_seq;
flush_master_info(mi);
delete ev;
+ thd->log_seq = 0;
break;
case STOP_EVENT:
@@ -1105,24 +1183,56 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
mi->inc_pos(event_len);
flush_master_info(mi);
}
+ mi->last_log_seq = ev->log_seq;
delete ev;
+ thd->log_seq = 0;
break;
case ROTATE_EVENT:
{
Rotate_log_event* rev = (Rotate_log_event*)ev;
int ident_len = rev->ident_len;
+ bool rotate_binlog = 0, write_slave_event = 0;
+ char* log_name = mi->log_file_name;
pthread_mutex_lock(&mi->lock);
- memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
- mi->log_file_name[ident_len] = 0;
- mi->pos = 4; // skip magic number
+
+ // rotate local binlog only if the name of remote has changed
+ if (!*log_name || !(log_name[ident_len] == 0 &&
+ !memcmp(log_name, rev->new_log_ident, ident_len)))
+ {
+ write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F)
+ && mysql_bin_log.is_open());
+ rotate_binlog = (*log_name && write_slave_event);
+ memcpy(log_name, rev->new_log_ident,ident_len );
+ log_name[ident_len] = 0;
+ }
+ mi->pos = rev->pos;
+ mi->last_log_seq = ev->log_seq;
pthread_cond_broadcast(&mi->cond);
pthread_mutex_unlock(&mi->lock);
- flush_master_info(mi);
#ifndef DBUG_OFF
- if(abort_slave_event_count)
+ if (abort_slave_event_count)
++events_till_abort;
-#endif
+#endif
+ if (rotate_binlog)
+ {
+ mi->last_log_seq = 0;
+ mysql_bin_log.new_file();
+ }
+ flush_master_info(mi);
+
+ if (write_slave_event)
+ {
+ Slave_log_event s(thd, mi);
+ if (s.master_host)
+ {
+ s.set_log_seq(0, &mysql_bin_log);
+ s.server_id = ::server_id;
+ mysql_bin_log.write(&s);
+ }
+ }
+
delete ev;
+ thd->log_seq = 0;
break;
}
@@ -1142,6 +1252,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
}
mi->inc_pending(event_len);
delete ev;
+ // do not reset log_seq
break;
}
}
@@ -1237,6 +1348,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
sql_print_error("Slave thread killed while connecting to master");
goto err;
}
+
+connected:
+
+ // register ourselves with the master
+ // if fails, this is not fatal - we just print the error message and go
+ // on with life
+ thd->proc_info = "Registering slave on master";
+ register_slave_on_master(mysql);
while (!slave_killed(thd))
{
@@ -1280,7 +1399,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
goto err;
}
- continue;
+ goto connected;
}
@@ -1331,8 +1450,9 @@ reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
reconnect done to recover from failed read");
goto err;
}
- break;
- }
+
+ goto connected;
+ } // if(event_len == packet_error)
thd->proc_info = "Processing master log event";
if(exec_event(thd, &mysql->net, &glob_mi, event_len))
@@ -1369,15 +1489,14 @@ the slave thread with \"mysqladmin start-slave\". We stopped at log \
{
// show a little mercy, allow slave to read one more event
// before cutting him off - otherwise he gets stuck
- // on Invar events, since they do not advance the offset
+ // on Intvar events, since they do not advance the offset
// immediately
if (++stuck_count > 2)
events_till_disconnect++;
}
#endif
-
- }
- }
+ } // while(!slave_killed(thd)) - read/exec loop
+ } // while(!slave_killed(thd)) - slave loop
// error = 0;
err: