diff options
author | serg@serg.mylan <> | 2005-02-14 21:50:09 +0100 |
---|---|---|
committer | serg@serg.mylan <> | 2005-02-14 21:50:09 +0100 |
commit | fd828e5b4d7f20a64bccfc25a096ca94760bcec9 (patch) | |
tree | ea9dd08a255018554104b4f137dea7cfe36ece0a /sql/log.cc | |
parent | 2b49bea59a594a0f4a87200adca21fb0cc3b2893 (diff) | |
parent | db13afd89db4a415e2a77b0c200a1391ea2f03fe (diff) | |
download | mariadb-git-fd828e5b4d7f20a64bccfc25a096ca94760bcec9.tar.gz |
manually merged
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 1431 |
1 files changed, 1050 insertions, 381 deletions
diff --git a/sql/log.cc b/sql/log.cc index d21979a707c..ca9cb6e3238 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -24,7 +24,6 @@ #include "mysql_priv.h" #include "sql_repl.h" -#include "ha_innodb.h" // necessary to cut the binlog when crash recovery #include <my_dir.h> #include <stdarg.h> @@ -39,23 +38,243 @@ ulong sync_binlog_counter= 0; static bool test_if_number(const char *str, long *res, bool allow_wildcards); +static int binlog_close_connection(THD *thd); +static int binlog_savepoint_set(THD *thd, void *sv); +static int binlog_savepoint_rollback(THD *thd, void *sv); +static int binlog_commit(THD *thd, bool all); +static int binlog_rollback(THD *thd, bool all); +static int binlog_prepare(THD *thd, bool all); + +static handlerton binlog_hton = { + 0, + sizeof(my_off_t), /* savepoint size = binlog offset */ + binlog_close_connection, + binlog_savepoint_set, + binlog_savepoint_rollback, + NULL, /* savepoint_release */ + binlog_commit, + binlog_rollback, + binlog_prepare, + NULL, /* recover */ + NULL, /* commit_by_xid */ + NULL /* rollback_by_xid */ +}; + +/* + this function is mostly a placeholder. + conceptually, binlog initialization (now mostly done in MYSQL_LOG::open) + should be moved here. + + for now, we fail if binlog is closed (mysql_bin_log.open() failed for some + reason) - it'll make mysqld to shutdown. +*/ + +handlerton *binlog_init() +{ + return mysql_bin_log.is_open() : &binlog_hton : 0; +} + +static int binlog_close_connection(THD *thd) +{ + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + DBUG_ASSERT(mysql_bin_log.is_open() && !my_b_tell(trans_log)); + close_cached_file(trans_log); + my_free((gptr)trans_log, MYF(0)); + return 0; +} + +static inline void binlog_cleanup_trans(IO_CACHE *trans_log) +{ + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } + reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail + trans_log->end_of_file= max_binlog_cache_size; +} + +static int binlog_prepare(THD *thd, bool all) +{ + /* + do nothing. + just pretend we can do 2pc, so that MySQL won't + switch to 1pc. + real work will be done in MYSQL_LOG::log() + */ + return 0; +} + +static int binlog_commit(THD *thd, bool all) +{ + int error; + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + DBUG_ENTER("binlog_commit"); + DBUG_ASSERT(mysql_bin_log.is_open() && + (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))); + + if (!my_b_tell(trans_log)) + { + // we're here because trans_log was flushed in MYSQL_LOG::log() + DBUG_RETURN(0); + } + + /* Update the binary log as we have cached some queries */ + error= mysql_bin_log.write(thd, trans_log); + binlog_cleanup_trans(trans_log); + DBUG_RETURN(error); +} + +static int binlog_rollback(THD *thd, bool all) +{ + int error=0; + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + DBUG_ENTER("binlog_rollback"); + /* + first two conditions here are guaranteed - see trans_register_ha() + call below. The third one must be true. If it is not, we're registering + unnecessary, doing extra work. The cause should be found and eliminated + */ + DBUG_ASSERT(all && mysql_bin_log.is_open() && my_b_tell(trans_log)); + /* + Update the binary log with a BEGIN/ROLLBACK block if we have + cached some queries and we updated some non-transactional + table. Such cases should be rare (updating a + non-transactional table inside a transaction...) + */ + if (unlikely(thd->options & OPTION_STATUS_NO_TRANS_UPDATE)) + { + Query_log_event qev(thd, "ROLLBACK", 8, TRUE, FALSE); + qev.write(trans_log); + error= mysql_bin_log.write(thd, trans_log); + } + binlog_cleanup_trans(trans_log); + DBUG_RETURN(error); +} + +/* + NOTE: how do we handle this (unlikely but legal) case: + [transaction] + [update to non-trans table] + [rollback to savepoint] ? + The problem occurs when a savepoint is before the update to the + non-transactional table. Then when there's a rollback to the savepoint, if we + simply truncate the binlog cache, we lose the part of the binlog cache where + the update is. If we want to not lose it, we need to write the SAVEPOINT + command and the ROLLBACK TO SAVEPOINT command to the binlog cache. The latter + is easy: it's just write at the end of the binlog cache, but the former + should be *inserted* to the place where the user called SAVEPOINT. The + solution is that when the user calls SAVEPOINT, we write it to the binlog + cache (so no need to later insert it). As transactions are never intermixed + in the binary log (i.e. they are serialized), we won't have conflicts with + savepoint names when using mysqlbinlog or in the slave SQL thread. + Then when ROLLBACK TO SAVEPOINT is called, if we updated some + non-transactional table, we don't truncate the binlog cache but instead write + ROLLBACK TO SAVEPOINT to it; otherwise we truncate the binlog cache (which + will chop the SAVEPOINT command from the binlog cache, which is good as in + that case there is no need to have it in the binlog). +*/ + +static int binlog_savepoint_set(THD *thd, void *sv) +{ + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + DBUG_ENTER("binlog_savepoint_set"); + DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + + *(my_off_t *)sv= my_b_tell(trans_log); + /* Write it to the binary log */ + Query_log_event qinfo(thd, thd->query, thd->query_length, TRUE, FALSE); + DBUG_RETURN(mysql_bin_log.write(&qinfo)); +} + +static int binlog_savepoint_rollback(THD *thd, void *sv) +{ + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + DBUG_ENTER("binlog_savepoint_rollback"); + DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + + /* + Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some + non-transactional table. Otherwise, truncate the binlog cache starting + from the SAVEPOINT command. + */ + if (unlikely(thd->options & OPTION_STATUS_NO_TRANS_UPDATE)) + { + Query_log_event qinfo(thd, thd->query, thd->query_length, TRUE, FALSE); + DBUG_RETURN(mysql_bin_log.write(&qinfo)); + } + reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0); + DBUG_RETURN(0); +} + +int check_binlog_magic(IO_CACHE* log, const char** errmsg) +{ + char magic[4]; + DBUG_ASSERT(my_b_tell(log) == 0); + + if (my_b_read(log, (byte*) magic, sizeof(magic))) + { + *errmsg = "I/O error reading the header from the binary log"; + sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, + log->error); + return 1; + } + if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) + { + *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL"; + return 1; + } + return 0; +} + +File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) +{ + File file; + DBUG_ENTER("open_binlog"); + + if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0) + { + sql_print_error("Failed to open log (file '%s', errno %d)", + log_file_name, my_errno); + *errmsg = "Could not open log file"; + goto err; + } + if (init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0, + MYF(MY_WME|MY_DONT_CHECK_FILESIZE))) + { + sql_print_error("Failed to create a cache on log (file '%s')", + log_file_name); + *errmsg = "Could not open log file"; + goto err; + } + if (check_binlog_magic(log,errmsg)) + goto err; + DBUG_RETURN(file); + +err: + if (file >= 0) + { + my_close(file,MYF(0)); + end_io_cache(log); + } + DBUG_RETURN(-1); +} #ifdef __NT__ static int eventSource = 0; -void setup_windows_event_source() +void setup_windows_event_source() { - HKEY hRegKey= NULL; + HKEY hRegKey= NULL; DWORD dwError= 0; TCHAR szPath[MAX_PATH]; DWORD dwTypes; - + if (eventSource) // Ensure that we are only called once return; eventSource= 1; // Create the event source registry key - dwError= RegCreateKey(HKEY_LOCAL_MACHINE, + dwError= RegCreateKey(HKEY_LOCAL_MACHINE, "SYSTEM\\CurrentControlSet\\Services\\EventLog\\Application\\MySQL", &hRegKey); @@ -63,9 +282,8 @@ void setup_windows_event_source() GetModuleFileName(NULL, szPath, MAX_PATH); /* Register EventMessageFile */ - dwError = RegSetValueEx(hRegKey, "EventMessageFile", 0, REG_EXPAND_SZ, + dwError = RegSetValueEx(hRegKey, "EventMessageFile", 0, REG_EXPAND_SZ, (PBYTE) szPath, strlen(szPath)+1); - /* Register supported event types */ dwTypes= (EVENTLOG_ERROR_TYPE | EVENTLOG_WARNING_TYPE | @@ -128,14 +346,14 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG() :bytes_written(0), last_time(0), query_start(0), name(0), file_id(1), open_count(1), log_type(LOG_CLOSED), write_error(0), inited(0), - need_start_event(1), description_event_for_exec(0), + need_start_event(1), prepared_xids(0), description_event_for_exec(0), description_event_for_queue(0) { /* We don't want to initialize LOCK_Log here as such initialization depends on safe_mutex (when using safe_mutex) which depends on MY_INIT(), which is called only in main(). Doing initialization here would make it happen - before main(). + before main(). */ index_file_name[0] = 0; bzero((char*) &log_file,sizeof(log_file)); @@ -156,7 +374,7 @@ void MYSQL_LOG::cleanup() if (inited) { inited= 0; - close(LOG_CLOSE_INDEX); + close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); delete description_event_for_queue; delete description_event_for_exec; (void) pthread_mutex_destroy(&LOCK_log); @@ -168,7 +386,7 @@ void MYSQL_LOG::cleanup() int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) -{ +{ fn_format(new_name,log_name,mysql_data_home,"",4); if (log_type != LOG_NORMAL) { @@ -209,6 +427,66 @@ void MYSQL_LOG::init_pthread_objects() (void) pthread_cond_init(&update_cond, 0); } +const char *MYSQL_LOG::generate_name(const char *log_name, + const char *suffix, + bool strip_ext, char *buff) +{ + if (!log_name || !log_name[0]) + { + /* + TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. + */ + strmake(buff,glob_hostname,FN_REFLEN-5); + strmov(fn_ext(buff),suffix); + return (const char *)buff; + } + // get rid of extension if the log is binary to avoid problems + if (strip_ext) + { + char *p = fn_ext(log_name); + uint length=(uint) (p-log_name); + strmake(buff,log_name,min(length,FN_REFLEN)); + return (const char*)buff; + } + return log_name; +} + +bool MYSQL_LOG::open_index_file(const char *index_file_name_arg, + const char *log_name) +{ + File index_file_nr= -1; + DBUG_ASSERT(!my_b_inited(&index_file)); + + /* + First open of this class instance + Create an index file that will hold all file names uses for logging. + Add new entries to the end of it. + */ + myf opt= MY_UNPACK_FILENAME; + if (!index_file_name_arg) + { + index_file_name_arg= log_name; // Use same basename for index file + opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT; + } + fn_format(index_file_name, index_file_name_arg, mysql_data_home, + ".index", opt); + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_BINARY , + MYF(MY_WME))) < 0 || + my_sync(index_file_nr, MYF(MY_WME)) || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, WRITE_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); + return TRUE; + } + return FALSE; +} + /* Open a (new) log file. @@ -224,35 +502,39 @@ void MYSQL_LOG::init_pthread_objects() 1 error */ -bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, - const char *new_name, const char *index_file_name_arg, - enum cache_type io_cache_type_arg, - bool no_auto_events_arg, +bool MYSQL_LOG::open(const char *log_name, + enum_log_type log_type_arg, + const char *new_name, + enum cache_type io_cache_type_arg, + bool no_auto_events_arg, ulong max_size_arg, bool null_created_arg) { - char buff[512]; - File file= -1, index_file_nr= -1; - int open_flags = O_CREAT | O_APPEND | O_BINARY; + char buff[FN_REFLEN]; + File file= -1; + int open_flags = O_CREAT | O_BINARY; DBUG_ENTER("MYSQL_LOG::open"); - DBUG_PRINT("enter",("log_type: %d",(int) log_type)); + DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); last_time=query_start=0; write_error=0; init(log_type_arg,io_cache_type_arg,no_auto_events_arg,max_size_arg); - + if (!(name=my_strdup(log_name,MYF(MY_WME)))) + { + name= (char *)log_name; // for the error message goto err; + } if (new_name) strmov(log_file_name,new_name); else if (generate_new_name(log_file_name, name)) goto err; - + if (io_cache_type == SEQ_READ_APPEND) - open_flags |= O_RDWR; + open_flags |= O_RDWR | O_APPEND; else - open_flags |= O_WRONLY; + open_flags |= O_WRONLY | (log_type == LOG_BIN ? 0 : O_APPEND); db[0]=0; open_count++; @@ -311,13 +593,6 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, { bool write_file_name_to_index_file=0; - myf opt= MY_UNPACK_FILENAME; - if (!index_file_name_arg) - { - index_file_name_arg= name; // Use same basename for index file - opt= MY_UNPACK_FILENAME | MY_REPLACE_EXT; - } - if (!my_b_filelength(&log_file)) { /* @@ -333,33 +608,9 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, write_file_name_to_index_file= 1; } - if (!my_b_inited(&index_file)) - { - /* - First open of this class instance - Create an index file that will hold all file names uses for logging. - Add new entries to the end of it. - Index file (and binlog) are so critical for recovery/replication - that we create them with MY_WAIT_IF_FULL. - */ - fn_format(index_file_name, index_file_name_arg, mysql_data_home, - ".index", opt); - if ((index_file_nr= my_open(index_file_name, - O_RDWR | O_CREAT | O_BINARY , - MYF(MY_WME))) < 0 || - my_sync(index_file_nr, MYF(MY_WME)) || - init_io_cache(&index_file, index_file_nr, - IO_SIZE, WRITE_CACHE, - my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), - 0, MYF(MY_WME | MY_WAIT_IF_FULL))) - goto err; - } - else - { - safe_mutex_assert_owner(&LOCK_index); - reinit_io_cache(&index_file, WRITE_CACHE, my_b_filelength(&index_file), - 0, 0); - } + DBUG_ASSERT(my_b_inited(&index_file)); + reinit_io_cache(&index_file, WRITE_CACHE, + my_b_filelength(&index_file), 0, 0); if (need_start_event && !no_auto_events) { /* @@ -367,6 +618,7 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, even if this is not the very first binlog. */ Format_description_log_event s(BINLOG_VERSION); + s.flags|= LOG_EVENT_BINLOG_IN_USE_F; if (!s.is_valid()) goto err; if (null_created_arg) @@ -401,7 +653,7 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, description_event_for_queue->created= 0; /* Don't set log_pos in event header */ description_event_for_queue->artificial_event=1; - + if (description_event_for_queue->write(&log_file)) goto err; bytes_written+= description_event_for_queue->data_written; @@ -436,11 +688,9 @@ err: sql_print_error("Could not use %s for logging (error %d). \ Turning logging off for the whole duration of the MySQL server process. \ To turn it on again: fix the cause, \ -shutdown the MySQL server and restart it.", log_name, errno); +shutdown the MySQL server and restart it.", name, errno); if (file >= 0) my_close(file,MYF(0)); - if (index_file_nr >= 0) - my_close(index_file_nr,MYF(0)); end_io_cache(&log_file); end_io_cache(&index_file); safeFree(name); @@ -546,8 +796,8 @@ int MYSQL_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, DBUG_PRINT("enter",("log_name: %s", log_name ? log_name : "NULL")); /* - Mutex needed because we need to make sure the file pointer does not move - from under our feet + Mutex needed because we need to make sure the file pointer does not + move from under our feet */ if (need_lock) pthread_mutex_lock(&LOCK_index); @@ -630,7 +880,7 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO; goto err; } - fname[length-1]=0; // kill /n + fname[length-1]=0; // kill \n linfo->index_file_offset = my_b_tell(&index_file); err: @@ -686,7 +936,7 @@ bool MYSQL_LOG::reset_logs(THD* thd) error=1; goto err; } - + for (;;) { my_delete(linfo.log_file_name, MYF(MY_WME)); @@ -699,11 +949,12 @@ bool MYSQL_LOG::reset_logs(THD* thd) my_delete(index_file_name, MYF(MY_WME)); // Reset (open will update) if (!thd->slave_thread) need_start_event=1; - open(save_name, save_log_type, 0, index_file_name, + open_index_file(index_file_name, 0); + open(save_name, save_log_type, 0, io_cache_type, no_auto_events, max_size, 0); my_free((gptr) save_name, MYF(0)); -err: +err: pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -722,7 +973,7 @@ err: rli->group_relay_log_name are deleted ; if true, the latter is deleted too (i.e. all relay logs read by the SQL slave thread are deleted). - + NOTE - This is only called from the slave-execute thread when it has read all commands from a relay log and want to switch to a new relay log. @@ -1040,10 +1291,28 @@ void MYSQL_LOG::new_file(bool need_lock) { pthread_mutex_lock(&LOCK_log); pthread_mutex_lock(&LOCK_index); - } + } safe_mutex_assert_owner(&LOCK_log); safe_mutex_assert_owner(&LOCK_index); + /* + if binlog is used as tc log, be sure all xids are "unlogged", + so that on recover we only need to scan one - latest - binlog file + for prepared xids. As this is expected to be a rare event, + simple wait strategy is enough. We're locking LOCK_log to be sure no + new Xid_log_event's are added to the log (and prepared_xids is not + increased), and waiting on COND_prep_xids for late threads to + catch up. + */ + if (prepared_xids) + { + tc_log_page_waits++; + pthread_mutex_lock(&LOCK_prep_xids); + while (prepared_xids) + pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids); + pthread_mutex_unlock(&LOCK_prep_xids); + } + /* Reuse old name if not binlog and not update log */ new_name_ptr= name; @@ -1055,7 +1324,7 @@ void MYSQL_LOG::new_file(bool need_lock) if (generate_new_name(new_name, name)) goto end; new_name_ptr=new_name; - + if (log_type == LOG_BIN) { if (!no_auto_events) @@ -1074,30 +1343,28 @@ void MYSQL_LOG::new_file(bool need_lock) log rotation should give the waiting thread a signal to discover EOF and move on to the next log. */ - signal_update(); + signal_update(); } old_name=name; save_log_type=log_type; name=0; // Don't free name close(LOG_CLOSE_TO_BE_OPENED); - /* + /* Note that at this point, log_type != LOG_CLOSED (important for is_open()). */ - /* + /* new_file() is only used for rotation (in FLUSH LOGS or because size > - max_binlog_size or max_relay_log_size). + max_binlog_size or max_relay_log_size). If this is a binary log, the Format_description_log_event at the beginning of the new file should have created=0 (to distinguish with the Format_description_log_event written at server startup, which should trigger temp tables deletion on slaves. - */ + */ - open(old_name, save_log_type, new_name_ptr, index_file_name, io_cache_type, - no_auto_events, max_size, 1); - if (this == &mysql_bin_log) - report_pos_in_innodb(); + open(old_name, save_log_type, new_name_ptr, + io_cache_type, no_auto_events, max_size, 1); my_free(old_name,MYF(0)); end: @@ -1286,8 +1553,7 @@ inline bool sync_binlog(IO_CACHE *cache) bool MYSQL_LOG::write(Log_event* event_info) { THD *thd=event_info->thd; - bool called_handler_commit=0; - bool error=0; + bool error=1; bool should_rotate = 0; DBUG_ENTER("MYSQL_LOG::write(event)"); @@ -1298,26 +1564,10 @@ bool MYSQL_LOG::write(Log_event* event_info) mostly called if is_open() *was* true a few instructions before, but it could have changed since. */ - if (is_open()) + if (likely(is_open())) { const char *local_db= event_info->get_db(); IO_CACHE *file= &log_file; -#ifdef USING_TRANSACTIONS - /* - Should we write to the binlog cache or to the binlog on disk? - Write to the binlog cache if: - - it is already not empty (meaning we're in a transaction; note that the - present event could be about a non-transactional table, but still we need - to write to the binlog cache in that case to handle updates to mixed - trans/non-trans table types the best possible in binlogging) - - or if the event asks for it (cache_stmt == true). - */ - if (opt_using_transactions && - (event_info->get_cache_stmt() || - (thd && my_b_tell(&thd->transaction.trans_log)))) - file= &thd->transaction.trans_log; -#endif - DBUG_PRINT("info",("event type=%d",event_info->get_type_code())); #ifdef HAVE_REPLICATION /* In the future we need to add to the following if tests like @@ -1333,7 +1583,50 @@ bool MYSQL_LOG::write(Log_event* event_info) } #endif /* HAVE_REPLICATION */ - error=1; +#ifdef USING_TRANSACTIONS + /* + Should we write to the binlog cache or to the binlog on disk? + Write to the binlog cache if: + - it is already not empty (meaning we're in a transaction; note that the + present event could be about a non-transactional table, but still we need + to write to the binlog cache in that case to handle updates to mixed + trans/non-trans table types the best possible in binlogging) + - or if the event asks for it (cache_stmt == true). + */ + if (opt_using_transactions && thd) + { + IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot]; + + if (event_info->get_cache_stmt()) + { + if (!trans_log) + { + thd->ha_data[binlog_hton.slot]= trans_log= (IO_CACHE *) + my_malloc(sizeof(IO_CACHE), MYF(MY_ZEROFILL)); + if (!trans_log || open_cached_file(trans_log, mysql_tmpdir, LOG_PREFIX, + binlog_cache_size, MYF(MY_WME))) + { + my_free((gptr)trans_log, MYF(MY_ALLOW_ZERO_PTR)); + thd->ha_data[binlog_hton.slot]= trans_log= 0; + goto err; + } + trans_log->end_of_file= max_binlog_cache_size; + trans_register_ha(thd, + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN), + &binlog_hton); + } + else if (!my_b_tell(trans_log)) + trans_register_ha(thd, + thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN), + &binlog_hton); + file= trans_log; + } + else if (trans_log && my_b_tell(trans_log)) + file= trans_log; + } +#endif + DBUG_PRINT("info",("event type=%d",event_info->get_type_code())); + /* No check for auto events flag here - this write method should never be called if auto-events are enabled @@ -1432,17 +1725,6 @@ COLLATION_CONNECTION=%u,COLLATION_DATABASE=%u,COLLATION_SERVER=%u", goto err; } } -#ifdef TO_BE_REMOVED - if (thd->variables.convert_set) - { - char buf[256], *p; - p= strmov(strmov(buf, "SET CHARACTER SET "), - thd->variables.convert_set->name); - Query_log_event e(thd, buf, (ulong) (p - buf), 0, FALSE); - if (e.write(file)) - goto err; - } -#endif } /* Write the SQL command */ @@ -1450,71 +1732,12 @@ COLLATION_CONNECTION=%u,COLLATION_DATABASE=%u,COLLATION_SERVER=%u", if (event_info->write(file)) goto err; - /* - Tell for transactional table handlers up to which position in the - binlog file we wrote. The table handler can store this info, and - after crash recovery print for the user the offset of the last - transactions which were recovered. Actually, we must also call - the table handler commit here, protected by the LOCK_log mutex, - because otherwise the transactions may end up in a different order - in the table handler log! - - Note that we will NOT call ha_report_binlog_offset_and_commit() if - there are binlog events cached in the transaction cache. That is - because then the log event which we write to the binlog here is - not a transactional event. In versions < 4.0.13 before this fix this - caused an InnoDB transaction to be committed if in the middle there - was a MyISAM event! - */ - if (file == &log_file) // we are writing to the real log (disk) { if (flush_io_cache(file) || sync_binlog(file)) goto err; - if (opt_using_transactions && - !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) - { - /* - LOAD DATA INFILE in AUTOCOMMIT=1 mode writes to the binlog - chunks also before it is successfully completed. We only report - the binlog write and do the commit inside the transactional table - handler if the log event type is appropriate. - */ - - if (event_info->get_type_code() == QUERY_EVENT || - event_info->get_type_code() == EXEC_LOAD_EVENT) - { -#ifndef DBUG_OFF - if (unlikely(opt_crash_binlog_innodb)) - { - /* - This option is for use in rpl_crash_binlog_innodb.test. - 1st we want to verify that Binlog_dump thread cannot send the - event now (because of LOCK_log): we here tell the Binlog_dump - thread to wake up, sleep for the slave to have time to possibly - receive data from the master (it should not), and then crash. - 2nd we want to verify that at crash recovery the rolled back - event is cut from the binlog. - */ - if (!(--opt_crash_binlog_innodb)) - { - signal_update(); - sleep(2); - fprintf(stderr,"This is a normal crash because of" - " --crash-binlog-innodb\n"); - assert(0); - } - DBUG_PRINT("info",("opt_crash_binlog_innodb: %d", - opt_crash_binlog_innodb)); - } -#endif - error = ha_report_binlog_offset_and_commit(thd, log_file_name, - file->pos_in_file); - called_handler_commit=1; - } - } - /* We wrote to the real log, check automatic rotation; */ + /* check automatic rotation; */ DBUG_PRINT("info",("max_size: %lu",max_size)); should_rotate= (my_b_tell(file) >= (my_off_t) max_size); } @@ -1533,7 +1756,7 @@ err: signal_update(); if (should_rotate) { - pthread_mutex_lock(&LOCK_index); + pthread_mutex_lock(&LOCK_index); new_file(0); // inside mutex pthread_mutex_unlock(&LOCK_index); } @@ -1541,15 +1764,6 @@ err: pthread_mutex_unlock(&LOCK_log); - /* - Flush the transactional handler log file now that we have released - LOCK_log; the flush is placed here to eliminate the bottleneck on the - group commit - */ - - if (called_handler_commit) - ha_commit_complete(thd); - #ifdef HAVE_REPLICATION if (should_rotate && expire_logs_days) { @@ -1577,16 +1791,18 @@ uint MYSQL_LOG::next_file_id() SYNOPSIS write() - thd + thd cache The cache to copy to the binlog - commit_or_rollback If true, will write "COMMIT" in the end, if false will - write "ROLLBACK". NOTE - We only come here if there is something in the cache. - The thing in the cache is always a complete transaction - 'cache' needs to be reinitialized after this functions returns. + TODO + fix it to become atomic - either the complete cache is added to binlog + or nothing (other storage engines rely on this, doing a ROLLBACK) + IMPLEMENTATION - To support transaction over replication, we wrap the transaction with BEGIN/COMMIT or BEGIN/ROLLBACK in the binary log. @@ -1595,29 +1811,21 @@ uint MYSQL_LOG::next_file_id() same updates are run on the slave. */ -bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache, bool commit_or_rollback) +bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache) { bool should_rotate= 0, error= 0; VOID(pthread_mutex_lock(&LOCK_log)); DBUG_ENTER("MYSQL_LOG::write(cache"); - if (is_open()) // Should always be true + if (likely(is_open())) // Should always be true { uint length; /* - Add the "BEGIN" and "COMMIT" in the binlog around transactions - which may contain more than 1 SQL statement. If we run with - AUTOCOMMIT=1, then MySQL immediately writes each SQL statement to - the binlog when the statement has been completed. No need to add - "BEGIN" ... "COMMIT" around such statements. Otherwise, MySQL uses - thd->transaction.trans_log to cache the SQL statements until the - explicit commit, and at the commit writes the contents in .trans_log - to the binlog. - - We write the "BEGIN" mark first in the buffer (.trans_log) where we - store the SQL statements for a transaction. At the transaction commit - we will add the "COMMIT mark and write the buffer to the binlog. + Log "BEGIN" at the beginning of the transaction. + which may contain more than 1 SQL statement. + There is no need to append "COMMIT", as it's already in the 'cache' + (in fact, Xid_log_event is there which does the commit on slaves) */ { Query_log_event qinfo(thd, "BEGIN", 5, TRUE, FALSE); @@ -1643,6 +1851,7 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache, bool commit_or_rollback) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) goto err; length=my_b_bytes_in_cache(cache); + DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); do { /* Write data to the binary log file */ @@ -1651,46 +1860,15 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache, bool commit_or_rollback) cache->read_pos=cache->read_end; // Mark buffer used up } while ((length=my_b_fill(cache))); - /* - We write the command "COMMIT" as the last SQL command in the - binlog segment cached for this transaction - */ - - { - Query_log_event qinfo(thd, - commit_or_rollback ? "COMMIT" : "ROLLBACK", - commit_or_rollback ? 6 : 8, - TRUE, FALSE); - qinfo.error_code= 0; - if (qinfo.write(&log_file) || flush_io_cache(&log_file) || - sync_binlog(&log_file)) + if (flush_io_cache(&log_file) || sync_binlog(&log_file)) goto err; - } + DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); if (cache->error) // Error on read { sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); write_error=1; // Don't give more errors goto err; } -#ifndef DBUG_OFF - if (unlikely(opt_crash_binlog_innodb)) - { - /* see the previous MYSQL_LOG::write() method for a comment */ - if (!(--opt_crash_binlog_innodb)) - { - signal_update(); - sleep(2); - fprintf(stderr, "This is a normal crash because of" - " --crash-binlog-innodb\n"); - assert(0); - } - DBUG_PRINT("info",("opt_crash_binlog_innodb: %d", - opt_crash_binlog_innodb)); - } -#endif - if ((ha_report_binlog_offset_and_commit(thd, log_file_name, - log_file.pos_in_file))) - goto err; signal_update(); DBUG_PRINT("info",("max_size: %lu",max_size)); if (should_rotate= (my_b_tell(&log_file) >= (my_off_t) max_size)) @@ -1703,12 +1881,6 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache, bool commit_or_rollback) } VOID(pthread_mutex_unlock(&LOCK_log)); - /* Flush the transactional handler log file now that we have released - LOCK_log; the flush is placed here to eliminate the bottleneck on the - group commit */ - - ha_commit_complete(thd); - #ifdef HAVE_REPLICATION if (should_rotate && expire_logs_days) { @@ -1894,11 +2066,11 @@ void MYSQL_LOG::wait_for_update(THD* thd, bool master_or_slave) SYNOPSIS close() - exiting Bitmask for one or more of the following bits: - LOG_CLOSE_INDEX if we should close the index file - LOG_CLOSE_TO_BE_OPENED if we intend to call open - at once after close. - LOG_CLOSE_STOP_EVENT write a 'stop' event to the log + exiting Bitmask for one or more of the following bits: + LOG_CLOSE_INDEX if we should close the index file + LOG_CLOSE_TO_BE_OPENED if we intend to call open + at once after close. + LOG_CLOSE_STOP_EVENT write a 'stop' event to the log NOTES One can do an open on the object at once after doing a close. @@ -1922,6 +2094,15 @@ void MYSQL_LOG::close(uint exiting) } #endif /* HAVE_REPLICATION */ end_io_cache(&log_file); + + /* don't pwrite in a file opened with O_APPEND - it doesn't work */ + if (log_file.type == WRITE_CACHE && log_type == LOG_BIN) + { + my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET; + char flags=0; // clearing LOG_EVENT_BINLOG_IN_USE_F + my_pwrite(log_file.file, &flags, 1, offset, MYF(0)); + } + if (my_close(log_file.file,MYF(0)) < 0 && ! write_error) { write_error=1; @@ -2106,145 +2287,6 @@ bool flush_error_log() return result; } - -/* - If the server has InnoDB on, and InnoDB has published the position of the - last committed transaction (which happens only if a crash recovery occured at - this startup) then truncate the previous binary log at the position given by - InnoDB. If binlog is shorter than the position, print a message to the error - log. - - SYNOPSIS - cut_spurious_tail() - - RETURN VALUES - 1 Error - 0 Ok -*/ - -bool MYSQL_LOG::cut_spurious_tail() -{ - int error= 0; - DBUG_ENTER("cut_spurious_tail"); - -#ifdef HAVE_INNOBASE_DB - if (have_innodb != SHOW_OPTION_YES) - DBUG_RETURN(0); - /* - This is the place where we use information from InnoDB to cut the - binlog. - */ - char *name= ha_innobase::get_mysql_bin_log_name(); - ulonglong pos= ha_innobase::get_mysql_bin_log_pos(); - ulonglong actual_size; - char llbuf1[22], llbuf2[22]; - - if (name[0] == 0 || pos == ULONGLONG_MAX) - { - DBUG_PRINT("info", ("InnoDB has not set binlog info")); - DBUG_RETURN(0); - } - /* The binlog given by InnoDB normally is never an active binlog */ - if (is_open() && is_active(name)) - { - sql_print_error("Warning: after InnoDB crash recovery, InnoDB says that " - "the binary log of the previous run has the same name " - "'%s' as the current one; this is likely to be abnormal.", - name); - DBUG_RETURN(1); - } - sql_print_error("After InnoDB crash recovery, checking if the binary log " - "'%s' contains rolled back transactions which must be " - "removed from it...", name); - /* If we have a too long binlog, cut. If too short, print error */ - int fd= my_open(name, O_EXCL | O_APPEND | O_BINARY | O_WRONLY, MYF(MY_WME)); - if (fd < 0) - { - int save_errno= my_errno; - sql_print_error("Could not open the binary log '%s' for truncation.", - name); - if (save_errno != ENOENT) - sql_print_error("The binary log '%s' should not be used for " - "replication.", name); - DBUG_RETURN(1); - } - - if (pos > (actual_size= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME)))) - { - /* - Note that when we have MyISAM rollback this error message should be - reconsidered. - */ - sql_print_error("The binary log '%s' is shorter than its expected size " - "(actual: %s, expected: %s) so it misses at least one " - "committed transaction; so it should not be used for " - "replication or point-in-time recovery. You would need " - "to restart slaves from a fresh master's data " - "snapshot ", - name, llstr(actual_size, llbuf1), - llstr(pos, llbuf2)); - error= 1; - goto err; - } - if (pos < actual_size) - { - sql_print_error("The binary log '%s' is bigger than its expected size " - "(actual: %s, expected: %s) so it contains a rolled back " - "transaction; now truncating that.", name, - llstr(actual_size, llbuf1), llstr(pos, llbuf2)); - /* - As on some OS, my_chsize() can only pad with 0s instead of really - truncating. Then mysqlbinlog (and Binlog_dump thread) will error on - these zeroes. This is annoying, but not more (you just need to manually - switch replication to the next binlog). Fortunately, in my_chsize.c, it - says that all modern machines support real ftruncate(). - - */ - if ((error= my_chsize(fd, pos, 0, MYF(MY_WME)))) - goto err; - } -err: - if (my_close(fd, MYF(MY_WME))) - error= 1; -#endif - DBUG_RETURN(error); -} - - -/* - If the server has InnoDB on, store the binlog name and position into - InnoDB. This function is used every time we create a new binlog. - - SYNOPSIS - report_pos_in_innodb() - - NOTES - This cannot simply be done in MYSQL_LOG::open(), because when we create - the first binlog at startup, we have not called ha_init() yet so we cannot - write into InnoDB yet. - - RETURN VALUES - 1 Error - 0 Ok -*/ - -void MYSQL_LOG::report_pos_in_innodb() -{ - DBUG_ENTER("report_pos_in_innodb"); -#ifdef HAVE_INNOBASE_DB - if (is_open() && have_innodb == SHOW_OPTION_YES) - { - DBUG_PRINT("info", ("Reporting binlog info into InnoDB - " - "name: '%s' position: %d", - log_file_name, my_b_tell(&log_file))); - innobase_store_binlog_offset_and_flush_log(log_file_name, - my_b_tell(&log_file)); - } -#endif - DBUG_VOID_RETURN; -} - - void MYSQL_LOG::signal_update() { DBUG_ENTER("MYSQL_LOG::signal_update"); @@ -2309,7 +2351,7 @@ void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, vprint_msg_to_log() event_type Type of event to write (Error, Warning, or Info) format Printf style format of message - args va_list list of arguments for the message + args va_list list of arguments for the message NOTE @@ -2375,3 +2417,630 @@ void sql_print_information(const char *format, ...) DBUG_VOID_RETURN; } + +/********* transaction coordinator log for 2pc - mmap() based solution *******/ + +/* + the log consists of a file, mmapped to a memory. + file is divided on pages of tc_log_page_size size. + (usable size of the first page is smaller because of log header) + there's PAGE control structure for each page + each page (or rather PAGE control structure) can be in one of three + states - active, syncing, pool. + there could be only one page in active or syncing states, + but many in pool - pool is fifo queue. + usual lifecycle of a page is pool->active->syncing->pool + "active" page - is a page where new xid's are logged. + the page stays active as long as syncing slot is taken. + "syncing" page is being synced to disk. no new xid can be added to it. + when the sync is done the page is moved to a pool and an active page + becomes "syncing". + + the result of such an architecture is a natural "commit grouping" - + If commits are coming faster than the system can sync, they do not + stall. Instead, all commit that came since the last sync are + logged to the same page, and they all are synced with the next - + one - sync. Thus, thought individual commits are delayed, throughput + is not decreasing. + + when a xid is added to an active page, the thread of this xid waits + for a page's condition until the page is synced. when syncing slot + becomes vacant one of these waiters is awaken to take care of syncing. + it syncs the page and signals all waiters that the page is synced. + PAGE::waiters is used to count these waiters, and a page may never + become active again until waiters==0 (that is all waiters from the + previous sync have noticed the sync was completed) + + note, that the page becomes "dirty" and has to be synced only when a + new xid is added into it. Removing a xid from a page does not make it + dirty - we don't sync removals to disk. +*/ +#define TC_LOG_HEADER_SIZE (sizeof(tc_log_magic)+1) + +static const char tc_log_magic[]={254, 0x23, 0x05, 0x74}; + +uint opt_tc_log_size=TC_LOG_MIN_SIZE; +uint tc_log_max_pages_used=0, tc_log_page_size=0, + tc_log_page_waits=0, tc_log_cur_pages_used=0; + +TC_LOG *tc_log; +TC_LOG_MMAP tc_log_mmap; +TC_LOG_DUMMY tc_log_dummy; + +int TC_LOG_MMAP::open(const char *opt_name) +{ + uint i; + bool crashed=FALSE; + PAGE *pg; + + DBUG_ASSERT(total_ha_2pc > 1); + DBUG_ASSERT(opt_name && opt_name[0]); + +#ifdef HAVE_GETPAGESIZE + tc_log_page_size=getpagesize(); + DBUG_ASSERT(TC_LOG_PAGE_SIZE % tc_log_page_size == 0); +#else + tc_log_page_size=TC_LOG_PAGE_SIZE; +#endif + + fn_format(logname,opt_name,mysql_data_home,"",MY_UNPACK_FILENAME); + fd= my_open(logname, O_RDWR, MYF(0)); + if (fd == -1) + { + if (my_errno != ENOENT) + goto err; + if (using_heuristic_recover()) + return 1; + fd= my_create(logname, O_RDWR, 0, MYF(MY_WME)); + if (fd == -1) + goto err; + inited=1; + file_length= opt_tc_log_size; + if (my_chsize(fd, file_length, 0, MYF(MY_WME))) + goto err; + } + else + { + inited= 1; + crashed= TRUE; + sql_print_information("Recovering after a crash"); + if (tc_heuristic_recover) + { + sql_print_error("Cannot perform automatic crash recovery when " + "--tc-heuristic-recover is used"); + goto err; + } + file_length= my_seek(fd, 0L, MY_SEEK_END, MYF(MY_WME+MY_FAE)); + if (file_length == MY_FILEPOS_ERROR || file_length % tc_log_page_size) + goto err; + } + + data= (uchar *)my_mmap(0, file_length, PROT_READ|PROT_WRITE, + MAP_NOSYNC|MAP_SHARED, fd, 0); + if (data == MAP_FAILED) + { + my_errno=errno; + goto err; + } + inited=2; + + npages=file_length/tc_log_page_size; + DBUG_ASSERT(npages >= 3); // to guarantee non-empty pool + if (!(pages=(PAGE *)my_malloc(npages*sizeof(PAGE), MYF(MY_WME|MY_ZEROFILL)))) + goto err; + inited=3; + for (pg=pages, i=0; i < npages; i++, pg++) + { + pg->next=pg+1; + pg->waiters=0; + pg->state=POOL; + pthread_mutex_init(&pg->lock, MY_MUTEX_INIT_FAST); + pthread_cond_init (&pg->cond, 0); + pg->start=(my_xid *)(data + i*tc_log_page_size); + pg->end=(my_xid *)(pg->start + tc_log_page_size); + pg->size=pg->free=tc_log_page_size/sizeof(my_xid); + } + pages[0].size=pages[0].free= + (tc_log_page_size-TC_LOG_HEADER_SIZE)/sizeof(my_xid); + pages[0].start=pages[0].end-pages[0].size; + pages[npages-1].next=0; + inited=4; + + if (crashed && recover()) + goto err; + + memcpy(data, tc_log_magic, sizeof(tc_log_magic)); + data[sizeof(tc_log_magic)]= total_ha_2pc; + my_msync(fd, data, tc_log_page_size, MS_SYNC); + inited=5; + + pthread_mutex_init(&LOCK_sync, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_active, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_active, 0); + pthread_cond_init(&COND_pool, 0); + + inited=6; + + syncing= 0; + active=pages; + pool=pages+1; + pool_last=pages+npages-1; + + return 0; + +err: + close(); + return 1; +} + +/* + there is no active page, let's got one from the pool + + two strategies here: + 1. take the first from the pool + 2. if there're waiters - take the one with the most free space + + TODO page merging. try to allocate adjacent page first, + so that they can be flushed both in one sync +*/ +void TC_LOG_MMAP::get_active_from_pool() +{ + PAGE **p, **best_p=0; + int best_free; + + if (syncing) + pthread_mutex_lock(&LOCK_pool); + + do + { + best_p= p= &pool; + if ((*p)->waiters == 0) // can the first page be used ? + break; // yes - take it. + + best_free=0; // no - trying second strategy + for (p=&(*p)->next; *p; p=&(*p)->next) + { + if ((*p)->waiters == 0 && (*p)->free > best_free) + { + best_free=(*p)->free; + best_p=p; + } + } + } + while ((*best_p == 0 || best_free == 0) && overflow()); + + active=*best_p; + if (active->free == active->size) // we've chosen an empty page + { + tc_log_cur_pages_used++; + set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used); + } + + if ((*best_p)->next) // unlink the page from the pool + *best_p=(*best_p)->next; + else + pool_last=*best_p; + + if (syncing) + pthread_mutex_unlock(&LOCK_pool); +} + +int TC_LOG_MMAP::overflow() +{ + /* + simple overflow handling - just wait + TODO perhaps, increase log size ? + let's check the behaviour of tc_log_page_waits first + */ + tc_log_page_waits++; + pthread_cond_wait(&COND_pool, &LOCK_pool); + return 1; // always return 1 +} + +/* + all access to active page is serialized but it's not a problem, as + we're assuming that fsync() will be a main bottleneck. + That is, parallelizing writes to log pages we'll decrease number of + threads waiting for a page, but then all these threads will be waiting + for a fsync() anyway + + RETURN + 0 - error + otherwise - "cookie", a number that will be passed as an argument + to unlog() call. tc_log can define it any way it wants, + and use for whatever purposes. TC_LOG_MMAP sets it + to the position in memory where xid was logged to. +*/ + +int TC_LOG_MMAP::log(THD *thd, my_xid xid) +{ + int err; + PAGE *p; + ulong cookie; + + pthread_mutex_lock(&LOCK_active); + + /* + if active page is full - just wait... + frankly speaking, active->free here accessed outside of mutex + protection, but it's safe, because it only means we may miss an + unlog() for the active page, and we're not waiting for it here - + unlog() does not signal COND_active. + */ + while (unlikely(active && active->free == 0)) + pthread_cond_wait(&COND_active, &LOCK_active); + + /* no active page ? take one from the pool */ + if (active == 0) + get_active_from_pool(); + + p=active; + pthread_mutex_lock(&p->lock); + + /* searching for an empty slot */ + while (*p->ptr) + { + p->ptr++; + DBUG_ASSERT(p->ptr < p->end); // because p->free > 0 + } + + /* found! store xid there and mark the page dirty */ + cookie= (ulong)((uchar *)p->ptr - data); // can never be zero + *p->ptr++= xid; + p->free--; + p->state= DIRTY; + + /* to sync or not to sync - this is the question */ + pthread_mutex_unlock(&LOCK_active); + pthread_mutex_lock(&LOCK_sync); + pthread_mutex_unlock(&p->lock); + + if (syncing) + { // somebody's syncing. let's wait + p->waiters++; + /* + note - it must be while(), not do ... while() here + as p->state may be not DIRTY when we come here + */ + while (p->state == DIRTY && syncing) + pthread_cond_wait(&p->cond, &LOCK_sync); + p->waiters--; + err= p->state == ERROR; + if (p->state != DIRTY) // page was synced + { + if (p->waiters == 0) + pthread_cond_signal(&COND_pool); // in case somebody's waiting + pthread_mutex_unlock(&LOCK_sync); + goto done; // we're done + } + } // page was not synced! do it now + DBUG_ASSERT(active == p && syncing == 0); + pthread_mutex_lock(&LOCK_active); + syncing=p; // place is vacant - take it + active=0; // page is not active anymore + pthread_cond_broadcast(&COND_active); // in case somebody's waiting + pthread_mutex_unlock(&LOCK_active); + pthread_mutex_unlock(&LOCK_sync); + err= sync(); + +done: + return err ? 0 : cookie; +} + +int TC_LOG_MMAP::sync() +{ + int err; + + DBUG_ASSERT(syncing != active); + + /* + sit down and relax - this can take a while... + note - no locks are held at this point + */ + err= my_msync(fd, syncing->start, 1, MS_SYNC); + + /* page is synced. let's move it to the pool */ + pthread_mutex_lock(&LOCK_pool); + pool_last->next=syncing; + pool_last=syncing; + syncing->next=0; + syncing->state= err ? ERROR : POOL; + pthread_cond_broadcast(&syncing->cond); // signal "sync done" + pthread_cond_signal(&COND_pool); // in case somebody's waiting + pthread_mutex_unlock(&LOCK_pool); + + /* marking 'syncing' slot free */ + pthread_mutex_lock(&LOCK_sync); + syncing=0; + pthread_cond_signal(&active->cond); // wake up a new syncer + pthread_mutex_unlock(&LOCK_sync); + return err; +} + +/* + erase xid from the page, update page free space counters/pointers. + cookie points directly to the memory where xid was logged +*/ +void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) +{ + PAGE *p=pages+(cookie/tc_log_page_size); + my_xid *x=(my_xid *)(data+cookie); + + DBUG_ASSERT(*x == xid); + DBUG_ASSERT(x >= p->start && x < p->end); + *x=0; + + pthread_mutex_lock(&p->lock); + p->free++; + DBUG_ASSERT(p->free <= p->size); + set_if_smaller(p->ptr, x); + if (p->free == p->size) // the page is completely empty + statistic_decrement(tc_log_cur_pages_used, &LOCK_status); + if (p->waiters == 0) // the page is in pool and ready to rock + pthread_cond_signal(&COND_pool); // ping ... for overflow() + pthread_mutex_unlock(&p->lock); +} + +void TC_LOG_MMAP::close() +{ + switch (inited) { + case 6: + pthread_mutex_destroy(&LOCK_sync); + pthread_mutex_destroy(&LOCK_active); + pthread_mutex_destroy(&LOCK_pool); + pthread_cond_destroy(&COND_pool); + case 5: + data[0]='A'; // garble the first (signature) byte, in case my_delete fails + case 4: + for (uint i=0; i < npages; i++) + { + if (pages[i].ptr == 0) + break; + pthread_mutex_destroy(&pages[i].lock); + pthread_cond_destroy(&pages[i].cond); + } + case 3: + my_free((gptr)pages, MYF(0)); + case 2: + my_munmap(data, file_length); + case 1: + my_close(fd, MYF(0)); + } + if (inited>=5) // cannot do in the switch because of Windows + my_delete(logname, MYF(MY_WME)); + inited=0; +} + +int TC_LOG_MMAP::recover() +{ + HASH xids; + PAGE *p=pages, *end_p=pages+npages; + + if (memcmp(data, tc_log_magic, sizeof(tc_log_magic))) + { + sql_print_error("Bad magic header in tc log"); + goto err1; + } + + /* + the first byte after magic signature is set to current + number of storage engines on startup + */ + if (data[sizeof(tc_log_magic)] != total_ha_2pc) + { + sql_print_error("Recovery failed! You must have enabled " + "exactly %d storage engines that support " + "two-phase commit protocol", + data[sizeof(tc_log_magic)]); + goto err1; + } + + if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0, + sizeof(my_xid), 0, 0, MYF(0))) + goto err1; + + for ( ; p < end_p ; p++) + { + for (my_xid *x=p->start; x < p->end; x++) + if (*x && my_hash_insert(&xids, (byte *)x)) + goto err2; // OOM + } + + if (ha_recover(&xids)) + goto err2; + + hash_free(&xids); + bzero(data, file_length); + return 0; + +err2: + hash_free(&xids); +err1: + sql_print_error("Crash recovery failed. Either correct the problem " + "(if it's, for example, out of memory error) and restart, " + "or delete tc log and start mysqld with " + "--tc-heuristic-recover={commit|rollback}"); + return 1; +} + +/* + Perform heuristic recovery, if --tc-heuristic-recover was used + + RETURN VALUE + 0 no heuristic recovery was requested + 1 heuristic recovery was performed + + NOTE + no matter whether heuristic recovery was successful or not + mysqld must exit. So, return value is the same in both cases. +*/ + +int TC_LOG::using_heuristic_recover() +{ + if (!tc_heuristic_recover) + return 0; + + sql_print_information("Heuristic crash recovery mode"); + if (ha_recover(0)) + sql_print_error("Heuristic crash recovery failed"); + sql_print_information("Please restart mysqld without --tc-heuristic-recover"); + return 1; +} + +/****** transaction coordinator log for 2pc - binlog() based solution ******/ +#define TC_LOG_BINLOG MYSQL_LOG + +/* + TODO keep in-memory list of prepared transactions + (add to list in log(), remove on unlog()) + and copy it to the new binlog if rotated + but let's check the behaviour of tc_log_page_waits first! +*/ + +int TC_LOG_BINLOG::open(const char *opt_name) +{ + LOG_INFO log_info; + int error= 1; + + DBUG_ASSERT(total_ha_2pc > 1); + DBUG_ASSERT(opt_name && opt_name[0]); + + pthread_mutex_init(&LOCK_prep_xids, MY_MUTEX_INIT_FAST); + pthread_cond_init (&COND_prep_xids, 0); + + if (using_heuristic_recover()) + return 1; + + if ((error= find_log_pos(&log_info, NullS, 1))) + { + if (error != LOG_INFO_EOF) + sql_print_error("find_log_pos() failed (error: %d)", error); + else + error= 0; + goto err; + } + + { + const char *errmsg; + char last_event_type=UNKNOWN_EVENT; + IO_CACHE log; + File file; + Log_event *ev=0; + Format_description_log_event fdle(BINLOG_VERSION); + char log_name[FN_REFLEN]; + + if (! fdle.is_valid()) + goto err; + + for (error= 0; !error ;) + { + strnmov(log_name, log_info.log_file_name, sizeof(log_name)); + if ((error= find_next_log(&log_info, 1)) != LOG_INFO_EOF) + { + sql_print_error("find_log_pos() failed (error: %d)", error); + goto err; + } + } + + if ((file= open_binlog(&log, log_name, &errmsg)) < 0) + { + sql_print_error("%s", errmsg); + goto err; + } + + if ((ev= Log_event::read_log_event(&log, 0, &fdle)) && + ev->get_type_code() == FORMAT_DESCRIPTION_EVENT && + ev->flags & LOG_EVENT_BINLOG_IN_USE_F) + error= recover(&log, (Format_description_log_event *)ev); + else + error=0; + + delete ev; + end_io_cache(&log); + my_close(file, MYF(MY_WME)); + + if (error) + goto err; + } + +err: + return error; +} + +/* this is called on shutdown, after ha_panic */ +void TC_LOG_BINLOG::close() +{ + DBUG_ASSERT(prepared_xids==0); + pthread_mutex_destroy(&LOCK_prep_xids); + pthread_cond_destroy (&COND_prep_xids); +} + +/* + TODO group commit + + RETURN + 0 - error + 1 - success +*/ +int TC_LOG_BINLOG::log(THD *thd, my_xid xid) +{ + Xid_log_event xle(thd, xid); + if (xle.write((IO_CACHE*)thd->ha_data[binlog_hton.slot])) + return 0; + thread_safe_increment(prepared_xids, &LOCK_prep_xids); + return !binlog_commit(thd,1); // invert return value +} + +void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + if (thread_safe_dec_and_test(prepared_xids, &LOCK_prep_xids)) + pthread_cond_signal(&COND_prep_xids); +} + +int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) +{ + Log_event *ev; + HASH xids; + MEM_ROOT mem_root; + + if (! fdle->is_valid() || + hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0, + sizeof(my_xid), 0, 0, MYF(0))) + goto err1; + + init_alloc_root(&mem_root, tc_log_page_size, tc_log_page_size); + + fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error + + while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid()) + { + if (ev->get_type_code() == XID_EVENT) + { + Xid_log_event *xev=(Xid_log_event *)ev; + byte *x=memdup_root(&mem_root, (char *)& xev->xid, sizeof(xev->xid)); + if (! x) + goto err2; + my_hash_insert(&xids, x); + } + delete ev; + } + + if (ha_recover(&xids)) + goto err2; + + free_root(&mem_root, MYF(0)); + hash_free(&xids); + return 0; + +err2: + free_root(&mem_root, MYF(0)); + hash_free(&xids); +err1: + sql_print_error("Crash recovery failed. Either correct the problem " + "(if it's, for example, out of memory error) and restart, " + "or delete (or rename) binary log and start mysqld with " + "--tc-heuristic-recover={commit|rollback}"); + return 1; +} + |