diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
commit | 5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch) | |
tree | 87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/log.cc | |
parent | 0831ce1c616296196eff82065da469156b4def82 (diff) | |
download | mariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz |
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work.
The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to
prepare for multi-master implementation.
I also documented IO_CACHE quite extensively and to some extend, THD class.
Makefile.am:
moved tags target script into a separate file
include/my_sys.h:
fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
updated replication locks, but now I see I did it wrong and it won't compile. Will fix
before the push.
mysql-test/r/rpl000014.result:
test result update
mysql-test/r/rpl000015.result:
test result update
mysql-test/r/rpl000016.result:
test result update
mysql-test/r/rpl_log.result:
test result update
mysql-test/t/rpl000016-slave.sh:
remove relay logs
mysql-test/t/rpl000017-slave.sh:
remove relay logs
mysql-test/t/rpl_log.test:
updated test
mysys/mf_iocache.c:
IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
IO_CACHE update to make replication work
mysys/thr_mutex.c:
cosmetic change
sql/item_func.cc:
new replication code
sql/lex.h:
new replication
sql/log.cc:
new replication
sql/log_event.cc:
new replication
sql/log_event.h:
new replication
sql/mini_client.cc:
new replication
sql/mini_client.h:
new replication
sql/mysql_priv.h:
new replication
sql/mysqld.cc:
new replication
sql/repl_failsafe.cc:
new replication
sql/slave.cc:
new replication
sql/slave.h:
new replication
sql/sql_class.cc:
new replication
sql/sql_class.h:
new replication
sql/sql_lex.h:
new replication
sql/sql_parse.cc:
new replication
sql/sql_repl.cc:
new replication
sql/sql_repl.h:
new replication
sql/sql_show.cc:
new replication
sql/sql_yacc.yy:
new replication
sql/stacktrace.c:
more robust stack tracing
sql/structs.h:
new replication code
BitKeeper/etc/ignore:
Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 296 |
1 files changed, 233 insertions, 63 deletions
diff --git a/sql/log.cc b/sql/log.cc index fa45d938b24..ec7396bda3c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -29,6 +29,7 @@ #include <my_dir.h> #include <stdarg.h> #include <m_ctype.h> // For test_if_number +#include <assert.h> MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log; extern I_List<i_string> binlog_do_db, binlog_ignore_db; @@ -81,7 +82,7 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), name(0), log_type(LOG_CLOSED),write_error(0), - inited(0), log_seq(1), file_id(1),no_rotate(0), + inited(0), file_id(1),no_rotate(0), need_start_event(1) { /* @@ -138,15 +139,18 @@ bool MYSQL_LOG::open_index( int options) } void MYSQL_LOG::init(enum_log_type log_type_arg, - enum cache_type io_cache_type_arg) + enum cache_type io_cache_type_arg, + bool no_auto_events_arg) { log_type = log_type_arg; io_cache_type = io_cache_type_arg; + no_auto_events = no_auto_events_arg; if (!inited) { inited=1; (void) pthread_mutex_init(&LOCK_log,MY_MUTEX_INIT_SLOW); (void) pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW); + (void) pthread_cond_init(&update_cond, 0); } } @@ -160,16 +164,17 @@ void MYSQL_LOG::close_index() } void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, - const char *new_name) + const char *new_name, enum cache_type io_cache_type_arg, + bool no_auto_events_arg) { MY_STAT tmp_stat; char buff[512]; File file= -1; bool do_magic; - + int open_flags = O_CREAT | O_APPEND | O_BINARY; if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name)) no_rotate = 1; - init(log_type_arg); + init(log_type_arg,io_cache_type_arg,no_auto_events_arg); if (!(name=my_strdup(log_name,MYF(MY_WME)))) goto err; @@ -177,7 +182,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, strmov(log_file_name,new_name); else if (generate_new_name(log_file_name, name)) goto err; - + + if (io_cache_type == SEQ_READ_APPEND) + open_flags |= O_RDWR; + else + open_flags |= O_WRONLY; + if (log_type == LOG_BIN && !index_file_name[0]) fn_format(index_file_name, name, mysql_data_home, ".index", 6); @@ -185,7 +195,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name, &tmp_stat, MYF(0))); - if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY, + if ((file=my_open(log_file_name,open_flags, MYF(MY_WME | ME_WAITTANG))) < 0 || init_io_cache(&log_file, file, IO_SIZE, io_cache_type, my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP))) @@ -235,11 +245,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, open_index(O_APPEND | O_RDWR | O_CREAT)) goto err; - log_seq = 1; - if (need_start_event) + if (need_start_event && !no_auto_events) { Start_log_event s; - s.set_log_seq(0, this); + s.set_log_pos(this); s.write(&log_file); need_start_event=0; } @@ -264,9 +273,7 @@ err: end_io_cache(&log_file); x_free(name); name=0; log_type=LOG_CLOSED; - return; - } int MYSQL_LOG::get_current_log(LOG_INFO* linfo) @@ -279,7 +286,8 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo) } // if log_name is "" we stop at the first entry -int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) +int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, + bool need_mutex) { if (index_file < 0) return LOG_INFO_INVALID; @@ -290,7 +298,8 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) // mutex needed because we need to make sure the file pointer does not move // from under our feet - pthread_mutex_lock(&LOCK_index); + if (need_mutex) + pthread_mutex_lock(&LOCK_index); if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0, 0, MYF(MY_WME))) { @@ -319,14 +328,15 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) error = 0; err: - pthread_mutex_unlock(&LOCK_index); + if (need_mutex) + pthread_mutex_unlock(&LOCK_index); end_io_cache(&io_cache); return error; } -int MYSQL_LOG::find_next_log(LOG_INFO* linfo) +int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) { // mutex needed because we need to make sure the file pointer does not move // from under our feet @@ -335,8 +345,8 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo) char* fname = linfo->log_file_name; IO_CACHE io_cache; uint length; - - pthread_mutex_lock(&LOCK_index); + if (need_lock) + pthread_mutex_lock(&LOCK_index); if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) linfo->index_file_offset, 0, MYF(MY_WME))) @@ -354,12 +364,126 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo) error = 0; err: - pthread_mutex_unlock(&LOCK_index); + if (need_lock) + pthread_mutex_unlock(&LOCK_index); end_io_cache(&io_cache); return error; } - +int MYSQL_LOG::reset_logs(THD* thd) +{ + LOG_INFO linfo; + int error=0; + const char* save_name; + enum_log_type save_log_type; + pthread_mutex_lock(&LOCK_log); + if (find_first_log(&linfo,"")) + { + error=1; + goto err; + } + + for(;;) + { + my_delete(linfo.log_file_name, MYF(MY_WME)); + if (find_next_log(&linfo)) + break; + } + save_name=name; + name=0; + save_log_type=log_type; + close(1); + my_delete(index_file_name, MYF(MY_WME)); + if (thd && !thd->slave_thread) + need_start_event=1; + open(save_name,save_log_type,0,io_cache_type,no_auto_events); + my_free((gptr)save_name,MYF(0)); +err: + pthread_mutex_unlock(&LOCK_log); + return error; +} + +int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli) +{ + // pre-conditions + DBUG_ASSERT(is_open()); + DBUG_ASSERT(index_file >= 0); + DBUG_ASSERT(rli->slave_running == 1); + DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name)); + // assume that we have previously read the first log and + // stored it in rli->relay_log_name + DBUG_ASSERT(rli->linfo.index_file_offset == + strlen(rli->relay_log_name) + 1); + + int tmp_fd; + + + char* fname, *io_buf; + int error = 0; + if (!(fname = (char*)my_malloc(IO_SIZE+FN_REFLEN, MYF(MY_WME)))) + return 1; + pthread_mutex_lock(&LOCK_index); + my_seek(index_file,rli->linfo.index_file_offset, + MY_SEEK_SET, MYF(MY_WME)); + io_buf = fname + FN_REFLEN; + strxmov(fname,rli->relay_log_name,".tmp",NullS); + + if ((tmp_fd = my_open(fname,O_CREAT|O_BINARY|O_RDWR, MYF(MY_WME))) < 0) + { + error = 1; + goto err; + } + for (;;) + { + int bytes_read; + bytes_read = my_read(index_file, io_buf, IO_SIZE, MYF(0)); + if (bytes_read < 0) // error + { + error=1; + goto err; + } + if (!bytes_read) + break; // end of file + // otherwise, we've read something and need to write it out + if (my_write(tmp_fd, io_buf, bytes_read, MYF(MY_WME|MY_NABP))) + { + error=1; + goto err; + } + } +err: + if (tmp_fd) + my_close(tmp_fd, MYF(MY_WME)); + if (error) + my_delete(fname, MYF(0)); // do not report error if the file is not there + else + { + my_close(index_file, MYF(MY_WME)); + if (my_rename(fname,index_file_name,MYF(MY_WME)) || + (index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND, + MYF(MY_WME)))<0 || + my_delete(rli->relay_log_name, MYF(MY_WME))) + error=1; + if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/))) + { + char buff[22]; + sql_print_error("next log error=%d,offset=%s,log=%s",error, + llstr(rli->linfo.index_file_offset,buff), + rli->linfo.log_file_name); + goto err2; + } + rli->relay_log_pos = 4; + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + } + // no need to free io_buf because we allocated both fname and io_buf in + // one malloc() +err2: + pthread_mutex_unlock(&LOCK_index); + my_free(fname, MYF(MY_WME)); + return error; +} + int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) { if (index_file < 0) return LOG_INFO_INVALID; @@ -395,9 +519,8 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) goto err; } logs_to_keep_inited = 1; - - for(;;) + for (;;) { my_off_t init_purge_offset= my_b_tell(&io_cache); if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN))) @@ -409,14 +532,14 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) } fname[--fname_len]=0; // kill \n - if(!memcmp(fname, to_log, fname_len + 1 )) + if (!memcmp(fname, to_log, fname_len + 1 )) { found_log = 1; purge_offset = init_purge_offset; } // if one of the logs before the target is in use - if(!found_log && log_in_use(fname)) + if (!found_log && log_in_use(fname)) { error = LOG_INFO_IN_USE; goto err; @@ -432,13 +555,13 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) } end_io_cache(&io_cache); - if(!found_log) + if (!found_log) { error = LOG_INFO_EOF; goto err; } - for(i = 0; i < logs_to_purge.elements; i++) + for (i = 0; i < logs_to_purge.elements; i++) { char* l; get_dynamic(&logs_to_purge, (gptr)&l, i); @@ -461,9 +584,9 @@ during log purge for write"); #else my_close(index_file, MYF(MY_WME)); my_delete(index_file_name, MYF(MY_WME)); - if(!(index_file = my_open(index_file_name, + if ((index_file = my_open(index_file_name, O_CREAT | O_BINARY | O_RDWR | O_APPEND, - MYF(MY_WME)))) + MYF(MY_WME)))<0) { sql_print_error("Could not re-open the binlog index file \ during log purge for write"); @@ -472,7 +595,7 @@ during log purge for write"); } #endif - for(i = 0; i < logs_to_keep.elements; i++) + for (i = 0; i < logs_to_keep.elements; i++) { char* l; get_dynamic(&logs_to_keep, (gptr)&l, i); @@ -490,15 +613,14 @@ during log purge for write"); err: pthread_mutex_unlock(&LOCK_index); - if(logs_to_purge_inited) + if (logs_to_purge_inited) delete_dynamic(&logs_to_purge); - if(logs_to_keep_inited) + if (logs_to_keep_inited) delete_dynamic(&logs_to_keep); end_io_cache(&io_cache); return error; } - // we assume that buf has at least FN_REFLEN bytes alloced void MYSQL_LOG::make_log_name(char* buf, const char* log_ident) { @@ -543,30 +665,36 @@ void MYSQL_LOG::new_file(bool inside_mutex) } if (log_type == LOG_BIN) { - /* - We log the whole file name for log file as the user may decide - to change base names at some point. - */ - THD* thd = current_thd; - Rotate_log_event r(thd,new_name+dirname_length(new_name)); - r.set_log_seq(0, this); - - /* - This log rotation could have been initiated by a master of - the slave running with log-bin we set the flag on rotate - event to prevent inifinite log rotation loop - */ - if (thd && slave_thd && thd == slave_thd) - r.flags |= LOG_EVENT_FORCED_ROTATE_F; - r.write(&log_file); - VOID(pthread_cond_broadcast(&COND_binlog_update)); + if (!no_auto_events) + { + /* + We log the whole file name for log file as the user may decide + to change base names at some point. + */ + THD* thd = current_thd; + Rotate_log_event r(thd,new_name+dirname_length(new_name)); + r.set_log_pos(this); + + /* + This log rotation could have been initiated by a master of + the slave running with log-bin we set the flag on rotate + event to prevent inifinite log rotation loop + */ + if (thd && thd->slave_thread) + r.flags |= LOG_EVENT_FORCED_ROTATE_F; + r.write(&log_file); + } + // update needs to be signaled even if there is no rotate event + // log rotation should give the waiting thread a signal to + // discover EOF and move on to the next log + signal_update(); } else strmov(new_name, old_name); // Reopen old file name } name=0; close(); - open(old_name, log_type, new_name); + open(old_name, log_type, new_name, io_cache_type, no_auto_events); my_free(old_name,MYF(0)); last_time=query_start=0; write_error=0; @@ -575,6 +703,31 @@ void MYSQL_LOG::new_file(bool inside_mutex) } } +bool MYSQL_LOG::appendv(const char* buf, uint len,...) +{ + bool error = 0; + va_list(args); + va_start(args,len); + + pthread_mutex_lock(&LOCK_log); + do + { + if (my_b_append(&log_file,buf,len)) + { + error = 1; + break; + } + if ((uint)my_b_append_tell(&log_file) > max_binlog_size) + { + new_file(1); + } + } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); + + if (!error) + signal_update(); + pthread_mutex_unlock(&LOCK_log); + return error; +} bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, const char *format,...) @@ -684,11 +837,12 @@ bool MYSQL_LOG::write(Log_event* event_info) return 0; } error=1; - + // no check for auto events flag here - this write method should + // never be called if auto-events are enabled if (thd && thd->last_insert_id_used) { Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) @@ -697,7 +851,7 @@ bool MYSQL_LOG::write(Log_event* event_info) if (thd && thd->insert_id_used) { Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) @@ -712,12 +866,12 @@ bool MYSQL_LOG::write(Log_event* event_info) // just in case somebody wants it later thd->query_length = (uint)(p - buf); Query_log_event e(thd, buf); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (e.write(file)) goto err; thd->query_length = save_query_length; // clean up } - event_info->set_log_seq(thd, this); + event_info->set_log_pos(this); if (event_info->write(file) || file == &log_file && flush_io_cache(file)) goto err; @@ -734,7 +888,7 @@ err: write_error=1; } if (file == &log_file) - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); } if (should_rotate) new_file(1); // inside mutex @@ -765,7 +919,7 @@ bool MYSQL_LOG::write(IO_CACHE *cache) if (is_open()) { uint length; - + //QQ: this looks like a bug - why READ_CACHE? if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) { sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno); @@ -800,7 +954,7 @@ err: if (error) write_error=1; else - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); VOID(pthread_mutex_unlock(&LOCK_log)); @@ -930,21 +1084,37 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, return error; } +void MYSQL_LOG:: wait_for_update(THD* thd) +{ + const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, + "Slave: waiting for binlog update"); + pthread_cond_wait(&update_cond, &LOCK_log); + // this is not a bug - we unlock the mutex for the caller, and expect him + // to lock it and then not unlock it upon return. This is a rather odd + // way of doing things, but this is the cleanest way I could think of to + // solve the race deadlock caused by THD::awake() first acquiring mysys_var + // mutex and then the current mutex, while wait_for_update being called with + // the current mutex already aquired and THD::exit_cond() trying to acquire + // mysys_var mutex. We do need the mutex to be acquired prior to the + // invocation of wait_for_update in all cases, so mutex acquisition inside + // wait_for_update() is not an option + pthread_mutex_unlock(&LOCK_log); + thd->exit_cond(old_msg); +} void MYSQL_LOG::close(bool exiting) { // One can't set log_type here! if (is_open()) { - File file=log_file.file; - if (log_type == LOG_BIN) + if (log_type == LOG_BIN && !no_auto_events) { Stop_log_event s; - s.set_log_seq(0, this); + s.set_log_pos(this); s.write(&log_file); - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); } end_io_cache(&log_file); - if (my_close(file,MYF(0)) < 0 && ! write_error) + if (my_close(log_file.file,MYF(0)) < 0 && ! write_error) { write_error=1; sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno); |