diff options
author | unknown <guilhem@mysql.com> | 2003-04-24 15:29:25 +0200 |
---|---|---|
committer | unknown <guilhem@mysql.com> | 2003-04-24 15:29:25 +0200 |
commit | 9f4f19d9f77433e02e9985d9c2f3224d062173b3 (patch) | |
tree | 0264c0e9bae7ff461d619885e3e94778f1749a23 | |
parent | 4ac98ec5215dedff9e24b91d305e9eec5ee3b96f (diff) | |
download | mariadb-git-9f4f19d9f77433e02e9985d9c2f3224d062173b3.tar.gz |
Replication: new code to not modify in-memory log positions until the COMMIT
is executed, even if the transaction spans on >=2 relay logs (bug #53).
New variable relay_log_purge =0|1
New test to verify bug #53
sql/log.cc:
Now we purge a relay log only when we are sure we won't need it,
i.e. we have executed the final query (if autocommit=1) or the COMMIT.
sql/log_event.cc:
Better tracking of the relay log's name and position
lastly executed, even if we are in a transaction which spans on
2 or more relay logs.
sql/mysql_priv.h:
new option relay_log_purge (the user can now decide himself
if he wants his relay logs to be automatically purged or not,
we don't make unsafe guesses like before)
sql/mysqld.cc:
new option --innodb (replaces --skip-innodb).
Useful for the test suite : we have skip-innodb in mysql-test-run,
but we can ('-opt.info' file) choose to start the server with
InnoDB for this test only.
New option --bdb
sql/repl_failsafe.cc:
Better tracking of the relay log's name and position
lastly executed, even if we are in a transaction which spans on
2 or more relay logs.
sql/set_var.cc:
new variable relay_log_purge
sql/slave.cc:
Better tracking of the relay log's name and position
lastly executed, even if we are in a transaction which spans on
2 or more relay logs.
Now we purge a relay log only when we are sure we won't need it,
i.e. we have executed the final query (if autocommit=1) or the COMMIT
sql/slave.h:
Better tracking of the relay log's name and position
lastly executed, even if we are in a transaction which spans on
2 or more relay logs.
sql/sql_class.h:
prototypes change
sql/sql_parse.cc:
removed thd argument (was not used in the function's body)
sql/sql_repl.cc:
Better tracking of the relay log's name and position
lastly executed, even if we are in a transaction which spans on
2 or more relay logs.
Turn relay_log_purge silently off when someone does CHANGE
MASTER TO RELAY_LOG_*
-rw-r--r-- | mysql-test/r/rpl_relayrotate.result | 18 | ||||
-rw-r--r-- | mysql-test/t/rpl_relayrotate-slave.opt | 4 | ||||
-rw-r--r-- | mysql-test/t/rpl_relayrotate.test | 61 | ||||
-rw-r--r-- | sql/log.cc | 167 | ||||
-rw-r--r-- | sql/log_event.cc | 80 | ||||
-rw-r--r-- | sql/mysql_priv.h | 2 | ||||
-rw-r--r-- | sql/mysqld.cc | 56 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 11 | ||||
-rw-r--r-- | sql/set_var.cc | 10 | ||||
-rw-r--r-- | sql/slave.cc | 206 | ||||
-rw-r--r-- | sql/slave.h | 91 | ||||
-rw-r--r-- | sql/sql_class.h | 10 | ||||
-rw-r--r-- | sql/sql_parse.cc | 2 | ||||
-rw-r--r-- | sql/sql_repl.cc | 41 |
14 files changed, 475 insertions, 284 deletions
diff --git a/mysql-test/r/rpl_relayrotate.result b/mysql-test/r/rpl_relayrotate.result new file mode 100644 index 00000000000..45f425a3532 --- /dev/null +++ b/mysql-test/r/rpl_relayrotate.result @@ -0,0 +1,18 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +stop slave; +create table t1 (a int) type=innodb; +reset slave; +start slave; +stop slave; +start slave; +select master_pos_wait('master-bin.001',3000,120)=-1; +master_pos_wait('master-bin.001',3000,120)=-1 +0 +select * from t1 where a=8000; +a +8000 diff --git a/mysql-test/t/rpl_relayrotate-slave.opt b/mysql-test/t/rpl_relayrotate-slave.opt new file mode 100644 index 00000000000..8b671423363 --- /dev/null +++ b/mysql-test/t/rpl_relayrotate-slave.opt @@ -0,0 +1,4 @@ +-O max_binlog_size=16384 +--innodb +--log-warnings + diff --git a/mysql-test/t/rpl_relayrotate.test b/mysql-test/t/rpl_relayrotate.test new file mode 100644 index 00000000000..4c330b8c9a2 --- /dev/null +++ b/mysql-test/t/rpl_relayrotate.test @@ -0,0 +1,61 @@ +# When the relay log gets rotated while the I/O thread +# is reading a transaction, the transaction spans on two or more +# relay logs. If STOP SLAVE occurs while the SQL thread is +# executing a part of the transaction in the non-first relay logs, +# we test if START SLAVE will resume in the beginning of the +# transaction (i.e., step back to the first relay log) + +# The slave is started with max_binlog_size=16384 bytes, +# to force many rotations (approximately 30 rotations) + +# If the master or slave does not support InnoDB, this test will pass + +source include/master-slave.inc; +connection slave; +stop slave; +connection master; +create table t1 (a int) type=innodb; +let $1=8000; +disable_query_log; +begin; +while ($1) +{ +# eval means expand $ expressions + eval insert into t1 values( $1 ); + dec $1; +} +commit; +# This will generate a 500kB master's binlog, +# which corresponds to 30 slave's relay logs. +enable_query_log; +save_master_pos; +connection slave; +reset slave; +start slave; +# We wait 1 sec for the SQL thread to be somewhere in +# the middle of the transaction, hopefully not in +# the first relay log, and hopefully before the COMMIT. +# Usually it stops when the SQL thread is around the 15th relay log. +# We cannot use MASTER_POS_WAIT() as master's position +# increases only when the slave executes the COMMIT. +system sleep 1; +stop slave; +# We suppose the SQL thread stopped before COMMIT. +# If so the transaction was rolled back +# and the table is now empty. +# Now restart +start slave; +# And see if the table contains '8000' +# which proves that the transaction restarted at +# the right place. +# We must wait for the transaction to commit before +# reading, MASTER_POS_WAIT() will do it for sure +# (the only statement with position>=3000 is COMMIT). +# Older versions of MySQL would hang forever in MASTER_POS_WAIT +# because COMMIT was said to be position 0 in the master's log (bug). +# Detect this with timeout. +select master_pos_wait('master-bin.001',3000,120)=-1; +select * from t1 where a=8000; +# Note that the simple fact to have less than around 30 slave's binlogs +# (the slave is started with --log-slave-updates) is already +# a proof that the transaction was not properly resumed. diff --git a/sql/log.cc b/sql/log.cc index 51b1c572601..50471041ee1 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -592,24 +592,32 @@ err: /* - Delete the current log file, remove it from index file and start on next + Delete relay log files prior to rli->group_relay_log_name + (i.e. all logs which are not involved in a non-finished group + (transaction)), remove them from the index file and start on next relay log. SYNOPSIS purge_first_log() - rli Relay log information - + rli Relay log information + included If false, all relay logs that are strictly before + rli->group_relay_log_name are deleted ; if true, the latter is + deleted too (i.e. all relay logs + read by the SQL slave thread are deleted). + NOTE - This is only called from the slave-execute thread when it has read - all commands from a log and want to switch to a new log. - - When this happens, we should never be in an active transaction as - a transaction is always written as a single block to the binary log. + all commands from a relay log and want to switch to a new relay log. + - When this happens, we can be in an active transaction as + a transaction can span over two relay logs + (although it is always written as a single block to the master's binary + log, hence cannot span over two master's binary logs). IMPLEMENTATION - Protects index file with LOCK_index - - Delete first log file, - - Copy all file names after this one to the front of the index file + - Delete relevant relay log files + - Copy all file names after these ones to the front of the index file - If the OS has truncate, truncate the file, else fill it with \n' - - Read the first file name from the index file and store in rli->linfo + - Read the next file name from the index file and store in rli->linfo RETURN VALUES 0 ok @@ -620,66 +628,68 @@ err: #ifdef HAVE_REPLICATION -int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli) +int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli, bool included) { int error; DBUG_ENTER("purge_first_log"); - /* - Test pre-conditions. - - Assume that we have previously read the first log and - stored it in rli->relay_log_name - */ DBUG_ASSERT(is_open()); DBUG_ASSERT(rli->slave_running == 1); - DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name)); - DBUG_ASSERT(rli->linfo.index_file_offset == - strlen(rli->relay_log_name) + 1); + DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); - /* We have already processed the relay log, so it's safe to delete it */ - my_delete(rli->relay_log_name, MYF(0)); pthread_mutex_lock(&LOCK_index); - if (copy_up_file_and_fill(&index_file, rli->linfo.index_file_offset)) - { - error= LOG_INFO_IO; - goto err; - } + pthread_mutex_lock(&rli->log_space_lock); + rli->relay_log.purge_logs(rli->group_relay_log_name, included, + 0, 0, &rli->log_space_total); + // Tell the I/O thread to take the relay_log_space_limit into account + rli->ignore_log_space_limit= 0; + pthread_mutex_unlock(&rli->log_space_lock); /* - Update the space counter used by all relay logs Ok to broadcast after the critical region as there is no risk of the mutex being destroyed by this thread later - this helps save context switches */ - pthread_mutex_lock(&rli->log_space_lock); - rli->log_space_total -= rli->relay_log_pos; - //tell the I/O thread to take the relay_log_space_limit into account - rli->ignore_log_space_limit= 0; - pthread_mutex_unlock(&rli->log_space_lock); pthread_cond_broadcast(&rli->log_space_cond); /* Read the next log file name from the index file and pass it back to the caller + If included is true, we want the first relay log; + otherwise we want the one after event_relay_log_name. */ - if ((error=find_log_pos(&rli->linfo, NullS, 0 /*no mutex*/))) + if ((included && (error=find_log_pos(&rli->linfo, NullS, 0))) || + (!included && + ((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) || + (error=find_next_log(&rli->linfo, 0))))) { char buff[22]; - sql_print_error("next log error: %d offset: %s log: %s", - error, - llstr(rli->linfo.index_file_offset,buff), - rli->linfo.log_file_name); + sql_print_error("next log error: %d offset: %s log: %s included: %d", + error, + llstr(rli->linfo.index_file_offset,buff), + rli->group_relay_log_name, + included); goto err; } + /* - Reset position to current log. This involves setting both of the - position variables: + Reset rli's coordinates to the current log. */ - rli->relay_log_pos = BIN_LOG_HEADER_SIZE; - rli->pending = 0; - strmake(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)-1); + rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + strmake(rli->event_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->event_relay_log_name)-1); + + /* + If we removed the rli->group_relay_log_name file, + we must update the rli->group* coordinates, otherwise do not touch it as the + group's execution is not finished (e.g. COMMIT not executed) + */ + if (included) + { + rli->group_relay_log_pos = BIN_LOG_HEADER_SIZE; + strmake(rli->group_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->group_relay_log_name)-1); + } /* Store where we are in the new file for the execution thread */ flush_relay_log_info(rli); @@ -693,13 +703,14 @@ err: Update log index_file */ -int MYSQL_LOG::update_log_index(LOG_INFO* log_info) +int MYSQL_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads) { if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset)) return LOG_INFO_IO; // now update offsets in index file for running threads - adjust_linfo_offsets(log_info->index_file_start_offset); + if (need_update_threads) + adjust_linfo_offsets(log_info->index_file_start_offset); return 0; } @@ -708,9 +719,13 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info) SYNOPSIS purge_logs() - thd Thread pointer - to_log Delete all log file name before this file. This file is not - deleted + to_log Delete all log file name before this file. + included If true, to_log is deleted too. + need_mutex + need_update_threads If we want to update the log coordinates of + all threads. False for relay logs, true otherwise. + freed_log_space If not null, decrement this variable of + the amount of log space freed NOTES If any of the logs before the deleted one is in use, @@ -722,31 +737,59 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info) LOG_INFO_EOF to_log not found */ -int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) +int MYSQL_LOG::purge_logs(const char *to_log, + bool included, + bool need_mutex, + bool need_update_threads, + ulonglong *decrease_log_space) { int error; + bool exit_loop= 0; LOG_INFO log_info; DBUG_ENTER("purge_logs"); + DBUG_PRINT("info",("to_log= %s",to_log)); if (no_rotate) DBUG_RETURN(LOG_INFO_PURGE_NO_ROTATE); - pthread_mutex_lock(&LOCK_index); + if (need_mutex) + pthread_mutex_lock(&LOCK_index); if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/))) goto err; /* - File name exists in index file; Delete until we find this file + File name exists in index file; delete until we find this file or a file that is used. */ if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/))) goto err; - while (strcmp(to_log,log_info.log_file_name) && - !log_in_use(log_info.log_file_name)) + while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) && + !log_in_use(log_info.log_file_name)) { - /* It's not fatal even if we can't delete a log file */ - my_delete(log_info.log_file_name, MYF(0)); - if (find_next_log(&log_info, 0)) + ulong tmp; + if (decrease_log_space) //stat the file we want to delete + { + MY_STAT s; + if (my_stat(log_info.log_file_name,&s,MYF(0))) + tmp= s.st_size; + else + { + /* + If we could not stat, we can't know the amount + of space that deletion will free. In most cases, + deletion won't work either, so it's not a problem. + */ + tmp= 0; + } + } + /* + It's not fatal if we can't delete a log file ; + if we could delete it, take its size into account + */ + DBUG_PRINT("info",("purging %s",log_info.log_file_name)); + if (!my_delete(log_info.log_file_name, MYF(0)) && decrease_log_space) + *decrease_log_space-= tmp; + if (find_next_log(&log_info, 0) || exit_loop) break; } @@ -754,10 +797,11 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) If we get killed -9 here, the sysadmin would have to edit the log index file after restart - otherwise, this should be safe */ - error= update_log_index(&log_info); + error= update_log_index(&log_info, need_update_threads); err: - pthread_mutex_unlock(&LOCK_index); + if (need_mutex) + pthread_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } @@ -779,7 +823,7 @@ err: LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated */ -int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time) +int MYSQL_LOG::purge_logs_before_date(time_t purge_time) { int error; LOG_INFO log_info; @@ -816,7 +860,7 @@ int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time) If we get killed -9 here, the sysadmin would have to edit the log index file after restart - otherwise, this should be safe */ - error= update_log_index(&log_info); + error= update_log_index(&log_info, 1); err: pthread_mutex_unlock(&LOCK_index); @@ -1269,7 +1313,7 @@ err: { long purge_time= time(0) - expire_logs_days*24*60*60; if (purge_time >= 0) - error= purge_logs_before_date(current_thd, purge_time); + error= purge_logs_before_date(purge_time); } #endif @@ -1534,7 +1578,6 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, If you don't do it this way, you will get a deadlock in THD::awake() */ - void MYSQL_LOG:: wait_for_update(THD* thd) { safe_mutex_assert_owner(&LOCK_log); diff --git a/sql/log_event.cc b/sql/log_event.cc index d4efb65bf42..39db264d898 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli) */ if (rli) { - if (rli->inside_transaction) - rli->inc_pending(get_event_len()); + /* + If in a transaction, and if the slave supports transactions, + just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN + (not OPTION_NOT_AUTOCOMMIT) as transactions are logged + with BEGIN/COMMIT, not with SET AUTOCOMMIT= . + + CAUTION: opt_using_transactions means + innodb || bdb ; suppose the master supports InnoDB and BDB, + but the slave supports only BDB, problems + will arise: + - suppose an InnoDB table is created on the master, + - then it will be MyISAM on the slave + - but as opt_using_transactions is true, the slave will believe he is + transactional with the MyISAM table. And problems will come when one + does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at BEGIN + whereas there has not been any rollback). This is the problem of + using opt_using_transactions instead of a finer + "does the slave support _the_transactional_handler_used_on_the_master_". + + More generally, we'll have problems when a query mixes a transactional + handler and MyISAM and STOP SLAVE is issued in the middle of the + "transaction". START SLAVE will resume at BEGIN while the MyISAM table has + already been updated. + + */ + if ((thd->options & OPTION_BEGIN) && opt_using_transactions) + rli->inc_event_relay_log_pos(get_event_len()); else { - rli->inc_pos(get_event_len(),log_pos); + rli->inc_group_relay_log_pos(get_event_len(),log_pos); flush_relay_log_info(rli); } } @@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) thd->db = rewrite_db((char*)db); /* - InnoDB internally stores the master log position it has processed so far; - position to store is really pos + pending + event_len - since we must store the pos of the END of the current log event + InnoDB internally stores the master log position it has executed so far, + i.e. the position just after the COMMIT event. + When InnoDB will want to store, the positions in rli won't have + been updated yet, so group_master_log_* will point to old BEGIN + and event_master_log* will point to the beginning of current COMMIT. + So the position to store is event_master_log_pos + event_len + since we must store the pos of the END of the current log event (COMMIT). */ rli->event_len= get_event_len(); @@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) DBUG_PRINT("query",("%s",thd->query)); mysql_parse(thd, thd->query, q_len); - /* - Set a flag if we are inside an transaction so that we can restart - the transaction from the start if we are killed - - This will only be done if we are supporting transactional tables - in the slave. - */ - if (!strcmp(thd->query,"BEGIN")) - rli->inside_transaction= opt_using_transactions; - else if (!strcmp(thd->query,"COMMIT")) - rli->inside_transaction=0; - DBUG_PRINT("info",("expected_error: %d last_errno: %d", expected_error, thd->net.last_errno)); if ((expected_error != (actual_error= thd->net.last_errno)) && @@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int Rotate_log_event::exec_event(struct st_relay_log_info* rli) { - char* log_name = rli->master_log_name; + char* log_name = rli->group_master_log_name; DBUG_ENTER("Rotate_log_event::exec_event"); pthread_mutex_lock(&rli->data_lock); memcpy(log_name, new_log_ident, ident_len+1); - rli->master_log_pos = pos; - rli->relay_log_pos += get_event_len(); - DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos)); + rli->group_master_log_pos = pos; + rli->event_relay_log_pos += get_event_len(); + rli->group_relay_log_pos = rli->event_relay_log_pos; + DBUG_PRINT("info", ("group_master_log_pos: %d", (ulong) rli->group_master_log_pos)); pthread_mutex_unlock(&rli->data_lock); pthread_cond_broadcast(&rli->data_cond); flush_relay_log_info(rli); @@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) thd->next_insert_id = val; break; } - rli->inc_pending(get_event_len()); + rli->inc_event_relay_log_pos(get_event_len()); return 0; } #endif @@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) { thd->rand.seed1= (ulong) seed1; thd->rand.seed2= (ulong) seed2; - rli->inc_pending(get_event_len()); + rli->inc_event_relay_log_pos(get_event_len()); return 0; } #endif // !MYSQL_CLIENT @@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL); free_root(&thd->mem_root,0); - rli->inc_pending(get_event_len()); + rli->inc_event_relay_log_pos(get_event_len()); return 0; } #endif // !MYSQL_CLIENT @@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&rli->data_lock); master_host_len = strlen(mi->host); - master_log_len = strlen(rli->master_log_name); + master_log_len = strlen(rli->group_master_log_name); // on OOM, just do not initialize the structure and print the error if ((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) @@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg, master_host = mem_pool + SL_MASTER_HOST_OFFSET ; memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; - memcpy(master_log, rli->master_log_name, master_log_len + 1); + memcpy(master_log, rli->group_master_log_name, master_log_len + 1); master_port = mi->port; - master_pos = rli->master_log_pos; + master_pos = rli->group_master_log_pos; DBUG_PRINT("info", ("master_log: %s pos: %d", master_log, (ulong) master_pos)); } @@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db) int Stop_log_event::exec_event(struct st_relay_log_info* rli) { // do not clean up immediately after rotate event - if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) + if (rli->group_master_log_pos > BIN_LOG_HEADER_SIZE) { close_temporary_tables(thd); cleanup_load_tmpdir(); } /* We do not want to update master_log pos because we get a rotate event - before stop, so by now master_log_name is set to the next log. + before stop, so by now group_master_log_name is set to the next log. If we updated it, we will have incorrect master coordinates and this could give false triggers in MASTER_POS_WAIT() that we have reached the target position when in fact we have not. */ - rli->inc_pos(get_event_len(), 0); + rli->inc_group_relay_log_pos(get_event_len(), 0); flush_relay_log_info(rli); return 0; } diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 4203d440667..d17faa3cea5 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size; extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log; extern ulong specialflag, current_pid; extern ulong expire_logs_days; - +extern my_bool relay_log_purge; extern uint test_flags,select_errors,ha_open_options; extern uint protocol_version,dropping_tables; extern uint delay_key_write_options; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index ea0311bafc6..5a6ea939878 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; my_bool lower_case_table_names, opt_old_rpl_compat; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0; -my_bool opt_console= 0; +my_bool opt_console= 0, opt_bdb, opt_innodb; volatile bool mqh_used = 0; FILE *bootstrap_file=0; @@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */ const char *myisam_recover_options_str="OFF"; const char *sql_mode_str="OFF"; ulong rpl_recovery_rank=0; +my_bool relay_log_purge=1; my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL; MY_TMPDIR mysql_tmpdir_list; @@ -2208,7 +2209,7 @@ static int init_server_components() { long purge_time= time(0) - expire_logs_days*24*60*60; if (purge_time >= 0) - mysql_bin_log.purge_logs_before_date(current_thd, purge_time); + mysql_bin_log.purge_logs_before_date(purge_time); } #endif } @@ -3400,7 +3401,7 @@ enum options OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR, OPT_BDB_HOME, OPT_BDB_LOG, OPT_BDB_TMP, OPT_BDB_NOSYNC, - OPT_BDB_LOCK, OPT_BDB_SKIP, + OPT_BDB_LOCK, OPT_BDB, OPT_BDB_NO_RECOVER, OPT_BDB_SHARED, OPT_MASTER_HOST, OPT_MASTER_USER, OPT_MASTER_PASSWORD, OPT_MASTER_PORT, @@ -3430,7 +3431,7 @@ enum options OPT_INNODB_FLUSH_METHOD, OPT_INNODB_FAST_SHUTDOWN, OPT_SAFE_SHOW_DB, - OPT_INNODB_SKIP, OPT_SKIP_SAFEMALLOC, + OPT_INNODB, OPT_SKIP_SAFEMALLOC, OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, @@ -3468,7 +3469,7 @@ enum options OPT_OPEN_FILES_LIMIT, OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE, OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER, - OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, + OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RELAY_LOG_PURGE, OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME, OPT_SORT_BUFFER, OPT_TABLE_CACHE, OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE, @@ -3529,8 +3530,10 @@ struct my_option my_long_options[] = (gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #endif /* HAVE_BERKELEY_DB */ - {"skip-bdb", OPT_BDB_SKIP, "Don't use berkeley db (will save memory)", - 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"bdb", OPT_BDB, "Enable Berkeley DB (if this version of MySQL supports it). \ +Disable with --skip-bdb (will save memory)", + (gptr*) &opt_bdb, (gptr*) &opt_bdb, 0, GET_BOOL, NO_ARG, 1, 0, 0, + 0, 0, 0}, {"big-tables", OPT_BIG_TABLES, "Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -3885,8 +3888,10 @@ struct my_option my_long_options[] = "Start without grant tables. This gives all users FULL ACCESS to all tables!", (gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, - {"skip-innodb", OPT_INNODB_SKIP, "Don't use Innodb (will save memory)", - 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"innodb", OPT_INNODB, "Enable InnoDB (if this version of MySQL supports it). \ +Disable with --skip-innodb (will save memory)", + (gptr*) &opt_innodb, (gptr*) &opt_innodb, 0, GET_BOOL, NO_ARG, 1, 0, 0, + 0, 0, 0}, {"skip-locking", OPT_SKIP_LOCK, "Deprecated option, use --skip-external-locking instead", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -4272,6 +4277,11 @@ struct my_option my_long_options[] = (gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG, 128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0}, #ifdef HAVE_REPLICATION + {"relay_log_purge", OPT_RELAY_LOG_PURGE, + "0 = do not purge relay logs. 1 = purge them as soon as they are no more needed.", + (gptr*) &relay_log_purge, + (gptr*) &relay_log_purge, 0, GET_BOOL, NO_ARG, + 1, 0, 1, 0, 1, 0}, {"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT, "Max space to use for all relay logs", (gptr*) &relay_log_space_limit, @@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), berkeley_shared_data=1; break; #endif /* HAVE_BERKELEY_DB */ - case OPT_BDB_SKIP: + case OPT_BDB: #ifdef HAVE_BERKELEY_DB - berkeley_skip=1; - have_berkeley_db=SHOW_OPTION_DISABLED; + if (opt_bdb) + { + berkeley_skip=0; + have_berkeley_db=SHOW_OPTION_YES; + } + else + { + berkeley_skip=1; + have_berkeley_db=SHOW_OPTION_DISABLED; + } #endif break; - case OPT_INNODB_SKIP: + case OPT_INNODB: #ifdef HAVE_INNOBASE_DB - innodb_skip=1; - have_innodb=SHOW_OPTION_DISABLED; + if (opt_innodb) + { + innodb_skip=0; + have_innodb=SHOW_OPTION_YES; + } + else + { + innodb_skip=1; + have_innodb=SHOW_OPTION_DISABLED; + } #endif break; case OPT_INNODB_DATA_FILE_PATH: diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 46791c13219..58769827bed 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -873,7 +873,6 @@ int load_master_data(THD* thd) // don't hit the magic number if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE) active_mi->master_log_pos = BIN_LOG_HEADER_SIZE; - active_mi->rli.pending = 0; flush_master_info(active_mi); } mc_mysql_free_result(master_status_res); @@ -897,9 +896,13 @@ int load_master_data(THD* thd) return 1; } pthread_mutex_lock(&active_mi->rli.data_lock); - active_mi->rli.master_log_pos = active_mi->master_log_pos; - strmake(active_mi->rli.master_log_name,active_mi->master_log_name, - sizeof(active_mi->rli.master_log_name)-1); + active_mi->rli.group_master_log_pos = active_mi->master_log_pos; + strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name, + sizeof(active_mi->rli.group_master_log_name)-1); + /* + No need to update rli.event* coordinates, they will be when the slave + threads start ; only rli.group* coordinates are necessary here. + */ flush_relay_log_info(&active_mi->rli); pthread_cond_broadcast(&active_mi->rli.data_cond); pthread_mutex_unlock(&active_mi->rli.data_lock); diff --git a/sql/set_var.cc b/sql/set_var.cc index d39a506c82d..dc6a2f71133 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size", &SV::read_buff_size); sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size", &SV::read_rnd_buff_size); +#ifdef HAVE_REPLICATION +sys_var_bool_ptr sys_relay_log_purge("relay_log_purge", + &relay_log_purge); +#endif sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank", &rpl_recovery_rank); sys_var_long_ptr sys_query_cache_size("query_cache_size", @@ -407,6 +411,9 @@ sys_var *sys_variables[]= &sys_rand_seed2, &sys_read_buff_size, &sys_read_rnd_buff_size, +#ifdef HAVE_REPLICATION + &sys_relay_log_purge, +#endif &sys_rpl_recovery_rank, &sys_safe_updates, &sys_select_limit, @@ -563,6 +570,9 @@ struct show_var_st init_vars[]= { {sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS}, {sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS}, {sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS}, +#ifdef HAVE_REPLICATION + {sys_relay_log_purge.name, (char*) &sys_relay_log_purge, SHOW_SYS}, +#endif {sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS}, #ifdef HAVE_QUERY_CACHE {sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS}, diff --git a/sql/slave.cc b/sql/slave.cc index fa7ab180eaa..eae795ae760 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -217,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, - If not, open the 'log' binary file. TODO - - check proper initialization of master_log_name/master_log_pos - - We may always want to delete all logs before 'log'. - Currently if we are not calling this with 'log' as NULL or the first - log we will never delete relay logs. - If we want this we should not set skip_log_purge to 1. + - check proper initialization of group_master_log_name/group_master_log_pos RETURN VALUES 0 ok @@ -248,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, rli->cur_log_fd = -1; } - rli->relay_log_pos = pos; + rli->group_relay_log_pos = rli->event_relay_log_pos = pos; /* Test to see if the previous run was with the skip of purging @@ -260,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, goto err; } - if (log) // If not first log + if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1)) { - if (strcmp(log, rli->linfo.log_file_name)) - rli->skip_log_purge= 1; // Different name; Don't purge - if (rli->relay_log.find_log_pos(&rli->linfo, log, 1)) - { - *errmsg="Could not find target log during relay log initialization"; - goto err; - } + *errmsg="Could not find target log during relay log initialization"; + goto err; } - strmake(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)-1); + strmake(rli->group_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->event_relay_log_name)-1); if (rli->relay_log.is_active(rli->linfo.log_file_name)) { /* @@ -302,7 +295,7 @@ err: If we don't purge, we can't honour relay_log_space_limit ; silently discard it */ - if (rli->skip_log_purge) + if (!relay_log_purge) rli->log_space_limit= 0; pthread_cond_broadcast(&rli->data_cond); if (need_data_lock) @@ -383,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, to display fine in any case. */ - rli->master_log_name[0]= 0; - rli->master_log_pos= 0; - rli->pending= 0; + rli->group_master_log_name[0]= 0; + rli->group_master_log_pos= 0; if (!rli->inited) DBUG_RETURN(0); @@ -402,16 +394,18 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, goto err; } /* Save name of used relay log file */ - strmake(rli->relay_log_name, rli->relay_log.get_log_fname(), - sizeof(rli->relay_log_name)-1); + strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->event_relay_log_name)-1); // Just first log with magic number and nothing else rli->log_space_total= BIN_LOG_HEADER_SIZE; - rli->relay_log_pos= BIN_LOG_HEADER_SIZE; + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; rli->relay_log.reset_bytes_written(); if (!just_reset) - error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos, - 0 /* do not need data lock */, errmsg); - + error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, + 0 /* do not need data lock */, errmsg); + err: #ifndef DBUG_OFF char buf[22]; @@ -1238,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) fn_format(fname, info_fname, mysql_data_home, "", 4+32); pthread_mutex_lock(&rli->data_lock); info_fd = rli->info_fd; - rli->pending = 0; rli->cur_log_fd = -1; rli->slave_skip_counter=0; rli->abort_pos_wait=0; - rli->skip_log_purge=0; - rli->log_space_limit = relay_log_space_limit; - rli->log_space_total = 0; + rli->log_space_limit= relay_log_space_limit; + rli->log_space_total= 0; // TODO: make this work with multi-master if (!opt_relay_logname) @@ -1285,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, &msg)) goto err; - rli->master_log_name[0]= 0; - rli->master_log_pos= 0; + rli->group_master_log_name[0]= 0; + rli->group_master_log_pos= 0; rli->info_fd= info_fd; } else // file exists @@ -1307,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->info_fd = info_fd; int relay_log_pos, master_log_pos; - if (init_strvar_from_file(rli->relay_log_name, - sizeof(rli->relay_log_name), &rli->info_file, + if (init_strvar_from_file(rli->group_relay_log_name, + sizeof(rli->group_relay_log_name), &rli->info_file, "") || init_intvar_from_file(&relay_log_pos, &rli->info_file, BIN_LOG_HEADER_SIZE) || - init_strvar_from_file(rli->master_log_name, - sizeof(rli->master_log_name), &rli->info_file, + init_strvar_from_file(rli->group_master_log_name, + sizeof(rli->group_master_log_name), &rli->info_file, "") || init_intvar_from_file(&master_log_pos, &rli->info_file, 0)) { msg="Error reading slave log configuration"; goto err; } - rli->relay_log_pos= relay_log_pos; - rli->master_log_pos= master_log_pos; + strmake(rli->event_relay_log_name,rli->group_relay_log_name, + sizeof(rli->event_relay_log_name)-1); + rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; + rli->group_master_log_pos= master_log_pos; if (init_relay_log_pos(rli, - rli->relay_log_name, - rli->relay_log_pos, + rli->group_relay_log_name, + rli->group_relay_log_pos, 0 /* no data lock*/, &msg)) goto err; } - DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); /* Now change the cache from READ to WRITE - must do this before flush_relay_log_info @@ -1407,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) { LOG_INFO linfo; DBUG_ENTER("count_relay_log_space"); - rli->log_space_total = 0; + rli->log_space_total= 0; if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) { sql_print_error("Could not find first log while counting relay log space"); @@ -1631,10 +1625,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi) protocol->store((uint32) mi->connect_retry); protocol->store(mi->master_log_name, &my_charset_bin); protocol->store((ulonglong) mi->master_log_pos); - protocol->store(mi->rli.relay_log_name + - dirname_length(mi->rli.relay_log_name), &my_charset_bin); - protocol->store((ulonglong) mi->rli.relay_log_pos); - protocol->store(mi->rli.master_log_name, &my_charset_bin); + protocol->store(mi->rli.group_relay_log_name + + dirname_length(mi->rli.group_relay_log_name), &my_charset_bin); + protocol->store((ulonglong) mi->rli.group_relay_log_pos); + protocol->store(mi->rli.group_master_log_name, &my_charset_bin); protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin); protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); protocol->store(&replicate_do_db); @@ -1642,7 +1636,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) protocol->store((uint32) mi->rli.last_slave_errno); protocol->store(mi->rli.last_slave_error, &my_charset_bin); protocol->store((uint32) mi->rli.slave_skip_counter); - protocol->store((ulonglong) mi->rli.master_log_pos); + protocol->store((ulonglong) mi->rli.group_master_log_pos); protocol->store((ulonglong) mi->rli.log_space_total); pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->data_lock); @@ -1673,17 +1667,15 @@ bool flush_master_info(MASTER_INFO* mi) st_relay_log_info::st_relay_log_info() - :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0), - cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0), - slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), - sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), - slave_running(0), skip_log_purge(0), - inside_transaction(0) /* the default is autocommit=1 */ -{ - relay_log_name[0] = master_log_name[0] = 0; + :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), + cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), + ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0), + slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), + slave_running(0) +{ + group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; last_slave_error[0]=0; - bzero(&info_file,sizeof(info_file)); bzero(&cache_buf, sizeof(cache_buf)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); @@ -1745,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, set_timespec(abstime,timeout); DBUG_ENTER("wait_for_pos"); - DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld", - master_log_name, (ulong) master_log_pos, + DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld", + group_master_log_name, (ulong) group_master_log_pos, (long) timeout)); pthread_mutex_lock(&data_lock); @@ -1796,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, { bool pos_reached; int cmp_result= 0; - DBUG_ASSERT(*master_log_name || master_log_pos == 0); - if (*master_log_name) + DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0); + if (*group_master_log_name) { - char *basename= master_log_name + dirname_length(master_log_name); + char *basename= group_master_log_name + dirname_length(group_master_log_name); /* First compare the parts before the extension. Find the dot in the master's log basename, @@ -1814,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, } // Now compare extensions. char *q_end; - ulong master_log_name_extension= strtoul(q, &q_end, 10); - if (master_log_name_extension < log_name_extension) + ulong group_master_log_name_extension= strtoul(q, &q_end, 10); + if (group_master_log_name_extension < log_name_extension) cmp_result = -1 ; else - cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ; + cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ; } - pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) || + pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) || cmp_result > 0); if (pos_reached || thd->killed) break; @@ -2127,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { /* TODO: I/O thread should not even log events with the same server id */ - rli->inc_pos(ev->get_event_len(), + rli->inc_group_relay_log_pos(ev->get_event_len(), type_code != STOP_EVENT ? ev->log_pos : LL(0), 1/* skip lock*/); flush_relay_log_info(rli); @@ -2497,15 +2489,13 @@ slave_begin: rli->abort_slave = 0; pthread_mutex_unlock(&rli->run_lock); pthread_cond_broadcast(&rli->start_cond); - // This should always be set to 0 when the slave thread is started - rli->pending = 0; //tell the I/O thread to take relay_log_space_limit into account from now on rli->ignore_log_space_limit= 0; if (init_relay_log_pos(rli, - rli->relay_log_name, - rli->relay_log_pos, + rli->group_relay_log_name, + rli->group_relay_log_pos, 1 /*need data lock*/, &errmsg)) { sql_print_error("Error initializing relay log position: %s", @@ -2513,18 +2503,18 @@ slave_begin: goto err; } THD_CHECK_SENTRY(thd); - DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); DBUG_ASSERT(rli->sql_thd == thd); DBUG_PRINT("master_info",("log_file_name: %s position: %s", - rli->master_log_name, - llstr(rli->master_log_pos,llbuff))); + rli->group_master_log_name, + llstr(rli->group_master_log_pos,llbuff))); if (global_system_variables.log_warnings) sql_print_error("Slave SQL thread initialized, starting replication in \ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, - llstr(rli->master_log_pos,llbuff),rli->relay_log_name, - llstr(rli->relay_log_pos,llbuff1)); + llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, + llstr(rli->group_relay_log_pos,llbuff1)); /* Read queries from the IO/THREAD until this thread is killed */ @@ -2541,7 +2531,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, Error running query, slave SQL thread aborted. Fix the problem, and restart \ the slave SQL thread with \"SLAVE START\". We stopped at log \ '%s' position %s", - RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); + RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); goto err; } } @@ -2549,7 +2539,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ /* Thread stopped. Print the current replication position to the log */ sql_print_error("Slave SQL thread exiting, replication stopped in log \ '%s' at position %s", - RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); + RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); err: VOID(pthread_mutex_lock(&LOCK_thread_count)); @@ -2699,7 +2689,7 @@ err: rev The rotate log event read from the binary log DESCRIPTION - Updates the master info and relay data with the place in the next binary + Updates the master info with the place in the next binary log where we should start reading. NOTES @@ -3073,18 +3063,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli) IO_CACHE *file = &rli->info_file; char buff[FN_REFLEN*2+22*2+4], *pos; - /* sql_thd is not set when calling from init_slave() */ - if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN)) - return 0; // Wait for COMMIT - my_b_seek(file, 0L); - pos=strmov(buff, rli->relay_log_name); + pos=strmov(buff, rli->group_relay_log_name); *pos++='\n'; - pos=longlong2str(rli->relay_log_pos, pos, 10); + pos=longlong2str(rli->group_relay_log_pos, pos, 10); *pos++='\n'; - pos=strmov(pos, rli->master_log_name); + pos=strmov(pos, rli->group_master_log_name); *pos++='\n'; - pos=longlong2str(rli->master_log_pos, pos, 10); + pos=longlong2str(rli->group_master_log_pos, pos, 10); *pos='\n'; if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1)) error=1; @@ -3107,7 +3093,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) DBUG_ENTER("reopen_relay_log"); IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; - if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, + if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name, errmsg)) <0) DBUG_RETURN(0); /* @@ -3115,7 +3101,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) relay_log_pos Current log pos pending Number of bytes already processed from the event */ - my_b_seek(cur_log,rli->relay_log_pos + rli->pending); + my_b_seek(cur_log,rli->event_relay_log_pos); DBUG_RETURN(cur_log); } @@ -3124,7 +3110,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) { Log_event* ev; IO_CACHE* cur_log = rli->cur_log; - pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); + pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; THD* thd = rli->sql_thd; DBUG_ENTER("next_event"); @@ -3173,7 +3159,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) } } DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); + DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); /* Relay log is always in new format - if the master is 3.23, the I/O thread will convert the format for us @@ -3240,8 +3226,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli) // prevent the I/O thread from blocking next times rli->ignore_log_space_limit= 1; // If the I/O thread is blocked, unblock it - pthread_cond_broadcast(&rli->log_space_cond); pthread_mutex_unlock(&rli->log_space_lock); + pthread_cond_broadcast(&rli->log_space_cond); // Note that wait_for_update unlocks lock_log ! rli->relay_log.wait_for_update(rli->sql_thd); // re-acquire data lock since we released it earlier @@ -3258,16 +3244,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli) my_close(rli->cur_log_fd, MYF(MY_WME)); rli->cur_log_fd = -1; - /* - TODO: make skip_log_purge a start-up option. At this point this - is not critical priority - */ - if (!rli->skip_log_purge) + if (relay_log_purge) { - // purge_first_log will properly set up relay log coordinates in rli - if (rli->relay_log.purge_first_log(rli)) + /* + purge_first_log will properly set up relay log coordinates in rli. + If the group's coordinates are equal to the event's coordinates + (i.e. the relay log was not rotated in the middle of a group), + we can purge this relay log too. + We do ulonglong and string comparisons, this may be slow but + - purging the last relay log is nice (it can save 1GB of disk), so we + like to detect the case where we can do it, and given this, + - I see no better detection method + - purge_first_log is not called that often + */ + if (rli->relay_log.purge_first_log + (rli, + rli->group_relay_log_pos == rli->event_relay_log_pos + && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name))) { - errmsg = "Error purging processed log"; + errmsg = "Error purging processed logs"; goto err; } } @@ -3285,10 +3280,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli) errmsg = "error switching to the next log"; goto err; } - rli->relay_log_pos = BIN_LOG_HEADER_SIZE; - rli->pending=0; - strmake(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)-1); + rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; + strmake(rli->event_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->event_relay_log_name)-1); flush_relay_log_info(rli); } @@ -3336,7 +3330,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) event(errno: %d cur_log->error: %d)", my_errno,cur_log->error); // set read position to the beginning of the event - my_b_seek(cur_log,rli->relay_log_pos+rli->pending); + my_b_seek(cur_log,rli->event_relay_log_pos); /* otherwise, we have had a partial read */ errmsg = "Aborting slave SQL thread because of partial event read"; break; // To end of function diff --git a/sql/slave.h b/sql/slave.h index a4db7388be5..16ba7f80471 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -92,12 +92,6 @@ typedef struct st_relay_log_info cur_log_fd - file descriptor of the current read relay log */ File info_fd,cur_log_fd; - /* name of current read relay log */ - char relay_log_name[FN_REFLEN]; - /* master log name corresponding to current read position */ - char master_log_name[FN_REFLEN]; - /* original log position of last processed event */ - volatile my_off_t master_log_pos; /* Protected with internal locks. @@ -142,20 +136,36 @@ typedef struct st_relay_log_info uint32 cur_log_old_open_count; /* - relay_log_pos - Current offset in the relay log. - pending - In some cases we do not increment offset immediately - after processing an event, because the following event - needs to be processed atomically together with this one - such as: - - Intvar_event - sets auto_increment value - Rand_event - sets the random seed - - However, once both events have been processed, we need to - increment by the cumulative offset. 'pending' stores the - extra offset to be added to the position. + Let's call a group (of events) : + - a transaction + or + - an autocommiting query + its associated events (INSERT_ID, + TIMESTAMP...) + We need these rli coordinates : + - relay log name and position of the beginning of the group we currently are + executing. Needed to know where we have to restart when replication has + stopped in the middle of a group (which has been rolled back by the slave). + - relay log name and position just after the event we have just + executed. This event is part of the current group. + Formerly we only had the immediately above coordinates, plus a 'pending' + variable, but this dealt wrong with the case of a transaction starting on a + relay log and finishing (commiting) on another relay log. Case which can + happen when, for example, the relay log gets rotated because of + max_binlog_size. + */ + char group_relay_log_name[FN_REFLEN]; + ulonglong group_relay_log_pos; + char event_relay_log_name[FN_REFLEN]; + ulonglong event_relay_log_pos; + /* + Original log name and position of the group we're currently executing + (whose coordinates are group_relay_log_name/pos in the relay log) + in the master's binlog. These concern the *group*, because in the master's + binlog the log_pos that comes with each event is the position of the + beginning of the group. */ - ulonglong relay_log_pos, pending; + char group_master_log_name[FN_REFLEN]; + volatile my_off_t group_master_log_pos; /* Handling of the relay_log_space_limit optional constraint. @@ -193,38 +203,39 @@ typedef struct st_relay_log_info /* if not set, the value of other members of the structure are undefined */ bool inited; volatile bool abort_slave, slave_running; - bool skip_log_purge; - bool inside_transaction; st_relay_log_info(); ~st_relay_log_info(); - inline void inc_pending(ulonglong val) + + inline void inc_event_relay_log_pos(ulonglong val) { - pending += val; + event_relay_log_pos+= val; } - /* TODO: this probably needs to be fixed */ - inline void inc_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0) + + void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0) { if (!skip_lock) pthread_mutex_lock(&data_lock); - relay_log_pos += val+pending; - pending = 0; - if (log_pos) - master_log_pos = log_pos+ val; + inc_event_relay_log_pos(val); + group_relay_log_pos= event_relay_log_pos; + strmake(group_relay_log_name,event_relay_log_name, + sizeof(group_relay_log_name)-1); + /* + If the slave does not support transactions and replicates a transaction, + users should not trust group_master_log_pos (which they can display with + SHOW SLAVE STATUS or read from relay-log.info), because to compute + group_master_log_pos the slave relies on log_pos stored in the master's + binlog, but if we are in a master's transaction these positions are always + the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does + not advance as it should on the non-transactional slave (it advances by + big leaps, whereas it should advance by small leaps). + */ + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos+ val; pthread_cond_broadcast(&data_cond); if (!skip_lock) pthread_mutex_unlock(&data_lock); } - /* - thread safe read of position - not needed if we are in the slave thread, - but required otherwise as var is a longlong - */ - inline void read_pos(ulonglong& var) - { - pthread_mutex_lock(&data_lock); - var = relay_log_pos; - pthread_mutex_unlock(&data_lock); - } int wait_for_pos(THD* thd, String* log_name, longlong log_pos, longlong timeout); @@ -334,7 +345,7 @@ typedef struct st_table_rule_ent #define TABLE_RULE_ARR_SIZE 16 #define MAX_SLAVE_ERRMSG 1024 -#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\ +#define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\ "FIRST") #define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\ "FIRST") diff --git a/sql/sql_class.h b/sql/sql_class.h index d3cb843ad85..7688d21c33e 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -145,10 +145,12 @@ public: int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); bool is_active(const char* log_file_name); - int update_log_index(LOG_INFO* linfo); - int purge_logs(THD* thd, const char* to_log); - int purge_logs_before_date(THD* thd, time_t purge_time); - int purge_first_log(struct st_relay_log_info* rli); + int update_log_index(LOG_INFO* linfo, bool need_update_threads); + int purge_logs(const char *to_log, bool included, + bool need_mutex, bool need_update_threads, + ulonglong *decrease_log_space); + int purge_logs_before_date(time_t purge_time); + int purge_first_log(struct st_relay_log_info* rli, bool included); bool reset_logs(THD* thd); // if we are exiting, we also want to close the index file void close(bool exiting = 0); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index d0a970c98b7..ba8a4af794a 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables) { long purge_time= time(0) - expire_logs_days*24*60*60; if (purge_time >= 0) - mysql_bin_log.purge_logs_before_date(thd, purge_time); + mysql_bin_log.purge_logs_before_date(purge_time); } #endif mysql_slow_log.new_file(1); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 7e9b6aea7b5..0eb444b85c0 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log) char search_file_name[FN_REFLEN]; mysql_bin_log.make_log_name(search_file_name, to_log); - int res = mysql_bin_log.purge_logs(thd, search_file_name); + int res = mysql_bin_log.purge_logs(search_file_name, 0, 1, 1, NULL); return purge_error_message(thd, res); } @@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log) int purge_master_logs_before_date(THD* thd, time_t purge_time) { - int res = mysql_bin_log.purge_logs_before_date(thd, purge_time); + int res = mysql_bin_log.purge_logs_before_date(purge_time); return purge_error_message(thd ,res); } @@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi) error=1; goto err; } - //delete relay logs, clear relay log coordinates + // delete relay logs, clear relay log coordinates if ((error= purge_relay_logs(&mi->rli, thd, 1 /* just reset */, &errmsg))) goto err; - //Clear master's log coordinates (only for good display of SHOW SLAVE STATUS) + // Clear master's log coordinates (only for good display of SHOW SLAVE STATUS) mi->master_log_name[0]= 0; mi->master_log_pos= BIN_LOG_HEADER_SIZE; - //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 + // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 end_master_info(mi); - //and delete these two files + // and delete these two files fn_format(fname, master_info_file, mysql_data_home, "", 4+32); if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) { error=1; goto err; } + // delete relay_log_info_file fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) { @@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi) // if we change host or port, we must reset the postion mi->master_log_name[0] = 0; mi->master_log_pos= BIN_LOG_HEADER_SIZE; - mi->rli.pending = 0; } if (lex_mi->log_file_name) @@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi) if (lex_mi->pos) { mi->master_log_pos= lex_mi->pos; - mi->rli.pending = 0; } DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); @@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi) if (lex_mi->relay_log_name) { need_relay_log_purge= 0; - strmake(mi->rli.relay_log_name,lex_mi->relay_log_name, - sizeof(mi->rli.relay_log_name)-1); + strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name, + sizeof(mi->rli.group_relay_log_name)-1); + strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name, + sizeof(mi->rli.event_relay_log_name)-1); } if (lex_mi->relay_log_pos) { need_relay_log_purge= 0; - mi->rli.relay_log_pos=lex_mi->relay_log_pos; + mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; } flush_master_info(mi); if (need_relay_log_purge) { - mi->rli.skip_log_purge= 0; + relay_log_purge= 1; thd->proc_info="purging old relay logs"; if (purge_relay_logs(&mi->rli, thd, 0 /* not only reset, but also reinit */, @@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi) else { const char* msg; - mi->rli.skip_log_purge= 1; + relay_log_purge= 0; /* Relay log is already initialized */ if (init_relay_log_pos(&mi->rli, - mi->rli.relay_log_name, - mi->rli.relay_log_pos, + mi->rli.group_relay_log_name, + mi->rli.group_relay_log_pos, 0 /*no data lock*/, &msg)) { @@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi) DBUG_RETURN(1); } } - mi->rli.master_log_pos = mi->master_log_pos; + mi->rli.group_master_log_pos = mi->master_log_pos; DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); - strmake(mi->rli.master_log_name,mi->master_log_name, - sizeof(mi->rli.master_log_name)-1); - if (!mi->rli.master_log_name[0]) // uninitialized case - mi->rli.master_log_pos=0; + strmake(mi->rli.group_master_log_name,mi->master_log_name, + sizeof(mi->rli.group_master_log_name)-1); + if (!mi->rli.group_master_log_name[0]) // uninitialized case + mi->rli.group_master_log_pos=0; pthread_mutex_lock(&mi->rli.data_lock); mi->rli.abort_pos_wait++; |