summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2001-10-23 13:28:03 -0600
committerunknown <sasha@mysql.sashanet.com>2001-10-23 13:28:03 -0600
commit8fc78e08b0e68f82121b0bba8d930bc5ec57a29f (patch)
treec5c654aca3970830687c68cb22289835e1719233 /sql
parent74f49f9f34b7a4cc1b87aa1cb657b72f8c6c856e (diff)
downloadmariadb-git-8fc78e08b0e68f82121b0bba8d930bc5ec57a29f.tar.gz
cleanup
removal of duplicate code in mf_iocache.cc work on failsafe replication work on SEQ_READ_APPEND io cache include/my_sys.h: updates for SEQ_READ_APPEND libmysql/Makefile.am: fix for mysys/mf_iocache.c libmysql/libmysql.c: updates for new format of SHOW SLAVE HOSTS mysql-test/r/rpl000001.result: test replication of LOAD DATA LOCAL INFILE mysql-test/r/rpl000002.result: updated test result mysql-test/t/rpl000001.test: test LOAD DATA LOCAL INFILE mysys/mf_iocache.c: cleanup to remove duplicate functionality some work on SEQ_READ_APPEND sql/mf_iocache.cc: cleanup to remove duplicate functionality sql/repl_failsafe.cc: more work on failsafe replication sql/repl_failsafe.h: more work on failsafe replication sql/slave.cc: cleanup more work on failsafe replication sql/sql_load.cc: fixed bug on replicating empty file loads got LOAD DATA LOCAL INFILE to work again, and to be replicated sql/sql_repl.cc: cleanup more work on failsafe replication sql/sql_repl.h: more work on failsafe replication
Diffstat (limited to 'sql')
-rw-r--r--sql/mf_iocache.cc624
-rw-r--r--sql/repl_failsafe.cc177
-rw-r--r--sql/repl_failsafe.h6
-rw-r--r--sql/slave.cc70
-rw-r--r--sql/sql_load.cc22
-rw-r--r--sql/sql_repl.cc17
-rw-r--r--sql/sql_repl.h1
7 files changed, 255 insertions, 662 deletions
diff --git a/sql/mf_iocache.cc b/sql/mf_iocache.cc
index 40b98983291..55e953687a3 100644
--- a/sql/mf_iocache.cc
+++ b/sql/mf_iocache.cc
@@ -42,295 +42,6 @@ static void my_aiowait(my_aio_result *result);
extern "C" {
/*
- ** if cachesize == 0 then use default cachesize (from s-file)
- ** if file == -1 then real_open_cached_file() will be called.
- ** returns 0 if ok
- */
-
-int init_io_cache(IO_CACHE *info, File file, uint cachesize,
- enum cache_type type, my_off_t seek_offset,
- pbool use_async_io, myf cache_myflags)
-{
- uint min_cache;
- DBUG_ENTER("init_io_cache");
- DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
-
- /* There is no file in net_reading */
- info->file= file;
- info->pre_close = info->pre_read = info->post_read = 0;
- info->arg = 0;
- if (!cachesize)
- if (! (cachesize= my_default_record_cache_size))
- DBUG_RETURN(1); /* No cache requested */
- min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
- if (type == READ_CACHE)
- { /* Assume file isn't growing */
- if (cache_myflags & MY_DONT_CHECK_FILESIZE)
- {
- cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
- }
- else
- {
- my_off_t file_pos,end_of_file;
- if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
- DBUG_RETURN(1);
- end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
- if (end_of_file < seek_offset)
- end_of_file=seek_offset;
- VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
- if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
- {
- cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
- use_async_io=0; /* No need to use async */
- }
- }
- }
- if ((int) type < (int) READ_NET)
- {
- for (;;)
- {
- cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
- (ulong) ~(min_cache-1));
- if (cachesize < min_cache)
- cachesize = min_cache;
- if ((info->buffer=
- (byte*) my_malloc(cachesize,
- MYF((cache_myflags & ~ MY_WME) |
- (cachesize == min_cache ? MY_WME : 0)))) != 0)
- break; /* Enough memory found */
- if (cachesize == min_cache)
- DBUG_RETURN(2); /* Can't alloc cache */
- cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
- }
- }
- else
- info->buffer=0;
- DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
- info->pos_in_file= seek_offset;
- info->read_length=info->buffer_length=cachesize;
- info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
- type != READ_NET);
- info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
- info->rc_request_pos=info->rc_pos=info->buffer;
-
- if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
- {
- info->rc_end=info->buffer; /* Nothing in cache */
- }
- else /* type == WRITE_CACHE */
- {
- info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
- }
- /* end_of_file may be changed by user later */
- info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
- : ~(my_off_t) 0);
- info->type=type;
- info->error=0;
- info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */
-#ifdef HAVE_AIOWAIT
- if (use_async_io && ! my_disable_async_io)
- {
- DBUG_PRINT("info",("Using async io"));
- info->read_length/=2;
- info->read_function=_my_b_async_read;
- }
- info->inited=info->aio_result.pending=0;
-#endif
- DBUG_RETURN(0);
-} /* init_io_cache */
-
-
- /* Wait until current request is ready */
-
-#ifdef HAVE_AIOWAIT
-static void my_aiowait(my_aio_result *result)
-{
- if (result->pending)
- {
- struct aio_result_t *tmp;
- for (;;)
- {
- if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
- {
- if (errno == EINTR)
- continue;
- DBUG_PRINT("error",("No aio request, error: %d",errno));
- result->pending=0; /* Assume everythings is ok */
- break;
- }
- ((my_aio_result*) tmp)->pending=0;
- if ((my_aio_result*) tmp == result)
- break;
- }
- }
- return;
-}
-#endif
-
- /* Use this to reset cache to start or other type */
- /* Some simple optimizing is done when reinit in current buffer */
-
-my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
- my_off_t seek_offset,
- pbool use_async_io __attribute__((unused)),
- pbool clear_cache)
-{
- DBUG_ENTER("reinit_io_cache");
-
- info->seek_not_done= test(info->file >= 0); /* Seek not done */
-
- /* If the whole file is in memory, avoid flushing to disk */
- if (! clear_cache &&
- seek_offset >= info->pos_in_file &&
- seek_offset <= info->pos_in_file +
- (uint) (info->rc_end - info->rc_request_pos))
- { /* use current buffer */
- if (info->type == WRITE_CACHE && type == READ_CACHE)
- {
- info->rc_end=info->rc_pos;
- info->end_of_file=my_b_tell(info);
- }
- else if (type == WRITE_CACHE)
- {
- if (info->type == READ_CACHE)
- info->rc_end=info->buffer+info->buffer_length;
- info->end_of_file = ~(my_off_t) 0;
- }
- info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
-#ifdef HAVE_AIOWAIT
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
-#endif
- }
- else
- {
- /*
- If we change from WRITE_CACHE to READ_CACHE, assume that everything
- after the current positions should be ignored
- */
- if (info->type == WRITE_CACHE && type == READ_CACHE)
- info->end_of_file=my_b_tell(info);
- /* No need to flush cache if we want to reuse it */
- if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
- DBUG_RETURN(1);
- if (info->pos_in_file != seek_offset)
- {
- info->pos_in_file=seek_offset;
- info->seek_not_done=1;
- }
- info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
- {
- info->rc_end=info->buffer; /* Nothing in cache */
- }
- else
- {
- info->rc_end=info->buffer+info->buffer_length-
- (seek_offset & (IO_SIZE-1));
- info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
- ~(my_off_t) 0);
- }
- }
- info->type=type;
- info->error=0;
- info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read;
-#ifdef HAVE_AIOWAIT
- if (type != READ_NET)
- {
- if (use_async_io && ! my_disable_async_io &&
- ((ulong) info->buffer_length <
- (ulong) (info->end_of_file - seek_offset)))
- {
- info->read_length=info->buffer_length/2;
- info->read_function=_my_b_async_read;
- }
- }
- info->inited=0;
-#endif
- DBUG_RETURN(0);
-} /* init_io_cache */
-
-
-
- /*
- Read buffered. Returns 1 if can't read requested characters
- This function is only called from the my_b_read() macro
- when there isn't enough characters in the buffer to
- satisfy the request.
- Returns 0 we succeeded in reading all data
- */
-
-int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
-{
- uint length,diff_length,left_length;
- my_off_t max_length, pos_in_file;
-
- if ((left_length=(uint) (info->rc_end-info->rc_pos)))
- {
- dbug_assert(Count >= left_length); /* User is not using my_b_read() */
- memcpy(Buffer,info->rc_pos, (size_t) (left_length));
- Buffer+=left_length;
- Count-=left_length;
- }
- /* pos_in_file always point on where info->buffer was read */
- pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
- if (info->seek_not_done)
- { /* File touched, do seek */
- VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
- info->seek_not_done=0;
- }
- diff_length=(uint) (pos_in_file & (IO_SIZE-1));
- if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
- { /* Fill first intern buffer */
- uint read_length;
- if (info->end_of_file == pos_in_file)
- { /* End of file */
- info->error=(int) left_length;
- return 1;
- }
- length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
- if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
- != (uint) length)
- {
- info->error= read_length == (uint) -1 ? -1 :
- (int) (read_length+left_length);
- return 1;
- }
- Count-=length;
- Buffer+=length;
- pos_in_file+=length;
- left_length+=length;
- diff_length=0;
- }
- max_length=info->read_length-diff_length;
- if (info->type != READ_FIFO &&
- (info->end_of_file - pos_in_file) < max_length)
- max_length = info->end_of_file - pos_in_file;
- if (!max_length)
- {
- if (Count)
- {
- info->error= left_length; /* We only got this many char */
- return 1;
- }
- length=0; /* Didn't read any chars */
- }
- else if ((length=my_read(info->file,info->buffer,(uint) max_length,
- info->myflags)) < Count ||
- length == (uint) -1)
- {
- if (length != (uint) -1)
- memcpy(Buffer,info->buffer,(size_t) length);
- info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
- return 1;
- }
- info->rc_pos=info->buffer+Count;
- info->rc_end=info->buffer+length;
- info->pos_in_file=pos_in_file;
- memcpy(Buffer,info->buffer,(size_t) Count);
- return 0;
-}
-
- /*
** Read buffered from the net.
** Returns 1 if can't read requested characters
** Returns 0 if record read
@@ -359,341 +70,8 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length;
Buffer[0] = info->rc_pos[0]; /* length is always 1 */
info->rc_pos++;
+ info->buffer = info->rc_pos;
return 0;
}
-#ifdef HAVE_AIOWAIT
-
-int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
-{
- uint length,read_length,diff_length,left_length,use_length,org_Count;
- my_off_t max_length;
- my_off_t next_pos_in_file;
- byte *read_buffer;
-
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
- Buffer+=left_length;
- org_Count=Count;
- Count-=left_length;
-
- if (info->inited)
- { /* wait for read block */
- info->inited=0; /* No more block to read */
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
- if (info->aio_result.result.aio_errno)
- {
- if (info->myflags & MY_WME)
- my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
- my_filename(info->file),
- info->aio_result.result.aio_errno);
- my_errno=info->aio_result.result.aio_errno;
- info->error= -1;
- return(1);
- }
- if (! (read_length = (uint) info->aio_result.result.aio_return) ||
- read_length == (uint) -1)
- {
- my_errno=0; /* For testing */
- info->error= (read_length == (uint) -1 ? -1 :
- (int) (read_length+left_length));
- return(1);
- }
- info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos);
-
- if (info->rc_request_pos != info->buffer)
- info->rc_request_pos=info->buffer;
- else
- info->rc_request_pos=info->buffer+info->read_length;
- info->rc_pos=info->rc_request_pos;
- next_pos_in_file=info->aio_read_pos+read_length;
-
- /* Check if pos_in_file is changed
- (_ni_read_cache may have skipped some bytes) */
-
- if (info->aio_read_pos < info->pos_in_file)
- { /* Fix if skipped bytes */
- if (info->aio_read_pos + read_length < info->pos_in_file)
- {
- read_length=0; /* Skipp block */
- next_pos_in_file=info->pos_in_file;
- }
- else
- {
- my_off_t offset= (info->pos_in_file - info->aio_read_pos);
- info->pos_in_file=info->aio_read_pos; /* Whe are here */
- info->rc_pos=info->rc_request_pos+offset;
- read_length-=offset; /* Bytes left from rc_pos */
- }
- }
-#ifndef DBUG_OFF
- if (info->aio_read_pos > info->pos_in_file)
- {
- my_errno=EINVAL;
- return(info->read_length= -1);
- }
-#endif
- /* Copy found bytes to buffer */
- length=min(Count,read_length);
- memcpy(Buffer,info->rc_pos,(size_t) length);
- Buffer+=length;
- Count-=length;
- left_length+=length;
- info->rc_end=info->rc_pos+read_length;
- info->rc_pos+=length;
- }
- else
- next_pos_in_file=(info->pos_in_file+ (uint)
- (info->rc_end - info->rc_request_pos));
-
- /* If reading large blocks, or first read or read with skipp */
- if (Count)
- {
- if (next_pos_in_file == info->end_of_file)
- {
- info->error=(int) (read_length+left_length);
- return 1;
- }
- VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0)));
- read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1));
- if (Count < read_length)
- { /* Small block, read to cache */
- if ((read_length=my_read(info->file,info->rc_request_pos,
- read_length, info->myflags)) == (uint) -1)
- return info->error= -1;
- use_length=min(Count,read_length);
- memcpy(Buffer,info->rc_request_pos,(size_t) use_length);
- info->rc_pos=info->rc_request_pos+Count;
- info->rc_end=info->rc_request_pos+read_length;
- info->pos_in_file=next_pos_in_file; /* Start of block in cache */
- next_pos_in_file+=read_length;
-
- if (Count != use_length)
- { /* Didn't find hole block */
- if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
- my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
- my_filename(info->file),my_errno);
- info->error=(int) (read_length+left_length);
- return 1;
- }
- }
- else
- { /* Big block, don't cache it */
- if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags))
- != Count)
- {
- info->error= read_length == (uint) -1 ? -1 : read_length+left_length;
- return 1;
- }
- info->rc_pos=info->rc_end=info->rc_request_pos;
- info->pos_in_file=(next_pos_in_file+=Count);
- }
- }
-
- /* Read next block with asyncronic io */
- max_length=info->end_of_file - next_pos_in_file;
- diff_length=(next_pos_in_file & (IO_SIZE-1));
-
- if (max_length > (my_off_t) info->read_length - diff_length)
- max_length= (my_off_t) info->read_length - diff_length;
- if (info->rc_request_pos != info->buffer)
- read_buffer=info->buffer;
- else
- read_buffer=info->buffer+info->read_length;
- info->aio_read_pos=next_pos_in_file;
- if (max_length)
- {
- info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
- DBUG_PRINT("aioread",("filepos: %ld length: %ld",
- (ulong) next_pos_in_file,(ulong) max_length));
- if (aioread(info->file,read_buffer,(int) max_length,
- (my_off_t) next_pos_in_file,MY_SEEK_SET,
- &info->aio_result.result))
- { /* Skipp async io */
- my_errno=errno;
- DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
- errno, info->aio_result.result.aio_errno));
- if (info->rc_request_pos != info->buffer)
- {
- bmove(info->buffer,info->rc_request_pos,
- (uint) (info->rc_end - info->rc_pos));
- info->rc_request_pos=info->buffer;
- info->rc_pos-=info->read_length;
- info->rc_end-=info->read_length;
- }
- info->read_length=info->buffer_length; /* Use hole buffer */
- info->read_function=_my_b_read; /* Use normal IO_READ next */
- }
- else
- info->inited=info->aio_result.pending=1;
- }
- return 0; /* Block read, async in use */
-} /* _my_b_async_read */
-#endif
-
-
-/* Read one byte when buffer is empty */
-
-int _my_b_get(IO_CACHE *info)
-{
- byte buff;
- IO_CACHE_CALLBACK pre_read,post_read;
- if ((pre_read = info->pre_read))
- (*pre_read)(info);
- if ((*(info)->read_function)(info,&buff,1))
- return my_b_EOF;
- if ((post_read = info->post_read))
- (*post_read)(info);
- return (int) (uchar) buff;
-}
-
- /* Returns != 0 if error on write */
-
-int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
-{
- uint rest_length,length;
-
- rest_length=(uint) (info->rc_end - info->rc_pos);
- memcpy(info->rc_pos,Buffer,(size_t) rest_length);
- Buffer+=rest_length;
- Count-=rest_length;
- info->rc_pos+=rest_length;
- if (info->pos_in_file+info->buffer_length > info->end_of_file)
- {
- my_errno=errno=EFBIG;
- return info->error = -1;
- }
- if (flush_io_cache(info))
- return 1;
- if (Count >= IO_SIZE)
- { /* Fill first intern buffer */
- length=Count & (uint) ~(IO_SIZE-1);
- if (info->seek_not_done)
- { /* File touched, do seek */
- VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)));
- info->seek_not_done=0;
- }
- if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
- return info->error= -1;
- Count-=length;
- Buffer+=length;
- info->pos_in_file+=length;
- }
- memcpy(info->rc_pos,Buffer,(size_t) Count);
- info->rc_pos+=Count;
- return 0;
-}
-
-
-/*
- Write a block to disk where part of the data may be inside the record
- buffer. As all write calls to the data goes through the cache,
- we will never get a seek over the end of the buffer
-*/
-
-int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
- my_off_t pos)
-{
- uint length;
- int error=0;
-
- if (pos < info->pos_in_file)
- {
- /* Of no overlap, write everything without buffering */
- if (pos + Count <= info->pos_in_file)
- return my_pwrite(info->file, Buffer, Count, pos,
- info->myflags | MY_NABP);
- /* Write the part of the block that is before buffer */
- length= (uint) (info->pos_in_file - pos);
- if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
- info->error=error=-1;
- Buffer+=length;
- pos+= length;
- Count-= length;
- }
-
- /* Check if we want to write inside the used part of the buffer.*/
- length= (uint) (info->rc_end - info->buffer);
- if (pos < info->pos_in_file + length)
- {
- uint offset= (uint) (pos - info->pos_in_file);
- length-=offset;
- if (length > Count)
- length=Count;
- memcpy(info->buffer+offset, Buffer, length);
- Buffer+=length;
- Count-= length;
- /* Fix length of buffer if the new data was larger */
- if (info->buffer+length > info->rc_pos)
- info->rc_pos=info->buffer+length;
- if (!Count)
- return (error);
- }
- /* Write at the end of the current buffer; This is the normal case */
- if (_my_b_write(info, Buffer, Count))
- error= -1;
- return error;
-}
-
- /* Flush write cache */
-
-int flush_io_cache(IO_CACHE *info)
-{
- uint length;
- DBUG_ENTER("flush_io_cache");
-
- if (info->type == WRITE_CACHE)
- {
- if (info->file == -1)
- {
- if (real_open_cached_file(info))
- DBUG_RETURN((info->error= -1));
- }
- if (info->rc_pos != info->buffer)
- {
- length=(uint) (info->rc_pos - info->buffer);
- if (info->seek_not_done)
- { /* File touched, do seek */
- if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
- MY_FILEPOS_ERROR)
- DBUG_RETURN((info->error= -1));
- info->seek_not_done=0;
- }
- info->rc_pos=info->buffer;
- info->pos_in_file+=length;
- info->rc_end=(info->buffer+info->buffer_length-
- (info->pos_in_file & (IO_SIZE-1)));
- if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
- DBUG_RETURN((info->error= -1));
- DBUG_RETURN(0);
- }
- }
-#ifdef HAVE_AIOWAIT
- else if (info->type != READ_NET)
- {
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
- info->inited=0;
- }
-#endif
- DBUG_RETURN(0);
-}
-
-
-int end_io_cache(IO_CACHE *info)
-{
- int error=0;
- IO_CACHE_CALLBACK pre_close;
- DBUG_ENTER("end_io_cache");
- if((pre_close=info->pre_close))
- (*pre_close)(info);
- if (info->buffer)
- {
- if (info->file != -1) /* File doesn't exist */
- error=flush_io_cache(info);
- my_free((gptr) info->buffer,MYF(MY_WME));
- info->buffer=info->rc_pos=(byte*) 0;
- }
- DBUG_RETURN(error);
-} /* end_io_cache */
-
} /* extern "C" */
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 40eb3b8bb7c..ece8e11064b 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -18,6 +18,10 @@
#include "mysql_priv.h"
#include "repl_failsafe.h"
+#include "sql_repl.h"
+#include "slave.h"
+#include "mini_client.h"
+#include <mysql.h>
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
+static int init_failsafe_rpl_thread(THD* thd)
+{
+ DBUG_ENTER("init_failsafe_rpl_thread");
+ thd->system_thread = thd->bootstrap = 1;
+ thd->client_capabilities = 0;
+ my_net_init(&thd->net, 0);
+ thd->net.timeout = slave_net_timeout;
+ thd->max_packet_length=thd->net.max_packet;
+ thd->master_access= ~0;
+ thd->priv_user = 0;
+ thd->system_thread = 1;
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id = thread_id++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ if (init_thr_lock() ||
+ my_pthread_setspecific_ptr(THR_THD, thd) ||
+ my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
+ my_pthread_setspecific_ptr(THR_NET, &thd->net))
+ {
+ close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
+ end_thread(thd,0);
+ DBUG_RETURN(-1);
+ }
+
+ thd->mysys_var=my_thread_var;
+ thd->dbug_thread_id=my_thread_id();
+#if !defined(__WIN__) && !defined(OS2)
+ sigset_t set;
+ VOID(sigemptyset(&set)); // Get mask in use
+ VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
+#endif
+
+ thd->mem_root.free=thd->mem_root.used=0;
+ if (thd->max_join_size == (ulong) ~0L)
+ thd->options |= OPTION_BIG_SELECTS;
+
+ thd->proc_info="Thread initialized";
+ thd->version=refresh_version;
+ thd->set_time();
+ DBUG_RETURN(0);
+}
+
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status;
+ pthread_cond_signal(&COND_rpl_status);
+ pthread_mutex_unlock(&LOCK_rpl_status);
+}
+
+int update_slave_list(MYSQL* mysql)
+{
+ MYSQL_RES* res=0;
+ MYSQL_ROW row;
+ const char* error=0;
+ bool have_auth_info;
+ int port_ind;
+
+ if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
+ !(res = mc_mysql_store_result(mysql)))
+ {
+ error = "Query error";
+ goto err;
+ }
+
+ switch (mc_mysql_num_fields(res))
+ {
+ case 5:
+ have_auth_info = 0;
+ port_ind=2;
+ break;
+ case 7:
+ have_auth_info = 1;
+ port_ind=4;
+ break;
+ default:
+ error = "Invalid number of fields in SHOW SLAVE HOSTS";
+ goto err;
+ }
+
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ while ((row = mc_mysql_fetch_row(res)))
+ {
+ uint32 server_id;
+ SLAVE_INFO* si, *old_si;
+ server_id = atoi(row[0]);
+ if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
+ (byte*)&server_id,4)))
+ si = old_si;
+ else
+ {
+ if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
+ {
+ error = "Out of memory";
+ pthread_mutex_unlock(&LOCK_slave_list);
+ goto err;
+ }
+ si->server_id = server_id;
+ }
+ strnmov(si->host, row[1], sizeof(si->host));
+ si->port = atoi(row[port_ind]);
+ si->rpl_recovery_rank = atoi(row[port_ind+1]);
+ si->master_id = atoi(row[port_ind+2]);
+ if (have_auth_info)
+ {
+ strnmov(si->user, row[2], sizeof(si->user));
+ strnmov(si->password, row[3], sizeof(si->password));
+ }
+ }
+ pthread_mutex_unlock(&LOCK_slave_list);
+err:
+ if (res)
+ mc_mysql_free_result(res);
+ if (error)
+ {
+ sql_print_error("Error updating slave list:",error);
+ return 1;
+ }
+ return 0;
+}
+
+int find_recovery_captain(THD* thd, MYSQL* mysql)
+{
+
+ return 0;
+}
+
+pthread_handler_decl(handle_failsafe_rpl,arg)
+{
+ DBUG_ENTER("handle_failsafe_rpl");
+ THD *thd = new THD;
+ thd->thread_stack = (char*)&thd;
+ MYSQL* recovery_captain = 0;
+ pthread_detach_this_thread();
+ if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
+ {
+ sql_print_error("Could not initialize failsafe replication thread");
+ goto err;
+ }
+ pthread_mutex_lock(&LOCK_rpl_status);
+ while (!thd->killed && !abort_loop)
+ {
+ bool break_req_chain = 0;
+ const char* msg = thd->enter_cond(&COND_rpl_status,
+ &LOCK_rpl_status, "Waiting for request");
+ pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
+ thd->proc_info="Processling request";
+ while (!break_req_chain)
+ {
+ switch (rpl_status)
+ {
+ case RPL_LOST_SOLDIER:
+ if (find_recovery_captain(thd, recovery_captain))
+ rpl_status=RPL_TROOP_SOLDIER;
+ else
+ rpl_status=RPL_RECOVERY_CAPTAIN;
+ break_req_chain=1; /* for now until other states are implemented */
+ break;
+ default:
+ break_req_chain=1;
+ break;
+ }
+ }
+ thd->exit_cond(msg);
+ }
pthread_mutex_unlock(&LOCK_rpl_status);
+err:
+ if (recovery_captain)
+ mc_mysql_close(recovery_captain);
+ delete thd;
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(0);
}
+
+
+
diff --git a/sql/repl_failsafe.h b/sql/repl_failsafe.h
index 42b386e6255..b71dde1dc10 100644
--- a/sql/repl_failsafe.h
+++ b/sql/repl_failsafe.h
@@ -1,6 +1,8 @@
#ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
+#include "mysql.h"
+
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
extern pthread_mutex_t LOCK_rpl_status;
extern pthread_cond_t COND_rpl_status;
extern TYPELIB rpl_role_typelib, rpl_status_typelib;
+extern uint rpl_recovery_rank;
extern const char* rpl_role_type[], *rpl_status_type[];
+pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
+int find_recovery_captain(THD* thd, MYSQL* mysql);
+int update_slave_list(MYSQL* mysql);
#endif
diff --git a/sql/slave.cc b/sql/slave.cc
index 11224421ccc..8075b5ad75b 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
+static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
+ bool reconnect);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
int2store(buf, (uint16)report_port);
packet.append(buf, 2);
+ int4store(buf, rpl_recovery_rank);
+ packet.append(buf, 4);
+ int4store(buf, 0); /* tell the master will fill in master_id */
+ packet.append(buf, 4);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
@@ -868,7 +874,7 @@ command");
}
-static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
+static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
// for convinience lets think we start by
@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
- thd->set_time();
DBUG_ENTER("handle_slave");
pthread_detach_this_thread();
@@ -1067,6 +1072,7 @@ connected:
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
+ update_slave_list(mysql);
while (!slave_killed(thd))
{
@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
while(!slave_killed(thd))
{
thd->proc_info = "Reading master update";
- uint event_len = read_event(mysql, &glob_mi);
+ ulong event_len = read_event(mysql, &glob_mi);
if(slave_killed(thd))
{
sql_print_error("Slave thread killed while reading event");
@@ -1244,30 +1250,7 @@ position %s",
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
- int slave_was_killed;
-#ifndef DBUG_OFF
- events_till_disconnect = disconnect_slave_event_count;
-#endif
- while(!(slave_was_killed = slave_killed(thd)) &&
- !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, 0))
- {
- sql_print_error("Slave thread: error connecting to master: %s (%d),\
- retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
- safe_sleep(thd, mi->connect_retry);
- }
-
- if(!slave_was_killed)
- {
- change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
- mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
- mi->user, mi->host, mi->port);
-#ifdef SIGNAL_WITH_VIO_CLOSE
- thd->set_active_vio(mysql->net.vio);
-#endif
- }
-
- return slave_was_killed;
+ return connect_to_master(thd, mysql, mi, 0);
}
/*
@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
master_retry_count times
*/
-static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
+static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
+ bool reconnect)
{
int slave_was_killed;
int last_errno= -2; // impossible error
@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
- while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
+ while (!(slave_was_killed = slave_killed(thd)) &&
+ (reconnect ? mc_mysql_reconnect(mysql) :
+ !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
- sql_print_error("Slave thread: error re-connecting to master: \
+ sql_print_error("Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
@@ -1309,18 +1296,26 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if (master_retry_count && err_count++ == master_retry_count)
{
slave_was_killed=1;
- change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
+ if (reconnect)
+ change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break;
}
}
if (!slave_was_killed)
{
- sql_print_error("Slave: reconnected to master '%s@%s:%d',\
+ if (reconnect)
+ sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
llstr(glob_mi.pos,llbuff));
+ else
+ {
+ change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
+ mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
+ mi->user, mi->host, mi->port);
+ }
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
return slave_was_killed;
}
+/*
+ Try to connect until successful or slave killed or we have retried
+ master_retry_count times
+*/
+
+static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
+{
+ return connect_to_master(thd, mysql, mi, 1);
+}
+
+
#ifdef __GNUC__
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index c2793da78f3..28140121491 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -278,8 +278,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
ha_autocommit_or_rollback(thd,error);
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
- Delete_file_log_event d(thd);
- mysql_bin_log.write(&d);
+ if (lf_info.wrote_create_file)
+ {
+ Delete_file_log_event d(thd);
+ mysql_bin_log.write(&d);
+ }
}
DBUG_RETURN(-1); // Error on read
}
@@ -303,8 +306,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!opt_old_rpl_compat)
{
read_info.end_io_cache(); // make sure last block gets logged
- Execute_load_log_event e(thd);
- mysql_bin_log.write(&e);
+ if (lf_info.wrote_create_file)
+ {
+ Execute_load_log_event e(thd);
+ mysql_bin_log.write(&e);
+ }
}
}
if (using_transactions)
@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
}
else
{
+ /* init_io_cache() will not initialize read_function member
+ if the cache is READ_NET. The reason is explained in
+ mysys/mf_iocache.c. So we work around the problem with a
+ manual assignment
+ */
+ if (get_it_from_net)
+ cache.read_function = _my_b_net_read;
+
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index e05b17bedef..b5300813410 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
+ p += 2;
+ si->rpl_recovery_rank = uint4korr(p);
+ p += 4;
+ if (!(si->master_id = uint4korr(p)))
+ si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
@@ -534,6 +539,7 @@ impossible position";
DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock);
+ DBUG_PRINT("wait",("binary log received update"));
break;
default:
@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
+ field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
+ field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
+ net_store_data(packet, si->rpl_recovery_rank);
+ net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
- if (!(block_len = file->rc_end - file->buffer))
+ char* buffer = (char*)file->buffer;
+ if (!(block_len = file->rc_end - buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
- Append_block_log_event a(lf_info->thd, (char*) file->buffer, block_len);
+ Append_block_log_event a(lf_info->thd, buffer, block_len);
mysql_bin_log.write(&a);
}
else
{
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields,
- lf_info->handle_dup, (char*) file->buffer,
+ lf_info->handle_dup, buffer,
block_len);
mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1;
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index d6fafd31f21..c12e0536058 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -6,6 +6,7 @@
typedef struct st_slave_info
{
uint32 server_id;
+ uint32 rpl_recovery_rank, master_id;
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];