summaryrefslogtreecommitdiff
path: root/sql/wsrep_server_service.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_server_service.cc')
-rw-r--r--sql/wsrep_server_service.cc51
1 files changed, 41 insertions, 10 deletions
diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc
index 42856862db3..bfb85e3d0ab 100644
--- a/sql/wsrep_server_service.cc
+++ b/sql/wsrep_server_service.cc
@@ -26,6 +26,7 @@
#include "wsrep_mysqld.h"
#include "wsrep_schema.h"
#include "wsrep_utils.h"
+#include "wsrep_thd.h"
#include "log.h" /* sql_print_xxx() */
#include "sql_class.h" /* system variables */
@@ -50,6 +51,10 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, cs.m_thd->thread_stack);
WSREP_DEBUG("Created storage service with thread id %llu",
thd->thread_id);
+ /* Use variables from the current thd attached to client_service.
+ This is because we need to be able to BF abort storage access
+ operations. */
+ wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
@@ -62,6 +67,7 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, hps.m_thd->thread_stack);
WSREP_DEBUG("Created high priority storage service with thread id %llu",
thd->thread_id);
+ wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
@@ -71,21 +77,48 @@ void Wsrep_server_service::release_storage_service(
Wsrep_storage_service* ss=
static_cast<Wsrep_storage_service*>(storage_service);
THD* thd= ss->m_thd;
+ wsrep_reset_threadvars(thd);
delete ss;
delete thd;
}
+Wsrep_applier_service*
+wsrep_create_streaming_applier(THD *orig_thd, const char *ctx)
+{
+ /* Reset variables to allow creating new variables in thread local
+ storage for new THD if needed. Note that reset must be done for
+ current_thd, as orig_thd may not be in effect. This may be the case when
+ streaming transaction is BF aborted and streaming applier
+ is created from BF aborter context. */
+ Wsrep_threadvars saved_threadvars(wsrep_save_threadvars());
+ wsrep_reset_threadvars(saved_threadvars.cur_thd);
+ THD *thd= 0;
+ Wsrep_applier_service *ret= 0;
+ if (!wsrep_create_threadvars() &&
+ (thd= new THD(next_thread_id(), true)))
+ {
+ init_service_thd(thd, orig_thd->thread_stack);
+ wsrep_assign_from_threadvars(thd);
+ WSREP_DEBUG("Created streaming applier service in %s context with "
+ "thread id %llu", ctx, thd->thread_id);
+ if (!(ret= new (std::nothrow) Wsrep_applier_service(thd)))
+ {
+ delete thd;
+ }
+ }
+ /* Restore original thread local storage state before returning. */
+ wsrep_restore_threadvars(saved_threadvars);
+ wsrep_store_threadvars(saved_threadvars.cur_thd);
+ return ret;
+}
+
wsrep::high_priority_service*
Wsrep_server_service::streaming_applier_service(
wsrep::client_service& orig_client_service)
{
Wsrep_client_service& orig_cs=
static_cast<Wsrep_client_service&>(orig_client_service);
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, orig_cs.m_thd->thread_stack);
- WSREP_DEBUG("Created streaming applier service in local context with "
- "thread id %llu", thd->thread_id);
- return new Wsrep_applier_service(thd);
+ return wsrep_create_streaming_applier(orig_cs.m_thd, "local");
}
wsrep::high_priority_service*
@@ -94,11 +127,7 @@ Wsrep_server_service::streaming_applier_service(
{
Wsrep_high_priority_service&
orig_hps(static_cast<Wsrep_high_priority_service&>(orig_high_priority_service));
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, orig_hps.m_thd->thread_stack);
- WSREP_DEBUG("Created streaming applier service in high priority "
- "context with thread id %llu", thd->thread_id);
- return new Wsrep_applier_service(thd);
+ return wsrep_create_streaming_applier(orig_hps.m_thd, "high priority");
}
void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_service* high_priority_service)
@@ -107,7 +136,9 @@ void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_se
static_cast<Wsrep_high_priority_service*>(high_priority_service);
THD* thd= hps->m_thd;
delete hps;
+ wsrep_store_threadvars(thd);
delete thd;
+ wsrep_delete_threadvars();
}
void Wsrep_server_service::background_rollback(wsrep::client_state& client_state)