diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-04-01 21:46:23 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-04-01 21:46:23 -0700 |
commit | d7cfbff5b0a53ba22bb731dec70c4886d2ff9570 (patch) | |
tree | 0156e4a56e551202f8c7a8237544114db4a8d1bc /sql | |
parent | 869e671f98b1a056595e259283bf4e5e40256a20 (diff) | |
download | mariadb-git-d7cfbff5b0a53ba22bb731dec70c4886d2ff9570.tar.gz |
relay_log_space_limit
DBUG_ macro cleanup
buffer boundary cleanup
This changeset, although not fully tested, works for me better than
anything I've had so far, including what is in the repository. I will
push it unless something crashes while I am writing this :-)
mysql-test/r/rpl000014.result:
updated result
mysql-test/r/rpl000015.result:
updated result
mysql-test/r/rpl000016.result:
updated result
mysql-test/r/rpl_log.result:
new result
mysys/mf_iocache.c:
DBUG_ cleanup
mysys/mf_iocache2.c:
DBUG_ fix
sql/log.cc:
added relay_log_space_limit
sql/mysqld.cc:
relay_log_space_limit
sql/slave.cc:
relay_log_space_limit
sql/slave.h:
relay_log_space_limit
sql/sql_class.h:
relay_log_space_limit
sql/sql_repl.cc:
fixed buffer overrun bug
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 42 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/slave.cc | 111 | ||||
-rw-r--r-- | sql/slave.h | 8 | ||||
-rw-r--r-- | sql/sql_class.h | 17 | ||||
-rw-r--r-- | sql/sql_repl.cc | 2 |
6 files changed, 159 insertions, 23 deletions
diff --git a/sql/log.cc b/sql/log.cc index dc6b1d35cef..c393d2eb413 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -83,7 +83,7 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), name(0), log_type(LOG_CLOSED),write_error(0), inited(0), file_id(1),no_rotate(0), - need_start_event(1) + need_start_event(1),bytes_written(0) { /* We don't want to intialize LOCK_Log here as the thread system may @@ -99,6 +99,7 @@ MYSQL_LOG::~MYSQL_LOG() { (void) pthread_mutex_destroy(&LOCK_log); (void) pthread_mutex_destroy(&LOCK_index); + (void) pthread_cond_destroy(&update_cond); } } @@ -233,18 +234,14 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, } else if (log_type == LOG_BIN) { - bool error; - /* - Explanation of the boolean black magic: - if we are supposed to write magic number try write - clean -up if failed - then if index_file has not been previously opened, try to open it - clean up if failed - */ - if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) || + bool error; + if (do_magic) + { + if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) || open_index(O_APPEND | O_RDWR | O_CREAT)) - goto err; + goto err; + bytes_written += 4; + } if (need_start_event && !no_auto_events) { @@ -462,12 +459,30 @@ err: my_delete(fname, MYF(0)); // do not report error if the file is not there else { + MY_STAT s; my_close(index_file, MYF(MY_WME)); + if (!my_stat(rli->relay_log_name,&s,MYF(0))) + { + sql_print_error("The first log %s failed to stat during purge", + rli->relay_log_name); + error=1; + goto err; + } if (my_rename(fname,index_file_name,MYF(MY_WME)) || (index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND, MYF(MY_WME)))<0 || my_delete(rli->relay_log_name, MYF(MY_WME))) error=1; + + pthread_mutex_lock(&rli->log_space_lock); + rli->log_space_total -= s.st_size; + fprintf(stderr,"purge_first_log: %ld\n", rli->log_space_total); + pthread_mutex_unlock(&rli->log_space_lock); + // ok to broadcast after the critical region as there is no risk of + // the mutex being destroyed by this thread later - this helps save + // context switches + pthread_cond_broadcast(&rli->log_space_cond); + if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/))) { char buff[22]; @@ -695,6 +710,7 @@ void MYSQL_LOG::new_file(bool inside_mutex) if (thd && thd->slave_thread) r.flags |= LOG_EVENT_FORCED_ROTATE_F; r.write(&log_file); + bytes_written += r.get_event_len(); } // update needs to be signaled even if there is no rotate event // log rotation should give the waiting thread a signal to @@ -728,6 +744,7 @@ bool MYSQL_LOG::append(Log_event* ev) error=1; goto err; } + bytes_written += ev->get_event_len(); if ((uint)my_b_append_tell(&log_file) > max_binlog_size) { new_file(1); @@ -754,6 +771,7 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...) error = 1; break; } + bytes_written += len; } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); if ((uint) my_b_append_tell(&log_file) > max_binlog_size) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 3a441bb61dc..1c4f7aa24c5 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -3042,6 +3042,8 @@ CHANGEABLE_VAR changeable_vars[] = { 128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE }, { "record_rnd_buffer", (long*) &record_rnd_cache_size, 0, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE }, + { "relay_log_space_limit", (long*) &relay_log_space_limit, 0L, 0L,ULONG_MAX, + 0, 1}, { "slave_net_timeout", (long*) &slave_net_timeout, SLAVE_NET_TIMEOUT, 1, LONG_TIMEOUT, 0, 1 }, { "slow_launch_time", (long*) &slow_launch_time, diff --git a/sql/slave.cc b/sql/slave.cc index 919cf362238..25b29732000 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -42,6 +42,8 @@ bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; static TABLE* save_temporary_tables = 0; +ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong + values and make it ulonglong */ // when slave thread exits, we need to remember the temporary tables so we // can re-use them on slave start @@ -60,8 +62,10 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev); static int queue_old_event(MASTER_INFO* mi, const char* buf, uint event_len); +static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli); static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi); static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli); +static int count_relay_log_space(RELAY_LOG_INFO* rli); static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); @@ -264,8 +268,9 @@ void init_slave_skip_errors(char* arg) // are not running int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) { + DBUG_ENTER("purge_relay_logs"); if (!rli->inited) - return 0; /* successfully do nothing */ + DBUG_RETURN(0); /* successfully do nothing */ DBUG_ASSERT(rli->slave_running == 0); DBUG_ASSERT(rli->mi->slave_running == 0); int error=0; @@ -282,14 +287,20 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) goto err; } strnmov(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)); + sizeof(rli->relay_log_name)-1); + rli->log_space_total=4; //just first log with magic number and nothing else rli->relay_log_pos=4; + rli->relay_log.reset_bytes_written(); rli->log_pos_current=0; if (!just_reset) error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); -err: +err: +#ifndef DBUG_OFF + char buf[22]; +#endif + DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf))); pthread_mutex_unlock(&rli->data_lock); - return error; + DBUG_RETURN(error); } int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) @@ -953,8 +964,9 @@ void end_master_info(MASTER_INFO* mi) int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) { + DBUG_ENTER("init_relay_log_info"); if (rli->inited) - return 0; + DBUG_RETURN(0); MY_STAT stat_area; char fname[FN_REFLEN+128]; int info_fd; @@ -970,6 +982,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->log_pos_current=0; rli->abort_pos_wait=0; rli->skip_log_purge=0; + rli->log_space_limit = relay_log_space_limit; + rli->log_space_total = 0; // TODO: make this work with multi-master if (!opt_relay_logname) { @@ -1001,7 +1015,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) my_close(info_fd, MYF(0)); rli->info_fd=-1; pthread_mutex_unlock(&rli->data_lock); - return 1; + DBUG_RETURN(1); } if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) goto err; @@ -1021,7 +1035,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) my_close(info_fd, MYF(0)); rli->info_fd=-1; pthread_mutex_unlock(&rli->data_lock); - return 1; + DBUG_RETURN(1); } rli->info_fd = info_fd; @@ -1052,8 +1066,13 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) // before flush_relay_log_info reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); error=test(flush_relay_log_info(rli)); + if (count_relay_log_space(rli)) + { + msg="Error counting relay log space"; + goto err; + } pthread_mutex_unlock(&rli->data_lock); - return error; + DBUG_RETURN(error); err: sql_print_error(msg); @@ -1061,9 +1080,66 @@ err: my_close(info_fd, MYF(0)); rli->info_fd=-1; pthread_mutex_unlock(&rli->data_lock); - return 1; + DBUG_RETURN(1); } +static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo) +{ + MY_STAT s; + DBUG_ENTER("add_relay_log"); + if (!my_stat(linfo->log_file_name,&s,MYF(0))) + { + sql_print_error("log %s listed in the index, but failed to stat", + linfo->log_file_name); + DBUG_RETURN(1); + } + rli->log_space_total += s.st_size; +#ifndef DBUG_OFF + char buf[22]; +#endif + DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf))); + DBUG_RETURN(0); +} + +static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) +{ + bool slave_killed; + MASTER_INFO* mi = rli->mi; + const char* save_proc_info; + THD* thd = mi->io_thd; + DBUG_ENTER("wait_for_relay_log_space"); + pthread_mutex_lock(&rli->log_space_lock); + save_proc_info = thd->proc_info; + thd->proc_info = "Waiting for relay log space to free"; + while (rli->log_space_limit < rli->log_space_total && + !(slave_killed=io_slave_killed(thd,mi))) + { + pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock); + } + thd->proc_info = save_proc_info; + pthread_mutex_unlock(&rli->log_space_lock); + DBUG_RETURN(slave_killed); +} + +static int count_relay_log_space(RELAY_LOG_INFO* rli) +{ + LOG_INFO linfo; + DBUG_ENTER("count_relay_log_space"); + rli->log_space_total = 0; + if (rli->relay_log.find_first_log(&linfo,"")) + { + sql_print_error("Could not find first log while counting relay log space"); + DBUG_RETURN(1); + } + if (add_relay_log(rli,&linfo)) + DBUG_RETURN(1); + while (!rli->relay_log.find_next_log(&linfo)) + { + if (add_relay_log(rli,&linfo)) + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} int init_master_info(MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname) @@ -1242,6 +1318,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Skip_counter", 12)); field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); + field_list.push_back(new Item_empty_string("Relay_log_space", 12)); if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); @@ -1268,6 +1345,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) net_store_data(packet, mi->rli.last_slave_error); net_store_data(packet, mi->rli.slave_skip_counter); net_store_data(packet, (longlong) mi->rli.master_log_pos); + net_store_data(packet, (longlong) mi->rli.log_space_total); pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->data_lock); @@ -1783,6 +1861,14 @@ from master"); goto err; } flush_master_info(mi); + if (mi->rli.log_space_limit && mi->rli.log_space_limit < + mi->rli.log_space_total) + if (wait_for_relay_log_space(&mi->rli)) + { + sql_print_error("Slave I/O thread aborted while waiting for relay \ +log space"); + goto err; + } // TODO: check debugging abort code #ifndef DBUG_OFF if (abort_slave_event_count && !--events_till_abort) @@ -1986,7 +2072,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) goto err; } - /* this dummy block is so we could insantiate Append_block_log_event + /* this dummy block is so we could instantiate Append_block_log_event once and then modify it slightly instead of doing it multiple times in the loop */ @@ -2012,6 +2098,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) relay log"); goto err; } + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); break; } if (unlikely(cev_not_written)) @@ -2026,6 +2113,7 @@ relay log"); goto err; } cev_not_written=0; + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); } else { @@ -2038,6 +2126,7 @@ relay log"); relay log"); goto err; } + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ; } } } @@ -2145,6 +2234,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_ASSERT(!tmp_buf); return 1; } + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); } delete ev; if (likely(inc_pos)) @@ -2198,6 +2288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) { if (likely(inc_pos)) mi->master_log_pos += event_len; + mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); } if (unlikely(processed_stop_event)) mi->ignore_stop_event=1; diff --git a/sql/slave.h b/sql/slave.h index 7ae5da1a340..354fc46e99d 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -31,6 +31,7 @@ extern char* slave_load_tmpdir; extern my_string master_info_file,relay_log_info_file; extern my_string opt_relay_logname, opt_relaylog_index_name; extern bool opt_skip_slave_start; +extern ulong relay_log_space_limit; struct st_master_info; /* @@ -153,6 +154,9 @@ typedef struct st_relay_log_info bool log_pos_current; bool abort_pos_wait; bool skip_log_purge; + ulonglong log_space_limit,log_space_total; + pthread_mutex_t log_space_lock; + pthread_cond_t log_space_cond; st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0), cur_log_init_count(0), @@ -163,17 +167,21 @@ typedef struct st_relay_log_info bzero(&info_file,sizeof(info_file)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST); pthread_cond_init(&data_cond, NULL); pthread_cond_init(&start_cond, NULL); pthread_cond_init(&stop_cond, NULL); + pthread_cond_init(&log_space_cond, NULL); } ~st_relay_log_info() { pthread_mutex_destroy(&run_lock); pthread_mutex_destroy(&data_lock); + pthread_mutex_destroy(&log_space_lock); pthread_cond_destroy(&data_cond); pthread_cond_destroy(&start_cond); pthread_cond_destroy(&stop_cond); + pthread_cond_destroy(&log_space_cond); } inline void inc_pending(ulonglong val) { diff --git a/sql/sql_class.h b/sql/sql_class.h index 289a2ab9255..d8824e80686 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -78,12 +78,29 @@ class MYSQL_LOG { bool need_start_event; pthread_cond_t update_cond; bool no_auto_events; // for relay binlog + ulonglong bytes_written; friend class Log_event; public: MYSQL_LOG(); ~MYSQL_LOG(); pthread_mutex_t* get_log_lock() { return &LOCK_log; } + void reset_bytes_written() + { + bytes_written = 0; + } + void harvest_bytes_written(ulonglong* counter) + { +#ifndef DBUG_OFF + char buf1[22],buf2[22]; +#endif + DBUG_ENTER("harvest_bytes_written"); + (*counter)+=bytes_written; + DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1), + llstr(bytes_written,buf2))); + bytes_written=0; + DBUG_VOID_RETURN; + } IO_CACHE* get_log_file() { return &log_file; } void signal_update() { pthread_cond_broadcast(&update_cond);} void wait_for_update(THD* thd); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 8216dec815a..398ff443ad4 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -751,7 +751,7 @@ int change_master(THD* thd, MASTER_INFO* mi) need_relay_log_purge = 0; mi->rli.skip_log_purge=1; strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name, - sizeof(mi->rli.relay_log_name)); + sizeof(mi->rli.relay_log_name)-1); } if (lex_mi->relay_log_pos) |