summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-05-15 15:52:08 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-05-15 15:52:08 +0200
commit787c470cef54574e744eb5dfd9153d837fe67e45 (patch)
tree1168c4a1f2bd4371f56e7b231da7f0f18dcdb1f9 /sql/rpl_parallel.cc
parentd60915692cd02cc70b7eb8245c9ac6eab5df3d0c (diff)
downloadmariadb-git-787c470cef54574e744eb5dfd9153d837fe67e45.tar.gz
MDEV-5262: Missing retry after temp error in parallel replication
Handle retry of event groups that span multiple relay log files. - If retry reaches the end of one relay log file, move on to the next. - Handle refcounting of relay log files, and avoid purging relay log files until all event groups have completed that might have needed them for transaction retry.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc154
1 files changed, 111 insertions, 43 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 0b35e3c9fdc..67d61b7cf11 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -204,20 +204,14 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
}
#endif
-static int
-retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
-{
- /* ToDo */
- return 0;
-}
-
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
{
IO_CACHE rlog;
- File fd;
+ LOG_INFO linfo;
+ File fd= (File)-1;
const char *errmsg= NULL;
inuse_relaylog *ir= rgi->relay_log;
uint64 event_count;
@@ -241,7 +235,10 @@ do_retry:
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
- return 1;
+ {
+ err= 1;
+ goto err;
+ }
cur_offset= rgi->retry_start_offset;
my_b_seek(&rlog, cur_offset);
@@ -249,43 +246,85 @@ do_retry:
{
Log_event_type event_type;
Log_event *ev;
+ rpl_parallel_thread::queued_event *qev;
- old_offset= cur_offset;
- ev= Log_event::read_log_event(&rlog, 0,
- rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
- opt_slave_sql_verify_checksum);
- cur_offset= my_b_tell(&rlog);
-
- if (!ev)
- {
- err= 1;
- goto err;
- }
- ev->thd= thd;
- event_type= ev->get_type_code();
- if (Log_event::is_group_event(event_type))
+ /* The loop is here so we can try again the next relay log file on EOF. */
+ for (;;)
{
- rpl_parallel_thread::queued_event *qev;
+ old_offset= cur_offset;
+ ev= Log_event::read_log_event(&rlog, 0,
+ rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ opt_slave_sql_verify_checksum);
+ cur_offset= my_b_tell(&rlog);
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
- cur_offset - old_offset);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- if (!qev)
+ if (ev)
+ break;
+ if (rlog.error < 0)
{
- delete ev;
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ errmsg= "slave SQL thread aborted because of I/O error";
err= 1;
goto err;
}
- err= rpt_handle_event(qev, rpt);
- ++event_count;
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- rpt->free_qev(qev);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (rlog.error > 0)
+ {
+ sql_print_error("Slave SQL thread: I/O error reading "
+ "event(errno: %d cur_log->error: %d)",
+ my_errno, rlog.error);
+ errmsg= "Aborting slave SQL thread because of partial event read";
+ err= 1;
+ goto err;
+ }
+ /* EOF. Move to the next relay log. */
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+
+ /* Find the next relay log file. */
+ if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
+ (err= rli->relay_log.find_next_log(&linfo, 1)))
+ {
+ char buff[22];
+ sql_print_error("next log error: %d offset: %s log: %s",
+ err,
+ llstr(linfo.index_file_offset, buff),
+ log_name);
+ goto err;
+ }
+ strmake_buf(log_name ,linfo.log_file_name);
+
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ /* Loop to try again on the new log file. */
}
- else
- err= retry_handle_relay_log_rotate(ev, &rlog);
+
+ event_type= ev->get_type_code();
+ if (!Log_event::is_group_event(event_type))
+ {
+ delete ev;
+ continue;
+ }
+ ev->thd= thd;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ cur_offset - old_offset);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (!qev)
+ {
+ delete ev;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto err;
+ }
+ err= rpt_handle_event(qev, rpt);
+ ++event_count;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_qev(qev);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
delete_or_keep_event_post_apply(rgi, event_type, ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
@@ -300,6 +339,7 @@ do_retry:
{
end_io_cache(&rlog);
mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
goto do_retry;
}
sql_print_error("Slave worker thread retried transaction %lu time(s) "
@@ -309,15 +349,17 @@ do_retry:
}
goto err;
}
-
- // ToDo: handle too many retries.
-
} while (event_count < events_to_execute);
err:
- end_io_cache(&rlog);
- mysql_file_close(fd, MYF(MY_WME));
+ if (fd >= 0)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ }
+ if (errmsg)
+ sql_print_error("Error reading relay log event: %s", errmsg);
return err;
}
@@ -340,6 +382,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
+ inuse_relaylog *last_ir;
+ uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -683,12 +727,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
+ last_ir= NULL;
+ accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ inuse_relaylog *ir= qevs_to_free->ir;
+ /* Batch up refcount update to reduce use of synchronised operations. */
+ if (last_ir != ir)
+ {
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ accumulated_ir_count= 0;
+ }
+ last_ir= ir;
+ }
+ ++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&rli->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&rli->inuse_relaylog_atomic_lock);
+ }
if ((events= rpt->event_queue) != NULL)
{
@@ -1711,6 +1777,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
+ qev->ir= rli->last_inuse_relaylog;
+ ++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);