diff options
Diffstat (limited to 'sql/semisync_master.cc')
-rw-r--r-- | sql/semisync_master.cc | 282 |
1 files changed, 141 insertions, 141 deletions
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc index 5b594d20c71..99d8b75ece8 100644 --- a/sql/semisync_master.cc +++ b/sql/semisync_master.cc @@ -74,29 +74,29 @@ static ulonglong timespec_to_usec(const struct timespec *ts) Active_tranx::Active_tranx(mysql_mutex_t *lock, ulong trace_level) - : Trace(trace_level), allocator_(max_connections), - num_entries_(max_connections << 1), /* Transaction hash table size + : Trace(trace_level), m_allocator(max_connections), + m_num_entries(max_connections << 1), /* Transaction hash table size * is set to double the size * of max_connections */ - lock_(lock) + m_lock(lock) { /* No transactions are in the list initially. */ - trx_front_ = NULL; - trx_rear_ = NULL; + m_trx_front = NULL; + m_trx_rear = NULL; /* Create the hash table to find a transaction's ending event. */ - trx_htb_ = new Tranx_node *[num_entries_]; - for (int idx = 0; idx < num_entries_; ++idx) - trx_htb_[idx] = NULL; + m_trx_htb = new Tranx_node *[m_num_entries]; + for (int idx = 0; idx < m_num_entries; ++idx) + m_trx_htb[idx] = NULL; sql_print_information("Semi-sync replication initialized for transactions."); } Active_tranx::~Active_tranx() { - delete [] trx_htb_; - trx_htb_ = NULL; - num_entries_ = 0; + delete [] m_trx_htb; + m_trx_htb = NULL; + m_num_entries = 0; } unsigned int Active_tranx::calc_hash(const unsigned char *key, @@ -121,7 +121,7 @@ unsigned int Active_tranx::get_hash_value(const char *log_file_name, unsigned int hash2 = calc_hash((const unsigned char *)(&log_file_pos), sizeof(log_file_pos)); - return (hash1 + hash2) % num_entries_; + return (hash1 + hash2) % m_num_entries; } int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1, @@ -148,7 +148,7 @@ int Active_tranx::insert_tranx_node(const char *log_file_name, DBUG_ENTER("Active_tranx:insert_tranx_node"); - ins_node = allocator_.allocate_node(); + ins_node = m_allocator.allocate_node(); if (!ins_node) { sql_print_error("%s: transaction node allocation failed for: (%s, %lu)", @@ -159,25 +159,25 @@ int Active_tranx::insert_tranx_node(const char *log_file_name, } /* insert the binlog position in the active transaction list. */ - strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1); - ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - ins_node->log_pos_ = log_file_pos; + strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1); + ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + ins_node->log_pos = log_file_pos; - if (!trx_front_) + if (!m_trx_front) { /* The list is empty. */ - trx_front_ = trx_rear_ = ins_node; + m_trx_front = m_trx_rear = ins_node; } else { - int cmp = compare(ins_node, trx_rear_); + int cmp = compare(ins_node, m_trx_rear); if (cmp > 0) { /* Compare with the tail first. If the transaction happens later in * binlog, then make it the new tail. */ - trx_rear_->next_ = ins_node; - trx_rear_ = ins_node; + m_trx_rear->next = ins_node; + m_trx_rear = ins_node; } else { @@ -186,20 +186,20 @@ int Active_tranx::insert_tranx_node(const char *log_file_name, */ sql_print_error("%s: binlog write out-of-order, tail (%s, %lu), " "new node (%s, %lu)", "Active_tranx:insert_tranx_node", - trx_rear_->log_name_, (ulong)trx_rear_->log_pos_, - ins_node->log_name_, (ulong)ins_node->log_pos_); + m_trx_rear->log_name, (ulong)m_trx_rear->log_pos, + ins_node->log_name, (ulong)ins_node->log_pos); result = -1; goto l_end; } } - hash_val = get_hash_value(ins_node->log_name_, ins_node->log_pos_); - ins_node->hash_next_ = trx_htb_[hash_val]; - trx_htb_[hash_val] = ins_node; + hash_val = get_hash_value(ins_node->log_name, ins_node->log_pos); + ins_node->hash_next = m_trx_htb[hash_val]; + m_trx_htb[hash_val] = ins_node; DBUG_PRINT("semisync", ("%s: insert (%s, %lu) in entry(%u)", "Active_tranx:insert_tranx_node", - ins_node->log_name_, (ulong)ins_node->log_pos_, + ins_node->log_name, (ulong)ins_node->log_pos, hash_val)); l_end: @@ -212,14 +212,14 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name, DBUG_ENTER("Active_tranx::is_tranx_end_pos"); unsigned int hash_val = get_hash_value(log_file_name, log_file_pos); - Tranx_node *entry = trx_htb_[hash_val]; + Tranx_node *entry = m_trx_htb[hash_val]; while (entry != NULL) { if (compare(entry, log_file_name, log_file_pos) == 0) break; - entry = entry->hash_next_; + entry = entry->hash_next; } DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)", @@ -238,13 +238,13 @@ int Active_tranx::clear_active_tranx_nodes(const char *log_file_name, if (log_file_name != NULL) { - new_front = trx_front_; + new_front = m_trx_front; while (new_front) { if (compare(new_front, log_file_name, log_file_pos) > 0) break; - new_front = new_front->next_; + new_front = new_front->next; } } else @@ -258,54 +258,54 @@ int Active_tranx::clear_active_tranx_nodes(const char *log_file_name, /* No active transaction nodes after the call. */ /* Clear the hash table. */ - memset(trx_htb_, 0, num_entries_ * sizeof(Tranx_node *)); - allocator_.free_all_nodes(); + memset(m_trx_htb, 0, m_num_entries * sizeof(Tranx_node *)); + m_allocator.free_all_nodes(); /* Clear the active transaction list. */ - if (trx_front_ != NULL) + if (m_trx_front != NULL) { - trx_front_ = NULL; - trx_rear_ = NULL; + m_trx_front = NULL; + m_trx_rear = NULL; } DBUG_PRINT("semisync", ("%s: cleared all nodes", "Active_tranx::::clear_active_tranx_nodes")); } - else if (new_front != trx_front_) + else if (new_front != m_trx_front) { Tranx_node *curr_node, *next_node; /* Delete all transaction nodes before the confirmation point. */ int n_frees = 0; - curr_node = trx_front_; + curr_node = m_trx_front; while (curr_node != new_front) { - next_node = curr_node->next_; + next_node = curr_node->next; n_frees++; /* Remove the node from the hash table. */ - unsigned int hash_val = get_hash_value(curr_node->log_name_, curr_node->log_pos_); - Tranx_node **hash_ptr = &(trx_htb_[hash_val]); + unsigned int hash_val = get_hash_value(curr_node->log_name, curr_node->log_pos); + Tranx_node **hash_ptr = &(m_trx_htb[hash_val]); while ((*hash_ptr) != NULL) { if ((*hash_ptr) == curr_node) { - (*hash_ptr) = curr_node->hash_next_; + (*hash_ptr) = curr_node->hash_next; break; } - hash_ptr = &((*hash_ptr)->hash_next_); + hash_ptr = &((*hash_ptr)->hash_next); } curr_node = next_node; } - trx_front_ = new_front; - allocator_.free_nodes_before(trx_front_); + m_trx_front = new_front; + m_allocator.free_nodes_before(m_trx_front); DBUG_PRINT("semisync", ("%s: cleared %d nodes back until pos (%s, %lu)", "Active_tranx::::clear_active_tranx_nodes", n_frees, - trx_front_->log_name_, (ulong)trx_front_->log_pos_)); + m_trx_front->log_name, (ulong)m_trx_front->log_pos)); } DBUG_RETURN(0); @@ -336,26 +336,26 @@ int Active_tranx::clear_active_tranx_nodes(const char *log_file_name, ******************************************************************************/ Repl_semi_sync_master::Repl_semi_sync_master() - : active_tranxs_(NULL), - init_done_(false), - reply_file_name_inited_(false), - reply_file_pos_(0L), - wait_file_name_inited_(false), - wait_file_pos_(0), - master_enabled_(false), - wait_timeout_(0L), - state_(0), - wait_point_(0) + : m_active_tranxs(NULL), + m_init_done(false), + m_reply_file_name_inited(false), + m_reply_file_pos(0L), + m_wait_file_name_inited(false), + m_wait_file_pos(0), + m_master_enabled(false), + m_wait_timeout(0L), + m_state(0), + m_wait_point(0) { - strcpy(reply_file_name_, ""); - strcpy(wait_file_name_, ""); + strcpy(m_reply_file_name, ""); + strcpy(m_wait_file_name, ""); } int Repl_semi_sync_master::init_object() { int result; - init_done_ = true; + m_init_done = true; /* References to the parameter works after set_options(). */ set_wait_timeout(rpl_semi_sync_master_timeout); @@ -398,15 +398,15 @@ int Repl_semi_sync_master::enable_master() if (!get_master_enabled()) { - active_tranxs_ = new Active_tranx(&LOCK_binlog, trace_level_); - if (active_tranxs_ != NULL) + m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level); + if (m_active_tranxs != NULL) { - commit_file_name_inited_ = false; - reply_file_name_inited_ = false; - wait_file_name_inited_ = false; + m_commit_file_name_inited = false; + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; set_master_enabled(true); - state_ = true; + m_state = true; sql_print_information("Semi-sync replication enabled on the master."); } else @@ -433,13 +433,13 @@ int Repl_semi_sync_master::disable_master() */ switch_off(); - assert(active_tranxs_ != NULL); - delete active_tranxs_; - active_tranxs_ = NULL; + assert(m_active_tranxs != NULL); + delete m_active_tranxs; + m_active_tranxs = NULL; - reply_file_name_inited_ = false; - wait_file_name_inited_ = false; - commit_file_name_inited_ = false; + m_reply_file_name_inited = false; + m_wait_file_name_inited = false; + m_commit_file_name_inited = false; set_master_enabled(false); sql_print_information("Semi-sync replication disabled on the master."); @@ -452,14 +452,14 @@ int Repl_semi_sync_master::disable_master() void Repl_semi_sync_master::cleanup() { - if (init_done_) + if (m_init_done) { mysql_mutex_destroy(&LOCK_binlog); mysql_cond_destroy(&COND_binlog_send); - init_done_= 0; + m_init_done= 0; } - delete active_tranxs_; + delete m_active_tranxs; } void Repl_semi_sync_master::lock() @@ -592,10 +592,10 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, * sync replication slaves. So, if any one of them get the transaction, * the transaction session in the primary can move forward. */ - if (reply_file_name_inited_) + if (m_reply_file_name_inited) { cmp = Active_tranx::compare(log_file_name, log_file_pos, - reply_file_name_, reply_file_pos_); + m_reply_file_name, m_reply_file_pos); /* If the requested position is behind the sending binlog position, * would not adjust sending binlog position. @@ -614,13 +614,13 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, if (need_copy_send_pos) { - strmake_buf(reply_file_name_, log_file_name); - reply_file_pos_ = log_file_pos; - reply_file_name_inited_ = true; + strmake_buf(m_reply_file_name, log_file_name); + m_reply_file_pos = log_file_pos; + m_reply_file_name_inited = true; /* Remove all active transaction nodes before this point. */ - assert(active_tranxs_ != NULL); - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos); + assert(m_active_tranxs != NULL); + m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos); DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)", "Repl_semi_sync_master::report_reply_binlog", @@ -632,15 +632,15 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id, /* Let us check if some of the waiting threads doing a trx * commit can now proceed. */ - cmp = Active_tranx::compare(reply_file_name_, reply_file_pos_, - wait_file_name_, wait_file_pos_); + cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, + m_wait_file_name, m_wait_file_pos); if (cmp >= 0) { /* Yes, at least one waiting thread can now proceed: * let us release all waiting threads with a broadcast */ can_release_threads = true; - wait_file_name_inited_ = false; + m_wait_file_name_inited = false; } } @@ -811,9 +811,9 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, while (is_on() && !thd_killed(current_thd)) { - if (reply_file_name_inited_) + if (m_reply_file_name_inited) { - int cmp = Active_tranx::compare(reply_file_name_, reply_file_pos_, + int cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos, trx_wait_binlog_name, trx_wait_binlog_pos); if (cmp >= 0) @@ -823,8 +823,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, */ DBUG_PRINT("semisync", ("%s: Binlog reply is ahead (%s, %lu),", "Repl_semi_sync_master::commit_trx", - reply_file_name_, - (ulong)reply_file_pos_)); + m_reply_file_name, + (ulong)m_reply_file_pos)); break; } } @@ -832,37 +832,37 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, /* Let us update the info about the minimum binlog position of waiting * threads. */ - if (wait_file_name_inited_) + if (m_wait_file_name_inited) { int cmp = Active_tranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos, - wait_file_name_, wait_file_pos_); + m_wait_file_name, m_wait_file_pos); if (cmp <= 0) { /* This thd has a lower position, let's update the minimum info. */ - strmake_buf(wait_file_name_, trx_wait_binlog_name); - wait_file_pos_ = trx_wait_binlog_pos; + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; rpl_semi_sync_master_wait_pos_backtraverse++; DBUG_PRINT("semisync", ("%s: move back wait position (%s, %lu),", "Repl_semi_sync_master::commit_trx", - wait_file_name_, (ulong)wait_file_pos_)); + m_wait_file_name, (ulong)m_wait_file_pos)); } } else { - strmake_buf(wait_file_name_, trx_wait_binlog_name); - wait_file_pos_ = trx_wait_binlog_pos; - wait_file_name_inited_ = true; + strmake_buf(m_wait_file_name, trx_wait_binlog_name); + m_wait_file_pos = trx_wait_binlog_pos; + m_wait_file_name_inited = true; DBUG_PRINT("semisync", ("%s: init wait position (%s, %lu),", "Repl_semi_sync_master::commit_trx", - wait_file_name_, (ulong)wait_file_pos_)); + m_wait_file_name, (ulong)m_wait_file_pos)); } /* Calcuate the waiting period. */ - long diff_secs = (long) (wait_timeout_ / TIME_THOUSAND); - long diff_nsecs = (long) ((wait_timeout_ % TIME_THOUSAND) * TIME_MILLION); + long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND); + long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION); long nsecs = start_ts.tv_nsec + diff_nsecs; abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION; abstime.tv_nsec = nsecs % TIME_BILLION; @@ -879,8 +879,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)", "Repl_semi_sync_master::commit_trx", - wait_timeout_, - wait_file_name_, (ulong)wait_file_pos_)); + m_wait_timeout, + m_wait_file_name, (ulong)m_wait_file_pos)); wait_result = cond_timewait(&abstime); rpl_semi_sync_master_wait_sessions--; @@ -891,7 +891,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), " "semi-sync up to file %s, position %lu.", trx_wait_binlog_name, (ulong)trx_wait_binlog_pos, - reply_file_name_, (ulong)reply_file_pos_); + m_reply_file_name, (ulong)m_reply_file_pos); rpl_semi_sync_master_wait_timeouts++; /* switch semi-sync off */ @@ -921,11 +921,11 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name, /* At this point, the binlog file and position of this transaction must have been removed from Active_tranx. - active_tranxs_ may be NULL if someone disabled semi sync during + m_active_tranxs may be NULL if someone disabled semi sync during cond_timewait() */ - assert(thd_killed(current_thd) || !active_tranxs_ || - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name, + assert(thd_killed(current_thd) || !m_active_tranxs || + !m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name, trx_wait_binlog_pos)); l_end: @@ -967,15 +967,15 @@ int Repl_semi_sync_master::switch_off() DBUG_ENTER("Repl_semi_sync_master::switch_off"); - state_ = false; + m_state = false; /* Clear the active transaction list. */ - assert(active_tranxs_ != NULL); - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0); + assert(m_active_tranxs != NULL); + result = m_active_tranxs->clear_active_tranx_nodes(NULL, 0); rpl_semi_sync_master_off_times++; - wait_file_name_inited_ = false; - reply_file_name_inited_ = false; + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; sql_print_information("Semi-sync replication switched OFF."); cond_broadcast(); /* wake up all waiting threads */ @@ -993,13 +993,13 @@ int Repl_semi_sync_master::try_switch_on(int server_id, /* If the current sending event's position is larger than or equal to the * 'largest' commit transaction binlog position, the slave is already * catching up now and we can switch semi-sync on here. - * If commit_file_name_inited_ indicates there are no recent transactions, + * If m_commit_file_name_inited indicates there are no recent transactions, * we can enable semi-sync immediately. */ - if (commit_file_name_inited_) + if (m_commit_file_name_inited) { int cmp = Active_tranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); + m_commit_file_name, m_commit_file_pos); semi_sync_on = (cmp >= 0); } else @@ -1010,7 +1010,7 @@ int Repl_semi_sync_master::try_switch_on(int server_id, if (semi_sync_on) { /* Switch semi-sync replication on. */ - state_ = true; + m_state = true; sql_print_information("Semi-sync replication switched ON with slave (server_id: %d) " "at (%s, %lu)", @@ -1066,10 +1066,10 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, /* semi-sync is ON */ sync = false; /* No sync unless a transaction is involved. */ - if (reply_file_name_inited_) + if (m_reply_file_name_inited) { cmp = Active_tranx::compare(log_file_name, log_file_pos, - reply_file_name_, reply_file_pos_); + m_reply_file_name, m_reply_file_pos); if (cmp <= 0) { /* If we have already got the reply for the event, then we do @@ -1079,10 +1079,10 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, } } - if (wait_file_name_inited_) + if (m_wait_file_name_inited) { cmp = Active_tranx::compare(log_file_name, log_file_pos, - wait_file_name_, wait_file_pos_); + m_wait_file_name, m_wait_file_pos); } else { @@ -1097,17 +1097,17 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet, /* * We only wait if the event is a transaction's ending event. */ - assert(active_tranxs_ != NULL); - sync = active_tranxs_->is_tranx_end_pos(log_file_name, + assert(m_active_tranxs != NULL); + sync = m_active_tranxs->is_tranx_end_pos(log_file_name, log_file_pos); } } else { - if (commit_file_name_inited_) + if (m_commit_file_name_inited) { int cmp = Active_tranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); + m_commit_file_name, m_commit_file_pos); sync = (cmp >= 0); } else @@ -1151,35 +1151,35 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name, /* Update the 'largest' transaction commit position seen so far even * though semi-sync is switched off. - * It is much better that we update commit_file_* here, instead of + * It is much better that we update m_commit_file* here, instead of * inside commit_trx(). This is mostly because update_sync_header() - * will watch for commit_file_* to decide whether to switch semi-sync + * will watch for m_commit_file* to decide whether to switch semi-sync * on. The detailed reason is explained in function update_sync_header(). */ - if (commit_file_name_inited_) + if (m_commit_file_name_inited) { int cmp = Active_tranx::compare(log_file_name, log_file_pos, - commit_file_name_, commit_file_pos_); + m_commit_file_name, m_commit_file_pos); if (cmp > 0) { /* This is a larger position, let's update the maximum info. */ - strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); - commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - commit_file_pos_ = log_file_pos; + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; } } else { - strncpy(commit_file_name_, log_file_name, FN_REFLEN-1); - commit_file_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */ - commit_file_pos_ = log_file_pos; - commit_file_name_inited_ = true; + strncpy(m_commit_file_name, log_file_name, FN_REFLEN-1); + m_commit_file_name[FN_REFLEN-1] = 0; /* make sure it ends properly */ + m_commit_file_pos = log_file_pos; + m_commit_file_name_inited = true; } if (is_on()) { - assert(active_tranxs_ != NULL); - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos)) + assert(m_active_tranxs != NULL); + if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos)) { /* if insert tranx_node failed, print a warning message @@ -1254,13 +1254,13 @@ int Repl_semi_sync_master::after_reset_master() if (rpl_semi_sync_master_clients == 0 && !rpl_semi_sync_master_wait_no_slave) - state_ = 0; + m_state = 0; else - state_ = get_master_enabled()? 1 : 0; + m_state = get_master_enabled()? 1 : 0; - wait_file_name_inited_ = false; - reply_file_name_inited_ = false; - commit_file_name_inited_ = false; + m_wait_file_name_inited = false; + m_reply_file_name_inited = false; + m_commit_file_name_inited = false; rpl_semi_sync_master_yes_transactions = 0; rpl_semi_sync_master_no_transactions = 0; @@ -1306,7 +1306,7 @@ void Repl_semi_sync_master::set_export_stats() { lock(); - rpl_semi_sync_master_status = state_; + rpl_semi_sync_master_status = m_state; rpl_semi_sync_master_avg_trx_wait_time= ((rpl_semi_sync_master_trx_wait_num) ? (ulong)((double)rpl_semi_sync_master_trx_wait_time / |