diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 474 |
1 files changed, 316 insertions, 158 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 0330c151fd2..88a80029bba 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -40,7 +40,8 @@ #include <my_dir.h> #include <sql_common.h> #include <errmsg.h> -#include <mysqld_error.h> +#include <ssl_compat.h> +#include "unireg.h" #include <mysys_err.h> #include "rpl_handler.h" #include <signal.h> @@ -60,7 +61,6 @@ #include "debug_sync.h" #include "rpl_parallel.h" - #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") #define MAX_SLAVE_RETRY_PAUSE 5 @@ -77,6 +77,7 @@ Master_info *active_mi= 0; Master_info_index *master_info_index; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; +ulonglong opt_read_binlog_speed_limit = 0; const char *relay_log_index= 0; const char *relay_log_basename= 0; @@ -217,7 +218,7 @@ static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql) void init_thread_mask(int* mask,Master_info* mi,bool inverse) { bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; - register int tmp_mask=0; + int tmp_mask=0; DBUG_ENTER("init_thread_mask"); if (set_io) @@ -296,11 +297,8 @@ handle_slave_background(void *arg __attribute__((unused))) bool stop; my_thread_init(); - thd= new THD; + thd= new THD(next_thread_id()); thd->thread_stack= (char*) &thd; /* Set approximate stack start */ - mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id= thread_id++; - mysql_mutex_unlock(&LOCK_thread_count); thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; thread_safe_increment32(&service_thread_count); thd->store_globals(); @@ -365,8 +363,8 @@ handle_slave_background(void *arg __attribute__((unused))) delete thd; thread_safe_decrement32(&service_thread_count); signal_thd_deleted(); - my_thread_end(); + my_thread_end(); return 0; } @@ -521,7 +519,7 @@ int init_slave() if (active_mi->host[0] && !opt_skip_slave_start) { int error; - THD *thd= new THD; + THD *thd= new THD(next_thread_id()); thd->thread_stack= (char*) &thd; thd->store_globals(); @@ -741,8 +739,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_PRINT("info",("Flushing relay-log info file.")); if (current_thd) THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file); - if (flush_relay_log_info(&mi->rli) || - my_sync(mi->rli.info_fd, MYF(MY_WME))) + if (mi->rli.flush() || my_sync(mi->rli.info_fd, MYF(MY_WME))) retval= ER_ERROR_DURING_FLUSH_LOGS; mysql_mutex_unlock(log_lock); @@ -1080,6 +1077,7 @@ void slave_prepare_for_shutdown() mysql_mutex_lock(&LOCK_active_mi); master_info_index->free_connections(); mysql_mutex_unlock(&LOCK_active_mi); + stop_slave_background_thread(); } /* @@ -1642,8 +1640,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { + mysql_mutex_lock(&mi->data_lock); mi->clock_diff_with_master= (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); + mysql_mutex_unlock(&mi->data_lock); } else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; @@ -1655,7 +1655,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) } else { + mysql_mutex_lock(&mi->data_lock); mi->clock_diff_with_master= 0; /* The "most sensible" value */ + mysql_mutex_unlock(&mi->data_lock); sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, " "do not trust column Seconds_Behind_Master of SHOW " "SLAVE STATUS. Error: %s (%d)", @@ -2806,6 +2808,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list, Item_empty_string(thd, "Parallel_Mode", sizeof("conservative")-1), mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Delay", 10, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Remaining_Delay", 8, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_SQL_Running_State", + 20)); if (full) { field_list->push_back(new (mem_root) @@ -2997,6 +3008,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, prot_store_ids(thd, &mi->ignore_server_ids); // Master_Server_id protocol->store((uint32) mi->master_id); + // SQL_Delay // Master_Ssl_Crl protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath @@ -3019,6 +3031,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(mode_name, strlen(mode_name), &my_charset_bin); } + protocol->store((uint32) mi->rli.get_sql_delay()); + // SQL_Remaining_Delay + // THD::proc_info is not protected by any lock, so we read it once + // to ensure that we use the same value throughout this function. + const char *slave_sql_running_state= + mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : ""; + if (slave_sql_running_state == Relay_log_info::state_delaying_string) + { + time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end(); + protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0)); + } + else + protocol->store_null(); + // Slave_SQL_Running_State + protocol->store(slave_sql_running_state, &my_charset_bin); + if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3143,13 +3171,10 @@ void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi) { DBUG_ENTER("set_slave_thread_default_charset"); - thd->variables.character_set_client= - global_system_variables.character_set_client; - thd->variables.collation_connection= - global_system_variables.collation_connection; thd->variables.collation_server= global_system_variables.collation_server; - thd->update_charset(); + thd->update_charset(global_system_variables.character_set_client, + global_system_variables.collation_connection); thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); DBUG_VOID_RETURN; @@ -3188,9 +3213,6 @@ static int init_slave_thread(THD* thd, Master_info *mi, thd->variables.sql_log_slow= opt_log_slow_slave_statements; thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); - mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; - mysql_mutex_unlock(&LOCK_thread_count); if (thd_type == SLAVE_THD_SQL) THD_STAGE_INFO(thd, stage_waiting_for_the_next_event_in_relay_log); @@ -3296,13 +3318,15 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, try a reconnect. We do not want to print anything to the error log in this case because this a anormal event in an idle server. + network_read_len get the real network read length in VIO, especially using compressed protocol RETURN VALUES 'packet_error' Error number Length of packet */ -static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) +static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, + ulong* network_read_len) { ulong len; DBUG_ENTER("read_event"); @@ -3317,7 +3341,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) DBUG_RETURN(packet_error); #endif - len = cli_safe_read(mysql); + len = cli_safe_read_reallen(mysql, network_read_len); if (len == packet_error || (long) len < 1) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) @@ -3392,6 +3416,73 @@ has_temporary_error(THD *thd) } +/** + If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock. + + Design note: this is the place to unlock rli->data_lock. The lock + must be held when reading delay info from rli, but it should not be + held while sleeping. + + @param ev Event that is about to be executed. + + @param thd The sql thread's THD object. + + @param rli The sql thread's Relay_log_info structure. + + @retval 0 If the delay timed out and the event shall be executed. + + @retval nonzero If the delay was interrupted and the event shall be skipped. +*/ +int +sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + long sql_delay= rli->get_sql_delay(); + + DBUG_ENTER("sql_delay_event"); + mysql_mutex_assert_owner(&rli->data_lock); + DBUG_ASSERT(!rli->belongs_to_client()); + + int type= ev->get_type_code(); + if (sql_delay && type != ROTATE_EVENT && + type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3) + { + // The time when we should execute the event. + time_t sql_delay_end= + ev->when + rli->mi->clock_diff_with_master + sql_delay; + // The current time. + time_t now= my_time(0); + // The time we will have to sleep before executing the event. + unsigned long nap_time= 0; + if (sql_delay_end > now) + nap_time= (ulong)(sql_delay_end - now); + + DBUG_PRINT("info", ("sql_delay= %lu " + "ev->when= %lu " + "rli->mi->clock_diff_with_master= %lu " + "now= %ld " + "sql_delay_end= %llu " + "nap_time= %ld", + sql_delay, (long)ev->when, + rli->mi->clock_diff_with_master, + (long)now, (ulonglong)sql_delay_end, (long)nap_time)); + + if (sql_delay_end > now) + { + DBUG_PRINT("info", ("delaying replication event %lu secs", + nap_time)); + rli->start_sql_delay(sql_delay_end); + mysql_mutex_unlock(&rli->data_lock); + DBUG_RETURN(slave_sleep(thd, nap_time, sql_slave_killed, rgi)); + } + } + + mysql_mutex_unlock(&rli->data_lock); + + DBUG_RETURN(0); +} + + /* First half of apply_event_and_update_pos(), see below. Setup some THD variables for applying the event. @@ -3441,12 +3532,6 @@ apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi) thd->variables.server_id = ev->server_id; thd->set_time(); // time the query thd->lex->current_select= 0; - if (!ev->when) - { - my_hrtime_t hrtime= my_hrtime(); - ev->when= hrtime_to_my_time(hrtime); - ev->when_sec_part= hrtime_sec_part(hrtime); - } thd->variables.option_bits= (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); @@ -3515,16 +3600,16 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, if (exec_res == 0) { int error= ev->update_pos(rgi); -#ifdef HAVE_valgrind - if (!rli->is_fake) -#endif + #ifndef DBUG_OFF + DBUG_PRINT("info", ("update_pos error = %d", error)); + if (!rli->belongs_to_client()) { - DBUG_PRINT("info", ("update_pos error = %d", error)); DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos, rli->group_relay_log_name)); DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos, rli->event_relay_log_name)); } +#endif /* The update should not fail, so print an error message and return an error code. @@ -3559,21 +3644,39 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, /** Applies the given event and advances the relay log position. - In essence, this function does: + This is needed by the sql thread to execute events from the binlog, + and by clients executing BINLOG statements. Conceptually, this + function does: @code ev->apply_event(rli); ev->update_pos(rli); @endcode - But it also does some maintainance, such as skipping events if - needed and reporting errors. + It also does the following maintainance: - If the @c skip flag is set, then it is tested whether the event - should be skipped, by looking at the slave_skip_counter and the - server id. The skip flag should be set when calling this from a - replication thread but not set when executing an explicit BINLOG - statement. + - Initializes the thread's server_id and time; and the event's + thread. + + - If !rli->belongs_to_client() (i.e., if it belongs to the slave + sql thread instead of being used for executing BINLOG + statements), it does the following things: (1) skips events if it + is needed according to the server id or slave_skip_counter; (2) + unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER + TO MASTER_DELAY=X'; (4) maintains the running state of the sql + thread (rli->thread_state). + + - Reports errors as needed. + + @param ev The event to apply. + + @param thd The client thread that executes the event (i.e., the + slave sql thread if called from a replication slave, or the client + thread if called to execute a BINLOG statement). + + @param rli The relay log info (i.e., the slave's rli if called from + a replication slave, or the client's thd->rli_fake if called to + execute a BINLOG statement). @retval 0 OK. @@ -3596,7 +3699,16 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi) DBUG_ASSERT(rli->slave_skip_counter > 0); rli->slave_skip_counter--; } - mysql_mutex_unlock(&rli->data_lock); + + if (reason == Log_event::EVENT_SKIP_NOT) + { + // Sleeps if needed, and unlocks rli->data_lock. + if (sql_delay_event(ev, thd, rgi)) + return 0; + } + else + mysql_mutex_unlock(&rli->data_lock); + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); } @@ -3620,6 +3732,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, driver thread, so 23 should never see EVENT_SKIP_COUNT here. */ DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT); + /* + Calling sql_delay_event() was handled in the SQL driver thread when + doing parallel replication. + */ return apply_event_and_update_pos_apply(ev, thd, rgi, reason); } @@ -3667,7 +3783,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) } /* Check for an event that starts or stops a transaction */ - if (typ == QUERY_EVENT) + if (LOG_EVENT_IS_QUERY(typ)) { Query_log_event *qev= (Query_log_event*) ev; /* @@ -3699,7 +3815,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) /** - Top-level function for executing the next event from the relay log. + Top-level function for executing the next event in the relay log. + This is called from the SQL thread. This function reads the event from the relay log, executes it, and advances the relay log position. It also handles errors, etc. @@ -3806,7 +3923,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", if ((typ == XID_EVENT) || - ((typ == QUERY_EVENT) && + (LOG_EVENT_IS_QUERY(typ) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); @@ -3837,6 +3954,19 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, This is the case for pre-10.0 events without GTID, and for handling slave_skip_counter. */ + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + { + /* + Ignore FD's timestamp as it does not reflect the slave execution + state but likely to reflect a deep past. Consequently when the first + data modification event execution last long all this time + Seconds_Behind_Master is zero. + */ + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; + + DBUG_ASSERT(rli->last_master_timestamp >= 0); + } } if (typ == GTID_EVENT) @@ -4128,7 +4258,7 @@ pthread_handler_t handle_slave_io(void *arg) mysql= NULL ; retry_count= 0; - thd= new THD; // note that contructor of THD uses DBUG_ ! + thd= new THD(next_thread_id()); // note that contructor of THD uses DBUG_ ! mysql_mutex_lock(&mi->run_lock); /* Inform waiting threads that slave has started */ @@ -4151,9 +4281,7 @@ pthread_handler_t handle_slave_io(void *arg) goto err_during_init; } thd->system_thread_info.rpl_io_info= &io_info; - mysql_mutex_lock(&LOCK_thread_count); - threads.append(thd); - mysql_mutex_unlock(&LOCK_thread_count); + add_to_active_threads(thd); mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT; mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); @@ -4254,8 +4382,10 @@ connected: };); #endif - // TODO: the assignment below should be under mutex (5.0) + mysql_mutex_lock(&mi->run_lock); mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; + mysql_mutex_unlock(&mi->run_lock); + thd->slave_net = &mysql->net; THD_STAGE_INFO(thd, stage_checking_master_version); ret= get_master_version_and_clock(mysql, mi); @@ -4302,6 +4432,7 @@ connected: } DBUG_PRINT("info",("Starting reading binary log from master")); + thd->set_command(COM_SLAVE_IO); while (!io_slave_killed(mi)) { THD_STAGE_INFO(thd, stage_requesting_binlog_dump); @@ -4319,9 +4450,11 @@ connected: mi->slave_running= MYSQL_SLAVE_RUN_READING; DBUG_ASSERT(mi->last_error().number == 0); + ulonglong lastchecktime = my_hrtime().val; + ulonglong tokenamount = opt_read_binlog_speed_limit*1024; while (!io_slave_killed(mi)) { - ulong event_len; + ulong event_len, network_read_len = 0; /* We say "waiting" because read_event() will wait if there's nothing to read. But if there's something to read, it will not wait. The @@ -4329,7 +4462,7 @@ connected: we're in fact receiving nothing. */ THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); - event_len= read_event(mysql, mi, &suppress_warnings); + event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len); if (check_io_slave_killed(mi, NullS)) goto err; @@ -4377,6 +4510,47 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } + /* Control the binlog read speed of master + when read_binlog_speed_limit is non-zero + */ + ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024; + if (speed_limit_in_bytes) + { + /* Prevent the tokenamount become a large value, + for example, the IO thread doesn't work for a long time + */ + if (tokenamount > speed_limit_in_bytes * 2) + { + lastchecktime = my_hrtime().val; + tokenamount = speed_limit_in_bytes * 2; + } + + do + { + ulonglong currenttime = my_hrtime().val; + tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000*1000); + lastchecktime = currenttime; + if(tokenamount < network_read_len) + { + ulonglong duration =1000ULL*1000 * (network_read_len - tokenamount) / speed_limit_in_bytes; + time_t second_time = (time_t)(duration / (1000 * 1000)); + uint micro_time = duration % (1000 * 1000); + + // at least sleep 1000 micro second + my_sleep(MY_MAX(micro_time,1000)); + + /* + If it sleep more than one second, + it should use slave_sleep() to avoid the STOP SLAVE hang. + */ + if (second_time) + slave_sleep(thd, second_time, io_slave_killed, mi); + + } + }while(tokenamount < network_read_len); + tokenamount -= network_read_len; + } + /* XXX: 'synced' should be updated by queue_event to indicate whether event has been synced to disk */ bool synced= 0; @@ -4476,6 +4650,7 @@ err: flush_master_info(mi, TRUE, TRUE); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); + unlink_not_visible_thd(thd); mysql_mutex_lock(&mi->run_lock); err_during_init: @@ -4506,9 +4681,7 @@ err_during_init: DBUG_LEAVE; // Must match DBUG_ENTER() my_thread_end(); -#ifdef HAVE_OPENSSL ERR_remove_state(0); -#endif pthread_exit(0); return 0; // Avoid compiler warnings } @@ -4535,7 +4708,8 @@ int check_temp_dir(char* tmp_file) size_t tmp_dir_size; DBUG_ENTER("check_temp_dir"); - mysql_mutex_lock(&LOCK_thread_count); + /* This look is safe to use as this function is only called once */ + mysql_mutex_lock(&LOCK_start_thread); if (check_temp_dir_run) { if ((result= check_temp_dir_result)) @@ -4574,7 +4748,7 @@ int check_temp_dir(char* tmp_file) mysql_file_delete(key_file_misc, tmp_file, MYF(0)); end: - mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&LOCK_start_thread); DBUG_RETURN(result); } @@ -4700,7 +4874,7 @@ pthread_handler_t handle_slave_sql(void *arg) #endif serial_rgi= new rpl_group_info(rli); - thd = new THD; // note that contructor of THD uses DBUG_ ! + thd = new THD(next_thread_id()); // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is thd->system_thread_info.rpl_sql_info= &sql_info; @@ -4762,9 +4936,7 @@ pthread_handler_t handle_slave_sql(void *arg) applied. In all other cases it must be FALSE. */ thd->variables.binlog_annotate_row_events= 0; - mysql_mutex_lock(&LOCK_thread_count); - threads.append(thd); - mysql_mutex_unlock(&LOCK_thread_count); + add_to_active_threads(thd); /* We are going to set slave_running to 1. Assuming slave I/O thread is alive and connected, this is going to make Seconds_Behind_Master be 0 @@ -4948,6 +5120,7 @@ pthread_handler_t handle_slave_sql(void *arg) /* Read queries from the IO/THREAD until this thread is killed */ + thd->set_command(COM_SLAVE_SQL); while (!sql_slave_killed(serial_rgi)) { THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log); @@ -5042,11 +5215,11 @@ pthread_handler_t handle_slave_sql(void *arg) my_bool save_log_all_errors= thd->log_all_errors; /* - We don't need to check return value for flush_relay_log_info() + We don't need to check return value for rli->flush() as any errors should be logged to stderr */ thd->log_all_errors= 1; - flush_relay_log_info(rli); + rli->flush(); thd->log_all_errors= save_log_all_errors; if (mi->using_parallel()) { @@ -5090,7 +5263,9 @@ pthread_handler_t handle_slave_sql(void *arg) } THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); + unlink_not_visible_thd(thd); mysql_mutex_lock(&rli->run_lock); + err_during_init: /* We need data_lock, at least to wake up any waiting master_pos_wait() */ mysql_mutex_lock(&rli->data_lock); @@ -5112,14 +5287,13 @@ err_during_init: /* TODO: see if we can do this conditionally in next_event() instead to avoid unneeded position re-init + + We only reset THD::temporary_tables to 0 here and not free it, as this + could be used by slave through Relay_log_info::save_temporary_tables. */ - thd->temporary_tables = 0; // remove tempation from destructor to close them - THD_CHECK_SENTRY(thd); + thd->temporary_tables= 0; rli->sql_driver_thd= 0; - mysql_mutex_lock(&LOCK_thread_count); thd->rgi_fake= thd->rgi_slave= NULL; - delete serial_rgi; - mysql_mutex_unlock(&LOCK_thread_count); #ifdef WITH_WSREP /* @@ -5148,28 +5322,29 @@ err_during_init: #endif /* WITH_WSREP */ /* - Note: the order of the broadcast and unlock calls below (first broadcast, then unlock) - is important. Otherwise a killer_thread can execute between the calls and - delete the mi structure leading to a crash! (see BUG#25306 for details) - */ + Note: the order of the broadcast and unlock calls below (first + broadcast, then unlock) is important. Otherwise a killer_thread can + execute between the calls and delete the mi structure leading to a + crash! (see BUG#25306 for details) + */ mysql_cond_broadcast(&rli->stop_cond); DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5);); mysql_mutex_unlock(&rli->run_lock); // tell the world we are done rpl_parallel_resize_pool_if_no_slaves(); + /* TODO: Check if this lock is needed */ mysql_mutex_lock(&LOCK_thread_count); - thd->unlink(); + delete serial_rgi; mysql_mutex_unlock(&LOCK_thread_count); + delete thd; thread_safe_decrement32(&service_thread_count); signal_thd_deleted(); DBUG_LEAVE; // Must match DBUG_ENTER() my_thread_end(); -#ifdef HAVE_OPENSSL ERR_remove_state(0); -#endif pthread_exit(0); return 0; // Avoid compiler warnings } @@ -5584,9 +5759,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) bool gtid_skip_enqueue= false; bool got_gtid_event= false; rpl_gtid event_gtid; -#ifndef DBUG_OFF - static uint dbug_rows_event_count= 0; -#endif + static uint dbug_rows_event_count __attribute__((unused))= 0; + bool is_compress_event = false; + char* new_buf = NULL; + char new_buf_arr[4096]; + bool is_malloc = false; + /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -5632,7 +5810,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { char *debug_event_buf_c = (char*) buf; int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); @@ -5935,9 +6113,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) TODO: handling `when' for SHOW SLAVE STATUS' snds behind */ - if (memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) - || mi->master_log_pos > hb.log_pos) - { + if (memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) || + mi->master_log_pos > hb.log_pos) { /* missed events of heartbeat from the past */ error= ER_SLAVE_HEARTBEAT_FAILURE; error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); @@ -6085,6 +6262,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) inc_pos= event_len; } break; + /* + Binlog compressed event should uncompress in IO thread + */ + case QUERY_COMPRESSED_EVENT: + inc_pos= event_len; + if (query_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + buf = new_buf; + is_compress_event = true; + goto default_action; + + case WRITE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + inc_pos = event_len; + { + if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, + checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, event_len, new_buf_arr, sizeof(new_buf_arr), + &is_malloc, (char **)&new_buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + } + buf = new_buf; + is_compress_event = true; + goto default_action; #ifndef DBUG_OFF case XID_EVENT: @@ -6102,7 +6324,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_after_2_events", { if (mi->dbug_do_disconnect && - (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) || + (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) || ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) && (--mi->dbug_event_counter == 0)) { @@ -6115,7 +6337,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_before_commit", { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg))) { @@ -6135,7 +6357,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ++mi->events_queued_since_last_gtid; } - inc_pos= event_len; + if (!is_compress_event) + inc_pos= event_len; + break; } @@ -6226,8 +6450,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* everything is filtered out from non-master */ (s_id != mi->master_id || /* for the master meta information is necessary */ - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || /* Check whether it needs to be filtered based on domain_id @@ -6256,9 +6480,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ if (!(s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -6299,7 +6523,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) buf[EVENT_TYPE_OFFSET])) || (!mi->last_queued_gtid_standalone && ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg)))))) { @@ -6329,6 +6553,9 @@ err: mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); + if(is_malloc) + my_free((void *)new_buf); + DBUG_RETURN(error); } @@ -6646,75 +6873,6 @@ MYSQL *rpl_connect_master(MYSQL *mysql) } #endif -/* - Store the file and position where the execute-slave thread are in the - relay log. - - SYNOPSIS - flush_relay_log_info() - rli Relay log information - - NOTES - - As this is only called by the slave thread or on STOP SLAVE, with the - log_lock grabbed and the slave thread stopped, we don't need to have - a lock here. - - If there is an active transaction, then we don't update the position - in the relay log. This is to ensure that we re-execute statements - if we die in the middle of an transaction that was rolled back. - - As a transaction never spans binary logs, we don't have to handle the - case where we do a relay-log-rotation in the middle of the transaction. - If this would not be the case, we would have to ensure that we - don't delete the relay log file where the transaction started when - we switch to a new relay log file. - - TODO - - Change the log file information to a binary format to avoid calling - longlong2str. - - RETURN VALUES - 0 ok - 1 write error -*/ - -bool flush_relay_log_info(Relay_log_info* rli) -{ - bool error=0; - DBUG_ENTER("flush_relay_log_info"); - - if (unlikely(rli->no_storage)) - DBUG_RETURN(0); - - IO_CACHE *file = &rli->info_file; - char buff[FN_REFLEN*2+22*2+4], *pos; - - my_b_seek(file, 0L); - pos=strmov(buff, rli->group_relay_log_name); - *pos++='\n'; - pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10); - *pos++='\n'; - pos=strmov(pos, rli->group_master_log_name); - *pos++='\n'; - pos=longlong10_to_str(rli->group_master_log_pos, pos, 10); - *pos='\n'; - if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1)) - error=1; - if (flush_io_cache(file)) - error=1; - if (sync_relayloginfo_period && - !error && - ++(rli->sync_counter) >= sync_relayloginfo_period) - { - if (my_sync(rli->info_fd, MYF(MY_WME))) - error=1; - rli->sync_counter= 0; - } - /* - Flushing the relay log is done by the slave I/O thread - or by the user on STOP SLAVE. - */ - DBUG_RETURN(error); -} - /* Called when we notice that the current "hot" log got rotated under our feet. @@ -7074,7 +7232,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) } rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); - if (flush_relay_log_info(rli)) + if (rli->flush()) { errmsg= "error flushing relay log"; goto err; |