diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 6 | ||||
-rw-r--r-- | sql/event_data_objects.cc | 324 | ||||
-rw-r--r-- | sql/event_queue.cc | 40 | ||||
-rw-r--r-- | sql/event_queue.h | 4 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 462 | ||||
-rw-r--r-- | sql/event_scheduler.h | 2 |
6 files changed, 420 insertions, 418 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index 3518903d149..d0512f3d9de 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -65,8 +65,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \ parse_file.h sql_view.h sql_trigger.h \ sql_array.h sql_cursor.h events.h \ + event_db_repository.h event_queue.h \ sql_plugin.h authors.h sql_partition.h event_data_objects.h \ - event_queue.h event_db_repository.h \ partition_info.h partition_element.h event_scheduler.h \ contributors.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ @@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ gstream.cc spatial.cc sql_help.cc sql_cursor.cc \ tztime.cc my_time.c my_user.c my_decimal.cc\ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ - sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\ - events.cc event_data_objects.cc \ + sp_cache.cc parse_file.cc sql_trigger.cc \ + event_scheduler.cc events.cc event_data_objects.cc \ event_queue.cc event_db_repository.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index 8d990def32d..5f879c6bea0 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -24,65 +24,12 @@ #define EVEX_MAX_INTERVAL_VALUE 1000000000L - -/* - Switches the security context - SYNOPSIS - event_change_security_context() - thd Thread - user The user - host The host of the user - db The schema for which the security_ctx will be loaded - backup Where to store the old context - - RETURN VALUE - FALSE OK - TRUE Error (generates error too) -*/ - static bool event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, - LEX_STRING db, Security_context *backup) -{ - DBUG_ENTER("event_change_security_context"); - DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str)); -#ifndef NO_EMBEDDED_ACCESS_CHECKS - - *backup= thd->main_security_ctx; - if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str, - host.str, db.str)) - { - my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str); - DBUG_RETURN(TRUE); - } - thd->security_ctx= &thd->main_security_ctx; -#endif - DBUG_RETURN(FALSE); -} - - -/* - Restores the security context - SYNOPSIS - event_restore_security_context() - thd Thread - backup Context to switch to -*/ + LEX_STRING db, Security_context *backup); static void -event_restore_security_context(THD *thd, Security_context *backup) -{ - DBUG_ENTER("event_restore_security_context"); -#ifndef NO_EMBEDDED_ACCESS_CHECKS - if (backup) - { - thd->main_security_ctx= *backup; - thd->security_ctx= &thd->main_security_ctx; - } -#endif - DBUG_VOID_RETURN; -} - +event_restore_security_context(THD *thd, Security_context *backup); /* Returns a new instance @@ -237,47 +184,6 @@ Event_parse_data::init_body(THD *thd) /* - Inits definer (definer_user and definer_host) during parsing. - - SYNOPSIS - Event_parse_data::init_definer() - thd Thread -*/ - -void -Event_parse_data::init_definer(THD *thd) -{ - int definer_user_len; - int definer_host_len; - DBUG_ENTER("Event_parse_data::init_definer"); - - DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx " - "thd->sec_ctx->priv_user=0x%lx", thd->mem_root, - thd->security_ctx->priv_user)); - - definer_user_len= strlen(thd->security_ctx->priv_user); - definer_host_len= strlen(thd->security_ctx->priv_host); - - /* + 1 for @ */ - DBUG_PRINT("info",("init definer as whole")); - definer.length= definer_user_len + definer_host_len + 1; - definer.str= thd->alloc(definer.length + 1); - - DBUG_PRINT("info",("copy the user")); - memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len); - definer.str[definer_user_len]= '@'; - - DBUG_PRINT("info",("copy the host")); - memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host, - definer_host_len); - definer.str[definer.length]= '\0'; - DBUG_PRINT("info",("definer [%s] initted", definer.str)); - - DBUG_VOID_RETURN; -} - - -/* Sets time for execution for one-time event. SYNOPSIS @@ -646,6 +552,47 @@ Event_parse_data::check_parse_data(THD *thd) /* + Inits definer (definer_user and definer_host) during parsing. + + SYNOPSIS + Event_parse_data::init_definer() + thd Thread +*/ + +void +Event_parse_data::init_definer(THD *thd) +{ + int definer_user_len; + int definer_host_len; + DBUG_ENTER("Event_parse_data::init_definer"); + + DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx " + "thd->sec_ctx->priv_user=0x%lx", thd->mem_root, + thd->security_ctx->priv_user)); + + definer_user_len= strlen(thd->security_ctx->priv_user); + definer_host_len= strlen(thd->security_ctx->priv_host); + + /* + 1 for @ */ + DBUG_PRINT("info",("init definer as whole")); + definer.length= definer_user_len + definer_host_len + 1; + definer.str= thd->alloc(definer.length + 1); + + DBUG_PRINT("info",("copy the user")); + memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len); + definer.str[definer_user_len]= '@'; + + DBUG_PRINT("info",("copy the host")); + memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host, + definer_host_len); + definer.str[definer.length]= '\0'; + DBUG_PRINT("info",("definer [%s] initted", definer.str)); + + DBUG_VOID_RETURN; +} + + +/* Constructor SYNOPSIS @@ -1668,6 +1615,69 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf) /* + Executes the event (the underlying sp_head object); + + SYNOPSIS + Event_job_data::execute() + thd THD + + RETURN VALUE + 0 success + -99 No rights on this.dbname.str + others retcodes of sp_head::execute_procedure() +*/ + +int +Event_job_data::execute(THD *thd) +{ + Security_context save_ctx; + /* this one is local and not needed after exec */ + int ret= 0; + + DBUG_ENTER("Event_job_data::execute"); + DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str)); + + if ((ret= compile(thd, NULL))) + goto done; + + event_change_security_context(thd, definer_user, definer_host, dbname, + &save_ctx); + /* + THD::~THD will clean this or if there is DROP DATABASE in the SP then + it will be free there. It should not point to our buffer which is allocated + on a mem_root. + */ + thd->db= my_strdup(dbname.str, MYF(0)); + thd->db_length= dbname.length; + if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str))) + { + List<Item> empty_item_list; + empty_item_list.empty(); + if (thd->enable_slow_log) + sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS; + sphead->m_flags|= sp_head::LOG_GENERAL_LOG; + + ret= sphead->execute_procedure(thd, &empty_item_list); + } + else + { + DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str, + definer_host.str, dbname.str)); + ret= -99; + } + + event_restore_security_context(thd, &save_ctx); +done: + thd->end_statement(); + thd->cleanup_after_query(); + + DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret)); + + DBUG_RETURN(ret); +} + + +/* Compiles an event before it's execution. Compiles the anonymous sp_head object held by the event @@ -1800,69 +1810,6 @@ done: /* - Executes the event (the underlying sp_head object); - - SYNOPSIS - Event_job_data::execute() - thd THD - - RETURN VALUE - 0 success - -99 No rights on this.dbname.str - others retcodes of sp_head::execute_procedure() -*/ - -int -Event_job_data::execute(THD *thd) -{ - Security_context save_ctx; - /* this one is local and not needed after exec */ - int ret= 0; - - DBUG_ENTER("Event_job_data::execute"); - DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str)); - - if ((ret= compile(thd, NULL))) - goto done; - - event_change_security_context(thd, definer_user, definer_host, dbname, - &save_ctx); - /* - THD::~THD will clean this or if there is DROP DATABASE in the SP then - it will be free there. It should not point to our buffer which is allocated - on a mem_root. - */ - thd->db= my_strdup(dbname.str, MYF(0)); - thd->db_length= dbname.length; - if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str))) - { - List<Item> empty_item_list; - empty_item_list.empty(); - if (thd->enable_slow_log) - sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS; - sphead->m_flags|= sp_head::LOG_GENERAL_LOG; - - ret= sphead->execute_procedure(thd, &empty_item_list); - } - else - { - DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str, - definer_host.str, dbname.str)); - ret= -99; - } - - event_restore_security_context(thd, &save_ctx); -done: - thd->end_statement(); - thd->cleanup_after_query(); - - DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret)); - - DBUG_RETURN(ret); -} - - -/* Checks whether two events are in the same schema SYNOPSIS @@ -1899,3 +1846,62 @@ event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b) return !sortcmp_lex_string(name, b->name, system_charset_info) && !sortcmp_lex_string(db, b->dbname, system_charset_info); } + + +/* + Switches the security context + SYNOPSIS + event_change_security_context() + thd Thread + user The user + host The host of the user + db The schema for which the security_ctx will be loaded + backup Where to store the old context + + RETURN VALUE + FALSE OK + TRUE Error (generates error too) +*/ + +static bool +event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, + LEX_STRING db, Security_context *backup) +{ + DBUG_ENTER("event_change_security_context"); + DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str)); +#ifndef NO_EMBEDDED_ACCESS_CHECKS + + *backup= thd->main_security_ctx; + if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str, + host.str, db.str)) + { + my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str); + DBUG_RETURN(TRUE); + } + thd->security_ctx= &thd->main_security_ctx; +#endif + DBUG_RETURN(FALSE); +} + + +/* + Restores the security context + SYNOPSIS + event_restore_security_context() + thd Thread + backup Context to switch to +*/ + +static void +event_restore_security_context(THD *thd, Security_context *backup) +{ + DBUG_ENTER("event_restore_security_context"); +#ifndef NO_EMBEDDED_ACCESS_CHECKS + if (backup) + { + thd->main_security_ctx= *backup; + thd->security_ctx= &thd->main_security_ctx; + } +#endif + DBUG_VOID_RETURN; +} diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 69114c53118..c6c7d7f14ac 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -41,6 +41,7 @@ struct event_queue_param Event_queue *queue; pthread_mutex_t LOCK_loaded; pthread_cond_t COND_loaded; + bool loading_finished; }; @@ -85,9 +86,14 @@ event_queue_loader_thread(void *arg) DBUG_ENTER("event_queue_loader_thread"); + pthread_mutex_lock(¶m->LOCK_loaded); + param->queue->check_system_tables(thd); param->queue->load_events_from_db(thd); + + param->loading_finished= TRUE; pthread_cond_signal(¶m->COND_loaded); + pthread_mutex_unlock(¶m->LOCK_loaded); end: @@ -113,8 +119,6 @@ Event_queue::Event_queue() mutex_last_attempted_lock_in_func= ""; mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE; - - queue_loaded= FALSE; } @@ -195,8 +199,10 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) event_queue_param_value= (struct event_queue_param *) my_malloc(sizeof(struct event_queue_param), MYF(0)); + event_queue_param_value->thd= new_thd; event_queue_param_value->queue= this; + event_queue_param_value->loading_finished= FALSE; pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST); pthread_cond_init(&event_queue_param_value->COND_loaded, NULL); @@ -207,8 +213,8 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched) { do { pthread_cond_wait(&event_queue_param_value->COND_loaded, - &event_queue_param_value->LOCK_loaded); - } while (queue_loaded == FALSE); + &event_queue_param_value->LOCK_loaded); + } while (event_queue_param_value->loading_finished == FALSE); } pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded); @@ -662,8 +668,6 @@ end: close_thread_tables(thd); - queue_loaded= TRUE; - DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); } @@ -683,7 +687,7 @@ end: TRUE Error */ -bool +void Event_queue::check_system_tables(THD *thd) { TABLE_LIST tables; @@ -702,39 +706,35 @@ Event_queue::check_system_tables(THD *thd) tables.lock_type= TL_READ; if ((ret= simple_open_n_lock_tables(thd, &tables))) - sql_print_error("Cannot open mysql.db"); - else { - ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, - mysql_db_table_fields, &mysql_db_table_last_check, - ER_CANNOT_LOAD_FROM_TABLE); - close_thread_tables(thd); + sql_print_error("Cannot open mysql.db"); + goto end; } - if (ret) - DBUG_RETURN(TRUE); + ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, + mysql_db_table_fields, &mysql_db_table_last_check, + ER_CANNOT_LOAD_FROM_TABLE); + close_thread_tables(thd); bzero((char*) &tables, sizeof(tables)); tables.db= (char*) "mysql"; tables.table_name= tables.alias= (char*) "user"; tables.lock_type= TL_READ; - if ((ret= simple_open_n_lock_tables(thd, &tables))) + if (simple_open_n_lock_tables(thd, &tables)) sql_print_error("Cannot open mysql.db"); else { if (tables.table->s->fields < 29 || strncmp(tables.table->field[29]->field_name, STRING_WITH_LEN("Event_priv"))) - { sql_print_error("mysql.user has no `Event_priv` column at position 29"); - ret= TRUE; - } close_thread_tables(thd); } +end: thd->restore_backup_open_tables_state(&backup); - DBUG_RETURN(ret); + DBUG_VOID_RETURN; } diff --git a/sql/event_queue.h b/sql/event_queue.h index 3270938e881..e8b46abde92 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -56,7 +56,7 @@ public: void drop_schema_events(THD *thd, LEX_STRING schema); - static bool + void check_system_tables(THD *thd); void @@ -99,8 +99,6 @@ protected: /* The sorted queue with the Event_job_data objects */ QUEUE queue; - bool queue_loaded; - uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; uint mutex_last_attempted_lock_at_line; diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index 5e6ffb090cb..0c39c1a512b 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -28,19 +28,12 @@ #define SCHED_FUNC "<unknown>" #endif -#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) -#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) +#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__) +#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__) #define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__) extern pthread_attr_t connection_attrib; -struct scheduler_param -{ - THD *thd; - Event_scheduler *scheduler; -}; - - static LEX_STRING scheduler_states_names[] = { @@ -49,6 +42,11 @@ LEX_STRING scheduler_states_names[] = { C_STRING_WITH_LEN("STOPPING")} }; +struct scheduler_param { + THD *thd; + Event_scheduler *scheduler; +}; + /* Prints the stack of infos, warnings, errors from thd to @@ -100,54 +98,6 @@ evex_print_warnings(THD *thd, Event_job_data *et) /* - Performs pre- pthread_create() initialisation of THD. Do this - in the thread that will pass THD to the child thread. In the - child thread call post_init_event_thread(). - - SYNOPSIS - pre_init_event_thread() - thd The THD of the thread. Has to be allocated by the caller. - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. -*/ - -void -pre_init_event_thread(THD* thd) -{ - DBUG_ENTER("pre_init_event_thread"); - thd->client_capabilities= 0; - thd->security_ctx->master_access= 0; - thd->security_ctx->db_access= 0; - thd->security_ctx->host_or_ip= (char*)my_localhost; - thd->security_ctx->set_user((char*)"event_scheduler"); - my_net_init(&thd->net, NULL); - thd->net.read_timeout= slave_net_timeout; - thd->slave_thread= 0; - thd->options|= OPTION_AUTO_IS_NULL; - thd->client_capabilities|= CLIENT_MULTI_RESULTS; - pthread_mutex_lock(&LOCK_thread_count); - thd->thread_id= thread_id++; - threads.append(thd); - thread_count++; - thread_running++; - pthread_mutex_unlock(&LOCK_thread_count); - - /* - Guarantees that we will see the thread in SHOW PROCESSLIST though its - vio is NULL. - */ - - thd->proc_info= "Initialized"; - thd->version= refresh_version; - thd->set_time(); - - DBUG_VOID_RETURN; -} - - -/* Performs post initialization of structures in a new thread. SYNOPSIS @@ -202,6 +152,54 @@ deinit_event_thread(THD *thd) /* + Performs pre- pthread_create() initialisation of THD. Do this + in the thread that will pass THD to the child thread. In the + child thread call post_init_event_thread(). + + SYNOPSIS + pre_init_event_thread() + thd The THD of the thread. Has to be allocated by the caller. + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. +*/ + +void +pre_init_event_thread(THD* thd) +{ + DBUG_ENTER("pre_init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->master_access= 0; + thd->security_ctx->db_access= 0; + thd->security_ctx->host_or_ip= (char*)my_localhost; + my_net_init(&thd->net, NULL); + thd->security_ctx->set_user((char*)"event_scheduler"); + thd->net.read_timeout= slave_net_timeout; + thd->slave_thread= 0; + thd->options|= OPTION_AUTO_IS_NULL; + thd->client_capabilities|= CLIENT_MULTI_RESULTS; + pthread_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + threads.append(thd); + thread_count++; + thread_running++; + pthread_mutex_unlock(&LOCK_thread_count); + + /* + Guarantees that we will see the thread in SHOW PROCESSLIST though its + vio is NULL. + */ + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + + DBUG_VOID_RETURN; +} + + +/* Function that executes the scheduler, SYNOPSIS @@ -259,33 +257,32 @@ event_worker_thread(void *arg) thd->thread_stack= (char *) &thd; // remember where our stack is DBUG_ENTER("event_worker_thread"); - if (post_init_event_thread(thd)) - goto end; - - DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." - "THD=0x%lx", time(NULL), thd)); + if (!post_init_event_thread(thd)) + { + DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." + "THD=0x%lx", time(NULL), thd)); - sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id); + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id); - thd->enable_slow_log= TRUE; + thd->enable_slow_log= TRUE; - ret= event->execute(thd); + ret= event->execute(thd); - evex_print_warnings(thd, event); + evex_print_warnings(thd, event); - sql_print_information("SCHEDULER: [%s.%s of %s] executed " - " in thread thread %lu. RetCode=%d", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id, ret); - if (ret == EVEX_COMPILE_ERROR) - sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + sql_print_information("SCHEDULER: [%s.%s of %s] executed " + " in thread thread %lu. RetCode=%d", event->dbname.str, event->name.str, - event->definer.str); - else if (ret == EVEX_MICROSECOND_UNSUP) - sql_print_information("SCHEDULER: MICROSECOND is not supported"); - + event->definer.str, thd->thread_id, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + event->dbname.str, event->name.str, + event->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) + sql_print_information("SCHEDULER: MICROSECOND is not supported"); + } end: DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, event->name.str)); @@ -293,7 +290,7 @@ end: deinit_event_thread(thd); - DBUG_RETURN(0); // Against gcc warnings + DBUG_RETURN(0); // Can't return anything here } @@ -305,17 +302,15 @@ end: Event_scheduler::init_scheduler() */ -bool +void Event_scheduler::init_scheduler(Event_queue *q) { - LOCK_SCHEDULER_DATA(); - thread_id= 0; - state= INITIALIZED; + LOCK_DATA(); queue= q; started_events= 0; - UNLOCK_SCHEDULER_DATA(); - - return FALSE; + thread_id= 0; + state= INITIALIZED; + UNLOCK_DATA(); } @@ -377,7 +372,7 @@ Event_scheduler::start() struct scheduler_param *scheduler_param_value; DBUG_ENTER("Event_scheduler::start"); - LOCK_SCHEDULER_DATA(); + LOCK_DATA(); DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); if (state > INITIALIZED) goto end; @@ -408,7 +403,7 @@ Event_scheduler::start() DBUG_PRINT("info", ("Setting state go RUNNING")); state= RUNNING; end: - UNLOCK_SCHEDULER_DATA(); + UNLOCK_DATA(); if (ret && new_thd) { @@ -427,56 +422,6 @@ end: /* - Stops the scheduler (again). Waits for acknowledgement from the - scheduler that it has stopped - synchronous stopping. - - SYNOPSIS - Event_scheduler::stop() - - RETURN VALUE - FALSE OK - TRUE Error (not reported) -*/ - -bool -Event_scheduler::stop() -{ - THD *thd= current_thd; - DBUG_ENTER("Event_scheduler::stop"); - DBUG_PRINT("enter", ("thd=0x%lx", current_thd)); - - LOCK_SCHEDULER_DATA(); - DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); - if (state != RUNNING) - goto end; - - state= STOPPING; - - DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); - sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); - - pthread_cond_signal(&COND_state); - - /* Guarantee we don't catch spurious signals */ - sql_print_information("SCHEDULER: Waiting the manager thread to reply"); - do { - DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " - "thread. Current value of state is %s . " - "workers count=%d", scheduler_states_names[state].str, - workers_count())); - /* thd could be 0x0, when shutting down */ - COND_STATE_WAIT(NULL); - } while (state == STOPPING); - DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); - - thread_id= 0; -end: - UNLOCK_SCHEDULER_DATA(); - DBUG_RETURN(FALSE); -} - - -/* The main loop of the scheduler. SYNOPSIS @@ -496,7 +441,7 @@ Event_scheduler::run(THD *thd) Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); - LOCK_SCHEDULER_DATA(); + LOCK_DATA(); thread_id= thd->thread_id; sql_print_information("SCHEDULER: Manager thread started with id %lu", @@ -529,7 +474,7 @@ Event_scheduler::run(THD *thd) COND_STATE_WAIT(NULL); thd->exit_cond(""); DBUG_PRINT("info", ("Woke up. Got COND_state")); - LOCK_SCHEDULER_DATA(); + LOCK_DATA(); } else if (abstime.tv_sec) { @@ -545,16 +490,16 @@ Event_scheduler::run(THD *thd) 1. Spurious wake-up 2. The top of the queue was changed (new one becase of create/update) */ - /* This will do implicit UNLOCK_SCHEDULER_DATA() */ + /* This will do implicit UNLOCK_DATA() */ thd->exit_cond(""); DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); - LOCK_SCHEDULER_DATA(); + LOCK_DATA(); } else { - UNLOCK_SCHEDULER_DATA(); + UNLOCK_DATA(); res= execute_top(thd, job_data); - LOCK_SCHEDULER_DATA(); + LOCK_DATA(); if (res) break; ++started_events; @@ -565,7 +510,7 @@ Event_scheduler::run(THD *thd) pthread_cond_signal(&COND_state); error: state= INITIALIZED; - UNLOCK_SCHEDULER_DATA(); + UNLOCK_DATA(); sql_print_information("SCHEDULER: Stopped"); DBUG_RETURN(res); @@ -627,23 +572,52 @@ error: /* - Returns the current state of the scheduler + Stops the scheduler (again). Waits for acknowledgement from the + scheduler that it has stopped - synchronous stopping. SYNOPSIS - Event_scheduler::get_state() + Event_scheduler::stop() RETURN VALUE - The state of the scheduler (INITIALIZED | RUNNING | STOPPING) + FALSE OK + TRUE Error (not reported) */ -enum Event_scheduler::enum_state -Event_scheduler::get_state() +bool +Event_scheduler::stop() { - enum Event_scheduler::enum_state ret; - LOCK_SCHEDULER_DATA(); - ret= state; - UNLOCK_SCHEDULER_DATA(); - return ret; + THD *thd= current_thd; + DBUG_ENTER("Event_scheduler::stop"); + DBUG_PRINT("enter", ("thd=0x%lx", current_thd)); + + LOCK_DATA(); + DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); + if (state != RUNNING) + goto end; + + state= STOPPING; + + DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); + sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); + + pthread_cond_signal(&COND_state); + + /* Guarantee we don't catch spurious signals */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); + do { + DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " + "thread. Current value of state is %s . " + "workers count=%d", scheduler_states_names[state].str, + workers_count())); + /* thd could be 0x0, when shutting down */ + COND_STATE_WAIT(NULL); + } while (state == STOPPING); + DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); + + thread_id= 0; +end: + UNLOCK_DATA(); + DBUG_RETURN(FALSE); } @@ -697,7 +671,7 @@ Event_scheduler::queue_changed() /* Auxiliary function for locking LOCK_scheduler_state. Used - by the LOCK_SCHEDULER_DATA macro. + by the LOCK_DATA macro. SYNOPSIS Event_scheduler::lock_data() @@ -720,7 +694,7 @@ Event_scheduler::lock_data(const char *func, uint line) /* Auxiliary function for unlocking LOCK_scheduler_state. Used - by the UNLOCK_SCHEDULER_DATA macro. + by the UNLOCK_DATA macro. SYNOPSIS Event_scheduler::unlock_data() @@ -754,30 +728,52 @@ Event_scheduler::unlock_data(const char *func, uint line) */ void -Event_scheduler::cond_wait(struct timespec *abstime, - const char *func, uint line) +Event_scheduler::cond_wait(struct timespec *abstime, const char *func, + uint line) { - DBUG_ENTER("Event_scheduler::cond_wait"); waiting_on_cond= TRUE; mutex_last_unlocked_at_line= line; mutex_scheduler_data_locked= FALSE; mutex_last_unlocked_in_func= func; - - if (abstime) - pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime); - else + if (!abstime) pthread_cond_wait(&COND_state, &LOCK_scheduler_state); - + else + pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; mutex_scheduler_data_locked= TRUE; waiting_on_cond= FALSE; +} - DBUG_VOID_RETURN; + +/* + Returns the current state of the scheduler + + SYNOPSIS + Event_scheduler::get_state() + + RETURN VALUE + The state of the scheduler (INITIALIZED | RUNNING | STOPPING) +*/ + +enum Event_scheduler::enum_state +Event_scheduler::get_state() +{ + enum Event_scheduler::enum_state ret; + DBUG_ENTER("Event_scheduler::get_state"); + LOCK_DATA(); + ret= state; + UNLOCK_DATA(); + DBUG_RETURN(ret); } /* + REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF + Returns whether the scheduler was initialized. +*/ + +/* Dumps the internal status of the scheduler SYNOPSIS @@ -805,80 +801,82 @@ Event_scheduler::dump_internal_status(THD *thd) tmp_string.length(0); int_string.length(0); - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler state"), scs); - protocol->store(scheduler_states_names[state].str, - scheduler_states_names[state].length, scs); + do + { + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler state"), scs); + protocol->store(scheduler_states_names[state].str, + scheduler_states_names[state].length, scs); - if ((ret= protocol->write())) - goto end; + if ((ret= protocol->write())) + break; - /* thread_id */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("thread_id"), scs); - if (thread_id) - { - int_string.set((longlong) thread_id, scs); - protocol->store(&int_string); - } - else - protocol->store_null(); - if ((ret= protocol->write())) - goto end; + /* thread_id */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("thread_id"), scs); + if (thread_id) + { + int_string.set((longlong) thread_id, scs); + protocol->store(&int_string); + } + else + protocol->store_null(); + if ((ret= protocol->write())) + break; - /* last locked at*/ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - mutex_last_locked_in_func, - mutex_last_locked_at_line)); - protocol->store(&tmp_string); - if ((ret= protocol->write())) - goto end; + /* last locked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_locked_in_func, + mutex_last_locked_at_line)); + protocol->store(&tmp_string); + if ((ret= protocol->write())) + break; - /* last unlocked at*/ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs); - tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), - tmp_string.alloced_length(), "%s::%d", - mutex_last_unlocked_in_func, - mutex_last_unlocked_at_line)); - protocol->store(&tmp_string); - if ((ret= protocol->write())) - goto end; + /* last unlocked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + mutex_last_unlocked_in_func, + mutex_last_unlocked_at_line)); + protocol->store(&tmp_string); + if ((ret= protocol->write())) + break; - /* waiting on */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs); - int_string.set((longlong) waiting_on_cond, scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs); + int_string.set((longlong) waiting_on_cond, scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + break; - /* workers_count */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler workers count"), scs); - int_string.set((longlong) workers_count(), scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler workers count"), scs); + int_string.set((longlong) workers_count(), scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + break; - /* workers_count */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler executed events"), scs); - int_string.set((longlong) started_events, scs); - protocol->store(&int_string); - if ((ret= protocol->write())) - goto end; + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler executed events"), scs); + int_string.set((longlong) started_events, scs); + protocol->store(&int_string); + if ((ret= protocol->write())) + break; - /* scheduler_data_locked */ - protocol->prepare_for_resend(); - protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); - int_string.set((longlong) mutex_scheduler_data_locked, scs); - protocol->store(&int_string); - ret= protocol->write(); -end: + /* scheduler_data_locked */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); + int_string.set((longlong) mutex_scheduler_data_locked, scs); + protocol->store(&int_string); + ret= protocol->write(); + } while (0); #endif DBUG_RETURN(ret); diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index 8e00b7b767f..b5c3dae49f8 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -56,7 +56,7 @@ public: bool run(THD *thd); - bool + void init_scheduler(Event_queue *queue); void |