diff options
Diffstat (limited to 'sql/event_scheduler.cc')
-rw-r--r-- | sql/event_scheduler.cc | 2423 |
1 files changed, 2423 insertions, 0 deletions
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc new file mode 100644 index 00000000000..a19f14d3726 --- /dev/null +++ b/sql/event_scheduler.cc @@ -0,0 +1,2423 @@ +/* Copyright (C) 2004-2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "event_priv.h" +#include "event.h" +#include "event_scheduler.h" +#include "sp_head.h" + +/* + ToDo: + 1. Talk to Alik to get a check for configure.in for my_time_t and time_t + 2. Look at guardian.h|cc to see its life cycle, has similarities. +*/ + + +/* + The scheduler is implemented as class Event_scheduler. Only one instance is + kept during the runtime of the server, by implementing the Singleton DP. + Object instance is always there because the memory is allocated statically + and initialized when the OS loader loads mysqld. This initialization is + bare. Extended initialization is done during the call to + Event_scheduler::init() in Events::init(). The reason for that late initialization + is that some subsystems needed to boot the Scheduler are not available at + earlier stages of the mysqld boot procedure. Events::init() is called in + mysqld.cc . If the mysqld is started with --event-scheduler=0 then + no initialization takes place and the scheduler is unavailable during this + server run. The server should be started with --event-scheduler=1 to have + the scheduler initialized and able to execute jobs. This starting alwa + s implies that the jobs execution will start immediately. If the server + is started with --event-scheduler=2 then the scheduler is started in suspended + state. Default state, if --event-scheduler is not specified is 2. + + The scheduler only manages execution of the events. Their creation, + alteration and deletion is delegated to other routines found in event.cc . + These routines interact with the scheduler : + - CREATE EVENT -> Event_scheduler::add_event() + - ALTER EVENT -> Event_scheduler::replace_event() + - DROP EVENT -> Event_scheduler::drop_event() + + There is one mutex in the single Event_scheduler object which controls + the simultaneous access to the objects invariants. Using one lock makes + it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is + initialized in Event_scheduler::init(). Which in turn is called by the + Facade class Events in event.cc, coming from init_thread_environment() from + mysqld.cc -> no concurrency at this point. It's destroyed in + Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc . + + The full initialization is done in Event_scheduler::init() called from + Events::init(). It's done before any requests coming in, so this is a + guarantee for not having concurrency. + + The scheduler is started with Event_scheduler::start() and stopped with + Event_scheduler::stop(). When the scheduler starts it loads all events + from mysql.event table. Unfortunately, there is a race condition between + the event disk management functions and the scheduler ones + (add/replace/drop_event & load_events_from_db()), because the operations + do not happen under one global lock but the disk operations are guarded + by the MYISAM lock on mysql.event. In the same time, the queue operations + are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during + server startup and stopped()-ed during server shutdown (in Events::shutdown() + called by kill_server() in mysqld.cc) these races does not exist. + + Since the user may want to temporarily inhibit execution of events the + scheduler can be suspended and then it can be forced to resume its + operations. The API call to perform these is + Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) . + When the scheduler is suspended the main scheduler thread, which ATM + happens to have thread_id 1, locks on a condition COND_suspend_or_resume. + When this is signal is sent for the reverse operation the main scheduler + loops continues to roll and execute events. + + When the scheduler is suspended all add/replace/drop_event() operations + work as expected and the modify the queue but no events execution takes + place. + + In contrast to the previous scheduler implementation, found in + event_executor.cc, the start, shutdown, suspend and resume are synchronous + operations. As a whole all operations are synchronized and no busy waits + are used except in stop_all_running_events(), which waits until all + running event worker threads have finished. It would have been nice to + use a conditional on which this method will wait and the last thread to + finish would signal it but this implies subclassing THD. + + The scheduler does not keep a counter of how many event worker threads are + running, at any specific moment, because this will copy functionality + already existing in the server. Namely, all THDs are registered in the + global `threads` array. THD has member variable system_thread which + identifies the type of thread. Connection threads being NON_SYSTEM_THREAD, + all other have their enum value. Important for the scheduler are + SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER. + + Class THD subclasses class ilink, which is the linked list of all threads. + When a THD instance is destroyed it's being removed from threads, thus + no manual intervention is needed. On the contrary registering is manual + with threads.append() . Traversing the threads array every time a subclass + of THD, for instance if we would have had THD_scheduler_worker to see + how many events we have and whether the scheduler is shutting down will + take much time and lead to a deadlock. stop_all_running_events() is called + under LOCK_scheduler_data. If the THD_scheduler_worker was aware of + the single Event_scheduler instance it will try to check + Event_scheduler::state but for this it would need to acquire + LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a + busy wait. + + DROP DATABASE DDL should drop all events defined in a specific schema. + DROP USER also should drop all events who has as definer the user being + dropped (this one is not addressed at the moment but a hook exists). For + this specific needs Event_scheduler::drop_matching_events() is + implemented. Which expects a callback to be applied on every object in + the queue. Thus events that match specific schema or user, will be + removed from the queue. The exposed interface is : + - Event_scheduler::drop_schema_events() + - Event_scheduler::drop_user_events() + + This bulk dropping happens under LOCK_scheduler_data, thus no two or + more threads can execute it in parallel. However, DROP DATABASE is also + synchronized, currently, in the server thus this does not impact the + overall performance. In addition, DROP DATABASE is not that often + executed DDL. + + Though the interface to the scheduler is only through the public methods + of class Event_scheduler, there are currently few functions which are + used during its operations. Namely : + - static evex_print_warnings() + After every event execution all errors/warnings are dumped, so the user + can see in case of a problem what the problem was. + + - static init_event_thread() + This function is both used by event_scheduler_thread() and + event_worker_thread(). It initializes the THD structure. The + initialization looks pretty similar to the one in slave.cc done for the + replication threads. However, though the similarities it cannot be + factored out to have one routine. + + - static event_scheduler_thread() + Because our way to register functions to be used by the threading library + does not allow usage of static methods this function is used to start the + scheduler in it. It does THD initialization and then calls + Event_scheduler::run(). + + - static event_worker_thread() + With already stated the reason for not being able to use methods, this + function executes the worker threads. + + The execution of events is, to some extent, synchronized to inhibit race + conditions when Event_timed::thread_id is being updated with the thread_id of + the THD in which the event is being executed. The thread_id is in the + Event_timed object because we need to be able to kill quickly a specific + event during ALTER/DROP EVENT without traversing the global `threads` array. + However, this makes the scheduler's code more complicated. The event worker + thread is started by Event_timed::spawn_now(), which in turn calls + pthread_create(). The thread_id which will be associated in init_event_thread + is not known in advance thus the registering takes place in + event_worker_thread(). This registering has to be synchronized under + LOCK_scheduler_data, so no kill_event() on a object in + replace_event/drop_event/drop_matching_events() could take place. + + This synchronization is done through class Worker_thread_param that is + local to this file. Event_scheduler::execute_top() is called under + LOCK_scheduler_data. This method : + 1. Creates an instance of Worker_thread_param on the stack + 2. Locks Worker_thread_param::LOCK_started + 3. Calls Event_timed::spawn_now() which in turn creates a new thread. + 4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the + worker thread send signal. The code is spurious wake-up safe because + Worker_thread_param::started is checked. + 5. The worker thread initializes its THD, then sets Event_timed::thread_id, + sets Worker_thread_param::started to TRUE and sends back + Worker_thread_param::COND_started. From this moment on, the event + is being executed and could be killed by using Event_timed::thread_id. + When Event_timed::spawn_thread_finish() is called in the worker thread, + it sets thread_id to 0. From this moment on, the worker thread should not + touch the Event_timed instance. + + + The life-cycle of the server is a FSA. + enum enum_state Event_scheduler::state keeps the state of the scheduler. + + The states are: + + |---UNINITIALIZED + | + | |------------------> IN_SHUTDOWN + --> INITIALIZED -> COMMENCING ---> RUNNING ----------| + ^ ^ | | ^ | + | |- CANTSTART <--| | |- SUSPENDED <-| + |______________________________| + + - UNINITIALIZED :The object is created and only the mutex is initialized + - INITIALIZED :All member variables are initialized + - COMMENCING :The scheduler is starting, no other attempt to start + should succeed before the state is back to INITIALIZED. + - CANTSTART :Set by the ::run() method in case it can't start for some + reason. In this case the connection thread that tries to + start the scheduler sees that some error has occurred and + returns an error to the user. Finally, the connection + thread sets the state to INITIALIZED, so further attempts + to start the scheduler could be made. + - RUNNING :The scheduler is running. New events could be added, + dropped, altered. The scheduler could be stopped. + - SUSPENDED :Like RUNNING but execution of events does not take place. + Operations on the memory queue are possible. + - IN_SHUTDOWN :The scheduler is shutting down, due to request by setting + the global event_scheduler to 0/FALSE, or because of a + KILL command sent by a user to the master thread. + + In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA() + are used for (un)locking purposes. They are used to save the programmer + from typing everytime + lock_data(__FUNCTION__, __LINE__); + All locking goes through Event_scheduler::lock_data() and ::unlock_data(). + These two functions then record in variables where for last time + LOCK_scheduler_data was locked and unlocked (two different variables). In + multithreaded environment, in some cases they make no sense but are useful for + inspecting deadlocks without having the server debug log turned on and the + server is still running. + + The same strategy is used for conditional variables. + Event_scheduler::cond_wait() is invoked from all places with parameter + an enum enum_cond_vars. In this manner, it's possible to inspect the last + on which condition the last call to cond_wait() was waiting. If the server + was started with debug trace switched on, the trace file also holds information + about conditional variables used. +*/ + + +#define LOCK_SCHEDULER_DATA() lock_data(__FUNCTION__,__LINE__) +#define UNLOCK_SCHEDULER_DATA() unlock_data(__FUNCTION__,__LINE__) + + +#ifndef DBUG_OFF +static +LEX_STRING states_names[] = +{ + {(char*) STRING_WITH_LEN("UNINITIALIZED")}, + {(char*) STRING_WITH_LEN("INITIALIZED")}, + {(char*) STRING_WITH_LEN("COMMENCING")}, + {(char*) STRING_WITH_LEN("CANTSTART")}, + {(char*) STRING_WITH_LEN("RUNNING")}, + {(char*) STRING_WITH_LEN("SUSPENDED")}, + {(char*) STRING_WITH_LEN("IN_SHUTDOWN")} +}; +#endif + + +Event_scheduler +Event_scheduler::singleton; + + +const char * const +Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = +{ + "new work", + "started or stopped", + "suspend or resume" +}; + + +class Worker_thread_param +{ +public: + Event_timed *et; + pthread_mutex_t LOCK_started; + pthread_cond_t COND_started; + bool started; + + Worker_thread_param(Event_timed *etn):et(etn), started(FALSE) + { + pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_started, NULL); + } + + ~Worker_thread_param() + { + pthread_mutex_destroy(&LOCK_started); + pthread_cond_destroy(&COND_started); + } +}; + + +/* + Prints the stack of infos, warnings, errors from thd to + the console so it can be fetched by the logs-into-tables and + checked later. + + SYNOPSIS + evex_print_warnings + thd - thread used during the execution of the event + et - the event itself +*/ + +static void +evex_print_warnings(THD *thd, Event_timed *et) +{ + MYSQL_ERROR *err; + DBUG_ENTER("evex_print_warnings"); + if (!thd->warn_list.elements) + DBUG_VOID_RETURN; + + char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; + char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; + String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); + prefix.length(0); + prefix.append("SCHEDULER: ["); + + append_identifier(thd, &prefix, et->definer_user.str, et->definer_user.length); + prefix.append('@'); + append_identifier(thd, &prefix, et->definer_host.str, et->definer_host.length); + prefix.append("][", 2); + append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); + prefix.append('.'); + append_identifier(thd,&prefix, et->name.str, et->name.length); + prefix.append("] ", 2); + + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + while ((err= it++)) + { + String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); + /* set it to 0 or we start adding at the end. That's the trick ;) */ + err_msg.length(0); + err_msg.append(prefix); + err_msg.append(err->msg, strlen(err->msg), system_charset_info); + err_msg.append("]"); + DBUG_ASSERT(err->level < 3); + (sql_print_message_handlers[err->level])("%*s", err_msg.length(), + err_msg.c_ptr()); + } + DBUG_VOID_RETURN; +} + + +/* + Inits an scheduler thread handler, both the main and a worker + + SYNOPSIS + init_event_thread() + thd - the THD of the thread. Has to be allocated by the caller. + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + + RETURN VALUE + 0 OK + -1 Error +*/ + +static int +init_event_thread(THD** t, enum enum_thread_type thread_type) +{ + THD *thd= *t; + thd->thread_stack= (char*)t; // remember where our stack is + DBUG_ENTER("init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->master_access= 0; + thd->security_ctx->db_access= 0; + thd->security_ctx->host_or_ip= (char*)my_localhost; + my_net_init(&thd->net, 0); + thd->net.read_timeout= slave_net_timeout; + thd->slave_thread= 0; + thd->options|= OPTION_AUTO_IS_NULL; + thd->client_capabilities|= CLIENT_MULTI_RESULTS; + thd->real_id=pthread_self(); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->thread_id= thread_id++; + threads.append(thd); + thread_count++; + thread_running++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + DBUG_RETURN(-1); + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + /* + Guarantees that we will see the thread in SHOW PROCESSLIST though its + vio is NULL. + */ + thd->system_thread= thread_type; + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + + DBUG_RETURN(0); +} + + +/* + Inits the main scheduler thread and then calls Event_scheduler::run() + of arg. + + SYNOPSIS + event_scheduler_thread() + arg void* ptr to Event_scheduler + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + 3. The reason to have a proxy function is that it's not possible to + use a method as function to be executed in a spawned thread: + - our pthread_hander_t macro uses extern "C" + - separating thread setup from the real execution loop is also to be + considered good. + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_scheduler_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd= NULL; + Event_scheduler *scheduler= (Event_scheduler *) arg; + + DBUG_ENTER("event_scheduler_thread"); + + my_thread_init(); + pthread_detach_this_thread(); + + /* note that constructor of THD uses DBUG_ ! */ + if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_SCHEDULER)) + { + sql_print_error("SCHEDULER: Cannot init manager event thread."); + scheduler->report_error_during_start(); + } + else + { + thd->security_ctx->set_user((char*)"event_scheduler"); + + sql_print_information("SCHEDULER: Manager thread booting"); + if (Event_scheduler::check_system_tables(thd)) + scheduler->report_error_during_start(); + else + scheduler->run(thd); + + /* + NOTE: Don't touch `scheduler` after this point because we have notified + the + thread which shuts us down that we have finished cleaning. In this + very moment a new scheduler thread could be started and a crash is + not welcome. + */ + } + + /* + If we cannot create THD then don't decrease because we haven't touched + thread_count and thread_running in init_event_thread() which was never + called. In init_event_thread() thread_count and thread_running are + always increased even in the case the method returns an error. + */ + if (thd) + { + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + my_thread_end(); + DBUG_RETURN(0); // Can't return anything here +} + + +/* + Function that executes an event in a child thread. Setups the + environment for the event execution and cleans after that. + + SYNOPSIS + event_worker_thread() + arg The Event_timed object to be processed + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_worker_thread(void *arg) +{ + THD *thd; /* needs to be first for thread_stack */ + Worker_thread_param *param= (Worker_thread_param *) arg; + Event_timed *event= param->et; + MEM_ROOT worker_mem_root; + int ret; + bool startup_error= FALSE; + Security_context *save_ctx; + /* this one is local and not needed after exec */ + Security_context security_ctx; + + DBUG_ENTER("event_worker_thread"); + DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str)); + + my_thread_init(); + pthread_detach_this_thread(); + + if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_WORKER)) + { + sql_print_error("SCHEDULER: Startup failure."); + startup_error= TRUE; + event->spawn_thread_finish(thd); + } + else + event->set_thread_id(thd->thread_id); + + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + /* + If we don't change it before we send the signal back, then an intermittent + DROP EVENT will take LOCK_scheduler_data and try to kill this thread, because + event->thread_id is already real. However, because thd->security_ctx->user + is not initialized then a crash occurs in kill_one_thread(). Thus, we have + to change the context before sending the signal. We are under + LOCK_scheduler_data being held by Event_scheduler::run() -> ::execute_top(). + */ + change_security_context(thd, event->definer_user, event->definer_host, + event->dbname, &security_ctx, &save_ctx); + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + + /* Signal the scheduler thread that we have started successfully */ + pthread_mutex_lock(¶m->LOCK_started); + param->started= TRUE; + pthread_cond_signal(¶m->COND_started); + pthread_mutex_unlock(¶m->LOCK_started); + + if (!startup_error) + { + uint flags; + + thd->init_for_queries(); + thd->enable_slow_log= TRUE; + + event->set_thread_id(thd->thread_id); + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id); + + ret= event->execute(thd, thd->mem_root); + evex_print_warnings(thd, event); + sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d", + event->dbname.str, event->name.str, + event->definer.str, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + event->dbname.str, event->name.str, + event->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) + sql_print_information("SCHEDULER: MICROSECOND is not supported"); + + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + + /* If true is returned, we are expected to free it */ + if (event->spawn_thread_finish(thd)) + { + DBUG_PRINT("info", ("Freeing object pointer")); + delete event; + } + } + + if (thd) + { + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + /* + Free it here because net.vio is NULL for us => THD::~THD will check it + and won't call net_end(&net); See also replication code. + */ + net_end(&thd->net); + DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id)); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thread_count--; + thread_running--; + delete thd; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + } + + my_thread_end(); + DBUG_RETURN(0); // Can't return anything here +} + + +/* + Constructor of class Event_scheduler. + + SYNOPSIS + Event_scheduler::Event_scheduler() +*/ + +Event_scheduler::Event_scheduler() + :state(UNINITIALIZED), start_scheduler_suspended(FALSE), + thread_id(0), mutex_last_locked_at_line(0), + mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""), + mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE), + mutex_scheduler_data_locked(FALSE) +{ +} + + +/* + Returns the singleton instance of the class. + + SYNOPSIS + Event_scheduler::get_instance() + + RETURN VALUE + address +*/ + +Event_scheduler* +Event_scheduler::get_instance() +{ + DBUG_ENTER("Event_scheduler::get_instance"); + DBUG_RETURN(&singleton); +} + + +/* + The implementation of full-fledged initialization. + + SYNOPSIS + Event_scheduler::init() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_scheduler::init() +{ + int i= 0; + bool ret= FALSE; + DBUG_ENTER("Event_scheduler::init"); + DBUG_PRINT("enter", ("this=%p", this)); + + LOCK_SCHEDULER_DATA(); + for (;i < COND_LAST; i++) + if (pthread_cond_init(&cond_vars[i], NULL)) + { + sql_print_error("SCHEDULER: Unable to initalize conditions"); + ret= TRUE; + goto end; + } + + /* init memory root */ + init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + + if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, + event_timed_compare_q, NULL, 30 /*auto_extent*/)) + { + sql_print_error("SCHEDULER: Can't initialize the execution queue"); + ret= TRUE; + goto end; + } + + if (sizeof(my_time_t) != sizeof(time_t)) + { + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." + "The scheduler may not work correctly. Stopping."); + DBUG_ASSERT(0); + ret= TRUE; + goto end; + } + + state= INITIALIZED; +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(ret); +} + + +/* + Frees all memory allocated by the scheduler object. + + SYNOPSIS + Event_scheduler::destroy() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +void +Event_scheduler::destroy() +{ + DBUG_ENTER("Event_scheduler"); + + LOCK_SCHEDULER_DATA(); + switch (state) { + case UNINITIALIZED: + break; + case INITIALIZED: + delete_queue(&queue); + free_root(&scheduler_root, MYF(0)); + int i; + for (i= 0; i < COND_LAST; i++) + pthread_cond_destroy(&cond_vars[i]); + state= UNINITIALIZED; + break; + default: + sql_print_error("SCHEDULER: Destroying while state is %d", state); + /* I trust my code but ::safe() > ::sorry() */ + DBUG_ASSERT(0); + break; + } + UNLOCK_SCHEDULER_DATA(); + + DBUG_VOID_RETURN; +} + + +/* + Adds an event to the scheduler queue + + SYNOPSIS + Event_scheduler::add_event() + et The event to add + check_existence Whether to check if already loaded. + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::add_event(THD *thd, Event_timed *et, bool check_existence) +{ + enum enum_error_code res; + Event_timed *et_new; + DBUG_ENTER("Event_scheduler::add_event"); + DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + if (check_existence && find_event(et, FALSE)) + { + res= OP_ALREADY_EXISTS; + goto end; + } + + /* We need to load the event on scheduler_root */ + if (!(res= load_event(thd, et, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + DBUG_PRINT("info", ("Sending COND_new_work")); + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(res); +} + + +/* + Drops an event from the scheduler queue + + SYNOPSIS + Event_scheduler::drop_event() + etn The event to drop + state Wait the event or kill&drop + + RETURN VALUE + FALSE OK (replaced or scheduler not working) + TRUE Failure +*/ + +bool +Event_scheduler::drop_event(THD *thd, Event_timed *et) +{ + int res; + Event_timed *et_old; + DBUG_ENTER("Event_scheduler::drop_event"); + DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + + if (!(et_old= find_event(et, TRUE))) + DBUG_PRINT("info", ("No such event found, probably DISABLED")); + + UNLOCK_SCHEDULER_DATA(); + + /* See comments in ::replace_event() why this is split in two parts. */ + if (et_old) + { + switch ((res= et_old->kill_thread(thd))) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + sql_print_error("SCHEDULER: Got unexpected error %d", res); + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(FALSE); +} + + +/* + Replaces an event in the scheduler queue + + SYNOPSIS + Event_scheduler::replace_event() + et The event to replace(add) into the queue + state Async or sync stopping + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk + OP_ALREADY_EXISTS Event already in the queue +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::replace_event(THD *thd, Event_timed *et, LEX_STRING *new_schema, + LEX_STRING *new_name) +{ + enum enum_error_code res; + Event_timed *et_old, *et_new= NULL; + LEX_STRING old_schema, old_name; + + DBUG_ENTER("Event_scheduler::replace_event"); + DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", + thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + + if (!(et_old= find_event(et, TRUE))) + DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", + et->dbname.str, et->name.str)); + + if (new_schema && new_name) + { + old_schema= et->dbname; + old_name= et->name; + et->dbname= *new_schema; + et->name= *new_name; + } + /* + We need to load the event (it's strings but on the object itself) + on scheduler_root. et_new could be NULL : + 1. Error occured + 2. If the replace is DISABLED, we don't load it into the queue. + */ + if (!(res= load_event(thd, et, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + DBUG_PRINT("info", ("Sending COND_new_work")); + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; + + if (new_schema && new_name) + { + et->dbname= old_schema; + et->name= old_name; + } + + UNLOCK_SCHEDULER_DATA(); + /* + Andrey: Is this comment still truthful ??? + + We don't move this code above because a potential kill_thread will call + THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, + which is LOCK_scheduler_data on which the COND_new_work in ::run() locks. + Hence, we try to acquire a lock which we have already acquired and we run + into an assert. Holding LOCK_scheduler_data however is not needed because + we don't touch any invariant of the scheduler anymore. ::drop_event() does + the same. + */ + if (et_old) + { + switch (et_old->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(res); +} + + +/* + Searches for an event in the scheduler queue + + SYNOPSIS + Event_scheduler::find_event() + etn The event to find + comparator The function to use for comparing + remove_from_q If found whether to remove from the Q + + RETURN VALUE + NULL Not found + otherwise Address + + NOTE + The caller should do the locking also the caller is responsible for + actual signalling in case an event is removed from the queue + (signalling COND_new_work for instance). +*/ + +Event_timed * +Event_scheduler::find_event(Event_timed *etn, bool remove_from_q) +{ + uint i; + DBUG_ENTER("Event_scheduler::find_event"); + + for (i= 0; i < queue.elements; ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", etn->dbname.str, etn->name.str, + et->dbname.str, et->name.str)); + if (event_timed_identifier_equal(etn, et)) + { + if (remove_from_q) + queue_remove(&queue, i); + DBUG_RETURN(et); + } + } + + DBUG_RETURN(NULL); +} + + +/* + Drops all events from the in-memory queue and disk that match + certain pattern evaluated by a comparator function + + SYNOPSIS + Event_scheduler::drop_matching_events() + thd THD + pattern A pattern string + comparator The function to use for comparing + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events + + NOTE + Expected is the caller to acquire lock on LOCK_scheduler_data +*/ + +void +Event_scheduler::drop_matching_events(THD *thd, LEX_STRING *pattern, + bool (*comparator)(Event_timed *,LEX_STRING *)) +{ + DBUG_ENTER("Event_scheduler::drop_matching_events"); + DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern->length, pattern->str, + state)); + if (is_running_or_suspended()) + { + uint i= 0, dropped= 0; + while (i < queue.elements) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); + if (comparator(et, pattern)) + { + /* + The queue is ordered. If we remove an element, then all elements after + it will shift one position to the left, if we imagine it as an array + from left to the right. In this case we should not increment the + counter and the (i < queue.elements) condition is ok. + */ + queue_remove(&queue, i); + + /* See replace_event() */ + switch (et->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et->flags |= EVENT_FREE_WHEN_FINISHED; + ++dropped; + break; + case 0: + delete et; + ++dropped; + break; + default: + DBUG_ASSERT(0); + } + } + else + i++; + } + DBUG_PRINT("info", ("Dropped %lu", dropped)); + } + /* + Don't send COND_new_work because no need to wake up the scheduler thread. + When it wakes next time up it will recalculate how much more it should + sleep if the top of the queue has been changed by this method. + */ + + DBUG_VOID_RETURN; +} + + +/* + Drops all events from the in-memory queue and disk that are from + certain schema. + + SYNOPSIS + Event_scheduler::drop_schema_events() + thd THD + db The schema name + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events +*/ + +int +Event_scheduler::drop_schema_events(THD *thd, LEX_STRING *schema) +{ + int ret; + DBUG_ENTER("Event_scheduler::drop_schema_events"); + LOCK_SCHEDULER_DATA(); + if (is_running_or_suspended()) + drop_matching_events(thd, schema, event_timed_db_equal); + + ret= db_drop_events_from_table(thd, schema); + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(ret); +} + + +extern pthread_attr_t connection_attrib; + + +/* + Starts the event scheduler + + SYNOPSIS + Event_scheduler::start() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_scheduler::start() +{ + bool ret= FALSE; + pthread_t th; + DBUG_ENTER("Event_scheduler::start"); + + LOCK_SCHEDULER_DATA(); + /* If already working or starting don't make another attempt */ + DBUG_ASSERT(state == INITIALIZED); + if (state > INITIALIZED) + { + DBUG_PRINT("info", ("scheduler is already running or starting")); + ret= TRUE; + goto end; + } + + /* + Now if another thread calls start it will bail-out because the branch + above will be executed. Thus no two or more child threads will be forked. + If the child thread cannot start for some reason then `state` is set + to CANTSTART and COND_started is also signaled. In this case we + set `state` back to INITIALIZED so another attempt to start the scheduler + can be made. + */ + state= COMMENCING; + /* Fork */ + if (pthread_create(&th, &connection_attrib, event_scheduler_thread, + (void*)this)) + { + DBUG_PRINT("error", ("cannot create a new thread")); + state= INITIALIZED; + ret= TRUE; + goto end; + } + + /* Wait till the child thread has booted (w/ or wo success) */ + while (!is_running_or_suspended() && state != CANTSTART) + cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + + /* + If we cannot start for some reason then don't prohibit further attempts. + Set back to INITIALIZED. + */ + if (state == CANTSTART) + { + state= INITIALIZED; + ret= TRUE; + goto end; + } + +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(ret); +} + + +/* + Starts the event scheduler in suspended mode. + + SYNOPSIS + Event_scheduler::start_suspended() + + RETURN VALUE + TRUE OK + FALSE Error +*/ + +bool +Event_scheduler::start_suspended() +{ + DBUG_ENTER("Event_scheduler::start_suspended"); + start_scheduler_suspended= TRUE; + DBUG_RETURN(start()); +} + + + +/* + Report back that we cannot start. Used for ocasions where + we can't go into ::run() and have to report externally. + + SYNOPSIS + Event_scheduler::report_error_during_start() +*/ + +inline void +Event_scheduler::report_error_during_start() +{ + DBUG_ENTER("Event_scheduler::report_error_during_start"); + + LOCK_SCHEDULER_DATA(); + state= CANTSTART; + DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + UNLOCK_SCHEDULER_DATA(); + + DBUG_VOID_RETURN; +} + + +/* + The internal loop of the event scheduler + + SYNOPSIS + Event_scheduler::run() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Failure +*/ + +bool +Event_scheduler::run(THD *thd) +{ + int ret; + struct timespec abstime; + DBUG_ENTER("Event_scheduler::run"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + LOCK_SCHEDULER_DATA(); + ret= load_events_from_db(thd); + + if (!ret) + { + thread_id= thd->thread_id; + state= start_scheduler_suspended? SUSPENDED:RUNNING; + start_scheduler_suspended= FALSE; + } + else + state= CANTSTART; + + DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + if (ret) + { + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(TRUE); + } + if (!check_n_suspend_if_needed(thd)) + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Manager thread started with id %lu", + thd->thread_id); + abstime.tv_nsec= 0; + while (is_running_or_suspended()) + { + TIME time_now_utc; + Event_timed *et; + my_bool tmp; + time_t now_utc; + + LOCK_SCHEDULER_DATA(); + if (check_n_wait_for_non_empty_queue(thd)) + continue; + + /* On TRUE data is unlocked, go back to the beginning */ + if (check_n_suspend_if_needed(thd)) + continue; + + /* Guaranteed locked here */ + if (state == IN_SHUTDOWN || shutdown_in_progress) + { + UNLOCK_SCHEDULER_DATA(); + break; + } + DBUG_ASSERT(state == RUNNING); + + et= (Event_timed *)queue_top(&queue); + + /* Skip disabled events */ + if (et->status != Event_timed::ENABLED) + { + sql_print_error("SCHEDULER: Found a disabled event %*s.%*s in the queue", + et->dbname.length, et->dbname.str, et->name.length, + et->name.str); + queue_remove(&queue, 0); + /* ToDo: check this again */ + delete et; + UNLOCK_SCHEDULER_DATA(); + continue; + } + thd->proc_info= (char *)"Computing"; + DBUG_PRINT("evex manager",("computing time to sleep till next exec")); + /* Timestamp is in UTC */ + abstime.tv_sec= sec_since_epoch_TIME(&et->execute_at); + + thd->end_time(); + if (abstime.tv_sec > thd->query_start()) + { + /* Event trigger time is in the future */ + thd->proc_info= (char *)"Sleep"; + DBUG_PRINT("info", ("Going to sleep. Should wakeup after approx %d secs", + abstime.tv_sec - thd->query_start())); + DBUG_PRINT("info", ("Entering condition because waiting for activation")); + /* + Use THD::enter_cond()/exit_cond() or we won't be able to kill a + sleeping thread. Though ::stop() can do it by sending COND_new_work + an user can't by just issuing 'KILL x'; . In the latter case + pthread_cond_timedwait() will wait till `abstime`. + "Sleeping until next time" + */ + thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping"); + + pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data, + &abstime); + + DBUG_PRINT("info", ("Manager woke up. state is %d", state)); + /* + If we get signal we should recalculate the whether it's the right time + because there could be : + 1. Spurious wake-up + 2. The top of the queue was changed (new one becase of add/drop/replace) + */ + /* This will do implicit UNLOCK_SCHEDULER_DATA() */ + thd->exit_cond(""); + } + else + { + thd->proc_info= (char *)"Executing"; + /* + Execute the event. An error may occur if a thread cannot be forked. + In this case stop the manager. + We should enter ::execute_top() with locked LOCK_scheduler_data. + */ + int ret= execute_top(thd); + UNLOCK_SCHEDULER_DATA(); + if (ret) + break; + } + } +end_loop: + thd->proc_info= (char *)"Cleaning"; + + LOCK_SCHEDULER_DATA(); + /* + It's possible that a user has used (SQL)COM_KILL. Hence set the appropriate + state because it is only set by ::stop(). + */ + if (state != IN_SHUTDOWN) + { + DBUG_PRINT("info", ("We got KILL but the but not from ::stop()")); + state= IN_SHUTDOWN; + } + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Shutting down"); + + thd->proc_info= (char *)"Cleaning queue"; + clean_queue(thd); + THD_CHECK_SENTRY(thd); + + /* free mamager_root memory but don't destroy the root */ + thd->proc_info= (char *)"Cleaning memory root"; + free_root(&scheduler_root, MYF(0)); + THD_CHECK_SENTRY(thd); + + /* + We notify the waiting thread which shutdowns us that we have cleaned. + There are few more instructions to be executed in this pthread but + they don't affect manager structures thus it's safe to signal already + at this point. + */ + LOCK_SCHEDULER_DATA(); + thd->proc_info= (char *)"Sending shutdown signal"; + DBUG_PRINT("info", ("Sending COND_started_or_stopped")); + if (state == IN_SHUTDOWN) + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + + state= INITIALIZED; + /* + We set it here because ::run() can stop not only because of ::stop() + call but also because of `KILL x` + */ + thread_id= 0; + sql_print_information("SCHEDULER: Stopped"); + UNLOCK_SCHEDULER_DATA(); + + /* We have modified, we set back */ + thd->query= NULL; + thd->query_length= 0; + + DBUG_RETURN(FALSE); +} + + +/* + Executes the top element of the queue. Auxiliary method for ::run(). + + SYNOPSIS + Event_scheduler::execute_top() + + RETURN VALUE + FALSE OK + TRUE Failure + + NOTE + NO locking is done. EXPECTED is that the caller should have locked + the queue (w/ LOCK_scheduler_data). +*/ + +bool +Event_scheduler::execute_top(THD *thd) +{ + int spawn_ret_code; + bool ret= FALSE; + DBUG_ENTER("Event_scheduler::execute_top"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + Event_timed *et= (Event_timed *)queue_top(&queue); + + /* Is it good idea to pass a stack address ?*/ + Worker_thread_param param(et); + + pthread_mutex_lock(¶m.LOCK_started); + /* + We don't lock LOCK_scheduler_data fpr workers_increment() because it's a + pre-requisite for calling the current_method. + */ + switch ((spawn_ret_code= et->spawn_now(event_worker_thread, ¶m))) { + case EVENT_EXEC_CANT_FORK: + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + sql_print_error("SCHEDULER: Problem while trying to create a thread"); + ret= TRUE; + break; + case EVENT_EXEC_ALREADY_EXEC: + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", + et->dbname.str, et->name.str); + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) + queue_remove(&queue, 0);// 0 is top, internally 1 + else + queue_replaced(&queue); + break; + default: + DBUG_ASSERT(!spawn_ret_code); + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) + queue_remove(&queue, 0);// 0 is top, internally 1 + else + queue_replaced(&queue); + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + if (likely(!spawn_ret_code)) + { + /* Wait the forked thread to start */ + do { + pthread_cond_wait(¶m.COND_started, ¶m.LOCK_started); + } while (!param.started); + } + /* + param was allocated on the stack so no explicit delete as well as + in this moment it's no more used in the spawned thread so it's safe + to be deleted. + */ + break; + } + pthread_mutex_unlock(¶m.LOCK_started); + /* `param` is on the stack and will be destructed by the compiler */ + + DBUG_RETURN(ret); +} + + +/* + Cleans the scheduler's queue. Auxiliary method for ::run(). + + SYNOPSIS + Event_scheduler::clean_queue() + thd Thread +*/ + +void +Event_scheduler::clean_queue(THD *thd) +{ + CHARSET_INFO *scs= system_charset_info; + uint i; + int ret; + DBUG_ENTER("Event_scheduler::clean_queue"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + LOCK_SCHEDULER_DATA(); + stop_all_running_events(thd); + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Emptying the queue"); + + /* empty the queue */ + for (i= 0; i < queue.elements; ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + et->free_sp(); + delete et; + } + resize_queue(&queue, 0); + + DBUG_VOID_RETURN; +} + + +/* + Stops all running events + + SYNOPSIS + Event_scheduler::stop_all_running_events() + thd Thread + + NOTE + LOCK_scheduler data must be acquired prior to call to this method +*/ + +void +Event_scheduler::stop_all_running_events(THD *thd) +{ + CHARSET_INFO *scs= system_charset_info; + uint i; + DYNAMIC_ARRAY running_threads; + THD *tmp; + DBUG_ENTER("Event_scheduler::stop_all_running_events"); + DBUG_PRINT("enter", ("workers_count=%d", workers_count())); + + my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); + + bool had_super= FALSE; + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + push_dynamic(&running_threads, (gptr) &tmp->thread_id); + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + /* We need temporarily SUPER_ACL to be able to kill our offsprings */ + if (!(thd->security_ctx->master_access & SUPER_ACL)) + thd->security_ctx->master_access|= SUPER_ACL; + else + had_super= TRUE; + + char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + + for (i= 0; i < running_threads.elements; ++i) + { + int ret; + ulong thd_id= *dynamic_element(&running_threads, i, ulong*); + + int_string.set((longlong) thd_id,scs); + tmp_string.append(int_string); + if (i < running_threads.elements - 1) + tmp_string.append(' '); + + if ((ret= kill_one_thread(thd, thd_id, FALSE))) + { + sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); + break; + } + } + if (running_threads.elements) + sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); + + if (!had_super) + thd->security_ctx->master_access &= ~SUPER_ACL; + + delete_dynamic(&running_threads); + + sql_print_information("SCHEDULER: Waiting for worker threads to finish"); + + while (workers_count()) + my_sleep(100000); + + DBUG_VOID_RETURN; +} + + +/* + Stops the event scheduler + + SYNOPSIS + Event_scheduler::stop() + + RETURN VALUE + OP_OK OK + OP_CANT_KILL Error during stopping of manager thread + OP_NOT_RUNNING Manager not working + + NOTE + The caller must have acquited LOCK_scheduler_data. +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::stop() +{ + int ret; + THD *thd= current_thd; + DBUG_ENTER("Event_scheduler::stop"); + DBUG_PRINT("enter", ("thd=%p", current_thd)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + /* + One situation to be here is if there was a start that forked a new + thread but the new thread did not acquire yet LOCK_scheduler_data. + Hence, in this case return an error. + */ + DBUG_PRINT("info", ("manager not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_NOT_RUNNING); + } + state= IN_SHUTDOWN; + + DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); + sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); + + /* + Sending the COND_new_work to ::run() is a way to get this working without + race conditions. If we use kill_one_thread() it will call THD::awake() and + because in ::run() both THD::enter_cond()/::exit_cond() are used, + THD::awake() will try to lock LOCK_scheduler_data. If we UNLOCK it before, + then the pthread_cond_signal(COND_started_or_stopped) could be signaled in + ::run() and we can miss the signal before we relock. A way is to use + another mutex for this shutdown procedure but better not. + */ + pthread_cond_signal(&cond_vars[COND_new_work]); + /* Or we are suspended - then we should wake up */ + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + + /* Guarantee we don't catch spurious signals */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); + while (state != INITIALIZED) + { + DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " + "thread. Current value of state is %d . " + "workers count=%d", state, workers_count())); + cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + } + DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(OP_OK); +} + + +/* + Suspends or resumes the scheduler. + SUSPEND - it won't execute any event till resumed. + RESUME - it will resume if suspended. + + SYNOPSIS + Event_scheduler::suspend_or_resume() + + RETURN VALUE + OP_OK OK +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::suspend_or_resume( + enum Event_scheduler::enum_suspend_or_resume action) +{ + enum enum_error_code ret; + DBUG_ENTER("Event_scheduler::suspend_or_resume"); + DBUG_PRINT("enter", ("action=%d", action)); + + LOCK_SCHEDULER_DATA(); + + if ((action == SUSPEND && state == SUSPENDED) || + (action == RESUME && state == RUNNING)) + { + DBUG_PRINT("info", ("Either trying to suspend suspended or resume " + "running scheduler. Doing nothing.")); + } + else + { + /* Wake the main thread up if he is asleep */ + DBUG_PRINT("info", ("Sending signal")); + if (action==SUSPEND) + { + state= SUSPENDED; + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else + { + state= RUNNING; + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + } + DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume")); + cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Got response")); + } + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); +} + + +/* + Returns the number of executing events. + + SYNOPSIS + Event_scheduler::workers_count() +*/ + +uint +Event_scheduler::workers_count() +{ + THD *tmp; + uint count= 0; + + DBUG_ENTER("Event_scheduler::workers_count"); + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + ++count; + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + DBUG_PRINT("exit", ("%d", count)); + DBUG_RETURN(count); +} + + +/* + Checks and suspends if needed + + SYNOPSIS + Event_scheduler::check_n_suspend_if_needed() + thd Thread + + RETURN VALUE + FALSE Not suspended, we haven't slept + TRUE We were suspended. LOCK_scheduler_data is unlocked. + + NOTE + The caller should have locked LOCK_scheduler_data! + The mutex will be unlocked in case this function returns TRUE +*/ + +bool +Event_scheduler::check_n_suspend_if_needed(THD *thd) +{ + bool was_suspended= FALSE; + DBUG_ENTER("Event_scheduler::check_n_suspend_if_needed"); + if (thd->killed && !shutdown_in_progress) + { + state= SUSPENDED; + thd->killed= THD::NOT_KILLED; + } + if (state == SUSPENDED) + { + thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data, + "Suspended"); + /* Send back signal to the thread that asked us to suspend operations */ + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + sql_print_information("SCHEDULER: Suspending operations"); + was_suspended= TRUE; + } + while (state == SUSPENDED) + { + cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume")); + if (state != SUSPENDED) + { + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + sql_print_information("SCHEDULER: Resuming operations"); + } + } + if (was_suspended) + { + if (queue.elements) + { + uint i; + DBUG_PRINT("info", ("We have to recompute the execution times")); + + for (i= 0; i < queue.elements; i++) + ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); + queue_fix(&queue); + } + /* This will implicitly unlock LOCK_scheduler_data */ + thd->exit_cond(""); + } + DBUG_RETURN(was_suspended); +} + + +/* + Checks for empty queue and waits till new element gets in + + SYNOPSIS + Event_scheduler::check_n_wait_for_non_empty_queue() + thd Thread + + RETURN VALUE + FALSE Did not wait - LOCK_scheduler_data still locked. + TRUE Waited - LOCK_scheduler_data unlocked. + + NOTE + The caller should have locked LOCK_scheduler_data! +*/ + +bool +Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) +{ + bool slept= FALSE; + DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); + DBUG_PRINT("enter", ("q.elements=%lu state=%s", + queue.elements, states_names[state])); + + if (!queue.elements) + thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data, + "Empty queue, sleeping"); + + /* Wait in a loop protecting against catching spurious signals */ + while (!queue.elements && state == RUNNING) + { + slept= TRUE; + DBUG_PRINT("info", ("Entering condition because of empty queue")); + cond_wait(COND_new_work, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d", + state)); + /* + exit_cond does implicit mutex_UNLOCK, we needed it locked if + 1. we loop again + 2. end the current loop and start doing calculations + */ + } + if (slept) + thd->exit_cond(""); + + DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", + queue.elements, states_names[state], thd->killed)); + + DBUG_RETURN(slept); +} + + +/* + Wrapper for pthread_mutex_lock + + SYNOPSIS + Event_scheduler::lock_data() + mutex Mutex to lock + line The line number on which the lock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +inline int +Event_scheduler::lock_data(const char *func, uint line) +{ + int ret; + DBUG_ENTER("Event_scheduler::lock_mutex"); + DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", + &LOCK_scheduler_data, func, line)); + ret= pthread_mutex_lock(&LOCK_scheduler_data); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_scheduler_data_locked= TRUE; + DBUG_RETURN(ret); +} + + +/* + Wrapper for pthread_mutex_unlock + + SYNOPSIS + Event_scheduler::unlock_data() + mutex Mutex to unlock + line The line number on which the unlock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +inline int +Event_scheduler::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); + DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", + &LOCK_scheduler_data, func, line)); + mutex_last_unlocked_at_line= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + DBUG_RETURN(pthread_mutex_unlock(&LOCK_scheduler_data)); +} + + +/* + Wrapper for pthread_cond_wait + + SYNOPSIS + Event_scheduler::cond_wait() + cond Conditional to wait for + mutex Mutex of the conditional + + RETURN VALUE + Error code of pthread_cond_wait() +*/ + +inline int +Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond, + pthread_mutex_t *mutex) +{ + int ret; + DBUG_ENTER("Event_scheduler::cond_wait"); + DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); + ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); + cond_waiting_on= COND_NONE; + DBUG_RETURN(ret); +} + + +/* + Checks whether the scheduler is in a running or suspended state. + + SYNOPSIS + Event_scheduler::is_running_or_suspended() + + RETURN VALUE + TRUE Either running or suspended + FALSE IN_SHUTDOWN, not started, etc. +*/ + +inline bool +Event_scheduler::is_running_or_suspended() +{ + return (state == SUSPENDED || state == RUNNING); +} + + +/* + Returns the current state of the scheduler + + SYNOPSIS + Event_scheduler::get_state() +*/ + +enum Event_scheduler::enum_state +Event_scheduler::get_state() +{ + enum Event_scheduler::enum_state ret; + DBUG_ENTER("Event_scheduler::get_state"); + /* lock_data & unlock_data are not static */ + pthread_mutex_lock(&singleton.LOCK_scheduler_data); + ret= singleton.state; + pthread_mutex_unlock(&singleton.LOCK_scheduler_data); + DBUG_RETURN(ret); +} + + +/* + Returns whether the scheduler was initialized. + + SYNOPSIS + Event_scheduler::initialized() + + RETURN VALUE + FALSE Was not initialized so far + TRUE Was initialized +*/ + +bool +Event_scheduler::initialized() +{ + DBUG_ENTER("Event_scheduler::initialized"); + DBUG_RETURN(Event_scheduler::get_state() != UNINITIALIZED); +} + + +/* + Returns the number of elements in the queue + + SYNOPSIS + Event_scheduler::events_count() + + RETURN VALUE + 0 Number of Event_timed objects in the queue +*/ + +uint +Event_scheduler::events_count() +{ + uint n; + DBUG_ENTER("Event_scheduler::events_count"); + LOCK_SCHEDULER_DATA(); + n= queue.elements; + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(n); +} + + +/* + Looks for a named event in mysql.event and then loads it from + the table, compiles and inserts it into the cache. + + SYNOPSIS + Event_scheduler::load_event() + thd THD + etn The name of the event to load and compile on scheduler's root + etn_new The loaded event + + RETURN VALUE + NULL Error during compile or the event is non-enabled. + otherwise Address +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::load_event(THD *thd, Event_timed *etn, Event_timed **etn_new) +{ + int ret= 0; + MEM_ROOT *tmp_mem_root; + Event_timed *et_loaded= NULL; + Open_tables_state backup; + + DBUG_ENTER("Event_scheduler::load_and_compile_event"); + DBUG_PRINT("enter",("thd=%p name:%*s",thd, etn->name.length, etn->name.str)); + + thd->reset_n_backup_open_tables_state(&backup); + /* No need to use my_error() here because db_find_event() has done it */ + { + sp_name spn(etn->dbname, etn->name); + ret= db_find_event(thd, &spn, &etn->definer, &et_loaded, NULL, + &scheduler_root); + } + thd->restore_backup_open_tables_state(&backup); + /* In this case no memory was allocated so we don't need to clean */ + if (ret) + DBUG_RETURN(OP_LOAD_ERROR); + + if (et_loaded->status != Event_timed::ENABLED) + { + /* + We don't load non-enabled events. + In db_find_event() `et_new` was allocated on the heap and not on + scheduler_root therefore we delete it here. + */ + delete et_loaded; + DBUG_RETURN(OP_DISABLED_EVENT); + } + + et_loaded->compute_next_execution_time(); + *etn_new= et_loaded; + + DBUG_RETURN(OP_OK); +} + + +/* + Loads all ENABLED events from mysql.event into the prioritized + queue. Called during scheduler main thread initialization. Compiles + the events. Creates Event_timed instances for every ENABLED event + from mysql.event. + + SYNOPSIS + Event_scheduler::load_events_from_db() + thd - Thread context. Used for memory allocation in some cases. + + RETURN VALUE + 0 OK + !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, + EVEX_COMPILE_ERROR) - in all these cases mysql.event was + tampered. + + NOTES + Reports the error to the console +*/ + +int +Event_scheduler::load_events_from_db(THD *thd) +{ + TABLE *table; + READ_RECORD read_record_info; + int ret= -1; + uint count= 0; + bool clean_the_queue= FALSE; + /* Compile the events on this root but only for syntax check, then discard */ + MEM_ROOT boot_root; + + DBUG_ENTER("Event_scheduler::load_events_from_db"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + if (state > COMMENCING) + { + DBUG_ASSERT(0); + sql_print_error("SCHEDULER: Trying to load events while already running."); + DBUG_RETURN(EVEX_GENERAL_ERROR); + } + + if ((ret= Events::open_event_table(thd, TL_READ, &table))) + { + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); + DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); + } + + init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info))) + { + Event_timed *et; + if (!(et= new Event_timed)) + { + DBUG_PRINT("info", ("Out of memory")); + clean_the_queue= TRUE; + break; + } + DBUG_PRINT("info", ("Loading event from row.")); + + if ((ret= et->load_from_row(&scheduler_root, table))) + { + clean_the_queue= TRUE; + sql_print_error("SCHEDULER: Error while loading from mysql.event. " + "Table probably corrupted"); + break; + } + if (et->status != Event_timed::ENABLED) + { + DBUG_PRINT("info",("%s is disabled",et->name.str)); + delete et; + continue; + } + + DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); + + /* We load only on scheduler root just to check whether the body compiles */ + switch (ret= et->compile(thd, &boot_root)) { + case EVEX_MICROSECOND_UNSUP: + et->free_sp(); + sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " + "supported but found in mysql.event"); + goto end; + case EVEX_COMPILE_ERROR: + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", + et->dbname.str, et->name.str); + goto end; + default: + /* Free it, it will be compiled again on the worker thread */ + et->free_sp(); + break; + } + + /* let's find when to be executed */ + if (et->compute_next_execution_time()) + { + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); + continue; + } + + DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); + queue_insert_safe(&queue, (byte *) et); + count++; + } +end: + end_read_record(&read_record_info); + free_root(&boot_root, MYF(0)); + + if (clean_the_queue) + { + for (count= 0; count < queue.elements; ++count) + queue_remove(&queue, 0); + ret= -1; + } + else + { + ret= 0; + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); + } + + /* Force close to free memory */ + thd->version--; + + close_thread_tables(thd); + + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); + DBUG_RETURN(ret); +} + + +/* + Opens mysql.db and mysql.user and checks whether: + 1. mysql.db has column Event_priv at column 20 (0 based); + 2. mysql.user has column Event_priv at column 29 (0 based); + + SYNOPSIS + Event_scheduler::check_system_tables() +*/ + +bool +Event_scheduler::check_system_tables(THD *thd) +{ + TABLE_LIST tables; + bool not_used; + Open_tables_state backup; + bool ret; + + DBUG_ENTER("Event_scheduler::check_system_tables"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + thd->reset_n_backup_open_tables_state(&backup); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "db"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, + mysql_db_table_fields, &mysql_db_table_last_check, + ER_CANNOT_LOAD_FROM_TABLE); + close_thread_tables(thd); + } + if (ret) + DBUG_RETURN(TRUE); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "user"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + if (tables.table->s->fields < 29 || + strncmp(tables.table->field[29]->field_name, + STRING_WITH_LEN("Event_priv"))) + { + sql_print_error("mysql.user has no `Event_priv` column at position 29"); + ret= TRUE; + } + close_thread_tables(thd); + } + + thd->restore_backup_open_tables_state(&backup); + + DBUG_RETURN(ret); +} + + +/* + Inits mutexes. + + SYNOPSIS + Event_scheduler::init_mutexes() +*/ + +void +Event_scheduler::init_mutexes() +{ + pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST); +} + + +/* + Destroys mutexes. + + SYNOPSIS + Event_scheduler::destroy_mutexes() +*/ + +void +Event_scheduler::destroy_mutexes() +{ + pthread_mutex_destroy(&singleton.LOCK_scheduler_data); +} + + +/* + Dumps some data about the internal status of the scheduler. + + SYNOPSIS + Event_scheduler::dump_internal_status() + thd THD + + RETURN VALUE + 0 OK + 1 Error +*/ + +int +Event_scheduler::dump_internal_status(THD *thd) +{ + DBUG_ENTER("dump_internal_status"); +#ifndef DBUG_OFF + CHARSET_INFO *scs= system_charset_info; + Protocol *protocol= thd->protocol; + List<Item> field_list; + int ret; + char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + int_string.length(0); + + field_list.push_back(new Item_empty_string("Name", 20)); + field_list.push_back(new Item_empty_string("Value",20)); + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) + DBUG_RETURN(1); + + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("state"), scs); + protocol->store(states_names[singleton.state].str, + states_names[singleton.state].length, + scs); + + ret= protocol->write(); + /* + If not initialized - don't show anything else. get_instance() + will otherwise implicitly initialize it. We don't want that. + */ + if (singleton.state >= INITIALIZED) + { + /* last locked at*/ + /* + The first thing to do, or get_instance() will overwrite the values. + mutex_last_locked_at_line / mutex_last_unlocked_at_line + */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("last locked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + singleton.mutex_last_locked_in_func, + singleton.mutex_last_locked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + + /* last unlocked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("last unlocked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + singleton.mutex_last_unlocked_in_func, + singleton.mutex_last_unlocked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("waiting on condition"), scs); + tmp_string.length(scs->cset-> + snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s", + (singleton.cond_waiting_on != COND_NONE) ? + cond_vars_names[singleton.cond_waiting_on]: + "NONE")); + protocol->store(&tmp_string); + ret= protocol->write(); + + Event_scheduler *scheduler= get_instance(); + + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("workers_count"), scs); + int_string.set((longlong) scheduler->workers_count(), scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* queue.elements */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue.elements"), scs); + int_string.set((longlong) scheduler->queue.elements, scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* scheduler_data_locked */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); + int_string.set((longlong) scheduler->mutex_scheduler_data_locked, scs); + protocol->store(&int_string); + ret= protocol->write(); + } + send_eof(thd); +#endif + DBUG_RETURN(0); +} |