diff options
author | unknown <guilhem@mysql.com> | 2005-10-12 13:29:55 +0200 |
---|---|---|
committer | unknown <guilhem@mysql.com> | 2005-10-12 13:29:55 +0200 |
commit | 7ff27a614dc60b969cfb99ef4653e0e478f83eca (patch) | |
tree | 3a9668cc5d377733fb17fc9db602a2c9c4b43e74 /sql | |
parent | e0c38d58f573164eff8a0865127cd87b9b3c42b3 (diff) | |
download | mariadb-git-7ff27a614dc60b969cfb99ef4653e0e478f83eca.tar.gz |
Fix for BUG#13023: "SQL Thread is up but doesn't move forward". Details in slave.cc;
in short we now record whenever the slave I/O thread ignores a master's event because of its server id,
and use this info in the slave SQL thread to advance Exec_master_log_pos. Because if we
do not, this variable stays at the position of the last executed event, i.e. the last *non-ignored*
executed one, which may not be the last of the master's binlog (and so the slave *looks* behind
the master though it's data-wise it's not).
mysql-test/t/rpl_dual_pos_advance-master.opt:
empty; its goal is just to trigger a server restart after running the test,
so that the master forgets that it was a slave (otherwise it affects the following tests).
sql/log.cc:
No more default arguments for Rotate_log_event constructor.
MYSQL_LOG::appendv() is now called without mutex.
sql/log_event.cc:
Moving one Rotate_log_event constructor from log_event.h. Support for on-demand choice of
duplicating the string argument of the constructor or not (because there now are needs for both
alternatives, see slave.cc).
sql/log_event.h:
We now have a case where a Rotate_log_event is executed by the slave SQL thread while
not being in the relay log, so it needs to pretend its length is 0: a ZERO_LEN flag for that;
a flag DUP_NAME (replaces "bool alloced") to be able to choose if we want the constructor
to duplicate the string argument or not.
sql/slave.cc:
A comment for BUG#13861 (to be fixed). llstr() instead of %ld as the number is ulonglong.
mi->rli becomes rli in some places.
Fix for BUG#13023:
- in the slave I/O thread, whenever we ignore an event because of its server id we update
a couple of coordinates in memory
- in the slave SQL thread, whenever we bump into the end of the latest relay log, we check
this couple of coordinates to see if we should advance our Exec_master_log_pos.
- when the slave I/O thread terminates it saves these in-memory coordinates into a Rotate event
in the relay log, so that they are durable.
sql/slave.h:
A couple of coordinates in RELAY_LOG_INFO to keep track of the last ignored events received
by the slave I/O thread (ignored because of the server id).
mysql-test/r/rpl_dual_pos_advance.result:
New BitKeeper file ``mysql-test/r/rpl_dual_pos_advance.result''
mysql-test/t/rpl_dual_pos_advance.test:
Test for BUG#13023 (with a part, disabled, to test BUG#13861 when I fix it).
Before the fix, this test used to hang.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 6 | ||||
-rw-r--r-- | sql/log_event.cc | 42 | ||||
-rw-r--r-- | sql/log_event.h | 28 | ||||
-rw-r--r-- | sql/slave.cc | 128 | ||||
-rw-r--r-- | sql/slave.h | 11 |
5 files changed, 174 insertions, 41 deletions
diff --git a/sql/log.cc b/sql/log.cc index a67f35e30bf..c530f15a84f 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1028,7 +1028,8 @@ void MYSQL_LOG::new_file(bool need_lock) to change base names at some point. */ THD *thd = current_thd; /* may be 0 if we are reacting to SIGHUP */ - Rotate_log_event r(thd,new_name+dirname_length(new_name)); + Rotate_log_event r(thd,new_name+dirname_length(new_name), + 0, LOG_EVENT_OFFSET, 0); r.set_log_pos(this); r.write(&log_file); bytes_written += r.get_event_len(); @@ -1106,7 +1107,7 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...) DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - pthread_mutex_lock(&LOCK_log); + safe_mutex_assert_owner(&LOCK_log); do { if (my_b_append(&log_file,(byte*) buf,len)) @@ -1125,7 +1126,6 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...) } err: - pthread_mutex_unlock(&LOCK_log); if (!error) signal_update(); DBUG_RETURN(error); diff --git a/sql/log_event.cc b/sql/log_event.cc index b08439a20b8..be4654bccd3 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2013,18 +2013,44 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) #endif /* MYSQL_CLIENT */ + /* - Rotate_log_event::Rotate_log_event() + Rotate_log_event::Rotate_log_event() (2 constructors) */ + +#ifndef MYSQL_CLIENT +Rotate_log_event::Rotate_log_event(THD* thd_arg, + const char* new_log_ident_arg, + uint ident_len_arg, ulonglong pos_arg, + uint flags_arg) + :Log_event(), new_log_ident(new_log_ident_arg), + pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg : + (uint) strlen(new_log_ident_arg)), flags(flags_arg) +{ +#ifndef DBUG_OFF + char buff[22]; + DBUG_ENTER("Rotate_log_event::Rotate_log_event(THD*,...)"); + DBUG_PRINT("enter",("new_log_ident %s pos %s flags %lu", new_log_ident_arg, + llstr(pos_arg, buff), flags)); +#endif + if (flags & DUP_NAME) + new_log_ident= my_strdup_with_length(new_log_ident_arg, + ident_len, + MYF(MY_WME)); + DBUG_VOID_RETURN; +} +#endif + + Rotate_log_event::Rotate_log_event(const char* buf, int event_len, bool old_format) - :Log_event(buf, old_format),new_log_ident(NULL),alloced(0) + :Log_event(buf, old_format), new_log_ident(0), flags(DUP_NAME) { // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; uint ident_offset; - DBUG_ENTER("Rotate_log_event"); + DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)"); if (event_len < header_size) DBUG_VOID_RETURN; @@ -2043,12 +2069,9 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, ident_offset = ROTATE_HEADER_LEN; } set_if_smaller(ident_len,FN_REFLEN-1); - if (!(new_log_ident= my_strdup_with_length((byte*) buf + - ident_offset, - (uint) ident_len, - MYF(MY_WME)))) - DBUG_VOID_RETURN; - alloced = 1; + new_log_ident= my_strdup_with_length((byte*) buf + ident_offset, + (uint) ident_len, + MYF(MY_WME)); DBUG_VOID_RETURN; } @@ -2060,6 +2083,7 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, int Rotate_log_event::write_data(IO_CACHE* file) { char buf[ROTATE_HEADER_LEN]; + DBUG_ASSERT(!(flags & ZERO_LEN)); // such an event cannot be written int8store(buf + R_POS_OFFSET, pos); return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); diff --git a/sql/log_event.h b/sql/log_event.h index 5c81d0c92f0..6c4e65f7460 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -434,7 +434,7 @@ public: } virtual int get_data_size() { return 0;} virtual int get_data_body_offset() { return 0; } - int get_event_len() + virtual int get_event_len() { return (cached_event_len ? cached_event_len : (cached_event_len = LOG_EVENT_HEADER_LEN + get_data_size())); @@ -849,18 +849,18 @@ public: class Rotate_log_event: public Log_event { public: + enum { + ZERO_LEN= 1, // if event should report 0 as its length + DUP_NAME= 2 // if constructor should dup the string argument + }; const char* new_log_ident; ulonglong pos; uint ident_len; - bool alloced; + uint flags; #ifndef MYSQL_CLIENT Rotate_log_event(THD* thd_arg, const char* new_log_ident_arg, - uint ident_len_arg = 0, - ulonglong pos_arg = LOG_EVENT_OFFSET) - :Log_event(), new_log_ident(new_log_ident_arg), - pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg : - (uint) strlen(new_log_ident_arg)), alloced(0) - {} + uint ident_len_arg, + ulonglong pos_arg, uint flags); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); int exec_event(struct st_relay_log_info* rli); @@ -872,10 +872,18 @@ public: Rotate_log_event(const char* buf, int event_len, bool old_format); ~Rotate_log_event() { - if (alloced) - my_free((gptr) new_log_ident, MYF(0)); + if (flags & DUP_NAME) + my_free((gptr) new_log_ident, MYF(MY_ALLOW_ZERO_PTR)); } Log_event_type get_type_code() { return ROTATE_EVENT;} + virtual int get_event_len() + { + if (flags & ZERO_LEN) + return 0; + if (cached_event_len == 0) + cached_event_len= LOG_EVENT_HEADER_LEN + get_data_size(); + return cached_event_len; + } int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} bool is_valid() { return new_log_ident != 0; } int write_data(IO_CACHE* file); diff --git a/sql/slave.cc b/sql/slave.cc index 1be3e3b4a17..ebae8461f11 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1739,6 +1739,55 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) } +/* + Builds a Rotate from the ignored events' info and writes it to relay log. + + SYNOPSIS + write_ignored_events_info_to_relay_log() + thd pointer to I/O thread's thd + mi + + DESCRIPTION + Slave I/O thread, going to die, must leave a durable trace of the + ignored events' end position for the use of the slave SQL thread, by + calling this function. Only that thread can call it (see assertion). + */ +static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi) +{ + RELAY_LOG_INFO *rli= &mi->rli; + pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); + DBUG_ASSERT(thd == mi->io_thd); + pthread_mutex_lock(log_lock); + if (rli->ign_master_log_name_end[0]) + { + DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); + Rotate_log_event *ev= new Rotate_log_event(thd, rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + /* can unlock before writing as slave SQL thd will soon see our Rotate */ + pthread_mutex_unlock(log_lock); + if (likely((bool)ev)) + { + ev->server_id= 0; // don't be ignored by slave SQL thread + if (unlikely(rli->relay_log.append(ev))) + sql_print_error("Slave I/O thread failed to write a Rotate event" + " to the relay log, " + "SHOW SLAVE STATUS may be inaccurate"); + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + flush_master_info(mi, 1); + delete ev; + } + else + sql_print_error("Slave I/O thread failed to create a Rotate event" + " (out of memory?), " + "SHOW SLAVE STATUS may be inaccurate"); + } + else + pthread_mutex_unlock(log_lock); +} + + void init_master_info_with_options(MASTER_INFO* mi) { mi->master_log_name[0] = 0; @@ -2334,7 +2383,7 @@ st_relay_log_info::st_relay_log_info() { group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; - last_slave_error[0]=0; until_log_name[0]= 0; + last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0; bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &cache_buf, sizeof(cache_buf)); @@ -2886,11 +2935,20 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) */ pthread_mutex_lock(&rli->data_lock); + /* + This tests if the position of the end of the last previous executed event + hits the UNTIL barrier. + We would prefer to test if the position of the start (or possibly) end of + the to-be-read event hits the UNTIL barrier, this is different if there + was an event ignored by the I/O thread just before (BUG#13861 to be + fixed). + */ if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && rli->is_until_satisfied()) { + char buf[22]; sql_print_error("Slave SQL thread stopped because it reached its" - " UNTIL position %ld", (long) rli->until_pos()); + " UNTIL position %s", llstr(rli->until_pos(), buf)); /* Setting abort_slave flag because we do not want additional message about error in query execution to be printed. @@ -2925,7 +2983,6 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) || (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { - /* TODO: I/O thread should not even log events with the same server id */ rli->inc_group_relay_log_pos(ev->get_event_len(), type_code != STOP_EVENT ? ev->log_pos : LL(0), 1/* skip lock*/); @@ -3033,7 +3090,8 @@ extern "C" pthread_handler_decl(handle_slave_io,arg) { THD *thd; // needs to be first for thread_stack MYSQL *mysql; - MASTER_INFO *mi = (MASTER_INFO*)arg; + MASTER_INFO *mi = (MASTER_INFO*)arg; + RELAY_LOG_INFO *rli= &mi->rli; char llbuff[22]; uint retry_count; @@ -3276,16 +3334,16 @@ reconnect done to recover from failed read"); char llbuf1[22], llbuf2[22]; DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \ ignore_log_space_limit=%d", - llstr(mi->rli.log_space_limit,llbuf1), - llstr(mi->rli.log_space_total,llbuf2), - (int) mi->rli.ignore_log_space_limit)); + llstr(rli->log_space_limit,llbuf1), + llstr(rli->log_space_total,llbuf2), + (int) rli->ignore_log_space_limit)); } #endif - if (mi->rli.log_space_limit && mi->rli.log_space_limit < - mi->rli.log_space_total && - !mi->rli.ignore_log_space_limit) - if (wait_for_relay_log_space(&mi->rli)) + if (rli->log_space_limit && rli->log_space_limit < + rli->log_space_total && + !rli->ignore_log_space_limit) + if (wait_for_relay_log_space(rli)) { sql_print_error("Slave I/O thread aborted while waiting for relay \ log space"); @@ -3316,6 +3374,7 @@ err: mysql_close(mysql); mi->mysql=0; } + write_ignored_events_info_to_relay_log(thd, mi); thd->proc_info = "Waiting for slave mutex on exit"; pthread_mutex_lock(&mi->run_lock); mi->slave_running = 0; @@ -3668,6 +3727,7 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev) if (unlikely(!rev->is_valid())) DBUG_RETURN(1); + /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */ memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1); mi->master_log_pos= rev->pos; DBUG_PRINT("info", ("master_log_pos: '%s' %d", @@ -3813,6 +3873,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) int error= 0; ulong inc_pos; RELAY_LOG_INFO *rli= &mi->rli; + pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); DBUG_ENTER("queue_event"); if (mi->old_format) @@ -3820,11 +3881,6 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) pthread_mutex_lock(&mi->data_lock); - /* - TODO: figure out if other events in addition to Rotate - require special processing. - Guilhem 2003-06 : I don't think so. - */ switch (buf[EVENT_TYPE_OFFSET]) { case STOP_EVENT: /* @@ -3873,16 +3929,27 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) direct master (an unsupported, useless setup!). */ + pthread_mutex_lock(log_lock); + if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) && !replicate_same_server_id) { /* Do not write it to the relay log. - We still want to increment, so that we won't re-read this event from the - master if the slave IO thread is now stopped/restarted (more efficient if - the events we are ignoring are big LOAD DATA INFILE). + a) We still want to increment mi->master_log_pos, so that we won't + re-read this event from the master if the slave IO thread is now + stopped/restarted (more efficient if the events we are ignoring are big + LOAD DATA INFILE). + b) We want to record that we are skipping events, for the information of + the slave SQL thread, otherwise that thread may let + rli->group_relay_log_pos stay too small if the last binlog's event is + ignored. */ mi->master_log_pos+= inc_pos; + memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); + DBUG_ASSERT(rli->ign_master_log_name_end[0]); + rli->ign_master_log_pos_end= mi->master_log_pos; + rli->relay_log.signal_update(); // the slave SQL thread needs to re-check DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos)); } else @@ -3894,7 +3961,9 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); rli->relay_log.harvest_bytes_written(&rli->log_space_total); } + rli->ign_master_log_name_end[0]= 0; // last event is not ignored } + pthread_mutex_unlock(log_lock); err: pthread_mutex_unlock(&mi->data_lock); @@ -4262,6 +4331,27 @@ Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", rli->last_master_timestamp= 0; DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count); + + if (rli->ign_master_log_name_end[0]) + { + /* We generate and return a Rotate, to make our positions advance */ + DBUG_PRINT("info",("seeing an ignored end segment")); + ev= new Rotate_log_event(thd, rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME | + Rotate_log_event::ZERO_LEN); + rli->ign_master_log_name_end[0]= 0; + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Rotate event " + "(out of memory?), SHOW SLAVE STATUS may be inaccurate"; + goto err; + } + pthread_mutex_unlock(log_lock); + ev->server_id= 0; // don't be ignored by slave SQL thread + DBUG_RETURN(ev); + } + /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block diff --git a/sql/slave.h b/sql/slave.h index 5a85e26d9ad..f780b7c8473 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -304,6 +304,17 @@ typedef struct st_relay_log_info */ ulong trans_retries, retried_trans; + /* + If the end of the hot relay log is made of master's events ignored by the + slave I/O thread, these two keep track of the coords (in the master's + binlog) of the last of these events seen by the slave I/O thread. If not, + ign_master_log_name_end[0] == 0. + As they are like a Rotate event read/written from/to the relay log, they + are both protected by rli->relay_log.LOCK_log. + */ + char ign_master_log_name_end[FN_REFLEN]; + ulonglong ign_master_log_pos_end; + st_relay_log_info(); ~st_relay_log_info(); |