summaryrefslogtreecommitdiff
path: root/sql/wsrep_thd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r--sql/wsrep_thd.cc115
1 files changed, 36 insertions, 79 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 9d70875c027..7f1818def73 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -136,99 +136,60 @@ void wsrep_create_appliers(long threads)
}
}
-static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
+static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
{
- WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
- /* Set thd->event_scheduler.data temporarily to NULL to avoid
- callbacks to threadpool wait_begin() during rollback. */
- auto saved_esd= thd->event_scheduler.data;
- thd->event_scheduler.data= 0;
- if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
- {
- DBUG_ASSERT(!saved_esd);
- DBUG_ASSERT(thd->wsrep_applier_service);
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service().
- release_high_priority_service(thd->wsrep_applier_service);
- }
- else
- {
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command(true);
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- wsrep_reset_threadvars(thd);
- /* Assign saved event_scheduler.data back before letting
- client to continue. */
- thd->event_scheduler.data= saved_esd;
- thd->wsrep_cs().sync_rollback_complete();
- }
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service()
+ .release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
}
-static void wsrep_rollback_high_priority(THD *thd)
+static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
{
- WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
- thd->thread_id, (long long)thd->real_id);
+ WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
+ thd->thread_id, thd->real_id);
+ char* orig_thread_stack= thd->thread_stack;
+ thd->thread_stack= rollbacker->thread_stack;
DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
/* Must be streaming and must have been removed from the
server state streaming appliers map. */
DBUG_ASSERT(thd->wsrep_trx().is_streaming());
DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
- thd->wsrep_trx().server_id(),
- thd->wsrep_trx().id()));
+ thd->wsrep_trx().server_id(),
+ thd->wsrep_trx().id()));
DBUG_ASSERT(thd->wsrep_applier_service);
/* Fragment removal should happen before rollback to make
the transaction non-observable in SR table after the rollback
completes. For correctness the order does not matter here,
but currently it is mandated by checks in some MTR tests. */
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().storage_service(
- *thd->wsrep_applier_service));
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
- wsrep_store_threadvars(thd);
+ wsrep_remove_streaming_fragments(thd, "high priority");
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::ws_meta());
thd->wsrep_applier_service->after_apply();
+ thd->thread_stack= orig_thread_stack;
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
+ thd->thread_id, thd->real_id);
/* Will free THD */
Wsrep_server_state::instance().server_service()
.release_high_priority_service(thd->wsrep_applier_service);
}
-static void wsrep_rollback_local(THD *thd)
+static void wsrep_rollback_local(THD *thd, THD *rollbacker)
{
- WSREP_INFO("Wsrep_rollback_local");
+ WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
+ thd->thread_id, thd->real_id);
+ char* orig_thread_stack= thd->thread_stack;
+ thd->thread_stack= rollbacker->thread_stack;
if (thd->wsrep_trx().is_streaming())
{
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().
- storage_service(thd->wsrep_cs().client_service()));
-
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().
- release_storage_service(storage_service);
- wsrep_store_threadvars(thd);
+ wsrep_remove_streaming_fragments(thd, "local");
}
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
@@ -247,9 +208,10 @@ static void wsrep_rollback_local(THD *thd)
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
+ thd->thread_stack= orig_thread_stack;
thd->wsrep_cs().sync_rollback_complete();
- WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
- thd->thread_id, (long long)thd->real_id);
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
+ thd->thread_id, thd->real_id);
}
static void wsrep_rollback_process(THD *rollbacker,
@@ -286,18 +248,13 @@ static void wsrep_rollback_process(THD *rollbacker,
/* Rollback methods below may free thd pointer. Do not try
to access it after method returns. */
- if (thd->wsrep_trx().is_streaming() &&
- thd->wsrep_trx().bf_aborted_in_total_order())
- {
- wsrep_rollback_streaming_aborted_by_toi(thd);
- }
- else if (wsrep_thd_is_applying(thd))
+ if (wsrep_thd_is_applying(thd))
{
- wsrep_rollback_high_priority(thd);
+ wsrep_rollback_high_priority(thd, rollbacker);
}
else
{
- wsrep_rollback_local(thd);
+ wsrep_rollback_local(thd, rollbacker);
}
wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");
@@ -345,7 +302,7 @@ void wsrep_fire_rollbacker(THD *thd)
}
-int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal)
{
DBUG_ENTER("wsrep_abort_thd");
THD *victim_thd= (THD *) victim_thd_ptr;
@@ -373,7 +330,7 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
bool wsrep_bf_abort(const THD* bf_thd, THD* victim_thd)
{
- WSREP_LOG_THD((THD*)bf_thd, "BF aborter before");
+ WSREP_LOG_THD(bf_thd, "BF aborter before");
WSREP_LOG_THD(victim_thd, "victim before");
wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno());