summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-04-01 21:46:23 -0700
committerunknown <sasha@mysql.sashanet.com>2002-04-01 21:46:23 -0700
commitd7cfbff5b0a53ba22bb731dec70c4886d2ff9570 (patch)
tree0156e4a56e551202f8c7a8237544114db4a8d1bc /sql
parent869e671f98b1a056595e259283bf4e5e40256a20 (diff)
downloadmariadb-git-d7cfbff5b0a53ba22bb731dec70c4886d2ff9570.tar.gz
relay_log_space_limit
DBUG_ macro cleanup buffer boundary cleanup This changeset, although not fully tested, works for me better than anything I've had so far, including what is in the repository. I will push it unless something crashes while I am writing this :-) mysql-test/r/rpl000014.result: updated result mysql-test/r/rpl000015.result: updated result mysql-test/r/rpl000016.result: updated result mysql-test/r/rpl_log.result: new result mysys/mf_iocache.c: DBUG_ cleanup mysys/mf_iocache2.c: DBUG_ fix sql/log.cc: added relay_log_space_limit sql/mysqld.cc: relay_log_space_limit sql/slave.cc: relay_log_space_limit sql/slave.h: relay_log_space_limit sql/sql_class.h: relay_log_space_limit sql/sql_repl.cc: fixed buffer overrun bug
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc42
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/slave.cc111
-rw-r--r--sql/slave.h8
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc2
6 files changed, 159 insertions, 23 deletions
diff --git a/sql/log.cc b/sql/log.cc
index dc6b1d35cef..c393d2eb413 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -83,7 +83,7 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0),
inited(0), file_id(1),no_rotate(0),
- need_start_event(1)
+ need_start_event(1),bytes_written(0)
{
/*
We don't want to intialize LOCK_Log here as the thread system may
@@ -99,6 +99,7 @@ MYSQL_LOG::~MYSQL_LOG()
{
(void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index);
+ (void) pthread_cond_destroy(&update_cond);
}
}
@@ -233,18 +234,14 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
else if (log_type == LOG_BIN)
{
- bool error;
- /*
- Explanation of the boolean black magic:
- if we are supposed to write magic number try write
- clean
-up if failed
- then if index_file has not been previously opened, try to open it
- clean up if failed
- */
- if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) ||
+ bool error;
+ if (do_magic)
+ {
+ if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) ||
open_index(O_APPEND | O_RDWR | O_CREAT))
- goto err;
+ goto err;
+ bytes_written += 4;
+ }
if (need_start_event && !no_auto_events)
{
@@ -462,12 +459,30 @@ err:
my_delete(fname, MYF(0)); // do not report error if the file is not there
else
{
+ MY_STAT s;
my_close(index_file, MYF(MY_WME));
+ if (!my_stat(rli->relay_log_name,&s,MYF(0)))
+ {
+ sql_print_error("The first log %s failed to stat during purge",
+ rli->relay_log_name);
+ error=1;
+ goto err;
+ }
if (my_rename(fname,index_file_name,MYF(MY_WME)) ||
(index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND,
MYF(MY_WME)))<0 ||
my_delete(rli->relay_log_name, MYF(MY_WME)))
error=1;
+
+ pthread_mutex_lock(&rli->log_space_lock);
+ rli->log_space_total -= s.st_size;
+ fprintf(stderr,"purge_first_log: %ld\n", rli->log_space_total);
+ pthread_mutex_unlock(&rli->log_space_lock);
+ // ok to broadcast after the critical region as there is no risk of
+ // the mutex being destroyed by this thread later - this helps save
+ // context switches
+ pthread_cond_broadcast(&rli->log_space_cond);
+
if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/)))
{
char buff[22];
@@ -695,6 +710,7 @@ void MYSQL_LOG::new_file(bool inside_mutex)
if (thd && thd->slave_thread)
r.flags |= LOG_EVENT_FORCED_ROTATE_F;
r.write(&log_file);
+ bytes_written += r.get_event_len();
}
// update needs to be signaled even if there is no rotate event
// log rotation should give the waiting thread a signal to
@@ -728,6 +744,7 @@ bool MYSQL_LOG::append(Log_event* ev)
error=1;
goto err;
}
+ bytes_written += ev->get_event_len();
if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
{
new_file(1);
@@ -754,6 +771,7 @@ bool MYSQL_LOG::appendv(const char* buf, uint len,...)
error = 1;
break;
}
+ bytes_written += len;
} while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
if ((uint) my_b_append_tell(&log_file) > max_binlog_size)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 3a441bb61dc..1c4f7aa24c5 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -3042,6 +3042,8 @@ CHANGEABLE_VAR changeable_vars[] = {
128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE },
{ "record_rnd_buffer", (long*) &record_rnd_cache_size,
0, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE },
+ { "relay_log_space_limit", (long*) &relay_log_space_limit, 0L, 0L,ULONG_MAX,
+ 0, 1},
{ "slave_net_timeout", (long*) &slave_net_timeout,
SLAVE_NET_TIMEOUT, 1, LONG_TIMEOUT, 0, 1 },
{ "slow_launch_time", (long*) &slow_launch_time,
diff --git a/sql/slave.cc b/sql/slave.cc
index 919cf362238..25b29732000 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -42,6 +42,8 @@ bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;
static TABLE* save_temporary_tables = 0;
+ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong
+ values and make it ulonglong */
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
@@ -60,8 +62,10 @@ static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
+static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
static inline bool io_slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool sql_slave_killed(THD* thd,RELAY_LOG_INFO* rli);
+static int count_relay_log_space(RELAY_LOG_INFO* rli);
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
@@ -264,8 +268,9 @@ void init_slave_skip_errors(char* arg)
// are not running
int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
{
+ DBUG_ENTER("purge_relay_logs");
if (!rli->inited)
- return 0; /* successfully do nothing */
+ DBUG_RETURN(0); /* successfully do nothing */
DBUG_ASSERT(rli->slave_running == 0);
DBUG_ASSERT(rli->mi->slave_running == 0);
int error=0;
@@ -282,14 +287,20 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
goto err;
}
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name));
+ sizeof(rli->relay_log_name)-1);
+ rli->log_space_total=4; //just first log with magic number and nothing else
rli->relay_log_pos=4;
+ rli->relay_log.reset_bytes_written();
rli->log_pos_current=0;
if (!just_reset)
error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
-err:
+err:
+#ifndef DBUG_OFF
+ char buf[22];
+#endif
+ DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf)));
pthread_mutex_unlock(&rli->data_lock);
- return error;
+ DBUG_RETURN(error);
}
int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
@@ -953,8 +964,9 @@ void end_master_info(MASTER_INFO* mi)
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
+ DBUG_ENTER("init_relay_log_info");
if (rli->inited)
- return 0;
+ DBUG_RETURN(0);
MY_STAT stat_area;
char fname[FN_REFLEN+128];
int info_fd;
@@ -970,6 +982,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->log_pos_current=0;
rli->abort_pos_wait=0;
rli->skip_log_purge=0;
+ rli->log_space_limit = relay_log_space_limit;
+ rli->log_space_total = 0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
{
@@ -1001,7 +1015,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
my_close(info_fd, MYF(0));
rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock);
- return 1;
+ DBUG_RETURN(1);
}
if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
goto err;
@@ -1021,7 +1035,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
my_close(info_fd, MYF(0));
rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock);
- return 1;
+ DBUG_RETURN(1);
}
rli->info_fd = info_fd;
@@ -1052,8 +1066,13 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
// before flush_relay_log_info
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
error=test(flush_relay_log_info(rli));
+ if (count_relay_log_space(rli))
+ {
+ msg="Error counting relay log space";
+ goto err;
+ }
pthread_mutex_unlock(&rli->data_lock);
- return error;
+ DBUG_RETURN(error);
err:
sql_print_error(msg);
@@ -1061,9 +1080,66 @@ err:
my_close(info_fd, MYF(0));
rli->info_fd=-1;
pthread_mutex_unlock(&rli->data_lock);
- return 1;
+ DBUG_RETURN(1);
}
+static inline int add_relay_log(RELAY_LOG_INFO* rli,LOG_INFO* linfo)
+{
+ MY_STAT s;
+ DBUG_ENTER("add_relay_log");
+ if (!my_stat(linfo->log_file_name,&s,MYF(0)))
+ {
+ sql_print_error("log %s listed in the index, but failed to stat",
+ linfo->log_file_name);
+ DBUG_RETURN(1);
+ }
+ rli->log_space_total += s.st_size;
+#ifndef DBUG_OFF
+ char buf[22];
+#endif
+ DBUG_PRINT("info",("log_space_total: %s", llstr(rli->log_space_total,buf)));
+ DBUG_RETURN(0);
+}
+
+static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
+{
+ bool slave_killed;
+ MASTER_INFO* mi = rli->mi;
+ const char* save_proc_info;
+ THD* thd = mi->io_thd;
+ DBUG_ENTER("wait_for_relay_log_space");
+ pthread_mutex_lock(&rli->log_space_lock);
+ save_proc_info = thd->proc_info;
+ thd->proc_info = "Waiting for relay log space to free";
+ while (rli->log_space_limit < rli->log_space_total &&
+ !(slave_killed=io_slave_killed(thd,mi)))
+ {
+ pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
+ }
+ thd->proc_info = save_proc_info;
+ pthread_mutex_unlock(&rli->log_space_lock);
+ DBUG_RETURN(slave_killed);
+}
+
+static int count_relay_log_space(RELAY_LOG_INFO* rli)
+{
+ LOG_INFO linfo;
+ DBUG_ENTER("count_relay_log_space");
+ rli->log_space_total = 0;
+ if (rli->relay_log.find_first_log(&linfo,""))
+ {
+ sql_print_error("Could not find first log while counting relay log space");
+ DBUG_RETURN(1);
+ }
+ if (add_relay_log(rli,&linfo))
+ DBUG_RETURN(1);
+ while (!rli->relay_log.find_next_log(&linfo))
+ {
+ if (add_relay_log(rli,&linfo))
+ DBUG_RETURN(1);
+ }
+ DBUG_RETURN(0);
+}
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname)
@@ -1242,6 +1318,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12));
field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_log_space", 12));
if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
@@ -1268,6 +1345,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
net_store_data(packet, mi->rli.last_slave_error);
net_store_data(packet, mi->rli.slave_skip_counter);
net_store_data(packet, (longlong) mi->rli.master_log_pos);
+ net_store_data(packet, (longlong) mi->rli.log_space_total);
pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock);
@@ -1783,6 +1861,14 @@ from master");
goto err;
}
flush_master_info(mi);
+ if (mi->rli.log_space_limit && mi->rli.log_space_limit <
+ mi->rli.log_space_total)
+ if (wait_for_relay_log_space(&mi->rli))
+ {
+ sql_print_error("Slave I/O thread aborted while waiting for relay \
+log space");
+ goto err;
+ }
// TODO: check debugging abort code
#ifndef DBUG_OFF
if (abort_slave_event_count && !--events_till_abort)
@@ -1986,7 +2072,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
goto err;
}
- /* this dummy block is so we could insantiate Append_block_log_event
+ /* this dummy block is so we could instantiate Append_block_log_event
once and then modify it slightly instead of doing it multiple times
in the loop
*/
@@ -2012,6 +2098,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
relay log");
goto err;
}
+ mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
break;
}
if (unlikely(cev_not_written))
@@ -2026,6 +2113,7 @@ relay log");
goto err;
}
cev_not_written=0;
+ mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
else
{
@@ -2038,6 +2126,7 @@ relay log");
relay log");
goto err;
}
+ mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
}
}
}
@@ -2145,6 +2234,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_ASSERT(!tmp_buf);
return 1;
}
+ mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
delete ev;
if (likely(inc_pos))
@@ -2198,6 +2288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
{
if (likely(inc_pos))
mi->master_log_pos += event_len;
+ mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
if (unlikely(processed_stop_event))
mi->ignore_stop_event=1;
diff --git a/sql/slave.h b/sql/slave.h
index 7ae5da1a340..354fc46e99d 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -31,6 +31,7 @@ extern char* slave_load_tmpdir;
extern my_string master_info_file,relay_log_info_file;
extern my_string opt_relay_logname, opt_relaylog_index_name;
extern bool opt_skip_slave_start;
+extern ulong relay_log_space_limit;
struct st_master_info;
/*
@@ -153,6 +154,9 @@ typedef struct st_relay_log_info
bool log_pos_current;
bool abort_pos_wait;
bool skip_log_purge;
+ ulonglong log_space_limit,log_space_total;
+ pthread_mutex_t log_space_lock;
+ pthread_cond_t log_space_cond;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
cur_log_init_count(0),
@@ -163,17 +167,21 @@ typedef struct st_relay_log_info
bzero(&info_file,sizeof(info_file));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
pthread_cond_init(&data_cond, NULL);
pthread_cond_init(&start_cond, NULL);
pthread_cond_init(&stop_cond, NULL);
+ pthread_cond_init(&log_space_cond, NULL);
}
~st_relay_log_info()
{
pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock);
+ pthread_mutex_destroy(&log_space_lock);
pthread_cond_destroy(&data_cond);
pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond);
+ pthread_cond_destroy(&log_space_cond);
}
inline void inc_pending(ulonglong val)
{
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 289a2ab9255..d8824e80686 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -78,12 +78,29 @@ class MYSQL_LOG {
bool need_start_event;
pthread_cond_t update_cond;
bool no_auto_events; // for relay binlog
+ ulonglong bytes_written;
friend class Log_event;
public:
MYSQL_LOG();
~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; }
+ void reset_bytes_written()
+ {
+ bytes_written = 0;
+ }
+ void harvest_bytes_written(ulonglong* counter)
+ {
+#ifndef DBUG_OFF
+ char buf1[22],buf2[22];
+#endif
+ DBUG_ENTER("harvest_bytes_written");
+ (*counter)+=bytes_written;
+ DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1),
+ llstr(bytes_written,buf2)));
+ bytes_written=0;
+ DBUG_VOID_RETURN;
+ }
IO_CACHE* get_log_file() { return &log_file; }
void signal_update() { pthread_cond_broadcast(&update_cond);}
void wait_for_update(THD* thd);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 8216dec815a..398ff443ad4 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -751,7 +751,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
need_relay_log_purge = 0;
mi->rli.skip_log_purge=1;
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
- sizeof(mi->rli.relay_log_name));
+ sizeof(mi->rli.relay_log_name)-1);
}
if (lex_mi->relay_log_pos)