summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <guilhem@mysql.com>2003-04-24 15:29:25 +0200
committerunknown <guilhem@mysql.com>2003-04-24 15:29:25 +0200
commit9f4f19d9f77433e02e9985d9c2f3224d062173b3 (patch)
tree0264c0e9bae7ff461d619885e3e94778f1749a23
parent4ac98ec5215dedff9e24b91d305e9eec5ee3b96f (diff)
downloadmariadb-git-9f4f19d9f77433e02e9985d9c2f3224d062173b3.tar.gz
Replication: new code to not modify in-memory log positions until the COMMIT
is executed, even if the transaction spans on >=2 relay logs (bug #53). New variable relay_log_purge =0|1 New test to verify bug #53 sql/log.cc: Now we purge a relay log only when we are sure we won't need it, i.e. we have executed the final query (if autocommit=1) or the COMMIT. sql/log_event.cc: Better tracking of the relay log's name and position lastly executed, even if we are in a transaction which spans on 2 or more relay logs. sql/mysql_priv.h: new option relay_log_purge (the user can now decide himself if he wants his relay logs to be automatically purged or not, we don't make unsafe guesses like before) sql/mysqld.cc: new option --innodb (replaces --skip-innodb). Useful for the test suite : we have skip-innodb in mysql-test-run, but we can ('-opt.info' file) choose to start the server with InnoDB for this test only. New option --bdb sql/repl_failsafe.cc: Better tracking of the relay log's name and position lastly executed, even if we are in a transaction which spans on 2 or more relay logs. sql/set_var.cc: new variable relay_log_purge sql/slave.cc: Better tracking of the relay log's name and position lastly executed, even if we are in a transaction which spans on 2 or more relay logs. Now we purge a relay log only when we are sure we won't need it, i.e. we have executed the final query (if autocommit=1) or the COMMIT sql/slave.h: Better tracking of the relay log's name and position lastly executed, even if we are in a transaction which spans on 2 or more relay logs. sql/sql_class.h: prototypes change sql/sql_parse.cc: removed thd argument (was not used in the function's body) sql/sql_repl.cc: Better tracking of the relay log's name and position lastly executed, even if we are in a transaction which spans on 2 or more relay logs. Turn relay_log_purge silently off when someone does CHANGE MASTER TO RELAY_LOG_*
-rw-r--r--mysql-test/r/rpl_relayrotate.result18
-rw-r--r--mysql-test/t/rpl_relayrotate-slave.opt4
-rw-r--r--mysql-test/t/rpl_relayrotate.test61
-rw-r--r--sql/log.cc167
-rw-r--r--sql/log_event.cc80
-rw-r--r--sql/mysql_priv.h2
-rw-r--r--sql/mysqld.cc56
-rw-r--r--sql/repl_failsafe.cc11
-rw-r--r--sql/set_var.cc10
-rw-r--r--sql/slave.cc206
-rw-r--r--sql/slave.h91
-rw-r--r--sql/sql_class.h10
-rw-r--r--sql/sql_parse.cc2
-rw-r--r--sql/sql_repl.cc41
14 files changed, 475 insertions, 284 deletions
diff --git a/mysql-test/r/rpl_relayrotate.result b/mysql-test/r/rpl_relayrotate.result
new file mode 100644
index 00000000000..45f425a3532
--- /dev/null
+++ b/mysql-test/r/rpl_relayrotate.result
@@ -0,0 +1,18 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+stop slave;
+create table t1 (a int) type=innodb;
+reset slave;
+start slave;
+stop slave;
+start slave;
+select master_pos_wait('master-bin.001',3000,120)=-1;
+master_pos_wait('master-bin.001',3000,120)=-1
+0
+select * from t1 where a=8000;
+a
+8000
diff --git a/mysql-test/t/rpl_relayrotate-slave.opt b/mysql-test/t/rpl_relayrotate-slave.opt
new file mode 100644
index 00000000000..8b671423363
--- /dev/null
+++ b/mysql-test/t/rpl_relayrotate-slave.opt
@@ -0,0 +1,4 @@
+-O max_binlog_size=16384
+--innodb
+--log-warnings
+
diff --git a/mysql-test/t/rpl_relayrotate.test b/mysql-test/t/rpl_relayrotate.test
new file mode 100644
index 00000000000..4c330b8c9a2
--- /dev/null
+++ b/mysql-test/t/rpl_relayrotate.test
@@ -0,0 +1,61 @@
+# When the relay log gets rotated while the I/O thread
+# is reading a transaction, the transaction spans on two or more
+# relay logs. If STOP SLAVE occurs while the SQL thread is
+# executing a part of the transaction in the non-first relay logs,
+# we test if START SLAVE will resume in the beginning of the
+# transaction (i.e., step back to the first relay log)
+
+# The slave is started with max_binlog_size=16384 bytes,
+# to force many rotations (approximately 30 rotations)
+
+# If the master or slave does not support InnoDB, this test will pass
+
+source include/master-slave.inc;
+connection slave;
+stop slave;
+connection master;
+create table t1 (a int) type=innodb;
+let $1=8000;
+disable_query_log;
+begin;
+while ($1)
+{
+# eval means expand $ expressions
+ eval insert into t1 values( $1 );
+ dec $1;
+}
+commit;
+# This will generate a 500kB master's binlog,
+# which corresponds to 30 slave's relay logs.
+enable_query_log;
+save_master_pos;
+connection slave;
+reset slave;
+start slave;
+# We wait 1 sec for the SQL thread to be somewhere in
+# the middle of the transaction, hopefully not in
+# the first relay log, and hopefully before the COMMIT.
+# Usually it stops when the SQL thread is around the 15th relay log.
+# We cannot use MASTER_POS_WAIT() as master's position
+# increases only when the slave executes the COMMIT.
+system sleep 1;
+stop slave;
+# We suppose the SQL thread stopped before COMMIT.
+# If so the transaction was rolled back
+# and the table is now empty.
+# Now restart
+start slave;
+# And see if the table contains '8000'
+# which proves that the transaction restarted at
+# the right place.
+# We must wait for the transaction to commit before
+# reading, MASTER_POS_WAIT() will do it for sure
+# (the only statement with position>=3000 is COMMIT).
+# Older versions of MySQL would hang forever in MASTER_POS_WAIT
+# because COMMIT was said to be position 0 in the master's log (bug).
+# Detect this with timeout.
+select master_pos_wait('master-bin.001',3000,120)=-1;
+select * from t1 where a=8000;
+# Note that the simple fact to have less than around 30 slave's binlogs
+# (the slave is started with --log-slave-updates) is already
+# a proof that the transaction was not properly resumed.
diff --git a/sql/log.cc b/sql/log.cc
index 51b1c572601..50471041ee1 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -592,24 +592,32 @@ err:
/*
- Delete the current log file, remove it from index file and start on next
+ Delete relay log files prior to rli->group_relay_log_name
+ (i.e. all logs which are not involved in a non-finished group
+ (transaction)), remove them from the index file and start on next relay log.
SYNOPSIS
purge_first_log()
- rli Relay log information
-
+ rli Relay log information
+ included If false, all relay logs that are strictly before
+ rli->group_relay_log_name are deleted ; if true, the latter is
+ deleted too (i.e. all relay logs
+ read by the SQL slave thread are deleted).
+
NOTE
- This is only called from the slave-execute thread when it has read
- all commands from a log and want to switch to a new log.
- - When this happens, we should never be in an active transaction as
- a transaction is always written as a single block to the binary log.
+ all commands from a relay log and want to switch to a new relay log.
+ - When this happens, we can be in an active transaction as
+ a transaction can span over two relay logs
+ (although it is always written as a single block to the master's binary
+ log, hence cannot span over two master's binary logs).
IMPLEMENTATION
- Protects index file with LOCK_index
- - Delete first log file,
- - Copy all file names after this one to the front of the index file
+ - Delete relevant relay log files
+ - Copy all file names after these ones to the front of the index file
- If the OS has truncate, truncate the file, else fill it with \n'
- - Read the first file name from the index file and store in rli->linfo
+ - Read the next file name from the index file and store in rli->linfo
RETURN VALUES
0 ok
@@ -620,66 +628,68 @@ err:
#ifdef HAVE_REPLICATION
-int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli)
+int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli, bool included)
{
int error;
DBUG_ENTER("purge_first_log");
- /*
- Test pre-conditions.
-
- Assume that we have previously read the first log and
- stored it in rli->relay_log_name
- */
DBUG_ASSERT(is_open());
DBUG_ASSERT(rli->slave_running == 1);
- DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name));
- DBUG_ASSERT(rli->linfo.index_file_offset ==
- strlen(rli->relay_log_name) + 1);
+ DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
- /* We have already processed the relay log, so it's safe to delete it */
- my_delete(rli->relay_log_name, MYF(0));
pthread_mutex_lock(&LOCK_index);
- if (copy_up_file_and_fill(&index_file, rli->linfo.index_file_offset))
- {
- error= LOG_INFO_IO;
- goto err;
- }
+ pthread_mutex_lock(&rli->log_space_lock);
+ rli->relay_log.purge_logs(rli->group_relay_log_name, included,
+ 0, 0, &rli->log_space_total);
+ // Tell the I/O thread to take the relay_log_space_limit into account
+ rli->ignore_log_space_limit= 0;
+ pthread_mutex_unlock(&rli->log_space_lock);
/*
- Update the space counter used by all relay logs
Ok to broadcast after the critical region as there is no risk of
the mutex being destroyed by this thread later - this helps save
context switches
*/
- pthread_mutex_lock(&rli->log_space_lock);
- rli->log_space_total -= rli->relay_log_pos;
- //tell the I/O thread to take the relay_log_space_limit into account
- rli->ignore_log_space_limit= 0;
- pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond);
/*
Read the next log file name from the index file and pass it back to
the caller
+ If included is true, we want the first relay log;
+ otherwise we want the one after event_relay_log_name.
*/
- if ((error=find_log_pos(&rli->linfo, NullS, 0 /*no mutex*/)))
+ if ((included && (error=find_log_pos(&rli->linfo, NullS, 0))) ||
+ (!included &&
+ ((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) ||
+ (error=find_next_log(&rli->linfo, 0)))))
{
char buff[22];
- sql_print_error("next log error: %d offset: %s log: %s",
- error,
- llstr(rli->linfo.index_file_offset,buff),
- rli->linfo.log_file_name);
+ sql_print_error("next log error: %d offset: %s log: %s included: %d",
+ error,
+ llstr(rli->linfo.index_file_offset,buff),
+ rli->group_relay_log_name,
+ included);
goto err;
}
+
/*
- Reset position to current log. This involves setting both of the
- position variables:
+ Reset rli's coordinates to the current log.
*/
- rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
- rli->pending = 0;
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name)-1);
+ rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->event_relay_log_name)-1);
+
+ /*
+ If we removed the rli->group_relay_log_name file,
+ we must update the rli->group* coordinates, otherwise do not touch it as the
+ group's execution is not finished (e.g. COMMIT not executed)
+ */
+ if (included)
+ {
+ rli->group_relay_log_pos = BIN_LOG_HEADER_SIZE;
+ strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->group_relay_log_name)-1);
+ }
/* Store where we are in the new file for the execution thread */
flush_relay_log_info(rli);
@@ -693,13 +703,14 @@ err:
Update log index_file
*/
-int MYSQL_LOG::update_log_index(LOG_INFO* log_info)
+int MYSQL_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads)
{
if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset))
return LOG_INFO_IO;
// now update offsets in index file for running threads
- adjust_linfo_offsets(log_info->index_file_start_offset);
+ if (need_update_threads)
+ adjust_linfo_offsets(log_info->index_file_start_offset);
return 0;
}
@@ -708,9 +719,13 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info)
SYNOPSIS
purge_logs()
- thd Thread pointer
- to_log Delete all log file name before this file. This file is not
- deleted
+ to_log Delete all log file name before this file.
+ included If true, to_log is deleted too.
+ need_mutex
+ need_update_threads If we want to update the log coordinates of
+ all threads. False for relay logs, true otherwise.
+ freed_log_space If not null, decrement this variable of
+ the amount of log space freed
NOTES
If any of the logs before the deleted one is in use,
@@ -722,31 +737,59 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info)
LOG_INFO_EOF to_log not found
*/
-int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
+int MYSQL_LOG::purge_logs(const char *to_log,
+ bool included,
+ bool need_mutex,
+ bool need_update_threads,
+ ulonglong *decrease_log_space)
{
int error;
+ bool exit_loop= 0;
LOG_INFO log_info;
DBUG_ENTER("purge_logs");
+ DBUG_PRINT("info",("to_log= %s",to_log));
if (no_rotate)
DBUG_RETURN(LOG_INFO_PURGE_NO_ROTATE);
- pthread_mutex_lock(&LOCK_index);
+ if (need_mutex)
+ pthread_mutex_lock(&LOCK_index);
if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/)))
goto err;
/*
- File name exists in index file; Delete until we find this file
+ File name exists in index file; delete until we find this file
or a file that is used.
*/
if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))
goto err;
- while (strcmp(to_log,log_info.log_file_name) &&
- !log_in_use(log_info.log_file_name))
+ while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
+ !log_in_use(log_info.log_file_name))
{
- /* It's not fatal even if we can't delete a log file */
- my_delete(log_info.log_file_name, MYF(0));
- if (find_next_log(&log_info, 0))
+ ulong tmp;
+ if (decrease_log_space) //stat the file we want to delete
+ {
+ MY_STAT s;
+ if (my_stat(log_info.log_file_name,&s,MYF(0)))
+ tmp= s.st_size;
+ else
+ {
+ /*
+ If we could not stat, we can't know the amount
+ of space that deletion will free. In most cases,
+ deletion won't work either, so it's not a problem.
+ */
+ tmp= 0;
+ }
+ }
+ /*
+ It's not fatal if we can't delete a log file ;
+ if we could delete it, take its size into account
+ */
+ DBUG_PRINT("info",("purging %s",log_info.log_file_name));
+ if (!my_delete(log_info.log_file_name, MYF(0)) && decrease_log_space)
+ *decrease_log_space-= tmp;
+ if (find_next_log(&log_info, 0) || exit_loop)
break;
}
@@ -754,10 +797,11 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
If we get killed -9 here, the sysadmin would have to edit
the log index file after restart - otherwise, this should be safe
*/
- error= update_log_index(&log_info);
+ error= update_log_index(&log_info, need_update_threads);
err:
- pthread_mutex_unlock(&LOCK_index);
+ if (need_mutex)
+ pthread_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
@@ -779,7 +823,7 @@ err:
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
*/
-int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time)
+int MYSQL_LOG::purge_logs_before_date(time_t purge_time)
{
int error;
LOG_INFO log_info;
@@ -816,7 +860,7 @@ int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time)
If we get killed -9 here, the sysadmin would have to edit
the log index file after restart - otherwise, this should be safe
*/
- error= update_log_index(&log_info);
+ error= update_log_index(&log_info, 1);
err:
pthread_mutex_unlock(&LOCK_index);
@@ -1269,7 +1313,7 @@ err:
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
- error= purge_logs_before_date(current_thd, purge_time);
+ error= purge_logs_before_date(purge_time);
}
#endif
@@ -1534,7 +1578,6 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
If you don't do it this way, you will get a deadlock in THD::awake()
*/
-
void MYSQL_LOG:: wait_for_update(THD* thd)
{
safe_mutex_assert_owner(&LOCK_log);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index d4efb65bf42..39db264d898 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
*/
if (rli)
{
- if (rli->inside_transaction)
- rli->inc_pending(get_event_len());
+ /*
+ If in a transaction, and if the slave supports transactions,
+ just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
+ (not OPTION_NOT_AUTOCOMMIT) as transactions are logged
+ with BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+
+ CAUTION: opt_using_transactions means
+ innodb || bdb ; suppose the master supports InnoDB and BDB,
+ but the slave supports only BDB, problems
+ will arise:
+ - suppose an InnoDB table is created on the master,
+ - then it will be MyISAM on the slave
+ - but as opt_using_transactions is true, the slave will believe he is
+ transactional with the MyISAM table. And problems will come when one
+ does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at BEGIN
+ whereas there has not been any rollback). This is the problem of
+ using opt_using_transactions instead of a finer
+ "does the slave support _the_transactional_handler_used_on_the_master_".
+
+ More generally, we'll have problems when a query mixes a transactional
+ handler and MyISAM and STOP SLAVE is issued in the middle of the
+ "transaction". START SLAVE will resume at BEGIN while the MyISAM table has
+ already been updated.
+
+ */
+ if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
+ rli->inc_event_relay_log_pos(get_event_len());
else
{
- rli->inc_pos(get_event_len(),log_pos);
+ rli->inc_group_relay_log_pos(get_event_len(),log_pos);
flush_relay_log_info(rli);
}
}
@@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
thd->db = rewrite_db((char*)db);
/*
- InnoDB internally stores the master log position it has processed so far;
- position to store is really pos + pending + event_len
- since we must store the pos of the END of the current log event
+ InnoDB internally stores the master log position it has executed so far,
+ i.e. the position just after the COMMIT event.
+ When InnoDB will want to store, the positions in rli won't have
+ been updated yet, so group_master_log_* will point to old BEGIN
+ and event_master_log* will point to the beginning of current COMMIT.
+ So the position to store is event_master_log_pos + event_len
+ since we must store the pos of the END of the current log event (COMMIT).
*/
rli->event_len= get_event_len();
@@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
DBUG_PRINT("query",("%s",thd->query));
mysql_parse(thd, thd->query, q_len);
- /*
- Set a flag if we are inside an transaction so that we can restart
- the transaction from the start if we are killed
-
- This will only be done if we are supporting transactional tables
- in the slave.
- */
- if (!strcmp(thd->query,"BEGIN"))
- rli->inside_transaction= opt_using_transactions;
- else if (!strcmp(thd->query,"COMMIT"))
- rli->inside_transaction=0;
-
DBUG_PRINT("info",("expected_error: %d last_errno: %d",
expected_error, thd->net.last_errno));
if ((expected_error != (actual_error= thd->net.last_errno)) &&
@@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{
- char* log_name = rli->master_log_name;
+ char* log_name = rli->group_master_log_name;
DBUG_ENTER("Rotate_log_event::exec_event");
pthread_mutex_lock(&rli->data_lock);
memcpy(log_name, new_log_ident, ident_len+1);
- rli->master_log_pos = pos;
- rli->relay_log_pos += get_event_len();
- DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos));
+ rli->group_master_log_pos = pos;
+ rli->event_relay_log_pos += get_event_len();
+ rli->group_relay_log_pos = rli->event_relay_log_pos;
+ DBUG_PRINT("info", ("group_master_log_pos: %d", (ulong) rli->group_master_log_pos));
pthread_mutex_unlock(&rli->data_lock);
pthread_cond_broadcast(&rli->data_cond);
flush_relay_log_info(rli);
@@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
thd->next_insert_id = val;
break;
}
- rli->inc_pending(get_event_len());
+ rli->inc_event_relay_log_pos(get_event_len());
return 0;
}
#endif
@@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
{
thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2;
- rli->inc_pending(get_event_len());
+ rli->inc_event_relay_log_pos(get_event_len());
return 0;
}
#endif // !MYSQL_CLIENT
@@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL);
free_root(&thd->mem_root,0);
- rli->inc_pending(get_event_len());
+ rli->inc_event_relay_log_pos(get_event_len());
return 0;
}
#endif // !MYSQL_CLIENT
@@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
pthread_mutex_lock(&mi->data_lock);
pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
- master_log_len = strlen(rli->master_log_name);
+ master_log_len = strlen(rli->group_master_log_name);
// on OOM, just do not initialize the structure and print the error
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME))))
@@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1;
- memcpy(master_log, rli->master_log_name, master_log_len + 1);
+ memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
master_port = mi->port;
- master_pos = rli->master_log_pos;
+ master_pos = rli->group_master_log_pos;
DBUG_PRINT("info", ("master_log: %s pos: %d", master_log,
(ulong) master_pos));
}
@@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{
// do not clean up immediately after rotate event
- if (rli->master_log_pos > BIN_LOG_HEADER_SIZE)
+ if (rli->group_master_log_pos > BIN_LOG_HEADER_SIZE)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
/*
We do not want to update master_log pos because we get a rotate event
- before stop, so by now master_log_name is set to the next log.
+ before stop, so by now group_master_log_name is set to the next log.
If we updated it, we will have incorrect master coordinates and this
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
- rli->inc_pos(get_event_len(), 0);
+ rli->inc_group_relay_log_pos(get_event_len(), 0);
flush_relay_log_info(rli);
return 0;
}
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 4203d440667..d17faa3cea5 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size;
extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log;
extern ulong specialflag, current_pid;
extern ulong expire_logs_days;
-
+extern my_bool relay_log_purge;
extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables;
extern uint delay_key_write_options;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index ea0311bafc6..5a6ea939878 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool lower_case_table_names, opt_old_rpl_compat;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0;
-my_bool opt_console= 0;
+my_bool opt_console= 0, opt_bdb, opt_innodb;
volatile bool mqh_used = 0;
FILE *bootstrap_file=0;
@@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */
const char *myisam_recover_options_str="OFF";
const char *sql_mode_str="OFF";
ulong rpl_recovery_rank=0;
+my_bool relay_log_purge=1;
my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL;
MY_TMPDIR mysql_tmpdir_list;
@@ -2208,7 +2209,7 @@ static int init_server_components()
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
- mysql_bin_log.purge_logs_before_date(current_thd, purge_time);
+ mysql_bin_log.purge_logs_before_date(purge_time);
}
#endif
}
@@ -3400,7 +3401,7 @@ enum options
OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR,
OPT_BDB_HOME, OPT_BDB_LOG,
OPT_BDB_TMP, OPT_BDB_NOSYNC,
- OPT_BDB_LOCK, OPT_BDB_SKIP,
+ OPT_BDB_LOCK, OPT_BDB,
OPT_BDB_NO_RECOVER, OPT_BDB_SHARED,
OPT_MASTER_HOST, OPT_MASTER_USER,
OPT_MASTER_PASSWORD, OPT_MASTER_PORT,
@@ -3430,7 +3431,7 @@ enum options
OPT_INNODB_FLUSH_METHOD,
OPT_INNODB_FAST_SHUTDOWN,
OPT_SAFE_SHOW_DB,
- OPT_INNODB_SKIP, OPT_SKIP_SAFEMALLOC,
+ OPT_INNODB, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
@@ -3468,7 +3469,7 @@ enum options
OPT_OPEN_FILES_LIMIT,
OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE,
OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER,
- OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT,
+ OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RELAY_LOG_PURGE,
OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME,
OPT_SORT_BUFFER, OPT_TABLE_CACHE,
OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE,
@@ -3529,8 +3530,10 @@ struct my_option my_long_options[] =
(gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif /* HAVE_BERKELEY_DB */
- {"skip-bdb", OPT_BDB_SKIP, "Don't use berkeley db (will save memory)",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"bdb", OPT_BDB, "Enable Berkeley DB (if this version of MySQL supports it). \
+Disable with --skip-bdb (will save memory)",
+ (gptr*) &opt_bdb, (gptr*) &opt_bdb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
+ 0, 0, 0},
{"big-tables", OPT_BIG_TABLES,
"Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -3885,8 +3888,10 @@ struct my_option my_long_options[] =
"Start without grant tables. This gives all users FULL ACCESS to all tables!",
(gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0},
- {"skip-innodb", OPT_INNODB_SKIP, "Don't use Innodb (will save memory)",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"innodb", OPT_INNODB, "Enable InnoDB (if this version of MySQL supports it). \
+Disable with --skip-innodb (will save memory)",
+ (gptr*) &opt_innodb, (gptr*) &opt_innodb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
+ 0, 0, 0},
{"skip-locking", OPT_SKIP_LOCK,
"Deprecated option, use --skip-external-locking instead",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -4272,6 +4277,11 @@ struct my_option my_long_options[] =
(gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG,
128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0},
#ifdef HAVE_REPLICATION
+ {"relay_log_purge", OPT_RELAY_LOG_PURGE,
+ "0 = do not purge relay logs. 1 = purge them as soon as they are no more needed.",
+ (gptr*) &relay_log_purge,
+ (gptr*) &relay_log_purge, 0, GET_BOOL, NO_ARG,
+ 1, 0, 1, 0, 1, 0},
{"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT,
"Max space to use for all relay logs",
(gptr*) &relay_log_space_limit,
@@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
berkeley_shared_data=1;
break;
#endif /* HAVE_BERKELEY_DB */
- case OPT_BDB_SKIP:
+ case OPT_BDB:
#ifdef HAVE_BERKELEY_DB
- berkeley_skip=1;
- have_berkeley_db=SHOW_OPTION_DISABLED;
+ if (opt_bdb)
+ {
+ berkeley_skip=0;
+ have_berkeley_db=SHOW_OPTION_YES;
+ }
+ else
+ {
+ berkeley_skip=1;
+ have_berkeley_db=SHOW_OPTION_DISABLED;
+ }
#endif
break;
- case OPT_INNODB_SKIP:
+ case OPT_INNODB:
#ifdef HAVE_INNOBASE_DB
- innodb_skip=1;
- have_innodb=SHOW_OPTION_DISABLED;
+ if (opt_innodb)
+ {
+ innodb_skip=0;
+ have_innodb=SHOW_OPTION_YES;
+ }
+ else
+ {
+ innodb_skip=1;
+ have_innodb=SHOW_OPTION_DISABLED;
+ }
#endif
break;
case OPT_INNODB_DATA_FILE_PATH:
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 46791c13219..58769827bed 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -873,7 +873,6 @@ int load_master_data(THD* thd)
// don't hit the magic number
if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
- active_mi->rli.pending = 0;
flush_master_info(active_mi);
}
mc_mysql_free_result(master_status_res);
@@ -897,9 +896,13 @@ int load_master_data(THD* thd)
return 1;
}
pthread_mutex_lock(&active_mi->rli.data_lock);
- active_mi->rli.master_log_pos = active_mi->master_log_pos;
- strmake(active_mi->rli.master_log_name,active_mi->master_log_name,
- sizeof(active_mi->rli.master_log_name)-1);
+ active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
+ strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
+ sizeof(active_mi->rli.group_master_log_name)-1);
+ /*
+ No need to update rli.event* coordinates, they will be when the slave
+ threads start ; only rli.group* coordinates are necessary here.
+ */
flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock);
diff --git a/sql/set_var.cc b/sql/set_var.cc
index d39a506c82d..dc6a2f71133 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size",
&SV::read_buff_size);
sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size",
&SV::read_rnd_buff_size);
+#ifdef HAVE_REPLICATION
+sys_var_bool_ptr sys_relay_log_purge("relay_log_purge",
+ &relay_log_purge);
+#endif
sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank",
&rpl_recovery_rank);
sys_var_long_ptr sys_query_cache_size("query_cache_size",
@@ -407,6 +411,9 @@ sys_var *sys_variables[]=
&sys_rand_seed2,
&sys_read_buff_size,
&sys_read_rnd_buff_size,
+#ifdef HAVE_REPLICATION
+ &sys_relay_log_purge,
+#endif
&sys_rpl_recovery_rank,
&sys_safe_updates,
&sys_select_limit,
@@ -563,6 +570,9 @@ struct show_var_st init_vars[]= {
{sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS},
{sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS},
{sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS},
+#ifdef HAVE_REPLICATION
+ {sys_relay_log_purge.name, (char*) &sys_relay_log_purge, SHOW_SYS},
+#endif
{sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS},
#ifdef HAVE_QUERY_CACHE
{sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS},
diff --git a/sql/slave.cc b/sql/slave.cc
index fa7ab180eaa..eae795ae760 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -217,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
- If not, open the 'log' binary file.
TODO
- - check proper initialization of master_log_name/master_log_pos
- - We may always want to delete all logs before 'log'.
- Currently if we are not calling this with 'log' as NULL or the first
- log we will never delete relay logs.
- If we want this we should not set skip_log_purge to 1.
+ - check proper initialization of group_master_log_name/group_master_log_pos
RETURN VALUES
0 ok
@@ -248,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
rli->cur_log_fd = -1;
}
- rli->relay_log_pos = pos;
+ rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
/*
Test to see if the previous run was with the skip of purging
@@ -260,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
goto err;
}
- if (log) // If not first log
+ if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{
- if (strcmp(log, rli->linfo.log_file_name))
- rli->skip_log_purge= 1; // Different name; Don't purge
- if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
- {
- *errmsg="Could not find target log during relay log initialization";
- goto err;
- }
+ *errmsg="Could not find target log during relay log initialization";
+ goto err;
}
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name)-1);
+ strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->event_relay_log_name)-1);
if (rli->relay_log.is_active(rli->linfo.log_file_name))
{
/*
@@ -302,7 +295,7 @@ err:
If we don't purge, we can't honour relay_log_space_limit ;
silently discard it
*/
- if (rli->skip_log_purge)
+ if (!relay_log_purge)
rli->log_space_limit= 0;
pthread_cond_broadcast(&rli->data_cond);
if (need_data_lock)
@@ -383,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
to display fine in any case.
*/
- rli->master_log_name[0]= 0;
- rli->master_log_pos= 0;
- rli->pending= 0;
+ rli->group_master_log_name[0]= 0;
+ rli->group_master_log_pos= 0;
if (!rli->inited)
DBUG_RETURN(0);
@@ -402,16 +394,18 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
goto err;
}
/* Save name of used relay log file */
- strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
- sizeof(rli->relay_log_name)-1);
+ strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->event_relay_log_name)-1);
// Just first log with magic number and nothing else
rli->log_space_total= BIN_LOG_HEADER_SIZE;
- rli->relay_log_pos= BIN_LOG_HEADER_SIZE;
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->relay_log.reset_bytes_written();
if (!just_reset)
- error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
- 0 /* do not need data lock */, errmsg);
-
+ error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
+ 0 /* do not need data lock */, errmsg);
+
err:
#ifndef DBUG_OFF
char buf[22];
@@ -1238,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock);
info_fd = rli->info_fd;
- rli->pending = 0;
rli->cur_log_fd = -1;
rli->slave_skip_counter=0;
rli->abort_pos_wait=0;
- rli->skip_log_purge=0;
- rli->log_space_limit = relay_log_space_limit;
- rli->log_space_total = 0;
+ rli->log_space_limit= relay_log_space_limit;
+ rli->log_space_total= 0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
@@ -1285,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg))
goto err;
- rli->master_log_name[0]= 0;
- rli->master_log_pos= 0;
+ rli->group_master_log_name[0]= 0;
+ rli->group_master_log_pos= 0;
rli->info_fd= info_fd;
}
else // file exists
@@ -1307,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->info_fd = info_fd;
int relay_log_pos, master_log_pos;
- if (init_strvar_from_file(rli->relay_log_name,
- sizeof(rli->relay_log_name), &rli->info_file,
+ if (init_strvar_from_file(rli->group_relay_log_name,
+ sizeof(rli->group_relay_log_name), &rli->info_file,
"") ||
init_intvar_from_file(&relay_log_pos,
&rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->master_log_name,
- sizeof(rli->master_log_name), &rli->info_file,
+ init_strvar_from_file(rli->group_master_log_name,
+ sizeof(rli->group_master_log_name), &rli->info_file,
"") ||
init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
{
msg="Error reading slave log configuration";
goto err;
}
- rli->relay_log_pos= relay_log_pos;
- rli->master_log_pos= master_log_pos;
+ strmake(rli->event_relay_log_name,rli->group_relay_log_name,
+ sizeof(rli->event_relay_log_name)-1);
+ rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
+ rli->group_master_log_pos= master_log_pos;
if (init_relay_log_pos(rli,
- rli->relay_log_name,
- rli->relay_log_pos,
+ rli->group_relay_log_name,
+ rli->group_relay_log_pos,
0 /* no data lock*/,
&msg))
goto err;
}
- DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
@@ -1407,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
- rli->log_space_total = 0;
+ rli->log_space_total= 0;
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{
sql_print_error("Could not find first log while counting relay log space");
@@ -1631,10 +1625,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
protocol->store((uint32) mi->connect_retry);
protocol->store(mi->master_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->master_log_pos);
- protocol->store(mi->rli.relay_log_name +
- dirname_length(mi->rli.relay_log_name), &my_charset_bin);
- protocol->store((ulonglong) mi->rli.relay_log_pos);
- protocol->store(mi->rli.master_log_name, &my_charset_bin);
+ protocol->store(mi->rli.group_relay_log_name +
+ dirname_length(mi->rli.group_relay_log_name), &my_charset_bin);
+ protocol->store((ulonglong) mi->rli.group_relay_log_pos);
+ protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(&replicate_do_db);
@@ -1642,7 +1636,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
protocol->store((uint32) mi->rli.last_slave_errno);
protocol->store(mi->rli.last_slave_error, &my_charset_bin);
protocol->store((uint32) mi->rli.slave_skip_counter);
- protocol->store((ulonglong) mi->rli.master_log_pos);
+ protocol->store((ulonglong) mi->rli.group_master_log_pos);
protocol->store((ulonglong) mi->rli.log_space_total);
pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock);
@@ -1673,17 +1667,15 @@ bool flush_master_info(MASTER_INFO* mi)
st_relay_log_info::st_relay_log_info()
- :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0),
- cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0),
- slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0),
- sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
- slave_running(0), skip_log_purge(0),
- inside_transaction(0) /* the default is autocommit=1 */
-{
- relay_log_name[0] = master_log_name[0] = 0;
+ :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
+ cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
+ ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0),
+ slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
+ slave_running(0)
+{
+ group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0;
last_slave_error[0]=0;
-
bzero(&info_file,sizeof(info_file));
bzero(&cache_buf, sizeof(cache_buf));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
@@ -1745,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
set_timespec(abstime,timeout);
DBUG_ENTER("wait_for_pos");
- DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld",
- master_log_name, (ulong) master_log_pos,
+ DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld",
+ group_master_log_name, (ulong) group_master_log_pos,
(long) timeout));
pthread_mutex_lock(&data_lock);
@@ -1796,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
{
bool pos_reached;
int cmp_result= 0;
- DBUG_ASSERT(*master_log_name || master_log_pos == 0);
- if (*master_log_name)
+ DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0);
+ if (*group_master_log_name)
{
- char *basename= master_log_name + dirname_length(master_log_name);
+ char *basename= group_master_log_name + dirname_length(group_master_log_name);
/*
First compare the parts before the extension.
Find the dot in the master's log basename,
@@ -1814,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
}
// Now compare extensions.
char *q_end;
- ulong master_log_name_extension= strtoul(q, &q_end, 10);
- if (master_log_name_extension < log_name_extension)
+ ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
+ if (group_master_log_name_extension < log_name_extension)
cmp_result = -1 ;
else
- cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ;
+ cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
}
- pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) ||
+ pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0);
if (pos_reached || thd->killed)
break;
@@ -2127,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
(rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
/* TODO: I/O thread should not even log events with the same server id */
- rli->inc_pos(ev->get_event_len(),
+ rli->inc_group_relay_log_pos(ev->get_event_len(),
type_code != STOP_EVENT ? ev->log_pos : LL(0),
1/* skip lock*/);
flush_relay_log_info(rli);
@@ -2497,15 +2489,13 @@ slave_begin:
rli->abort_slave = 0;
pthread_mutex_unlock(&rli->run_lock);
pthread_cond_broadcast(&rli->start_cond);
- // This should always be set to 0 when the slave thread is started
- rli->pending = 0;
//tell the I/O thread to take relay_log_space_limit into account from now on
rli->ignore_log_space_limit= 0;
if (init_relay_log_pos(rli,
- rli->relay_log_name,
- rli->relay_log_pos,
+ rli->group_relay_log_name,
+ rli->group_relay_log_pos,
1 /*need data lock*/, &errmsg))
{
sql_print_error("Error initializing relay log position: %s",
@@ -2513,18 +2503,18 @@ slave_begin:
goto err;
}
THD_CHECK_SENTRY(thd);
- DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
- rli->master_log_name,
- llstr(rli->master_log_pos,llbuff)));
+ rli->group_master_log_name,
+ llstr(rli->group_master_log_pos,llbuff)));
if (global_system_variables.log_warnings)
sql_print_error("Slave SQL thread initialized, starting replication in \
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
- llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
- llstr(rli->relay_log_pos,llbuff1));
+ llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
+ llstr(rli->group_relay_log_pos,llbuff1));
/* Read queries from the IO/THREAD until this thread is killed */
@@ -2541,7 +2531,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s",
- RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
+ RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
goto err;
}
}
@@ -2549,7 +2539,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
/* Thread stopped. Print the current replication position to the log */
sql_print_error("Slave SQL thread exiting, replication stopped in log \
'%s' at position %s",
- RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
+ RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
err:
VOID(pthread_mutex_lock(&LOCK_thread_count));
@@ -2699,7 +2689,7 @@ err:
rev The rotate log event read from the binary log
DESCRIPTION
- Updates the master info and relay data with the place in the next binary
+ Updates the master info with the place in the next binary
log where we should start reading.
NOTES
@@ -3073,18 +3063,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli)
IO_CACHE *file = &rli->info_file;
char buff[FN_REFLEN*2+22*2+4], *pos;
- /* sql_thd is not set when calling from init_slave() */
- if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
- return 0; // Wait for COMMIT
-
my_b_seek(file, 0L);
- pos=strmov(buff, rli->relay_log_name);
+ pos=strmov(buff, rli->group_relay_log_name);
*pos++='\n';
- pos=longlong2str(rli->relay_log_pos, pos, 10);
+ pos=longlong2str(rli->group_relay_log_pos, pos, 10);
*pos++='\n';
- pos=strmov(pos, rli->master_log_name);
+ pos=strmov(pos, rli->group_master_log_name);
*pos++='\n';
- pos=longlong2str(rli->master_log_pos, pos, 10);
+ pos=longlong2str(rli->group_master_log_pos, pos, 10);
*pos='\n';
if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
error=1;
@@ -3107,7 +3093,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
DBUG_ENTER("reopen_relay_log");
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
- if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
errmsg)) <0)
DBUG_RETURN(0);
/*
@@ -3115,7 +3101,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
relay_log_pos Current log pos
pending Number of bytes already processed from the event
*/
- my_b_seek(cur_log,rli->relay_log_pos + rli->pending);
+ my_b_seek(cur_log,rli->event_relay_log_pos);
DBUG_RETURN(cur_log);
}
@@ -3124,7 +3110,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
{
Log_event* ev;
IO_CACHE* cur_log = rli->cur_log;
- pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
DBUG_ENTER("next_event");
@@ -3173,7 +3159,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
}
}
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
/*
Relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us
@@ -3240,8 +3226,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
// prevent the I/O thread from blocking next times
rli->ignore_log_space_limit= 1;
// If the I/O thread is blocked, unblock it
- pthread_cond_broadcast(&rli->log_space_cond);
pthread_mutex_unlock(&rli->log_space_lock);
+ pthread_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update unlocks lock_log !
rli->relay_log.wait_for_update(rli->sql_thd);
// re-acquire data lock since we released it earlier
@@ -3258,16 +3244,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
my_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
- /*
- TODO: make skip_log_purge a start-up option. At this point this
- is not critical priority
- */
- if (!rli->skip_log_purge)
+ if (relay_log_purge)
{
- // purge_first_log will properly set up relay log coordinates in rli
- if (rli->relay_log.purge_first_log(rli))
+ /*
+ purge_first_log will properly set up relay log coordinates in rli.
+ If the group's coordinates are equal to the event's coordinates
+ (i.e. the relay log was not rotated in the middle of a group),
+ we can purge this relay log too.
+ We do ulonglong and string comparisons, this may be slow but
+ - purging the last relay log is nice (it can save 1GB of disk), so we
+ like to detect the case where we can do it, and given this,
+ - I see no better detection method
+ - purge_first_log is not called that often
+ */
+ if (rli->relay_log.purge_first_log
+ (rli,
+ rli->group_relay_log_pos == rli->event_relay_log_pos
+ && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
{
- errmsg = "Error purging processed log";
+ errmsg = "Error purging processed logs";
goto err;
}
}
@@ -3285,10 +3280,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
errmsg = "error switching to the next log";
goto err;
}
- rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
- rli->pending=0;
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name)-1);
+ rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
+ strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->event_relay_log_name)-1);
flush_relay_log_info(rli);
}
@@ -3336,7 +3330,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
event(errno: %d cur_log->error: %d)",
my_errno,cur_log->error);
// set read position to the beginning of the event
- my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
+ my_b_seek(cur_log,rli->event_relay_log_pos);
/* otherwise, we have had a partial read */
errmsg = "Aborting slave SQL thread because of partial event read";
break; // To end of function
diff --git a/sql/slave.h b/sql/slave.h
index a4db7388be5..16ba7f80471 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -92,12 +92,6 @@ typedef struct st_relay_log_info
cur_log_fd - file descriptor of the current read relay log
*/
File info_fd,cur_log_fd;
- /* name of current read relay log */
- char relay_log_name[FN_REFLEN];
- /* master log name corresponding to current read position */
- char master_log_name[FN_REFLEN];
- /* original log position of last processed event */
- volatile my_off_t master_log_pos;
/*
Protected with internal locks.
@@ -142,20 +136,36 @@ typedef struct st_relay_log_info
uint32 cur_log_old_open_count;
/*
- relay_log_pos - Current offset in the relay log.
- pending - In some cases we do not increment offset immediately
- after processing an event, because the following event
- needs to be processed atomically together with this one
- such as:
-
- Intvar_event - sets auto_increment value
- Rand_event - sets the random seed
-
- However, once both events have been processed, we need to
- increment by the cumulative offset. 'pending' stores the
- extra offset to be added to the position.
+ Let's call a group (of events) :
+ - a transaction
+ or
+ - an autocommiting query + its associated events (INSERT_ID,
+ TIMESTAMP...)
+ We need these rli coordinates :
+ - relay log name and position of the beginning of the group we currently are
+ executing. Needed to know where we have to restart when replication has
+ stopped in the middle of a group (which has been rolled back by the slave).
+ - relay log name and position just after the event we have just
+ executed. This event is part of the current group.
+ Formerly we only had the immediately above coordinates, plus a 'pending'
+ variable, but this dealt wrong with the case of a transaction starting on a
+ relay log and finishing (commiting) on another relay log. Case which can
+ happen when, for example, the relay log gets rotated because of
+ max_binlog_size.
+ */
+ char group_relay_log_name[FN_REFLEN];
+ ulonglong group_relay_log_pos;
+ char event_relay_log_name[FN_REFLEN];
+ ulonglong event_relay_log_pos;
+ /*
+ Original log name and position of the group we're currently executing
+ (whose coordinates are group_relay_log_name/pos in the relay log)
+ in the master's binlog. These concern the *group*, because in the master's
+ binlog the log_pos that comes with each event is the position of the
+ beginning of the group.
*/
- ulonglong relay_log_pos, pending;
+ char group_master_log_name[FN_REFLEN];
+ volatile my_off_t group_master_log_pos;
/*
Handling of the relay_log_space_limit optional constraint.
@@ -193,38 +203,39 @@ typedef struct st_relay_log_info
/* if not set, the value of other members of the structure are undefined */
bool inited;
volatile bool abort_slave, slave_running;
- bool skip_log_purge;
- bool inside_transaction;
st_relay_log_info();
~st_relay_log_info();
- inline void inc_pending(ulonglong val)
+
+ inline void inc_event_relay_log_pos(ulonglong val)
{
- pending += val;
+ event_relay_log_pos+= val;
}
- /* TODO: this probably needs to be fixed */
- inline void inc_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
+
+ void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
{
if (!skip_lock)
pthread_mutex_lock(&data_lock);
- relay_log_pos += val+pending;
- pending = 0;
- if (log_pos)
- master_log_pos = log_pos+ val;
+ inc_event_relay_log_pos(val);
+ group_relay_log_pos= event_relay_log_pos;
+ strmake(group_relay_log_name,event_relay_log_name,
+ sizeof(group_relay_log_name)-1);
+ /*
+ If the slave does not support transactions and replicates a transaction,
+ users should not trust group_master_log_pos (which they can display with
+ SHOW SLAVE STATUS or read from relay-log.info), because to compute
+ group_master_log_pos the slave relies on log_pos stored in the master's
+ binlog, but if we are in a master's transaction these positions are always
+ the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
+ not advance as it should on the non-transactional slave (it advances by
+ big leaps, whereas it should advance by small leaps).
+ */
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ group_master_log_pos= log_pos+ val;
pthread_cond_broadcast(&data_cond);
if (!skip_lock)
pthread_mutex_unlock(&data_lock);
}
- /*
- thread safe read of position - not needed if we are in the slave thread,
- but required otherwise as var is a longlong
- */
- inline void read_pos(ulonglong& var)
- {
- pthread_mutex_lock(&data_lock);
- var = relay_log_pos;
- pthread_mutex_unlock(&data_lock);
- }
int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
longlong timeout);
@@ -334,7 +345,7 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024
-#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\
+#define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\
"FIRST")
#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\
"FIRST")
diff --git a/sql/sql_class.h b/sql/sql_class.h
index d3cb843ad85..7688d21c33e 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -145,10 +145,12 @@ public:
int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name);
- int update_log_index(LOG_INFO* linfo);
- int purge_logs(THD* thd, const char* to_log);
- int purge_logs_before_date(THD* thd, time_t purge_time);
- int purge_first_log(struct st_relay_log_info* rli);
+ int update_log_index(LOG_INFO* linfo, bool need_update_threads);
+ int purge_logs(const char *to_log, bool included,
+ bool need_mutex, bool need_update_threads,
+ ulonglong *decrease_log_space);
+ int purge_logs_before_date(time_t purge_time);
+ int purge_first_log(struct st_relay_log_info* rli, bool included);
bool reset_logs(THD* thd);
// if we are exiting, we also want to close the index file
void close(bool exiting = 0);
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index d0a970c98b7..ba8a4af794a 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
- mysql_bin_log.purge_logs_before_date(thd, purge_time);
+ mysql_bin_log.purge_logs_before_date(purge_time);
}
#endif
mysql_slow_log.new_file(1);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 7e9b6aea7b5..0eb444b85c0 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log)
char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log);
- int res = mysql_bin_log.purge_logs(thd, search_file_name);
+ int res = mysql_bin_log.purge_logs(search_file_name, 0, 1, 1, NULL);
return purge_error_message(thd, res);
}
@@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log)
int purge_master_logs_before_date(THD* thd, time_t purge_time)
{
- int res = mysql_bin_log.purge_logs_before_date(thd, purge_time);
+ int res = mysql_bin_log.purge_logs_before_date(purge_time);
return purge_error_message(thd ,res);
}
@@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1;
goto err;
}
- //delete relay logs, clear relay log coordinates
+ // delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */,
&errmsg)))
goto err;
- //Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
+ // Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
mi->master_log_name[0]= 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
- //close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
+ // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
- //and delete these two files
+ // and delete these two files
fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ // delete relay_log_info_file
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{
@@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
// if we change host or port, we must reset the postion
mi->master_log_name[0] = 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
- mi->rli.pending = 0;
}
if (lex_mi->log_file_name)
@@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->pos)
{
mi->master_log_pos= lex_mi->pos;
- mi->rli.pending = 0;
}
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
@@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
- sizeof(mi->rli.relay_log_name)-1);
+ strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli.group_relay_log_name)-1);
+ strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli.event_relay_log_name)-1);
}
if (lex_mi->relay_log_pos)
{
need_relay_log_purge= 0;
- mi->rli.relay_log_pos=lex_mi->relay_log_pos;
+ mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
flush_master_info(mi);
if (need_relay_log_purge)
{
- mi->rli.skip_log_purge= 0;
+ relay_log_purge= 1;
thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */,
@@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi)
else
{
const char* msg;
- mi->rli.skip_log_purge= 1;
+ relay_log_purge= 0;
/* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli,
- mi->rli.relay_log_name,
- mi->rli.relay_log_pos,
+ mi->rli.group_relay_log_name,
+ mi->rli.group_relay_log_pos,
0 /*no data lock*/,
&msg))
{
@@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(1);
}
}
- mi->rli.master_log_pos = mi->master_log_pos;
+ mi->rli.group_master_log_pos = mi->master_log_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
- strmake(mi->rli.master_log_name,mi->master_log_name,
- sizeof(mi->rli.master_log_name)-1);
- if (!mi->rli.master_log_name[0]) // uninitialized case
- mi->rli.master_log_pos=0;
+ strmake(mi->rli.group_master_log_name,mi->master_log_name,
+ sizeof(mi->rli.group_master_log_name)-1);
+ if (!mi->rli.group_master_log_name[0]) // uninitialized case
+ mi->rli.group_master_log_pos=0;
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++;