diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-10-23 13:28:03 -0600 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-10-23 13:28:03 -0600 |
commit | 8fc78e08b0e68f82121b0bba8d930bc5ec57a29f (patch) | |
tree | c5c654aca3970830687c68cb22289835e1719233 /sql | |
parent | 74f49f9f34b7a4cc1b87aa1cb657b72f8c6c856e (diff) | |
download | mariadb-git-8fc78e08b0e68f82121b0bba8d930bc5ec57a29f.tar.gz |
cleanup
removal of duplicate code in mf_iocache.cc
work on failsafe replication
work on SEQ_READ_APPEND io cache
include/my_sys.h:
updates for SEQ_READ_APPEND
libmysql/Makefile.am:
fix for mysys/mf_iocache.c
libmysql/libmysql.c:
updates for new format of SHOW SLAVE HOSTS
mysql-test/r/rpl000001.result:
test replication of LOAD DATA LOCAL INFILE
mysql-test/r/rpl000002.result:
updated test result
mysql-test/t/rpl000001.test:
test LOAD DATA LOCAL INFILE
mysys/mf_iocache.c:
cleanup to remove duplicate functionality
some work on SEQ_READ_APPEND
sql/mf_iocache.cc:
cleanup to remove duplicate functionality
sql/repl_failsafe.cc:
more work on failsafe replication
sql/repl_failsafe.h:
more work on failsafe replication
sql/slave.cc:
cleanup
more work on failsafe replication
sql/sql_load.cc:
fixed bug on replicating empty file loads
got LOAD DATA LOCAL INFILE to work again, and to be replicated
sql/sql_repl.cc:
cleanup
more work on failsafe replication
sql/sql_repl.h:
more work on failsafe replication
Diffstat (limited to 'sql')
-rw-r--r-- | sql/mf_iocache.cc | 624 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 177 | ||||
-rw-r--r-- | sql/repl_failsafe.h | 6 | ||||
-rw-r--r-- | sql/slave.cc | 70 | ||||
-rw-r--r-- | sql/sql_load.cc | 22 | ||||
-rw-r--r-- | sql/sql_repl.cc | 17 | ||||
-rw-r--r-- | sql/sql_repl.h | 1 |
7 files changed, 255 insertions, 662 deletions
diff --git a/sql/mf_iocache.cc b/sql/mf_iocache.cc index 40b98983291..55e953687a3 100644 --- a/sql/mf_iocache.cc +++ b/sql/mf_iocache.cc @@ -42,295 +42,6 @@ static void my_aiowait(my_aio_result *result); extern "C" { /* - ** if cachesize == 0 then use default cachesize (from s-file) - ** if file == -1 then real_open_cached_file() will be called. - ** returns 0 if ok - */ - -int init_io_cache(IO_CACHE *info, File file, uint cachesize, - enum cache_type type, my_off_t seek_offset, - pbool use_async_io, myf cache_myflags) -{ - uint min_cache; - DBUG_ENTER("init_io_cache"); - DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); - - /* There is no file in net_reading */ - info->file= file; - info->pre_close = info->pre_read = info->post_read = 0; - info->arg = 0; - if (!cachesize) - if (! (cachesize= my_default_record_cache_size)) - DBUG_RETURN(1); /* No cache requested */ - min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; - if (type == READ_CACHE) - { /* Assume file isn't growing */ - if (cache_myflags & MY_DONT_CHECK_FILESIZE) - { - cache_myflags &= ~MY_DONT_CHECK_FILESIZE; - } - else - { - my_off_t file_pos,end_of_file; - if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR)) - DBUG_RETURN(1); - end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0)); - if (end_of_file < seek_offset) - end_of_file=seek_offset; - VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0))); - if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1) - { - cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; - use_async_io=0; /* No need to use async */ - } - } - } - if ((int) type < (int) READ_NET) - { - for (;;) - { - cachesize=(uint) ((ulong) (cachesize + min_cache-1) & - (ulong) ~(min_cache-1)); - if (cachesize < min_cache) - cachesize = min_cache; - if ((info->buffer= - (byte*) my_malloc(cachesize, - MYF((cache_myflags & ~ MY_WME) | - (cachesize == min_cache ? MY_WME : 0)))) != 0) - break; /* Enough memory found */ - if (cachesize == min_cache) - DBUG_RETURN(2); /* Can't alloc cache */ - cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */ - } - } - else - info->buffer=0; - DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize)); - info->pos_in_file= seek_offset; - info->read_length=info->buffer_length=cachesize; - info->seek_not_done= test(file >= 0 && type != READ_FIFO && - type != READ_NET); - info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); - info->rc_request_pos=info->rc_pos=info->buffer; - - if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) - { - info->rc_end=info->buffer; /* Nothing in cache */ - } - else /* type == WRITE_CACHE */ - { - info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); - } - /* end_of_file may be changed by user later */ - info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0 - : ~(my_off_t) 0); - info->type=type; - info->error=0; - info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */ -#ifdef HAVE_AIOWAIT - if (use_async_io && ! my_disable_async_io) - { - DBUG_PRINT("info",("Using async io")); - info->read_length/=2; - info->read_function=_my_b_async_read; - } - info->inited=info->aio_result.pending=0; -#endif - DBUG_RETURN(0); -} /* init_io_cache */ - - - /* Wait until current request is ready */ - -#ifdef HAVE_AIOWAIT -static void my_aiowait(my_aio_result *result) -{ - if (result->pending) - { - struct aio_result_t *tmp; - for (;;) - { - if ((int) (tmp=aiowait((struct timeval *) 0)) == -1) - { - if (errno == EINTR) - continue; - DBUG_PRINT("error",("No aio request, error: %d",errno)); - result->pending=0; /* Assume everythings is ok */ - break; - } - ((my_aio_result*) tmp)->pending=0; - if ((my_aio_result*) tmp == result) - break; - } - } - return; -} -#endif - - /* Use this to reset cache to start or other type */ - /* Some simple optimizing is done when reinit in current buffer */ - -my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, - my_off_t seek_offset, - pbool use_async_io __attribute__((unused)), - pbool clear_cache) -{ - DBUG_ENTER("reinit_io_cache"); - - info->seek_not_done= test(info->file >= 0); /* Seek not done */ - - /* If the whole file is in memory, avoid flushing to disk */ - if (! clear_cache && - seek_offset >= info->pos_in_file && - seek_offset <= info->pos_in_file + - (uint) (info->rc_end - info->rc_request_pos)) - { /* use current buffer */ - if (info->type == WRITE_CACHE && type == READ_CACHE) - { - info->rc_end=info->rc_pos; - info->end_of_file=my_b_tell(info); - } - else if (type == WRITE_CACHE) - { - if (info->type == READ_CACHE) - info->rc_end=info->buffer+info->buffer_length; - info->end_of_file = ~(my_off_t) 0; - } - info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); -#ifdef HAVE_AIOWAIT - my_aiowait(&info->aio_result); /* Wait for outstanding req */ -#endif - } - else - { - /* - If we change from WRITE_CACHE to READ_CACHE, assume that everything - after the current positions should be ignored - */ - if (info->type == WRITE_CACHE && type == READ_CACHE) - info->end_of_file=my_b_tell(info); - /* No need to flush cache if we want to reuse it */ - if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info)) - DBUG_RETURN(1); - if (info->pos_in_file != seek_offset) - { - info->pos_in_file=seek_offset; - info->seek_not_done=1; - } - info->rc_request_pos=info->rc_pos=info->buffer; - if (type == READ_CACHE || type == READ_NET || type == READ_FIFO) - { - info->rc_end=info->buffer; /* Nothing in cache */ - } - else - { - info->rc_end=info->buffer+info->buffer_length- - (seek_offset & (IO_SIZE-1)); - info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 : - ~(my_off_t) 0); - } - } - info->type=type; - info->error=0; - info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; -#ifdef HAVE_AIOWAIT - if (type != READ_NET) - { - if (use_async_io && ! my_disable_async_io && - ((ulong) info->buffer_length < - (ulong) (info->end_of_file - seek_offset))) - { - info->read_length=info->buffer_length/2; - info->read_function=_my_b_async_read; - } - } - info->inited=0; -#endif - DBUG_RETURN(0); -} /* init_io_cache */ - - - - /* - Read buffered. Returns 1 if can't read requested characters - This function is only called from the my_b_read() macro - when there isn't enough characters in the buffer to - satisfy the request. - Returns 0 we succeeded in reading all data - */ - -int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) -{ - uint length,diff_length,left_length; - my_off_t max_length, pos_in_file; - - if ((left_length=(uint) (info->rc_end-info->rc_pos))) - { - dbug_assert(Count >= left_length); /* User is not using my_b_read() */ - memcpy(Buffer,info->rc_pos, (size_t) (left_length)); - Buffer+=left_length; - Count-=left_length; - } - /* pos_in_file always point on where info->buffer was read */ - pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); - if (info->seek_not_done) - { /* File touched, do seek */ - VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); - info->seek_not_done=0; - } - diff_length=(uint) (pos_in_file & (IO_SIZE-1)); - if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) - { /* Fill first intern buffer */ - uint read_length; - if (info->end_of_file == pos_in_file) - { /* End of file */ - info->error=(int) left_length; - return 1; - } - length=(Count & (uint) ~(IO_SIZE-1))-diff_length; - if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) - != (uint) length) - { - info->error= read_length == (uint) -1 ? -1 : - (int) (read_length+left_length); - return 1; - } - Count-=length; - Buffer+=length; - pos_in_file+=length; - left_length+=length; - diff_length=0; - } - max_length=info->read_length-diff_length; - if (info->type != READ_FIFO && - (info->end_of_file - pos_in_file) < max_length) - max_length = info->end_of_file - pos_in_file; - if (!max_length) - { - if (Count) - { - info->error= left_length; /* We only got this many char */ - return 1; - } - length=0; /* Didn't read any chars */ - } - else if ((length=my_read(info->file,info->buffer,(uint) max_length, - info->myflags)) < Count || - length == (uint) -1) - { - if (length != (uint) -1) - memcpy(Buffer,info->buffer,(size_t) length); - info->error= length == (uint) -1 ? -1 : (int) (length+left_length); - return 1; - } - info->rc_pos=info->buffer+Count; - info->rc_end=info->buffer+length; - info->pos_in_file=pos_in_file; - memcpy(Buffer,info->buffer,(size_t) Count); - return 0; -} - - /* ** Read buffered from the net. ** Returns 1 if can't read requested characters ** Returns 0 if record read @@ -359,341 +70,8 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer, info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length; Buffer[0] = info->rc_pos[0]; /* length is always 1 */ info->rc_pos++; + info->buffer = info->rc_pos; return 0; } -#ifdef HAVE_AIOWAIT - -int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count) -{ - uint length,read_length,diff_length,left_length,use_length,org_Count; - my_off_t max_length; - my_off_t next_pos_in_file; - byte *read_buffer; - - memcpy(Buffer,info->rc_pos, - (size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); - Buffer+=left_length; - org_Count=Count; - Count-=left_length; - - if (info->inited) - { /* wait for read block */ - info->inited=0; /* No more block to read */ - my_aiowait(&info->aio_result); /* Wait for outstanding req */ - if (info->aio_result.result.aio_errno) - { - if (info->myflags & MY_WME) - my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG), - my_filename(info->file), - info->aio_result.result.aio_errno); - my_errno=info->aio_result.result.aio_errno; - info->error= -1; - return(1); - } - if (! (read_length = (uint) info->aio_result.result.aio_return) || - read_length == (uint) -1) - { - my_errno=0; /* For testing */ - info->error= (read_length == (uint) -1 ? -1 : - (int) (read_length+left_length)); - return(1); - } - info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos); - - if (info->rc_request_pos != info->buffer) - info->rc_request_pos=info->buffer; - else - info->rc_request_pos=info->buffer+info->read_length; - info->rc_pos=info->rc_request_pos; - next_pos_in_file=info->aio_read_pos+read_length; - - /* Check if pos_in_file is changed - (_ni_read_cache may have skipped some bytes) */ - - if (info->aio_read_pos < info->pos_in_file) - { /* Fix if skipped bytes */ - if (info->aio_read_pos + read_length < info->pos_in_file) - { - read_length=0; /* Skipp block */ - next_pos_in_file=info->pos_in_file; - } - else - { - my_off_t offset= (info->pos_in_file - info->aio_read_pos); - info->pos_in_file=info->aio_read_pos; /* Whe are here */ - info->rc_pos=info->rc_request_pos+offset; - read_length-=offset; /* Bytes left from rc_pos */ - } - } -#ifndef DBUG_OFF - if (info->aio_read_pos > info->pos_in_file) - { - my_errno=EINVAL; - return(info->read_length= -1); - } -#endif - /* Copy found bytes to buffer */ - length=min(Count,read_length); - memcpy(Buffer,info->rc_pos,(size_t) length); - Buffer+=length; - Count-=length; - left_length+=length; - info->rc_end=info->rc_pos+read_length; - info->rc_pos+=length; - } - else - next_pos_in_file=(info->pos_in_file+ (uint) - (info->rc_end - info->rc_request_pos)); - - /* If reading large blocks, or first read or read with skipp */ - if (Count) - { - if (next_pos_in_file == info->end_of_file) - { - info->error=(int) (read_length+left_length); - return 1; - } - VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0))); - read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1)); - if (Count < read_length) - { /* Small block, read to cache */ - if ((read_length=my_read(info->file,info->rc_request_pos, - read_length, info->myflags)) == (uint) -1) - return info->error= -1; - use_length=min(Count,read_length); - memcpy(Buffer,info->rc_request_pos,(size_t) use_length); - info->rc_pos=info->rc_request_pos+Count; - info->rc_end=info->rc_request_pos+read_length; - info->pos_in_file=next_pos_in_file; /* Start of block in cache */ - next_pos_in_file+=read_length; - - if (Count != use_length) - { /* Didn't find hole block */ - if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count) - my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), - my_filename(info->file),my_errno); - info->error=(int) (read_length+left_length); - return 1; - } - } - else - { /* Big block, don't cache it */ - if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags)) - != Count) - { - info->error= read_length == (uint) -1 ? -1 : read_length+left_length; - return 1; - } - info->rc_pos=info->rc_end=info->rc_request_pos; - info->pos_in_file=(next_pos_in_file+=Count); - } - } - - /* Read next block with asyncronic io */ - max_length=info->end_of_file - next_pos_in_file; - diff_length=(next_pos_in_file & (IO_SIZE-1)); - - if (max_length > (my_off_t) info->read_length - diff_length) - max_length= (my_off_t) info->read_length - diff_length; - if (info->rc_request_pos != info->buffer) - read_buffer=info->buffer; - else - read_buffer=info->buffer+info->read_length; - info->aio_read_pos=next_pos_in_file; - if (max_length) - { - info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */ - DBUG_PRINT("aioread",("filepos: %ld length: %ld", - (ulong) next_pos_in_file,(ulong) max_length)); - if (aioread(info->file,read_buffer,(int) max_length, - (my_off_t) next_pos_in_file,MY_SEEK_SET, - &info->aio_result.result)) - { /* Skipp async io */ - my_errno=errno; - DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped", - errno, info->aio_result.result.aio_errno)); - if (info->rc_request_pos != info->buffer) - { - bmove(info->buffer,info->rc_request_pos, - (uint) (info->rc_end - info->rc_pos)); - info->rc_request_pos=info->buffer; - info->rc_pos-=info->read_length; - info->rc_end-=info->read_length; - } - info->read_length=info->buffer_length; /* Use hole buffer */ - info->read_function=_my_b_read; /* Use normal IO_READ next */ - } - else - info->inited=info->aio_result.pending=1; - } - return 0; /* Block read, async in use */ -} /* _my_b_async_read */ -#endif - - -/* Read one byte when buffer is empty */ - -int _my_b_get(IO_CACHE *info) -{ - byte buff; - IO_CACHE_CALLBACK pre_read,post_read; - if ((pre_read = info->pre_read)) - (*pre_read)(info); - if ((*(info)->read_function)(info,&buff,1)) - return my_b_EOF; - if ((post_read = info->post_read)) - (*post_read)(info); - return (int) (uchar) buff; -} - - /* Returns != 0 if error on write */ - -int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) -{ - uint rest_length,length; - - rest_length=(uint) (info->rc_end - info->rc_pos); - memcpy(info->rc_pos,Buffer,(size_t) rest_length); - Buffer+=rest_length; - Count-=rest_length; - info->rc_pos+=rest_length; - if (info->pos_in_file+info->buffer_length > info->end_of_file) - { - my_errno=errno=EFBIG; - return info->error = -1; - } - if (flush_io_cache(info)) - return 1; - if (Count >= IO_SIZE) - { /* Fill first intern buffer */ - length=Count & (uint) ~(IO_SIZE-1); - if (info->seek_not_done) - { /* File touched, do seek */ - VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); - info->seek_not_done=0; - } - if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) - return info->error= -1; - Count-=length; - Buffer+=length; - info->pos_in_file+=length; - } - memcpy(info->rc_pos,Buffer,(size_t) Count); - info->rc_pos+=Count; - return 0; -} - - -/* - Write a block to disk where part of the data may be inside the record - buffer. As all write calls to the data goes through the cache, - we will never get a seek over the end of the buffer -*/ - -int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, - my_off_t pos) -{ - uint length; - int error=0; - - if (pos < info->pos_in_file) - { - /* Of no overlap, write everything without buffering */ - if (pos + Count <= info->pos_in_file) - return my_pwrite(info->file, Buffer, Count, pos, - info->myflags | MY_NABP); - /* Write the part of the block that is before buffer */ - length= (uint) (info->pos_in_file - pos); - if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP)) - info->error=error=-1; - Buffer+=length; - pos+= length; - Count-= length; - } - - /* Check if we want to write inside the used part of the buffer.*/ - length= (uint) (info->rc_end - info->buffer); - if (pos < info->pos_in_file + length) - { - uint offset= (uint) (pos - info->pos_in_file); - length-=offset; - if (length > Count) - length=Count; - memcpy(info->buffer+offset, Buffer, length); - Buffer+=length; - Count-= length; - /* Fix length of buffer if the new data was larger */ - if (info->buffer+length > info->rc_pos) - info->rc_pos=info->buffer+length; - if (!Count) - return (error); - } - /* Write at the end of the current buffer; This is the normal case */ - if (_my_b_write(info, Buffer, Count)) - error= -1; - return error; -} - - /* Flush write cache */ - -int flush_io_cache(IO_CACHE *info) -{ - uint length; - DBUG_ENTER("flush_io_cache"); - - if (info->type == WRITE_CACHE) - { - if (info->file == -1) - { - if (real_open_cached_file(info)) - DBUG_RETURN((info->error= -1)); - } - if (info->rc_pos != info->buffer) - { - length=(uint) (info->rc_pos - info->buffer); - if (info->seek_not_done) - { /* File touched, do seek */ - if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) == - MY_FILEPOS_ERROR) - DBUG_RETURN((info->error= -1)); - info->seek_not_done=0; - } - info->rc_pos=info->buffer; - info->pos_in_file+=length; - info->rc_end=(info->buffer+info->buffer_length- - (info->pos_in_file & (IO_SIZE-1))); - if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) - DBUG_RETURN((info->error= -1)); - DBUG_RETURN(0); - } - } -#ifdef HAVE_AIOWAIT - else if (info->type != READ_NET) - { - my_aiowait(&info->aio_result); /* Wait for outstanding req */ - info->inited=0; - } -#endif - DBUG_RETURN(0); -} - - -int end_io_cache(IO_CACHE *info) -{ - int error=0; - IO_CACHE_CALLBACK pre_close; - DBUG_ENTER("end_io_cache"); - if((pre_close=info->pre_close)) - (*pre_close)(info); - if (info->buffer) - { - if (info->file != -1) /* File doesn't exist */ - error=flush_io_cache(info); - my_free((gptr) info->buffer,MYF(MY_WME)); - info->buffer=info->rc_pos=(byte*) 0; - } - DBUG_RETURN(error); -} /* end_io_cache */ - } /* extern "C" */ diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 40eb3b8bb7c..ece8e11064b 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -18,6 +18,10 @@ #include "mysql_priv.h" #include "repl_failsafe.h" +#include "sql_repl.h" +#include "slave.h" +#include "mini_client.h" +#include <mysql.h> RPL_STATUS rpl_status=RPL_NULL; pthread_mutex_t LOCK_rpl_status; @@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE", TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"", rpl_status_type}; +static int init_failsafe_rpl_thread(THD* thd) +{ + DBUG_ENTER("init_failsafe_rpl_thread"); + thd->system_thread = thd->bootstrap = 1; + thd->client_capabilities = 0; + my_net_init(&thd->net, 0); + thd->net.timeout = slave_net_timeout; + thd->max_packet_length=thd->net.max_packet; + thd->master_access= ~0; + thd->priv_user = 0; + thd->system_thread = 1; + pthread_mutex_lock(&LOCK_thread_count); + thd->thread_id = thread_id++; + pthread_mutex_unlock(&LOCK_thread_count); + + if (init_thr_lock() || + my_pthread_setspecific_ptr(THR_THD, thd) || + my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) || + my_pthread_setspecific_ptr(THR_NET, &thd->net)) + { + close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? + end_thread(thd,0); + DBUG_RETURN(-1); + } + + thd->mysys_var=my_thread_var; + thd->dbug_thread_id=my_thread_id(); +#if !defined(__WIN__) && !defined(OS2) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + thd->mem_root.free=thd->mem_root.used=0; + if (thd->max_join_size == (ulong) ~0L) + thd->options |= OPTION_BIG_SELECTS; + + thd->proc_info="Thread initialized"; + thd->version=refresh_version; + thd->set_time(); + DBUG_RETURN(0); +} + void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status) { pthread_mutex_lock(&LOCK_rpl_status); if (rpl_status == from_status || rpl_status == RPL_ANY) rpl_status = to_status; + pthread_cond_signal(&COND_rpl_status); + pthread_mutex_unlock(&LOCK_rpl_status); +} + +int update_slave_list(MYSQL* mysql) +{ + MYSQL_RES* res=0; + MYSQL_ROW row; + const char* error=0; + bool have_auth_info; + int port_ind; + + if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) || + !(res = mc_mysql_store_result(mysql))) + { + error = "Query error"; + goto err; + } + + switch (mc_mysql_num_fields(res)) + { + case 5: + have_auth_info = 0; + port_ind=2; + break; + case 7: + have_auth_info = 1; + port_ind=4; + break; + default: + error = "Invalid number of fields in SHOW SLAVE HOSTS"; + goto err; + } + + pthread_mutex_lock(&LOCK_slave_list); + + while ((row = mc_mysql_fetch_row(res))) + { + uint32 server_id; + SLAVE_INFO* si, *old_si; + server_id = atoi(row[0]); + if ((old_si = (SLAVE_INFO*)hash_search(&slave_list, + (byte*)&server_id,4))) + si = old_si; + else + { + if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) + { + error = "Out of memory"; + pthread_mutex_unlock(&LOCK_slave_list); + goto err; + } + si->server_id = server_id; + } + strnmov(si->host, row[1], sizeof(si->host)); + si->port = atoi(row[port_ind]); + si->rpl_recovery_rank = atoi(row[port_ind+1]); + si->master_id = atoi(row[port_ind+2]); + if (have_auth_info) + { + strnmov(si->user, row[2], sizeof(si->user)); + strnmov(si->password, row[3], sizeof(si->password)); + } + } + pthread_mutex_unlock(&LOCK_slave_list); +err: + if (res) + mc_mysql_free_result(res); + if (error) + { + sql_print_error("Error updating slave list:",error); + return 1; + } + return 0; +} + +int find_recovery_captain(THD* thd, MYSQL* mysql) +{ + + return 0; +} + +pthread_handler_decl(handle_failsafe_rpl,arg) +{ + DBUG_ENTER("handle_failsafe_rpl"); + THD *thd = new THD; + thd->thread_stack = (char*)&thd; + MYSQL* recovery_captain = 0; + pthread_detach_this_thread(); + if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0))) + { + sql_print_error("Could not initialize failsafe replication thread"); + goto err; + } + pthread_mutex_lock(&LOCK_rpl_status); + while (!thd->killed && !abort_loop) + { + bool break_req_chain = 0; + const char* msg = thd->enter_cond(&COND_rpl_status, + &LOCK_rpl_status, "Waiting for request"); + pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status); + thd->proc_info="Processling request"; + while (!break_req_chain) + { + switch (rpl_status) + { + case RPL_LOST_SOLDIER: + if (find_recovery_captain(thd, recovery_captain)) + rpl_status=RPL_TROOP_SOLDIER; + else + rpl_status=RPL_RECOVERY_CAPTAIN; + break_req_chain=1; /* for now until other states are implemented */ + break; + default: + break_req_chain=1; + break; + } + } + thd->exit_cond(msg); + } pthread_mutex_unlock(&LOCK_rpl_status); +err: + if (recovery_captain) + mc_mysql_close(recovery_captain); + delete thd; + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(0); } + + + diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h index 42b386e6255..b71dde1dc10 100644 --- a/sql/repl_failsafe.h +++ b/sql/repl_failsafe.h @@ -1,6 +1,8 @@ #ifndef REPL_FAILSAFE_H #define REPL_FAILSAFE_H +#include "mysql.h" + typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE, RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER, RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */, @@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status; extern pthread_mutex_t LOCK_rpl_status; extern pthread_cond_t COND_rpl_status; extern TYPELIB rpl_role_typelib, rpl_status_typelib; +extern uint rpl_recovery_rank; extern const char* rpl_role_type[], *rpl_status_type[]; +pthread_handler_decl(handle_failsafe_rpl,arg); void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status); +int find_recovery_captain(THD* thd, MYSQL* mysql); +int update_slave_list(MYSQL* mysql); #endif diff --git a/sql/slave.cc b/sql/slave.cc index 11224421ccc..8075b5ad75b 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); +static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, + bool reconnect); static int safe_sleep(THD* thd, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, @@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql) int2store(buf, (uint16)report_port); packet.append(buf, 2); + int4store(buf, rpl_recovery_rank); + packet.append(buf, 4); + int4store(buf, 0); /* tell the master will fill in master_id */ + packet.append(buf, 4); if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), packet.length(), 0)) @@ -868,7 +874,7 @@ command"); } -static uint read_event(MYSQL* mysql, MASTER_INFO *mi) +static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; // for convinience lets think we start by @@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ ! - thd->set_time(); DBUG_ENTER("handle_slave"); pthread_detach_this_thread(); @@ -1067,6 +1072,7 @@ connected: // on with life thd->proc_info = "Registering slave on master"; register_slave_on_master(mysql); + update_slave_list(mysql); while (!slave_killed(thd)) { @@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME, while(!slave_killed(thd)) { thd->proc_info = "Reading master update"; - uint event_len = read_event(mysql, &glob_mi); + ulong event_len = read_event(mysql, &glob_mi); if(slave_killed(thd)) { sql_print_error("Slave thread killed while reading event"); @@ -1244,30 +1250,7 @@ position %s", static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { - int slave_was_killed; -#ifndef DBUG_OFF - events_till_disconnect = disconnect_slave_event_count; -#endif - while(!(slave_was_killed = slave_killed(thd)) && - !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, - mi->port, 0, 0)) - { - sql_print_error("Slave thread: error connecting to master: %s (%d),\ - retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry); - safe_sleep(thd, mi->connect_retry); - } - - if(!slave_was_killed) - { - change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); - mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d", - mi->user, mi->host, mi->port); -#ifdef SIGNAL_WITH_VIO_CLOSE - thd->set_active_vio(mysql->net.vio); -#endif - } - - return slave_was_killed; + return connect_to_master(thd, mysql, mi, 0); } /* @@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) master_retry_count times */ -static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) +static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, + bool reconnect) { int slave_was_killed; int last_errno= -2; // impossible error @@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) #ifndef DBUG_OFF events_till_disconnect = disconnect_slave_event_count; #endif - while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql)) + while (!(slave_was_killed = slave_killed(thd)) && + (reconnect ? mc_mysql_reconnect(mysql) : + !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0))) { /* Don't repeat last error */ if (mc_mysql_errno(mysql) != last_errno) { - sql_print_error("Slave thread: error re-connecting to master: \ + sql_print_error("Slave thread: error connecting to master: \ %s, last_errno=%d, retry in %d sec", mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mi->connect_retry); @@ -1309,18 +1296,26 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) if (master_retry_count && err_count++ == master_retry_count) { slave_was_killed=1; - change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER); + if (reconnect) + change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER); break; } } if (!slave_was_killed) { - sql_print_error("Slave: reconnected to master '%s@%s:%d',\ + if (reconnect) + sql_print_error("Slave: connected to master '%s@%s:%d',\ replication resumed in log '%s' at position %s", glob_mi.user, glob_mi.host, glob_mi.port, RPL_LOG_NAME, llstr(glob_mi.pos,llbuff)); + else + { + change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); + mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d", + mi->user, mi->host, mi->port); + } #ifdef SIGNAL_WITH_VIO_CLOSE thd->set_active_vio(mysql->net.vio); #endif @@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user, return slave_was_killed; } +/* + Try to connect until successful or slave killed or we have retried + master_retry_count times +*/ + +static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) +{ + return connect_to_master(thd, mysql, mi, 1); +} + + #ifdef __GNUC__ template class I_List_iterator<i_string>; template class I_List_iterator<i_string_pair>; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index c2793da78f3..28140121491 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -278,8 +278,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ha_autocommit_or_rollback(thd,error); if (!opt_old_rpl_compat && mysql_bin_log.is_open()) { - Delete_file_log_event d(thd); - mysql_bin_log.write(&d); + if (lf_info.wrote_create_file) + { + Delete_file_log_event d(thd); + mysql_bin_log.write(&d); + } } DBUG_RETURN(-1); // Error on read } @@ -303,8 +306,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (!opt_old_rpl_compat) { read_info.end_io_cache(); // make sure last block gets logged - Execute_load_log_event e(thd); - mysql_bin_log.write(&e); + if (lf_info.wrote_create_file) + { + Execute_load_log_event e(thd); + mysql_bin_log.write(&e); + } } } if (using_transactions) @@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, } else { + /* init_io_cache() will not initialize read_function member + if the cache is READ_NET. The reason is explained in + mysys/mf_iocache.c. So we work around the problem with a + manual assignment + */ + if (get_it_from_net) + cache.read_function = _my_b_net_read; + need_end_io_cache = 1; if (!opt_old_rpl_compat && mysql_bin_log.is_open()) cache.pre_read = cache.pre_close = diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index e05b17bedef..b5300813410 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) get_object(p,si->user); get_object(p,si->password); si->port = uint2korr(p); + p += 2; + si->rpl_recovery_rank = uint4korr(p); + p += 4; + if (!(si->master_id = uint4korr(p))) + si->master_id = server_id; si->thd = thd; pthread_mutex_lock(&LOCK_slave_list); @@ -534,6 +539,7 @@ impossible position"; DBUG_PRINT("wait",("waiting for data on binary log")); if (!thd->killed) pthread_cond_wait(&COND_binlog_update, log_lock); + DBUG_PRINT("wait",("binary log received update")); break; default: @@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd) field_list.push_back(new Item_empty_string("Password",20)); } field_list.push_back(new Item_empty_string("Port",20)); + field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20)); + field_list.push_back(new Item_empty_string("Master_id", 20)); if (send_fields(thd, field_list, 1)) DBUG_RETURN(-1); @@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd) net_store_data(packet, si->password); } net_store_data(packet, (uint32) si->port); + net_store_data(packet, si->rpl_recovery_rank); + net_store_data(packet, si->master_id); if (my_net_write(net, (char*)packet->ptr(), packet->length())) { pthread_mutex_unlock(&LOCK_slave_list); @@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file) { LOAD_FILE_INFO* lf_info; uint block_len ; - if (!(block_len = file->rc_end - file->buffer)) + char* buffer = (char*)file->buffer; + if (!(block_len = file->rc_end - buffer)) return 0; lf_info = (LOAD_FILE_INFO*)file->arg; if (lf_info->last_pos_in_file != HA_POS_ERROR && @@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file) lf_info->last_pos_in_file = file->pos_in_file; if (lf_info->wrote_create_file) { - Append_block_log_event a(lf_info->thd, (char*) file->buffer, block_len); + Append_block_log_event a(lf_info->thd, buffer, block_len); mysql_bin_log.write(&a); } else { Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db, lf_info->table_name, *lf_info->fields, - lf_info->handle_dup, (char*) file->buffer, + lf_info->handle_dup, buffer, block_len); mysql_bin_log.write(&c); lf_info->wrote_create_file = 1; diff --git a/sql/sql_repl.h b/sql/sql_repl.h index d6fafd31f21..c12e0536058 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -6,6 +6,7 @@ typedef struct st_slave_info { uint32 server_id; + uint32 rpl_recovery_rank, master_id; char host[HOSTNAME_LENGTH+1]; char user[USERNAME_LENGTH+1]; char password[HASH_PASSWORD_LENGTH+1]; |