diff options
-rw-r--r-- | .bzrignore | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 2423 | ||||
-rw-r--r-- | sql/log_event.h | 107 |
3 files changed, 1619 insertions, 913 deletions
diff --git a/.bzrignore b/.bzrignore index bacfe2ff975..0a6384dfccb 100644 --- a/.bzrignore +++ b/.bzrignore @@ -513,3 +513,5 @@ innobase/stamp-h1 myisam/rt_test.MYD myisam/rt_test.MYI stamp-h1 +scripts/fill_func_tables +scripts/fill_func_tables.sql diff --git a/sql/log_event.cc b/sql/log_event.cc index 50cfb835a5c..014cdcb4961 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -26,6 +26,11 @@ #include <assert.h> +/***************************************************************************** + + my_b_safe_write() + + ****************************************************************************/ inline int my_b_safe_write(IO_CACHE* file, const byte *buf, int len) { @@ -40,6 +45,11 @@ inline int my_b_safe_write(IO_CACHE* file, const byte *buf, return my_b_write(file, buf,len); } +/***************************************************************************** + + pretty_print_str() + + ****************************************************************************/ #ifdef MYSQL_CLIENT static void pretty_print_str(FILE* file, char* str, int len) { @@ -63,16 +73,26 @@ static void pretty_print_str(FILE* file, char* str, int len) } fputc('\'', file); } -#endif +#endif // MYSQL_CLIENT -#ifndef MYSQL_CLIENT +/***************************************************************************** + ignored_error_code() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT inline int ignored_error_code(int err_code) { return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code); } +#endif // !MYSQL_CLIENT + +/***************************************************************************** + pretty_print_str() + ****************************************************************************/ +#ifndef MYSQL_CLIENT static void pretty_print_str(String* packet, char* str, int len) { char* end = str + len; @@ -95,8 +115,14 @@ static void pretty_print_str(String* packet, char* str, int len) } packet->append('\''); } +#endif // !MYSQL_CLIENT + +/***************************************************************************** + slave_load_file_stem() + ****************************************************************************/ +#ifndef MYSQL_CLIENT static inline char* slave_load_file_stem(char*buf, uint file_id, int event_server_id) { @@ -108,9 +134,81 @@ static inline char* slave_load_file_stem(char*buf, uint file_id, *buf++ = '-'; return int10_to_str(file_id, buf, 10); } +#endif // !MYSQL_CLIENT -#endif +/***************************************************************************** + + cleanup_load_tmpdir() + + Delete all temporary files used for SQL_LOAD. + + TODO + - When we get a 'server start' event, we should only remove + the files associated with the server id that just started. + Easily fixable by adding server_id as a prefix to the log files. + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +static void cleanup_load_tmpdir() +{ + MY_DIR *dirp; + FILEINFO *file; + uint i; + if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME)))) + return; + + for (i=0 ; i < (uint)dirp->number_off_files; i++) + { + file=dirp->dir_entry+i; + if (is_prefix(file->name,"SQL_LOAD-")) + my_delete(file->name, MYF(0)); + } + + my_dirend(dirp); +} +#endif // !MYSQL_CLIENT +/***************************************************************************** + + write_str() + + ****************************************************************************/ +static bool write_str(IO_CACHE *file, char *str, byte length) +{ + return (my_b_safe_write(file, &length, 1) || + my_b_safe_write(file, (byte*) str, (int) length)); +} + +/***************************************************************************** + + read_str() + + ****************************************************************************/ +static inline int read_str(char * &buf, char *buf_end, char * &str, + uint8 &len) +{ + if (buf + (uint) (uchar) *buf >= buf_end) + return 1; + len = (uint8) *buf; + str= buf+1; + buf+= (uint) len+1; + return 0; +} + + +/***************************************************************************** + ***************************************************************************** + + Log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Log_event::get_type_str() + + ****************************************************************************/ const char* Log_event::get_type_str() { switch(get_type_code()) { @@ -131,6 +229,11 @@ const char* Log_event::get_type_str() } } +/***************************************************************************** + + Log_event::Log_event() + + ****************************************************************************/ #ifndef MYSQL_CLIENT Log_event::Log_event(THD* thd_arg, uint16 flags_arg) :exec_time(0), flags(flags_arg), cached_event_len(0), @@ -139,46 +242,23 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg) if (thd) { server_id = thd->server_id; - when = thd->start_time; - log_pos = thd->log_pos; + when = thd->start_time; + log_pos = thd->log_pos; } else { server_id = ::server_id; - when = time(NULL); - log_pos=0; + when = time(NULL); + log_pos =0; } } +#endif // !MYSQL_CLIENT -/* - Delete all temporary files used for SQL_LOAD. - - TODO - - When we get a 'server start' event, we should only remove - the files associated with the server id that just started. - Easily fixable by adding server_id as a prefix to the log files. -*/ - -static void cleanup_load_tmpdir() -{ - MY_DIR *dirp; - FILEINFO *file; - uint i; - if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME)))) - return; - - for (i=0 ; i < (uint)dirp->number_off_files; i++) - { - file=dirp->dir_entry+i; - if (is_prefix(file->name,"SQL_LOAD-")) - my_delete(file->name, MYF(0)); - } - - my_dirend(dirp); -} +/***************************************************************************** -#endif + Log_event::Log_event() + ****************************************************************************/ Log_event::Log_event(const char* buf, bool old_format) :cached_event_len(0), temp_buf(0) { @@ -202,6 +282,11 @@ Log_event::Log_event(const char* buf, bool old_format) #ifndef MYSQL_CLIENT +/***************************************************************************** + + Log_event::exec_event() + + ****************************************************************************/ int Log_event::exec_event(struct st_relay_log_info* rli) { if (rli) // QQ When is this not true ? @@ -213,172 +298,21 @@ int Log_event::exec_event(struct st_relay_log_info* rli) return 0; } -void Log_event::pack_info(String* packet) -{ - net_store_data(packet, "", 0); -} - -void Query_log_event::pack_info(String* packet) -{ - char buf[256]; - String tmp(buf, sizeof(buf), system_charset_info); - tmp.length(0); - if (db && db_len) - { - tmp.append("use `", 5); - tmp.append(db, db_len); - tmp.append("`; ", 3); - } - - if (query && q_len) - tmp.append(query, q_len); - net_store_data(packet, (char*)tmp.ptr(), tmp.length()); -} - -void Start_log_event::pack_info(String* packet) -{ - char buf1[256]; - String tmp(buf1, sizeof(buf1), system_charset_info); - tmp.length(0); - char buf[22]; - - tmp.append("Server ver: "); - tmp.append(server_version); - tmp.append(", Binlog ver: "); - tmp.append(llstr(binlog_version, buf)); - net_store_data(packet, tmp.ptr(), tmp.length()); -} - -void Load_log_event::pack_info(String* packet) -{ - char buf[256]; - String tmp(buf, sizeof(buf), system_charset_info); - tmp.length(0); - if (db && db_len) - { - tmp.append("use "); - tmp.append(db, db_len); - tmp.append("; ", 2); - } - - tmp.append("LOAD DATA INFILE '"); - tmp.append(fname, fname_len); - tmp.append("' ", 2); - if (sql_ex.opt_flags && REPLACE_FLAG ) - tmp.append(" REPLACE "); - else if (sql_ex.opt_flags && IGNORE_FLAG ) - tmp.append(" IGNORE "); - - tmp.append("INTO TABLE "); - tmp.append(table_name); - if (sql_ex.field_term_len) - { - tmp.append(" FIELDS TERMINATED BY "); - pretty_print_str(&tmp, sql_ex.field_term, sql_ex.field_term_len); - } - - if (sql_ex.enclosed_len) - { - if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) - tmp.append(" OPTIONALLY "); - tmp.append( " ENCLOSED BY "); - pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len); - } - - if (sql_ex.escaped_len) - { - tmp.append( " ESCAPED BY "); - pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len); - } - - if (sql_ex.line_term_len) - { - tmp.append(" LINES TERMINATED BY "); - pretty_print_str(&tmp, sql_ex.line_term, sql_ex.line_term_len); - } - - if (sql_ex.line_start_len) - { - tmp.append(" LINES STARTING BY "); - pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len); - } - - if ((int)skip_lines > 0) - tmp.append( " IGNORE %ld LINES ", (long) skip_lines); - - if (num_fields) - { - uint i; - const char* field = fields; - tmp.append(" ("); - for (i = 0; i < num_fields; i++) - { - if (i) - tmp.append(" ,"); - tmp.append( field); - - field += field_lens[i] + 1; - } - tmp.append(')'); - } - - net_store_data(packet, tmp.ptr(), tmp.length()); -} - -void Rotate_log_event::pack_info(String* packet) -{ - char buf1[256], buf[22]; - String tmp(buf1, sizeof(buf1), system_charset_info); - tmp.length(0); - tmp.append(new_log_ident, ident_len); - tmp.append(";pos="); - tmp.append(llstr(pos,buf)); - if (flags & LOG_EVENT_FORCED_ROTATE_F) - tmp.append("; forced by master"); - net_store_data(packet, tmp.ptr(), tmp.length()); -} +/***************************************************************************** -void Intvar_log_event::pack_info(String* packet) -{ - char buf1[256], buf[22]; - String tmp(buf1, sizeof(buf1), system_charset_info); - tmp.length(0); - tmp.append(get_var_type_name()); - tmp.append('='); - tmp.append(llstr(val, buf)); - net_store_data(packet, tmp.ptr(), tmp.length()); -} + Log_event::pack_info() -void Rand_log_event::pack_info(String* packet) + ****************************************************************************/ +void Log_event::pack_info(String* packet) { - char buf1[256], buf[22]; - String tmp(buf1, sizeof(buf1), system_charset_info); - tmp.length(0); - tmp.append("randseed1="); - tmp.append(llstr(seed1, buf)); - tmp.append(",randseed2="); - tmp.append(llstr(seed2, buf)); - net_store_data(packet, tmp.ptr(), tmp.length()); + net_store_data(packet, "", 0); } -void Slave_log_event::pack_info(String* packet) -{ - char buf1[256], buf[22], *end; - String tmp(buf1, sizeof(buf1), system_charset_info); - tmp.length(0); - tmp.append("host="); - tmp.append(master_host); - tmp.append(",port="); - end= int10_to_str((long) master_port, buf, 10); - tmp.append(buf, (uint32) (end-buf)); - tmp.append(",log="); - tmp.append(master_log); - tmp.append(",pos="); - tmp.append(llstr(master_pos,buf)); - net_store_data(packet, tmp.ptr(), tmp.length()); -} +/***************************************************************************** + Log_event::init_show_field_list() + ****************************************************************************/ void Log_event::init_show_field_list(List<Item>* field_list) { field_list->push_back(new Item_empty_string("Log_name", 20)); @@ -389,9 +323,13 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Info", 20)); } -/* - * only called by SHOW BINLOG EVENTS - */ +/***************************************************************************** + + Log_event::net_send() + + Only called by SHOW BINLOG EVENTS + + ****************************************************************************/ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) { String* packet = &thd->packet; @@ -410,22 +348,23 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) pack_info(packet); return my_net_write(&thd->net, (char*) packet->ptr(), packet->length()); } +#endif // !MYSQL_CLIENT -#endif /* MYSQL_CLIENT */ - - -int Query_log_event::write(IO_CACHE* file) -{ - return query ? Log_event::write(file) : -1; -} +/***************************************************************************** + Log_event::write() + ****************************************************************************/ int Log_event::write(IO_CACHE* file) { return (write_header(file) || write_data(file)) ? -1 : 0; } +/***************************************************************************** + Log_event::write_header() + + ****************************************************************************/ int Log_event::write_header(IO_CACHE* file) { char buf[LOG_EVENT_HEADER_LEN]; @@ -445,8 +384,13 @@ int Log_event::write_header(IO_CACHE* file) return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf))); } -#ifndef MYSQL_CLIENT +/***************************************************************************** + + Log_event::read_log_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT int Log_event::read_log_event(IO_CACHE* file, String* packet, pthread_mutex_t* log_lock) { @@ -501,8 +445,7 @@ end: pthread_mutex_unlock(log_lock); DBUG_RETURN(result); } - -#endif // MYSQL_CLIENT +#endif // !MYSQL_CLIENT #ifndef MYSQL_CLIENT #define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); @@ -513,7 +456,13 @@ end: #define LOCK_MUTEX #endif -// allocates memory - the caller is responsible for clean-up +/***************************************************************************** + + Log_event::read_log_event() + + Allocates memory--the caller is responsible for clean-up + + ****************************************************************************/ #ifndef MYSQL_CLIENT Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock, @@ -576,7 +525,11 @@ data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]); return res; } +/***************************************************************************** + + Log_event::read_log_event() + ****************************************************************************/ Log_event* Log_event::read_log_event(const char* buf, int event_len, const char **error, bool old_format) { @@ -642,8 +595,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, return ev; } - #ifdef MYSQL_CLIENT + +/***************************************************************************** + + Log_event::print_header() + + ****************************************************************************/ void Log_event::print_header(FILE* file) { char llbuff[22]; @@ -653,6 +611,11 @@ void Log_event::print_header(FILE* file) llstr(log_pos,llbuff)); } +/***************************************************************************** + + Log_event::print_timestamp() + + ****************************************************************************/ void Log_event::print_timestamp(FILE* file, time_t* ts) { struct tm *res; @@ -674,113 +637,91 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) res->tm_sec); } +#endif // MYSQL_CLIENT -void Start_log_event::print(FILE* file, bool short_form, char* last_db) -{ - if (short_form) - return; +/***************************************************************************** - print_header(file); - fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, - server_version); - print_timestamp(file, (time_t*)&created); - fputc('\n', file); - fflush(file); -} + Log_event::set_log_pos() -void Stop_log_event::print(FILE* file, bool short_form, char* last_db) + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Log_event::set_log_pos(MYSQL_LOG* log) { - if (short_form) - return; - - print_header(file); - fprintf(file, "\tStop\n"); - fflush(file); + if (!log_pos) + log_pos = my_b_tell(&log->log_file); } +#endif // !MYSQL_CLIENT -void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) -{ - char buf[22]; - if (short_form) - return; - print_header(file); - fprintf(file, "\tRotate to "); - if (new_log_ident) - my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, - MYF(MY_NABP | MY_WME)); - fprintf(file, " pos: %s", llstr(pos, buf)); - if (flags & LOG_EVENT_FORCED_ROTATE_F) - fprintf(file," forced by master"); - fputc('\n', file); - fflush(file); -} -#endif /* #ifdef MYSQL_CLIENT */ +/***************************************************************************** + ***************************************************************************** + Query_log_event methods -Start_log_event::Start_log_event(const char* buf, - bool old_format) - :Log_event(buf, old_format) -{ - buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET); - memcpy(server_version, buf+ST_SERVER_VER_OFFSET, - ST_SERVER_VER_LEN); - created = uint4korr(buf+ST_CREATED_OFFSET); -} + ***************************************************************************** + ****************************************************************************/ -int Start_log_event::write_data(IO_CACHE* file) -{ - char buff[START_HEADER_LEN]; - int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); - memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); - int4store(buff + ST_CREATED_OFFSET,created); - return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); -} +#ifndef MYSQL_CLIENT +/***************************************************************************** + Query_log_event::pack_info() -Rotate_log_event::Rotate_log_event(const char* buf, int event_len, - bool old_format) - :Log_event(buf, old_format),new_log_ident(NULL),alloced(0) + ****************************************************************************/ +void Query_log_event::pack_info(String* packet) { - // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET - int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - uint ident_offset; - if (event_len < header_size) - return; - buf += header_size; - if (old_format) - { - ident_len = (uint)(event_len - OLD_HEADER_LEN); - pos = 4; - ident_offset = 0; - } - else + char buf[256]; + String tmp(buf, sizeof(buf), system_charset_info); + tmp.length(0); + if (db && db_len) { - ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD); - pos = uint8korr(buf + R_POS_OFFSET); - ident_offset = ROTATE_HEADER_LEN; + tmp.append("use `", 5); + tmp.append(db, db_len); + tmp.append("`; ", 3); } - set_if_smaller(ident_len,FN_REFLEN-1); - if (!(new_log_ident= my_strdup_with_length((byte*) buf + - ident_offset, - (uint) ident_len, - MYF(MY_WME)))) - return; - alloced = 1; + + if (query && q_len) + tmp.append(query, q_len); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); } +#endif // !MYSQL_CLIENT +/***************************************************************************** -int Rotate_log_event::write_data(IO_CACHE* file) + Query_log_event::write() + + ****************************************************************************/ +int Query_log_event::write(IO_CACHE* file) { - char buf[ROTATE_HEADER_LEN]; - int8store(buf, pos + R_POS_OFFSET); - return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || - my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); + return query ? Log_event::write(file) : -1; } +/***************************************************************************** + + Query_log_event::write_data() + ****************************************************************************/ +int Query_log_event::write_data(IO_CACHE* file) +{ + if (!query) + return -1; + + char buf[QUERY_HEADER_LEN]; + int4store(buf + Q_THREAD_ID_OFFSET, thread_id); + int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); + buf[Q_DB_LEN_OFFSET] = (char) db_len; + int2store(buf + Q_ERR_CODE_OFFSET, error_code); + + return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || + my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || + my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0; +} + +/***************************************************************************** + + Query_log_event::Query_log_event() + + ****************************************************************************/ #ifndef MYSQL_CLIENT Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans) @@ -796,8 +737,13 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, exec_time = (ulong) (end_time - thd->start_time); db_len = (db) ? (uint32) strlen(db) : 0; } -#endif +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Query_log_event::Query_log_event() + ****************************************************************************/ Query_log_event::Query_log_event(const char* buf, int event_len, bool old_format) :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) @@ -833,9 +779,12 @@ Query_log_event::Query_log_event(const char* buf, int event_len, *((char*)query+q_len) = 0; } +/***************************************************************************** -#ifdef MYSQL_CLIENT + Query_log_event::print() + ****************************************************************************/ +#ifdef MYSQL_CLIENT void Query_log_event::print(FILE* file, bool short_form, char* last_db) { char buff[40],*end; // Enough for SET TIMESTAMP @@ -863,113 +812,311 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); fprintf(file, ";\n"); } -#endif - +#endif // MYSQL_CLIENT -int Query_log_event::write_data(IO_CACHE* file) -{ - if (!query) - return -1; - - char buf[QUERY_HEADER_LEN]; - int4store(buf + Q_THREAD_ID_OFFSET, thread_id); - int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); - buf[Q_DB_LEN_OFFSET] = (char) db_len; - int2store(buf + Q_ERR_CODE_OFFSET, error_code); +/***************************************************************************** - return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || - my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || - my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0; -} + Query_log_event::exec_event() -Intvar_log_event::Intvar_log_event(const char* buf, bool old_format) - :Log_event(buf, old_format) + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Query_log_event::exec_event(struct st_relay_log_info* rli) { - buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - type = buf[I_TYPE_OFFSET]; - val = uint8korr(buf+I_VAL_OFFSET); -} + int expected_error,actual_error = 0; + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = rewrite_db((char*)db); -const char* Intvar_log_event::get_var_type_name() -{ - switch(type) { - case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID"; - case INSERT_ID_EVENT: return "INSERT_ID"; - default: /* impossible */ return "UNKNOWN"; + /* + InnoDB internally stores the master log position it has processed so far; + position to store is really pos + pending + event_len + since we must store the pos of the END of the current log event + */ + rli->event_len= get_event_len(); + + if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->query = (char*)query; + thd->set_time((time_t)when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + thd->query_error = 0; // clear error + thd->net.last_errno = 0; + thd->net.last_error[0] = 0; + thd->slave_proxy_id = thread_id; // for temp tables + + /* + Sanity check to make sure the master did not get a really bad + error on the query. + */ + if (ignored_error_code((expected_error = error_code)) || + !check_expected_error(thd,rli,expected_error)) + { + mysql_log.write(thd,COM_QUERY,"%s",thd->query); + DBUG_PRINT("query",("%s",thd->query)); + mysql_parse(thd, thd->query, q_len); + DBUG_PRINT("info",("expected_error: %d last_errno: %d", + expected_error, thd->net.last_errno)); + if ((expected_error != (actual_error= thd->net.last_errno)) && + expected_error && + !ignored_error_code(actual_error) && + !ignored_error_code(expected_error)) + { + const char* errmsg = "Slave: did not get the expected error\ + running query from master - expected: '%s' (%d), got '%s' (%d)"; + sql_print_error(errmsg, ER_SAFE(expected_error), + expected_error, + actual_error ? thd->net.last_error: "no error", + actual_error); + thd->query_error = 1; + } + else if (expected_error == actual_error || + ignored_error_code(actual_error)) + { + DBUG_PRINT("info",("error ignored")); + thd->query_error = 0; + *rli->last_slave_error = 0; + rli->last_slave_errno = 0; + } + } + else + { + // master could be inconsistent, abort and tell DBA to check/fix it + thd->db = thd->query = 0; + thd->variables.convert_set = 0; + close_thread_tables(thd); + free_root(&thd->mem_root,0); + return 1; + } + } + thd->db= 0; // prevent db from being freed + thd->query= 0; // just to be sure + // assume no convert for next query unless set explictly + thd->variables.convert_set = 0; + close_thread_tables(thd); + + if (thd->query_error || thd->fatal_error) + { + slave_print_error(rli,actual_error, "error '%s' on query '%s'", + actual_error ? thd->net.last_error : + "unexpected success or fatal error", query); + free_root(&thd->mem_root,0); + return 1; } + free_root(&thd->mem_root,0); + return Log_event::exec_event(rli); } +#endif // !MYSQL_CLIENT -int Intvar_log_event::write_data(IO_CACHE* file) + +/***************************************************************************** + ***************************************************************************** + + Start_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Start_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Start_log_event::pack_info(String* packet) { - char buf[9]; - buf[I_TYPE_OFFSET] = type; - int8store(buf + I_VAL_OFFSET, val); - return my_b_safe_write(file, (byte*) buf, sizeof(buf)); + char buf1[256]; + String tmp(buf1, sizeof(buf1), system_charset_info); + tmp.length(0); + char buf[22]; + + tmp.append("Server ver: "); + tmp.append(server_version); + tmp.append(", Binlog ver: "); + tmp.append(llstr(binlog_version, buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); } +#endif // !MYSQL_CLIENT +/***************************************************************************** + + Start_log_event::print() + + ****************************************************************************/ #ifdef MYSQL_CLIENT -void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) +void Start_log_event::print(FILE* file, bool short_form, char* last_db) { - char llbuff[22]; - const char *msg; - LINT_INIT(msg); - - if (!short_form) - { - print_header(file); - fprintf(file, "\tIntvar\n"); - } + if (short_form) + return; - fprintf(file, "SET "); - switch (type) { - case LAST_INSERT_ID_EVENT: - msg="LAST_INSERT_ID"; - break; - case INSERT_ID_EVENT: - msg="INSERT_ID"; - break; - } - fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); + print_header(file); + fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, + server_version); + print_timestamp(file, (time_t*)&created); + fputc('\n', file); fflush(file); } -#endif +#endif // MYSQL_CLIENT /***************************************************************************** - * - * Rand log event - * + + Start_log_event::Start_log_event() + ****************************************************************************/ -Rand_log_event::Rand_log_event(const char* buf, bool old_format) +Start_log_event::Start_log_event(const char* buf, + bool old_format) :Log_event(buf, old_format) { buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - seed1 = uint8korr(buf+RAND_SEED1_OFFSET); - seed2 = uint8korr(buf+RAND_SEED2_OFFSET); + binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET); + memcpy(server_version, buf+ST_SERVER_VER_OFFSET, + ST_SERVER_VER_LEN); + created = uint4korr(buf+ST_CREATED_OFFSET); } -int Rand_log_event::write_data(IO_CACHE* file) +/***************************************************************************** + + Start_log_event::write_data() + + ****************************************************************************/ +int Start_log_event::write_data(IO_CACHE* file) { - char buf[16]; - int8store(buf + RAND_SEED1_OFFSET, seed1); - int8store(buf + RAND_SEED2_OFFSET, seed2); - return my_b_safe_write(file, (byte*) buf, sizeof(buf)); + char buff[START_HEADER_LEN]; + int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); + memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); + int4store(buff + ST_CREATED_OFFSET,created); + return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } -#ifdef MYSQL_CLIENT -void Rand_log_event::print(FILE* file, bool short_form, char* last_db) +/***************************************************************************** + + Start_log_event::exec_event() + + The master started + + IMPLEMENTATION + - To handle the case where the master died without a stop event, + we clean up all temporary tables + locks that we got. + + TODO + - Remove all active user locks + - If we have an active transaction at this point, the master died + in the middle while writing the transaction to the binary log. + In this case we should stop the slave. + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Start_log_event::exec_event(struct st_relay_log_info* rli) { - char llbuff[22]; - if (!short_form) + /* All temporary tables was deleted on the master */ + close_temporary_tables(thd); + /* + If we have old format, load_tmpdir is cleaned up by the I/O thread + */ + if (!rli->mi->old_format) + cleanup_load_tmpdir(); + return Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Load_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Load_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Load_log_event::pack_info(String* packet) +{ + char buf[256]; + String tmp(buf, sizeof(buf), system_charset_info); + tmp.length(0); + if (db && db_len) { - print_header(file); - fprintf(file, "\tRand\n"); + tmp.append("use "); + tmp.append(db, db_len); + tmp.append("; ", 2); } - fprintf(file, "SET RAND SEED1=%s;\n", llstr(seed1, llbuff)); - fprintf(file, "SET RAND SEED2=%s;\n", llstr(seed2, llbuff)); - fflush(file); + + tmp.append("LOAD DATA INFILE '"); + tmp.append(fname, fname_len); + tmp.append("' ", 2); + if (sql_ex.opt_flags && REPLACE_FLAG ) + tmp.append(" REPLACE "); + else if (sql_ex.opt_flags && IGNORE_FLAG ) + tmp.append(" IGNORE "); + + tmp.append("INTO TABLE "); + tmp.append(table_name); + if (sql_ex.field_term_len) + { + tmp.append(" FIELDS TERMINATED BY "); + pretty_print_str(&tmp, sql_ex.field_term, sql_ex.field_term_len); + } + + if (sql_ex.enclosed_len) + { + if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) + tmp.append(" OPTIONALLY "); + tmp.append( " ENCLOSED BY "); + pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len); + } + + if (sql_ex.escaped_len) + { + tmp.append( " ESCAPED BY "); + pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len); + } + + if (sql_ex.line_term_len) + { + tmp.append(" LINES TERMINATED BY "); + pretty_print_str(&tmp, sql_ex.line_term, sql_ex.line_term_len); + } + + if (sql_ex.line_start_len) + { + tmp.append(" LINES STARTING BY "); + pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len); + } + + if ((int)skip_lines > 0) + tmp.append( " IGNORE %ld LINES ", (long) skip_lines); + + if (num_fields) + { + uint i; + const char* field = fields; + tmp.append(" ("); + for (i = 0; i < num_fields; i++) + { + if (i) + tmp.append(" ,"); + tmp.append( field); + + field += field_lens[i] + 1; + } + tmp.append(')'); + } + + net_store_data(packet, tmp.ptr(), tmp.length()); } -#endif +#endif // !MYSQL_CLIENT +/***************************************************************************** + + Load_log_event::write_data_header() + + ****************************************************************************/ int Load_log_event::write_data_header(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; @@ -982,6 +1129,11 @@ int Load_log_event::write_data_header(IO_CACHE* file) return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN); } +/***************************************************************************** + + Load_log_event::write_data_body() + + ****************************************************************************/ int Load_log_event::write_data_body(IO_CACHE* file) { if (sql_ex.write_data(file)) @@ -997,99 +1149,11 @@ int Load_log_event::write_data_body(IO_CACHE* file) my_b_safe_write(file, (byte*)fname, fname_len)); } +/***************************************************************************** + Load_log_event::Load_log_event() -static bool write_str(IO_CACHE *file, char *str, byte length) -{ - return (my_b_safe_write(file, &length, 1) || - my_b_safe_write(file, (byte*) str, (int) length)); -} - - -int sql_ex_info::write_data(IO_CACHE* file) -{ - if (new_format()) - { - return (write_str(file, field_term, field_term_len) || - write_str(file, enclosed, enclosed_len) || - write_str(file, line_term, line_term_len) || - write_str(file, line_start, line_start_len) || - write_str(file, escaped, escaped_len) || - my_b_safe_write(file,(byte*) &opt_flags,1)); - } - else - { - old_sql_ex old_ex; - old_ex.field_term= *field_term; - old_ex.enclosed= *enclosed; - old_ex.line_term= *line_term; - old_ex.line_start= *line_start; - old_ex.escaped= *escaped; - old_ex.opt_flags= opt_flags; - old_ex.empty_flags=empty_flags; - return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)); - } -} - - -static inline int read_str(char * &buf, char *buf_end, char * &str, - uint8 &len) -{ - if (buf + (uint) (uchar) *buf >= buf_end) - return 1; - len = (uint8) *buf; - str= buf+1; - buf+= (uint) len+1; - return 0; -} - - -char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) -{ - cached_new_format = use_new_format; - if (use_new_format) - { - empty_flags=0; - /* - The code below assumes that buf will not disappear from - under our feet during the lifetime of the event. This assumption - holds true in the slave thread if the log is in new format, but is not - the case when we have old format because we will be reusing net buffer - to read the actual file before we write out the Create_file event. - */ - if (read_str(buf, buf_end, field_term, field_term_len) || - read_str(buf, buf_end, enclosed, enclosed_len) || - read_str(buf, buf_end, line_term, line_term_len) || - read_str(buf, buf_end, line_start, line_start_len) || - read_str(buf, buf_end, escaped, escaped_len)) - return 0; - opt_flags = *buf++; - } - else - { - field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; - field_term = buf++; // Use first byte in string - enclosed= buf++; - line_term= buf++; - line_start= buf++; - escaped= buf++; - opt_flags = *buf++; - empty_flags= *buf++; - if (empty_flags & FIELD_TERM_EMPTY) - field_term_len=0; - if (empty_flags & ENCLOSED_EMPTY) - enclosed_len=0; - if (empty_flags & LINE_TERM_EMPTY) - line_term_len=0; - if (empty_flags & LINE_START_EMPTY) - line_start_len=0; - if (empty_flags & ESCAPED_EMPTY) - escaped_len=0; - } - return buf; -} - - + ****************************************************************************/ #ifndef MYSQL_CLIENT Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg, const char* table_name_arg, @@ -1162,14 +1226,16 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, field_lens = (const uchar*)field_lens_buf.ptr(); fields = fields_buf.ptr(); } +#endif // !MYSQL_CLIENT -#endif +/***************************************************************************** + + Load_log_event::Load_log_event() -/* The caller must do buf[event_len] = 0 before he starts using the constructed event. -*/ + ****************************************************************************/ Load_log_event::Load_log_event(const char* buf, int event_len, bool old_format): Log_event(buf, old_format),num_fields(0),fields(0), @@ -1181,6 +1247,11 @@ Load_log_event::Load_log_event(const char* buf, int event_len, copy_log_event(buf, event_len, old_format); } +/***************************************************************************** + + Load_log_event::copy_log_event() + + ****************************************************************************/ int Load_log_event::copy_log_event(const char *buf, ulong event_len, bool old_format) { @@ -1225,8 +1296,12 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, return 0; } -#ifdef MYSQL_CLIENT +/***************************************************************************** + + Load_log_event::print() + ****************************************************************************/ +#ifdef MYSQL_CLIENT void Load_log_event::print(FILE* file, bool short_form, char* last_db) { if (!short_form) @@ -1307,18 +1382,14 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) fprintf(file, ";\n"); } - #endif /* #ifdef MYSQL_CLIENT */ -#ifndef MYSQL_CLIENT - -void Log_event::set_log_pos(MYSQL_LOG* log) -{ - if (!log_pos) - log_pos = my_b_tell(&log->log_file); -} +/***************************************************************************** + Load_log_event::set_fields() + ****************************************************************************/ +#ifndef MYSQL_CLIENT void Load_log_event::set_fields(List<Item> &fields) { uint i; @@ -1329,8 +1400,512 @@ void Load_log_event::set_fields(List<Item> &fields) field += field_lens[i] + 1; } } +#endif // !MYSQL_CLIENT + +/***************************************************************************** + + Load_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) +{ + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = rewrite_db((char*)db); + thd->query = 0; + thd->query_error = 0; + + if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->set_time((time_t)when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + TABLE_LIST tables; + bzero((char*) &tables,sizeof(tables)); + tables.db = thd->db; + tables.alias = tables.real_name = (char*)table_name; + tables.lock_type = TL_WRITE; + // the table will be opened in mysql_load + if (table_rules_on && !tables_ok(thd, &tables)) + { + // TODO: this is a bug - this needs to be moved to the I/O thread + if (net) + skip_load_data_infile(net); + } + else + { + char llbuff[22]; + enum enum_duplicates handle_dup = DUP_IGNORE; + if (sql_ex.opt_flags && REPLACE_FLAG) + handle_dup = DUP_REPLACE; + sql_exchange ex((char*)fname, sql_ex.opt_flags && + DUMPFILE_FLAG ); + String field_term(sql_ex.field_term,sql_ex.field_term_len, + system_charset_info); + String enclosed(sql_ex.enclosed,sql_ex.enclosed_len, + system_charset_info); + String line_term(sql_ex.line_term,sql_ex.line_term_len, + system_charset_info); + String line_start(sql_ex.line_start,sql_ex.line_start_len, + system_charset_info); + String escaped(sql_ex.escaped,sql_ex.escaped_len, system_charset_info); + + ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); + if (sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); + + ex.skip_lines = skip_lines; + List<Item> fields; + set_fields(fields); + thd->slave_proxy_id = thd->thread_id; + if (net) + { + // mysql_load will use thd->net to read the file + thd->net.vio = net->vio; + /* + Make sure the client does not get confused about the packet sequence + */ + thd->net.pkt_nr = net->pkt_nr; + } + if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, + TL_WRITE)) + thd->query_error = 1; + if (thd->cuted_fields) + sql_print_error("Slave: load data infile at position %s in log \ +'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, + thd->cuted_fields ); + if (net) + net->pkt_nr= thd->net.pkt_nr; + } + } + else + { + /* + We will just ask the master to send us /dev/null if we do not + want to load the data. + TODO: this a bug - needs to be done in I/O thread + */ + if (net) + skip_load_data_infile(net); + } + + thd->net.vio = 0; + thd->db= 0; // prevent db from being freed + close_thread_tables(thd); + if (thd->query_error) + { + int sql_error = thd->net.last_errno; + if (!sql_error) + sql_error = ER_UNKNOWN_ERROR; + + slave_print_error(rli,sql_error, + "Slave: Error '%s' running load data infile ", + ER_SAFE(sql_error)); + free_root(&thd->mem_root,0); + return 1; + } + free_root(&thd->mem_root,0); + + if (thd->fatal_error) + { + sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); + return 1; + } + + return Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Rotate_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Rotate_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Rotate_log_event::pack_info(String* packet) +{ + char buf1[256], buf[22]; + String tmp(buf1, sizeof(buf1), system_charset_info); + tmp.length(0); + tmp.append(new_log_ident, ident_len); + tmp.append(";pos="); + tmp.append(llstr(pos,buf)); + if (flags & LOG_EVENT_FORCED_ROTATE_F) + tmp.append("; forced by master"); + net_store_data(packet, tmp.ptr(), tmp.length()); +} +#endif // !MYSQL_CLIENT + +/***************************************************************************** + + Rotate_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT +void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) +{ + char buf[22]; + if (short_form) + return; + + print_header(file); + fprintf(file, "\tRotate to "); + if (new_log_ident) + my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, + MYF(MY_NABP | MY_WME)); + fprintf(file, " pos: %s", llstr(pos, buf)); + if (flags & LOG_EVENT_FORCED_ROTATE_F) + fprintf(file," forced by master"); + fputc('\n', file); + fflush(file); +} +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Rotate_log_event::Rotate_log_event() + + ****************************************************************************/ +Rotate_log_event::Rotate_log_event(const char* buf, int event_len, + bool old_format) + :Log_event(buf, old_format),new_log_ident(NULL),alloced(0) +{ + // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET + int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + uint ident_offset; + if (event_len < header_size) + return; + buf += header_size; + if (old_format) + { + ident_len = (uint)(event_len - OLD_HEADER_LEN); + pos = 4; + ident_offset = 0; + } + else + { + ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD); + pos = uint8korr(buf + R_POS_OFFSET); + ident_offset = ROTATE_HEADER_LEN; + } + set_if_smaller(ident_len,FN_REFLEN-1); + if (!(new_log_ident= my_strdup_with_length((byte*) buf + + ident_offset, + (uint) ident_len, + MYF(MY_WME)))) + return; + alloced = 1; +} + +/***************************************************************************** + + Rotate_log_event::write_data() + + ****************************************************************************/ +int Rotate_log_event::write_data(IO_CACHE* file) +{ + char buf[ROTATE_HEADER_LEN]; + int8store(buf, pos + R_POS_OFFSET); + return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || + my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); +} + +/***************************************************************************** + + Rotate_log_event::exec_event() + + Got a rotate log even from the master + + IMPLEMENTATION + This is mainly used so that we can later figure out the logname and + position for the master. + + We can't rotate the slave as this will cause infinitive rotations + in a A -> B -> A setup. + + RETURN VALUES + 0 ok + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Rotate_log_event::exec_event(struct st_relay_log_info* rli) +{ + char* log_name = rli->master_log_name; + DBUG_ENTER("Rotate_log_event::exec_event"); + + pthread_mutex_lock(&rli->data_lock); + memcpy(log_name, new_log_ident, ident_len+1); + rli->master_log_pos = pos; + rli->relay_log_pos += get_event_len(); + DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos)); + pthread_mutex_unlock(&rli->data_lock); + pthread_cond_broadcast(&rli->data_cond); + flush_relay_log_info(rli); + DBUG_RETURN(0); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Intvar_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Intvar_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Intvar_log_event::pack_info(String* packet) +{ + char buf1[256], buf[22]; + String tmp(buf1, sizeof(buf1), system_charset_info); + tmp.length(0); + tmp.append(get_var_type_name()); + tmp.append('='); + tmp.append(llstr(val, buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} +#endif // !MYSQL_CLIENT + +/***************************************************************************** + + Intvar_log_event::Intvar_log_event() + + ****************************************************************************/ +Intvar_log_event::Intvar_log_event(const char* buf, bool old_format) + :Log_event(buf, old_format) +{ + buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + type = buf[I_TYPE_OFFSET]; + val = uint8korr(buf+I_VAL_OFFSET); +} + +/***************************************************************************** + + Intvar_log_event::get_var_type_name() + + ****************************************************************************/ +const char* Intvar_log_event::get_var_type_name() +{ + switch(type) { + case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID"; + case INSERT_ID_EVENT: return "INSERT_ID"; + default: /* impossible */ return "UNKNOWN"; + } +} + +/***************************************************************************** + + Intvar_log_event::write_data() + + ****************************************************************************/ +int Intvar_log_event::write_data(IO_CACHE* file) +{ + char buf[9]; + buf[I_TYPE_OFFSET] = type; + int8store(buf + I_VAL_OFFSET, val); + return my_b_safe_write(file, (byte*) buf, sizeof(buf)); +} + +/***************************************************************************** + + Intvar_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT +void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) +{ + char llbuff[22]; + const char *msg; + LINT_INIT(msg); + + if (!short_form) + { + print_header(file); + fprintf(file, "\tIntvar\n"); + } + + fprintf(file, "SET "); + switch (type) { + case LAST_INSERT_ID_EVENT: + msg="LAST_INSERT_ID"; + break; + case INSERT_ID_EVENT: + msg="INSERT_ID"; + break; + } + fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); + fflush(file); +} +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Intvar_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Intvar_log_event::exec_event(struct st_relay_log_info* rli) +{ + switch (type) { + case LAST_INSERT_ID_EVENT: + thd->last_insert_id_used = 1; + thd->last_insert_id = val; + break; + case INSERT_ID_EVENT: + thd->next_insert_id = val; + break; + } + rli->inc_pending(get_event_len()); + return 0; +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Rand_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Rand_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Rand_log_event::pack_info(String* packet) +{ + char buf1[256], buf[22]; + String tmp(buf1, sizeof(buf1), system_charset_info); + tmp.length(0); + tmp.append("randseed1="); + tmp.append(llstr(seed1, buf)); + tmp.append(",randseed2="); + tmp.append(llstr(seed2, buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} +#endif // !MYSQL_CLIENT + +/***************************************************************************** + + Rand_log_event::Rand_log_event() + + ****************************************************************************/ +Rand_log_event::Rand_log_event(const char* buf, bool old_format) + :Log_event(buf, old_format) +{ + buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + seed1 = uint8korr(buf+RAND_SEED1_OFFSET); + seed2 = uint8korr(buf+RAND_SEED2_OFFSET); +} + +/***************************************************************************** + + Rand_log_event::write_data() + + ****************************************************************************/ +int Rand_log_event::write_data(IO_CACHE* file) +{ + char buf[16]; + int8store(buf + RAND_SEED1_OFFSET, seed1); + int8store(buf + RAND_SEED2_OFFSET, seed2); + return my_b_safe_write(file, (byte*) buf, sizeof(buf)); +} + +/***************************************************************************** + + Rand_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT +void Rand_log_event::print(FILE* file, bool short_form, char* last_db) +{ + char llbuff[22]; + if (!short_form) + { + print_header(file); + fprintf(file, "\tRand\n"); + } + fprintf(file, "SET RAND SEED1=%s;\n", llstr(seed1, llbuff)); + fprintf(file, "SET RAND SEED2=%s;\n", llstr(seed2, llbuff)); + fflush(file); +} +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Rand_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Rand_log_event::exec_event(struct st_relay_log_info* rli) +{ + thd->rand.seed1 = seed1; + thd->rand.seed2 = seed2; + rli->inc_pending(get_event_len()); + return 0; +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Slave_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Slave_log_event::pack_info() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +void Slave_log_event::pack_info(String* packet) +{ + char buf1[256], buf[22], *end; + String tmp(buf1, sizeof(buf1), system_charset_info); + tmp.length(0); + tmp.append("host="); + tmp.append(master_host); + tmp.append(",port="); + end= int10_to_str((long) master_port, buf, 10); + tmp.append(buf, (uint32) (end-buf)); + tmp.append(",log="); + tmp.append(master_log); + tmp.append(",pos="); + tmp.append(llstr(master_pos,buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} +#endif // !MYSQL_CLIENT + +/***************************************************************************** + Slave_log_event::Slave_log_event() + ****************************************************************************/ +#ifndef MYSQL_CLIENT Slave_log_event::Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli): Log_event(thd_arg),mem_pool(0),master_host(0) @@ -1364,17 +1939,24 @@ Slave_log_event::Slave_log_event(THD* thd_arg, pthread_mutex_unlock(&mi->data_lock); DBUG_VOID_RETURN; } +#endif // !MYSQL_CLIENT -#endif /* ! MYSQL_CLIENT */ +/***************************************************************************** + Slave_log_event dtor + ****************************************************************************/ Slave_log_event::~Slave_log_event() { my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR)); } -#ifdef MYSQL_CLIENT +/***************************************************************************** + Slave_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT void Slave_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; @@ -1386,14 +1968,23 @@ void Slave_log_event::print(FILE* file, bool short_form, char* last_db) master_log: '%s' master_pos: %s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } +#endif // MYSQL_CLIENT -#endif /* MYSQL_CLIENT */ +/***************************************************************************** + + Slave_log_event::get_data_size() + ****************************************************************************/ int Slave_log_event::get_data_size() { return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET; } +/***************************************************************************** + + Slave_log_event::write_data() + + ****************************************************************************/ int Slave_log_event::write_data(IO_CACHE* file) { int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); @@ -1402,7 +1993,11 @@ int Slave_log_event::write_data(IO_CACHE* file) return my_b_safe_write(file, (byte*)mem_pool, get_data_size()); } +/***************************************************************************** + + Slave_log_event::init_from_mem_pool() + ****************************************************************************/ void Slave_log_event::init_from_mem_pool(int data_size) { master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET); @@ -1419,6 +2014,11 @@ void Slave_log_event::init_from_mem_pool(int data_size) master_log_len = strlen(master_log); } +/***************************************************************************** + + Slave_log_event::Slave_log_event() + + ****************************************************************************/ Slave_log_event::Slave_log_event(const char* buf, int event_len) :Log_event(buf,0),mem_pool(0),master_host(0) { @@ -1432,6 +2032,93 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len) init_from_mem_pool(event_len); } +/***************************************************************************** + + Slave_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Slave_log_event::exec_event(struct st_relay_log_info* rli) +{ + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + return Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Stop_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Stop_log_event::print() + + ****************************************************************************/ +#ifdef MYSQL_CLIENT +void Stop_log_event::print(FILE* file, bool short_form, char* last_db) +{ + if (short_form) + return; + + print_header(file); + fprintf(file, "\tStop\n"); + fflush(file); +} +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Stop_log_event::exec_event() + + The master stopped. Clean up all temporary tables + locks that the + master may have set. + + TODO + - Remove all active user locks + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Stop_log_event::exec_event(struct st_relay_log_info* rli) +{ + // do not clean up immediately after rotate event + if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) + { + close_temporary_tables(thd); + cleanup_load_tmpdir(); + } + /* + We do not want to update master_log pos because we get a rotate event + before stop, so by now master_log_name is set to the next log. + If we updated it, we will have incorrect master coordinates and this + could give false triggers in MASTER_POS_WAIT() that we have reached + the target position when in fact we have not. + */ + rli->inc_pos(get_event_len(), 0); + flush_relay_log_info(rli); + return 0; +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Create_file_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Create_file_log_event ctor + + ****************************************************************************/ #ifndef MYSQL_CLIENT Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, const char* db_arg, const char* table_name_arg, @@ -1443,8 +2130,13 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, { sql_ex.force_new_format(); } -#endif +#endif // !MYSQL_CLIENT +/***************************************************************************** + + Create_file_log_event::write_data_body() + + ****************************************************************************/ int Create_file_log_event::write_data_body(IO_CACHE* file) { int res; @@ -1454,6 +2146,11 @@ int Create_file_log_event::write_data_body(IO_CACHE* file) my_b_safe_write(file, (byte*) block, block_len)); } +/***************************************************************************** + + Create_file_log_event::write_data_header() + + ****************************************************************************/ int Create_file_log_event::write_data_header(IO_CACHE* file) { int res; @@ -1464,6 +2161,11 @@ int Create_file_log_event::write_data_header(IO_CACHE* file) return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN); } +/***************************************************************************** + + Create_file_log_event::write_base() + + ****************************************************************************/ int Create_file_log_event::write_base(IO_CACHE* file) { int res; @@ -1473,6 +2175,11 @@ int Create_file_log_event::write_base(IO_CACHE* file) return res; } +/***************************************************************************** + + Create_file_log_event ctor + + ****************************************************************************/ Create_file_log_event::Create_file_log_event(const char* buf, int len, bool old_format) :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0) @@ -1499,7 +2206,11 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len, } } +/***************************************************************************** + + Create_file_log_event::print() + ****************************************************************************/ #ifdef MYSQL_CLIENT void Create_file_log_event::print(FILE* file, bool short_form, char* last_db) @@ -1509,8 +2220,13 @@ void Create_file_log_event::print(FILE* file, bool short_form, Load_log_event::print(file, 1, last_db); fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); } -#endif +#endif // MYSQL_CLIENT + +/***************************************************************************** + Create_file_log_event::pack_info() + + ****************************************************************************/ #ifndef MYSQL_CLIENT void Create_file_log_event::pack_info(String* packet) { @@ -1529,8 +2245,86 @@ void Create_file_log_event::pack_info(String* packet) tmp.append(buf, (uint32) (end-buf)); net_store_data(packet, (char*) tmp.ptr(), tmp.length()); } -#endif +#endif // !MYSQL_CLIENT +/***************************************************************************** + + Create_file_log_event::exec_event() + + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Create_file_log_event::exec_event(struct st_relay_log_info* rli) +{ + char fname_buf[FN_REFLEN+10]; + char *p; + int fd = -1; + IO_CACHE file; + int error = 1; + + bzero((char*)&file, sizeof(file)); + p = slave_load_file_stem(fname_buf, file_id, server_id); + strmov(p, ".info"); // strmov takes less code than memcpy + if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, + MYF(MY_WME))) < 0 || + init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, + MYF(MY_WME|MY_NABP))) + { + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); + goto err; + } + + // a trick to avoid allocating another buffer + strmov(p, ".data"); + fname = fname_buf; + fname_len = (uint)(p-fname) + 5; + if (write_base(&file)) + { + strmov(p, ".info"); // to have it right in the error message + slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); + goto err; + } + end_io_cache(&file); + my_close(fd, MYF(0)); + + // fname_buf now already has .data, not .info, because we did our trick + if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, + MYF(MY_WME))) < 0) + { + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); + goto err; + } + if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) + { + slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); + goto err; + } + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + error=0; // Everything is ok + +err: + if (error) + end_io_cache(&file); + if (fd >= 0) + my_close(fd, MYF(0)); + return error ? 1 : Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Append_block_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Append_block_log_event ctor + + ****************************************************************************/ #ifndef MYSQL_CLIENT Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, uint block_len_arg) @@ -1538,9 +2332,13 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, file_id(thd_arg->file_id) { } -#endif +#endif // !MYSQL_CLIENT - +/***************************************************************************** + + Append_block_log_event ctor + + ****************************************************************************/ Append_block_log_event::Append_block_log_event(const char* buf, int len) :Log_event(buf, 0),block(0) { @@ -1551,6 +2349,11 @@ Append_block_log_event::Append_block_log_event(const char* buf, int len) block_len = len - APPEND_BLOCK_EVENT_OVERHEAD; } +/***************************************************************************** + + Append_block_log_event::write_data() + + ****************************************************************************/ int Append_block_log_event::write_data(IO_CACHE* file) { byte buf[APPEND_BLOCK_HEADER_LEN]; @@ -1559,6 +2362,11 @@ int Append_block_log_event::write_data(IO_CACHE* file) my_b_safe_write(file, (byte*) block, block_len)); } +/***************************************************************************** + + Append_block_log_event::print() + + ****************************************************************************/ #ifdef MYSQL_CLIENT void Append_block_log_event::print(FILE* file, bool short_form, char* last_db) @@ -1570,8 +2378,13 @@ void Append_block_log_event::print(FILE* file, bool short_form, fprintf(file, "#Append_block: file_id: %d block_len: %d\n", file_id, block_len); } -#endif +#endif // MYSQL_CLIENT + +/***************************************************************************** + Append_block_log_event::pack_info() + + ****************************************************************************/ #ifndef MYSQL_CLIENT void Append_block_log_event::pack_info(String* packet) { @@ -1582,15 +2395,69 @@ void Append_block_log_event::pack_info(String* packet) block_len)); net_store_data(packet, buf, (int32) length); } +#endif // !MYSQL_CLIENT +/***************************************************************************** + + Append_block_log_event::exec_event() + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Append_block_log_event::exec_event(struct st_relay_log_info* rli) +{ + char fname[FN_REFLEN+10]; + char *p= slave_load_file_stem(fname, file_id, server_id); + int fd; + int error = 1; + + memcpy(p, ".data", 6); + if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) + { + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); + goto err; + } + if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) + { + slave_print_error(rli,my_errno, "Write to '%s' failed", fname); + goto err; + } + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + error=0; + +err: + if (fd >= 0) + my_close(fd, MYF(0)); + return error ? error : Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Delete_file_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Delete_file_log_event ctor + + ****************************************************************************/ +#ifndef MYSQL_CLIENT Delete_file_log_event::Delete_file_log_event(THD* thd_arg) :Log_event(thd_arg),file_id(thd_arg->file_id) { } -#endif +#endif // !MYSQL_CLIENT + +/***************************************************************************** + Delete_file_log_event ctor + ****************************************************************************/ Delete_file_log_event::Delete_file_log_event(const char* buf, int len) :Log_event(buf, 0),file_id(0) { @@ -1599,7 +2466,11 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, int len) file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); } +/***************************************************************************** + + Delete_file_log_event::write_data() + ****************************************************************************/ int Delete_file_log_event::write_data(IO_CACHE* file) { byte buf[DELETE_FILE_HEADER_LEN]; @@ -1607,6 +2478,11 @@ int Delete_file_log_event::write_data(IO_CACHE* file) return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN); } +/***************************************************************************** + + Delete_file_log_event::print() + + ****************************************************************************/ #ifdef MYSQL_CLIENT void Delete_file_log_event::print(FILE* file, bool short_form, char* last_db) @@ -1617,8 +2493,13 @@ void Delete_file_log_event::print(FILE* file, bool short_form, fputc('\n', file); fprintf(file, "#Delete_file: file_id=%u\n", file_id); } -#endif +#endif // MYSQL_CLIENT + +/***************************************************************************** + Delete_file_log_event::pack_info() + + ****************************************************************************/ #ifndef MYSQL_CLIENT void Delete_file_log_event::pack_info(String* packet) { @@ -1627,17 +2508,54 @@ void Delete_file_log_event::pack_info(String* packet) length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); net_store_data(packet, buf, (int32) length); } -#endif +#endif // !MYSQL_CLIENT + +/***************************************************************************** + Delete_file_log_event::exec_event() + ****************************************************************************/ +#ifndef MYSQL_CLIENT +int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) +{ + char fname[FN_REFLEN+10]; + char *p= slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".data", 6); + (void) my_delete(fname, MYF(MY_WME)); + memcpy(p, ".info", 6); + (void) my_delete(fname, MYF(MY_WME)); + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + return Log_event::exec_event(rli); +} +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + Execute_load_log_event methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + Execute_load_log_event ctor + + ****************************************************************************/ #ifndef MYSQL_CLIENT Execute_load_log_event::Execute_load_log_event(THD* thd_arg) :Log_event(thd_arg),file_id(thd_arg->file_id) { } -#endif +#endif // !MYSQL_CLIENT +/***************************************************************************** + Execute_load_log_event ctor + + ****************************************************************************/ Execute_load_log_event::Execute_load_log_event(const char* buf,int len) :Log_event(buf, 0),file_id(0) { @@ -1646,7 +2564,11 @@ Execute_load_log_event::Execute_load_log_event(const char* buf,int len) file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); } +/***************************************************************************** + Execute_load_log_event::write_data() + + ****************************************************************************/ int Execute_load_log_event::write_data(IO_CACHE* file) { byte buf[EXEC_LOAD_HEADER_LEN]; @@ -1654,6 +2576,11 @@ int Execute_load_log_event::write_data(IO_CACHE* file) return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN); } +/***************************************************************************** + + Execute_load_log_event::print() + + ****************************************************************************/ #ifdef MYSQL_CLIENT void Execute_load_log_event::print(FILE* file, bool short_form, char* last_db) @@ -1665,7 +2592,13 @@ void Execute_load_log_event::print(FILE* file, bool short_form, fprintf(file, "#Exec_load: file_id=%d\n", file_id); } -#endif +#endif // MYSQL_CLIENT + +/***************************************************************************** + + Execute_load_log_event::pack_info() + + ****************************************************************************/ #ifndef MYSQL_CLIENT void Execute_load_log_event::pack_info(String* packet) { @@ -1674,425 +2607,14 @@ void Execute_load_log_event::pack_info(String* packet) length= (uint) my_sprintf(buf, (buf, ";file_id=%u", (uint) file_id)); net_store_data(packet, buf, (int32) length); } -#endif - -#ifndef MYSQL_CLIENT -int Query_log_event::exec_event(struct st_relay_log_info* rli) -{ - int expected_error,actual_error = 0; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)db); - - /* - InnoDB internally stores the master log position it has processed so far; - position to store is really pos + pending + event_len - since we must store the pos of the END of the current log event - */ - rli->event_len= get_event_len(); - - if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->query = (char*)query; - thd->set_time((time_t)when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->query_error = 0; // clear error - thd->net.last_errno = 0; - thd->net.last_error[0] = 0; - thd->slave_proxy_id = thread_id; // for temp tables - - /* - Sanity check to make sure the master did not get a really bad - error on the query. - */ - if (ignored_error_code((expected_error = error_code)) || - !check_expected_error(thd,rli,expected_error)) - { - mysql_log.write(thd,COM_QUERY,"%s",thd->query); - DBUG_PRINT("query",("%s",thd->query)); - mysql_parse(thd, thd->query, q_len); - DBUG_PRINT("info",("expected_error: %d last_errno: %d", - expected_error, thd->net.last_errno)); - if ((expected_error != (actual_error= thd->net.last_errno)) && - expected_error && - !ignored_error_code(actual_error) && - !ignored_error_code(expected_error)) - { - const char* errmsg = "Slave: did not get the expected error\ - running query from master - expected: '%s' (%d), got '%s' (%d)"; - sql_print_error(errmsg, ER_SAFE(expected_error), - expected_error, - actual_error ? thd->net.last_error: "no error", - actual_error); - thd->query_error = 1; - } - else if (expected_error == actual_error || - ignored_error_code(actual_error)) - { - DBUG_PRINT("info",("error ignored")); - thd->query_error = 0; - *rli->last_slave_error = 0; - rli->last_slave_errno = 0; - } - } - else - { - // master could be inconsistent, abort and tell DBA to check/fix it - thd->db = thd->query = 0; - thd->variables.convert_set = 0; - close_thread_tables(thd); - free_root(&thd->mem_root,0); - return 1; - } - } - thd->db= 0; // prevent db from being freed - thd->query= 0; // just to be sure - // assume no convert for next query unless set explictly - thd->variables.convert_set = 0; - close_thread_tables(thd); - - if (thd->query_error || thd->fatal_error) - { - slave_print_error(rli,actual_error, "error '%s' on query '%s'", - actual_error ? thd->net.last_error : - "unexpected success or fatal error", query); - free_root(&thd->mem_root,0); - return 1; - } - free_root(&thd->mem_root,0); - return Log_event::exec_event(rli); -} - - -int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) -{ - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)db); - thd->query = 0; - thd->query_error = 0; - - if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->set_time((time_t)when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - TABLE_LIST tables; - bzero((char*) &tables,sizeof(tables)); - tables.db = thd->db; - tables.alias = tables.real_name = (char*)table_name; - tables.lock_type = TL_WRITE; - // the table will be opened in mysql_load - if (table_rules_on && !tables_ok(thd, &tables)) - { - // TODO: this is a bug - this needs to be moved to the I/O thread - if (net) - skip_load_data_infile(net); - } - else - { - char llbuff[22]; - enum enum_duplicates handle_dup = DUP_IGNORE; - if (sql_ex.opt_flags && REPLACE_FLAG) - handle_dup = DUP_REPLACE; - sql_exchange ex((char*)fname, sql_ex.opt_flags && - DUMPFILE_FLAG ); - String field_term(sql_ex.field_term,sql_ex.field_term_len, - system_charset_info); - String enclosed(sql_ex.enclosed,sql_ex.enclosed_len, - system_charset_info); - String line_term(sql_ex.line_term,sql_ex.line_term_len, - system_charset_info); - String line_start(sql_ex.line_start,sql_ex.line_start_len, - system_charset_info); - String escaped(sql_ex.escaped,sql_ex.escaped_len, system_charset_info); - - ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); - if (sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.skip_lines = skip_lines; - List<Item> fields; - set_fields(fields); - thd->slave_proxy_id = thd->thread_id; - if (net) - { - // mysql_load will use thd->net to read the file - thd->net.vio = net->vio; - /* - Make sure the client does not get confused about the packet sequence - */ - thd->net.pkt_nr = net->pkt_nr; - } - if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, - TL_WRITE)) - thd->query_error = 1; - if (thd->cuted_fields) - sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, - thd->cuted_fields ); - if (net) - net->pkt_nr= thd->net.pkt_nr; - } - } - else - { - /* - We will just ask the master to send us /dev/null if we do not - want to load the data. - TODO: this a bug - needs to be done in I/O thread - */ - if (net) - skip_load_data_infile(net); - } - - thd->net.vio = 0; - thd->db= 0; // prevent db from being freed - close_thread_tables(thd); - if (thd->query_error) - { - int sql_error = thd->net.last_errno; - if (!sql_error) - sql_error = ER_UNKNOWN_ERROR; - - slave_print_error(rli,sql_error, - "Slave: Error '%s' running load data infile ", - ER_SAFE(sql_error)); - free_root(&thd->mem_root,0); - return 1; - } - free_root(&thd->mem_root,0); - - if (thd->fatal_error) - { - sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); - return 1; - } - - return Log_event::exec_event(rli); -} - - -/* - The master started - - IMPLEMENTATION - - To handle the case where the master died without a stop event, - we clean up all temporary tables + locks that we got. - - TODO - - Remove all active user locks - - If we have an active transaction at this point, the master died - in the middle while writing the transaction to the binary log. - In this case we should stop the slave. -*/ - -int Start_log_event::exec_event(struct st_relay_log_info* rli) -{ - /* All temporary tables was deleted on the master */ - close_temporary_tables(thd); - /* - If we have old format, load_tmpdir is cleaned up by the I/O thread - */ - if (!rli->mi->old_format) - cleanup_load_tmpdir(); - return Log_event::exec_event(rli); -} +#endif // !MYSQL_CLIENT +/***************************************************************************** -/* - The master stopped. Clean up all temporary tables + locks that the - master may have set. - - TODO - - Remove all active user locks -*/ - -int Stop_log_event::exec_event(struct st_relay_log_info* rli) -{ - // do not clean up immediately after rotate event - if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) - { - close_temporary_tables(thd); - cleanup_load_tmpdir(); - } - /* - We do not want to update master_log pos because we get a rotate event - before stop, so by now master_log_name is set to the next log. - If we updated it, we will have incorrect master coordinates and this - could give false triggers in MASTER_POS_WAIT() that we have reached - the target position when in fact we have not. - */ - rli->inc_pos(get_event_len(), 0); - flush_relay_log_info(rli); - return 0; -} - - -/* - Got a rotate log even from the master - - IMPLEMENTATION - This is mainly used so that we can later figure out the logname and - position for the master. - - We can't rotate the slave as this will cause infinitive rotations - in a A -> B -> A setup. - - RETURN VALUES - 0 ok - */ - - -int Rotate_log_event::exec_event(struct st_relay_log_info* rli) -{ - char* log_name = rli->master_log_name; - DBUG_ENTER("Rotate_log_event::exec_event"); - - pthread_mutex_lock(&rli->data_lock); - memcpy(log_name, new_log_ident, ident_len+1); - rli->master_log_pos = pos; - rli->relay_log_pos += get_event_len(); - DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos)); - pthread_mutex_unlock(&rli->data_lock); - pthread_cond_broadcast(&rli->data_cond); - flush_relay_log_info(rli); - DBUG_RETURN(0); -} - - -int Intvar_log_event::exec_event(struct st_relay_log_info* rli) -{ - switch (type) { - case LAST_INSERT_ID_EVENT: - thd->last_insert_id_used = 1; - thd->last_insert_id = val; - break; - case INSERT_ID_EVENT: - thd->next_insert_id = val; - break; - } - rli->inc_pending(get_event_len()); - return 0; -} - -int Rand_log_event::exec_event(struct st_relay_log_info* rli) -{ - thd->rand.seed1 = seed1; - thd->rand.seed2 = seed2; - rli->inc_pending(get_event_len()); - return 0; -} - -int Slave_log_event::exec_event(struct st_relay_log_info* rli) -{ - if (mysql_bin_log.is_open()) - mysql_bin_log.write(this); - return Log_event::exec_event(rli); -} - -int Create_file_log_event::exec_event(struct st_relay_log_info* rli) -{ - char fname_buf[FN_REFLEN+10]; - char *p; - int fd = -1; - IO_CACHE file; - int error = 1; - - bzero((char*)&file, sizeof(file)); - p = slave_load_file_stem(fname_buf, file_id, server_id); - strmov(p, ".info"); // strmov takes less code than memcpy - if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, - MYF(MY_WME))) < 0 || - init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, - MYF(MY_WME|MY_NABP))) - { - slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); - goto err; - } - - // a trick to avoid allocating another buffer - strmov(p, ".data"); - fname = fname_buf; - fname_len = (uint)(p-fname) + 5; - if (write_base(&file)) - { - strmov(p, ".info"); // to have it right in the error message - slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); - goto err; - } - end_io_cache(&file); - my_close(fd, MYF(0)); - - // fname_buf now already has .data, not .info, because we did our trick - if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, - MYF(MY_WME))) < 0) - { - slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); - goto err; - } - if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) - { - slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); - goto err; - } - if (mysql_bin_log.is_open()) - mysql_bin_log.write(this); - error=0; // Everything is ok - -err: - if (error) - end_io_cache(&file); - if (fd >= 0) - my_close(fd, MYF(0)); - return error ? 1 : Log_event::exec_event(rli); -} - -int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) -{ - char fname[FN_REFLEN+10]; - char *p= slave_load_file_stem(fname, file_id, server_id); - memcpy(p, ".data", 6); - (void) my_delete(fname, MYF(MY_WME)); - memcpy(p, ".info", 6); - (void) my_delete(fname, MYF(MY_WME)); - if (mysql_bin_log.is_open()) - mysql_bin_log.write(this); - return Log_event::exec_event(rli); -} - -int Append_block_log_event::exec_event(struct st_relay_log_info* rli) -{ - char fname[FN_REFLEN+10]; - char *p= slave_load_file_stem(fname, file_id, server_id); - int fd; - int error = 1; - - memcpy(p, ".data", 6); - if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) - { - slave_print_error(rli,my_errno, "Could not open file '%s'", fname); - goto err; - } - if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) - { - slave_print_error(rli,my_errno, "Write to '%s' failed", fname); - goto err; - } - if (mysql_bin_log.is_open()) - mysql_bin_log.write(this); - error=0; - -err: - if (fd >= 0) - my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(rli); -} + Execute_load_log_event::exec_event() + ****************************************************************************/ +#ifndef MYSQL_CLIENT int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; @@ -2151,5 +2673,100 @@ err: } return error ? error : Log_event::exec_event(rli); } +#endif // !MYSQL_CLIENT + + +/***************************************************************************** + ***************************************************************************** + + sql_ex_info methods + + ***************************************************************************** + ****************************************************************************/ + +/***************************************************************************** + + sql_ex_info::write_data() + + ****************************************************************************/ +int sql_ex_info::write_data(IO_CACHE* file) +{ + if (new_format()) + { + return (write_str(file, field_term, field_term_len) || + write_str(file, enclosed, enclosed_len) || + write_str(file, line_term, line_term_len) || + write_str(file, line_start, line_start_len) || + write_str(file, escaped, escaped_len) || + my_b_safe_write(file,(byte*) &opt_flags,1)); + } + else + { + old_sql_ex old_ex; + old_ex.field_term= *field_term; + old_ex.enclosed= *enclosed; + old_ex.line_term= *line_term; + old_ex.line_start= *line_start; + old_ex.escaped= *escaped; + old_ex.opt_flags= opt_flags; + old_ex.empty_flags=empty_flags; + return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)); + } +} + +/***************************************************************************** + + sql_ex_info::init() + + ****************************************************************************/ +char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) +{ + cached_new_format = use_new_format; + if (use_new_format) + { + empty_flags=0; + /* + The code below assumes that buf will not disappear from + under our feet during the lifetime of the event. This assumption + holds true in the slave thread if the log is in new format, but is not + the case when we have old format because we will be reusing net buffer + to read the actual file before we write out the Create_file event. + */ + if (read_str(buf, buf_end, field_term, field_term_len) || + read_str(buf, buf_end, enclosed, enclosed_len) || + read_str(buf, buf_end, line_term, line_term_len) || + read_str(buf, buf_end, line_start, line_start_len) || + read_str(buf, buf_end, escaped, escaped_len)) + return 0; + opt_flags = *buf++; + } + else + { + field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; + field_term = buf++; // Use first byte in string + enclosed= buf++; + line_term= buf++; + line_start= buf++; + escaped= buf++; + opt_flags = *buf++; + empty_flags= *buf++; + if (empty_flags & FIELD_TERM_EMPTY) + field_term_len=0; + if (empty_flags & ENCLOSED_EMPTY) + enclosed_len=0; + if (empty_flags & LINE_TERM_EMPTY) + line_term_len=0; + if (empty_flags & LINE_START_EMPTY) + line_start_len=0; + if (empty_flags & ESCAPED_EMPTY) + escaped_len=0; + } + return buf; +} + + + + + + -#endif /* !MYSQL_CLIENT */ diff --git a/sql/log_event.h b/sql/log_event.h index c0a1345647f..bf04c480729 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -54,6 +54,11 @@ #define LINE_START_EMPTY 0x8 #define ESCAPED_EMPTY 0x10 +/***************************************************************************** + + old_sql_ex struct + + ****************************************************************************/ struct old_sql_ex { char field_term; @@ -67,6 +72,11 @@ struct old_sql_ex #define NUM_LOAD_DELIM_STRS 5 +/***************************************************************************** + + sql_ex_info struct + + ****************************************************************************/ struct sql_ex_info { char* field_term; @@ -99,13 +109,19 @@ struct sql_ex_info } }; -/* - Binary log consists of events. Each event has a fixed length header, - followed by possibly variable ( depending on the type of event) length - data body. The data body consists of an optional fixed length segment - (post-header), and an optional variable length segment. See #defines and - comments below for the format specifics -*/ +/***************************************************************************** + + MySQL Binary Log + + This log consists of events. Each event has a fixed-length header, + possibly followed by a variable length data body. + + The data body consists of an optional fixed length segment (post-header) + and an optional variable length segment. + + See the #defines below for the format specifics. + + ****************************************************************************/ /* event-specific post-header sizes */ #define LOG_EVENT_HEADER_LEN 19 @@ -221,6 +237,13 @@ class THD; struct st_relay_log_info; +/***************************************************************************** + + Log_event class + + This is the abstract base class for binary log events. + + ****************************************************************************/ class Log_event { public: @@ -303,6 +326,13 @@ public: }; +/***************************************************************************** + + Query Log Event class + + Logs SQL queries + + ****************************************************************************/ class Query_log_event: public Log_event { protected: @@ -355,6 +385,11 @@ public: }; +/***************************************************************************** + + Slave Log Event class + + ****************************************************************************/ class Slave_log_event: public Log_event { protected: @@ -384,6 +419,12 @@ public: int write_data(IO_CACHE* file ); }; + +/***************************************************************************** + + Load Log Event class + + ****************************************************************************/ class Load_log_event: public Log_event { protected: @@ -446,6 +487,11 @@ public: extern char server_version[SERVER_VERSION_LENGTH]; +/***************************************************************************** + + Start Log Event class + + ****************************************************************************/ class Start_log_event: public Log_event { public: @@ -477,6 +523,13 @@ public: }; +/***************************************************************************** + + Intvar Log Event class + + Logs special variables such as auto_increment values + + ****************************************************************************/ class Intvar_log_event: public Log_event { public: @@ -503,9 +556,11 @@ public: }; /***************************************************************************** - * - * Rand log event class - * + + Rand Log Event class + + Logs random seed used by the next RAND() + ****************************************************************************/ class Rand_log_event: public Log_event { @@ -531,6 +586,12 @@ class Rand_log_event: public Log_event bool is_valid() { return 1; } }; + +/***************************************************************************** + + Stop Log Event class + + ****************************************************************************/ class Stop_log_event: public Log_event { public: @@ -551,6 +612,13 @@ public: }; +/***************************************************************************** + + Rotate Log Event class + + This will be depricated when we move to using sequence ids. + + ****************************************************************************/ class Rotate_log_event: public Log_event { public: @@ -585,6 +653,11 @@ public: /* the classes below are for the new LOAD DATA INFILE logging */ +/***************************************************************************** + + Create File Log Event class + + ****************************************************************************/ class Create_file_log_event: public Load_log_event { protected: @@ -641,6 +714,11 @@ public: }; +/***************************************************************************** + + Append Block Log Event class + + ****************************************************************************/ class Append_block_log_event: public Log_event { public: @@ -665,7 +743,11 @@ public: int write_data(IO_CACHE* file); }; +/***************************************************************************** + Delete File Log Event class + + ****************************************************************************/ class Delete_file_log_event: public Log_event { public: @@ -687,6 +769,11 @@ public: int write_data(IO_CACHE* file); }; +/***************************************************************************** + + Execute Load Log Event class + + ****************************************************************************/ class Execute_load_log_event: public Log_event { public: |