diff options
-rw-r--r-- | sql/event_data_objects.cc | 76 | ||||
-rw-r--r-- | sql/event_data_objects.h | 25 | ||||
-rw-r--r-- | sql/event_queue.cc | 247 | ||||
-rw-r--r-- | sql/event_queue.h | 31 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 147 | ||||
-rw-r--r-- | sql/event_scheduler.h | 29 | ||||
-rw-r--r-- | sql/events.cc | 198 | ||||
-rw-r--r-- | sql/events.h | 16 | ||||
-rw-r--r-- | sql/sql_parse.cc | 3 |
9 files changed, 404 insertions, 368 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc index 54b043bd916..198f6518184 100644 --- a/sql/event_data_objects.cc +++ b/sql/event_data_objects.cc @@ -20,6 +20,8 @@ #include "event_db_repository.h" #include "sp_head.h" +/* That's a provisional solution */ +extern Event_db_repository events_event_db_repository; #define EVEX_MAX_INTERVAL_VALUE 1000000000L @@ -30,6 +32,47 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, static void event_restore_security_context(THD *thd, Security_context *backup); + +/* + Initiliazes dbname and name of an Event_queue_element_for_exec + object + + SYNOPSIS + Event_queue_element_for_exec::init() + + RETURN VALUE + FALSE OK + TRUE Error (OOM) +*/ + +bool +Event_queue_element_for_exec::init(LEX_STRING db, LEX_STRING n) +{ + if (!(dbname.str= my_strndup(db.str, dbname.length= db.length, MYF(MY_WME)))) + return TRUE; + if (!(name.str= my_strndup(n.str, name.length= n.length, MYF(MY_WME)))) + { + my_free((gptr) dbname.str, MYF(0)); + return TRUE; + } + return FALSE; +} + + +/* + Destructor + + SYNOPSIS + Event_queue_element_for_exec::~Event_queue_element_for_exec() +*/ + +Event_queue_element_for_exec::~Event_queue_element_for_exec() +{ + my_free((gptr) dbname.str, MYF(0)); + my_free((gptr) name.str, MYF(0)); +} + + /* Returns a new instance @@ -743,7 +786,7 @@ Event_timed::~Event_timed() */ Event_job_data::Event_job_data() - :thd(NULL), sphead(NULL), sql_mode(0) + :sphead(NULL), sql_mode(0) { } @@ -1239,6 +1282,7 @@ Event_queue_element::compute_next_execution_time() DBUG_PRINT("info", ("Dropped: %d", dropped)); status= Event_queue_element::DISABLED; status_changed= TRUE; + dropped= TRUE; goto ret; } @@ -1447,32 +1491,6 @@ Event_queue_element::mark_last_executed(THD *thd) /* - Drops the event - - SYNOPSIS - Event_queue_element::drop() - thd thread context - - RETURN VALUE - 0 OK - -1 Cannot open mysql.event - -2 Cannot find the event in mysql.event (already deleted?) - - others return code from SE in case deletion of the event row - failed. -*/ - -int -Event_queue_element::drop(THD *thd) -{ - DBUG_ENTER("Event_queue_element::drop"); - - DBUG_RETURN(Events::get_instance()-> - drop_event(thd, dbname, name, FALSE, TRUE)); -} - - -/* Saves status and last_executed_at to the disk if changed. SYNOPSIS @@ -1503,13 +1521,13 @@ Event_queue_element::update_timing_fields(THD *thd) thd->reset_n_backup_open_tables_state(&backup); - if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table)) + if (events_event_db_repository.open_event_table(thd, TL_WRITE, &table)) { ret= TRUE; goto done; } fields= table->field; - if ((ret= Events::get_instance()->db_repository-> + if ((ret= events_event_db_repository. find_named_event(thd, dbname, name, table))) goto done; diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h index e00b0b94eaf..4346b0eb5b8 100644 --- a/sql/event_data_objects.h +++ b/sql/event_data_objects.h @@ -27,6 +27,27 @@ class sp_head; class Sql_alloc; +class Event_queue_element_for_exec +{ +public: + Event_queue_element_for_exec(){}; + ~Event_queue_element_for_exec(); + + bool + init(LEX_STRING dbname, LEX_STRING name); + + LEX_STRING dbname; + LEX_STRING name; + bool dropped; + THD *thd; + +private: + /* Prevent use of these */ + Event_queue_element_for_exec(const Event_queue_element_for_exec &); + void operator=(Event_queue_element_for_exec &); +}; + + class Event_basic { protected: @@ -96,9 +117,6 @@ public: bool compute_next_execution_time(); - int - drop(THD *thd); - void mark_last_executed(THD *thd); @@ -160,7 +178,6 @@ public: class Event_job_data : public Event_basic { public: - THD *thd; sp_head *sphead; LEX_STRING body; diff --git a/sql/event_queue.cc b/sql/event_queue.cc index 45d354ea9b6..c4e6a518974 100644 --- a/sql/event_queue.cc +++ b/sql/event_queue.cc @@ -16,7 +16,6 @@ #include "mysql_priv.h" #include "event_queue.h" #include "event_data_objects.h" -#include "event_db_repository.h" #define EVENT_QUEUE_INITIAL_SIZE 30 @@ -136,16 +135,14 @@ Event_queue::deinit_mutexes() */ bool -Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) +Event_queue::init_queue(THD *thd) { - bool res; struct event_queue_param *event_queue_param_value= NULL; DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this: 0x%lx", (long) this)); LOCK_QUEUE_DATA(); - db_repository= db_repo; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, @@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) goto err; } - res= load_events_from_db(thd); UNLOCK_QUEUE_DATA(); - if (res) - deinit_queue(); - - DBUG_RETURN(res); + DBUG_RETURN(FALSE); err: UNLOCK_QUEUE_DATA(); @@ -204,37 +197,29 @@ Event_queue::deinit_queue() Event_queue::create_event() dbname The schema of the new event name The name of the new event - - RETURN VALUE - OP_OK OK or scheduler not working - OP_LOAD_ERROR Error during loading from disk */ -int -Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) +void +Event_queue::create_event(THD *thd, Event_queue_element *new_element) { - int res; - Event_queue_element *new_element; DBUG_ENTER("Event_queue::create_event"); - DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str)); + DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, + new_element->dbname.str, new_element->name.str)); - new_element= new Event_queue_element(); - res= db_repository->load_named_event(thd, dbname, name, new_element); - if (res || new_element->status == Event_queue_element::DISABLED) + if (new_element->status == Event_queue_element::DISABLED) delete new_element; else { new_element->compute_next_execution_time(); + DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); LOCK_QUEUE_DATA(); - DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); } - - DBUG_RETURN(res); + DBUG_VOID_RETURN; } @@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) name Name of the event new_schema New schema, in case of RENAME TO, otherwise NULL new_name New name, in case of RENAME TO, otherwise NULL - - RETURN VALUE - OP_OK OK or scheduler not working - OP_LOAD_ERROR Error during loading from disk */ -int +void Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, - LEX_STRING *new_schema, LEX_STRING *new_name) + Event_queue_element *new_element) { - int res; - Event_queue_element *new_element; - DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str)); - new_element= new Event_queue_element(); - - res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname, - new_name ? *new_name:name, new_element); - if (res) - { - delete new_element; - goto end; - } - else if (new_element->status == Event_queue_element::DISABLED) + if (new_element->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("The event is disabled.")); /* @@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); -end: - DBUG_PRINT("info", ("res=%d", res)); - DBUG_RETURN(res); + DBUG_VOID_RETURN; } @@ -454,133 +421,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) /* - Loads all ENABLED events from mysql.event into the prioritized - queue. Called during scheduler main thread initialization. Compiles - the events. Creates Event_queue_element instances for every ENABLED event - from mysql.event. - - SYNOPSIS - Event_queue::load_events_from_db() - thd - Thread context. Used for memory allocation in some cases. - - RETURN VALUE - 0 OK - !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, - EVEX_COMPILE_ERROR) - in all these cases mysql.event was - tampered. - - NOTES - Reports the error to the console -*/ - -int -Event_queue::load_events_from_db(THD *thd) -{ - TABLE *table; - READ_RECORD read_record_info; - int ret= -1; - uint count= 0; - bool clean_the_queue= TRUE; - - DBUG_ENTER("Event_queue::load_events_from_db"); - DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd)); - - if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) - { - sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open"); - DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); - } - - init_read_record(&read_record_info, thd, table ,NULL,1,0); - while (!(read_record_info.read_record(&read_record_info))) - { - Event_queue_element *et; - if (!(et= new Event_queue_element)) - { - DBUG_PRINT("info", ("Out of memory")); - break; - } - DBUG_PRINT("info", ("Loading event from row.")); - - if ((ret= et->load_from_row(table))) - { - sql_print_error("SCHEDULER: Error while loading from mysql.event. " - "Table probably corrupted"); - break; - } - if (et->status != Event_queue_element::ENABLED) - { - DBUG_PRINT("info",("%s is disabled",et->name.str)); - delete et; - continue; - } - - /* let's find when to be executed */ - if (et->compute_next_execution_time()) - { - sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." - " Skipping", et->dbname.str, et->name.str); - continue; - } - - { - Event_job_data temp_job_data; - DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); - - temp_job_data.load_from_row(table); - - /* - We load only on scheduler root just to check whether the body - compiles. - */ - switch (ret= temp_job_data.compile(thd, thd->mem_root)) { - case EVEX_MICROSECOND_UNSUP: - sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " - "supported but found in mysql.event"); - break; - case EVEX_COMPILE_ERROR: - sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load", - et->dbname.str, et->name.str); - break; - default: - break; - } - thd->end_statement(); - thd->cleanup_after_query(); - } - if (ret) - { - delete et; - goto end; - } - - queue_insert_safe(&queue, (byte *) et); - count++; - } - clean_the_queue= FALSE; -end: - end_read_record(&read_record_info); - - if (clean_the_queue) - { - empty_queue(); - ret= -1; - } - else - { - ret= 0; - sql_print_information("SCHEDULER: Loaded %d event%s", count, - (count == 1)?"":"s"); - } - - close_thread_tables(thd); - - DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); - DBUG_RETURN(ret); -} - - -/* Recalculates activation times in the queue. There is one reason for that. Because the values (execute_at) by which the queue is ordered are changed by calls to compute_next_execution_time() on a request from the @@ -629,7 +469,7 @@ Event_queue::empty_queue() { uint i; DBUG_ENTER("Event_queue::empty_queue"); - DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); + DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements)); sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements); /* empty the queue */ for (i= 0; i < queue.elements; ++i) @@ -690,31 +530,27 @@ static const char *queue_wait_msg= "Waiting for next activation"; SYNOPSIS Event_queue::get_top_for_execution_if_time() - thd [in] Thread - job_data [out] The object to execute + thd [in] Thread + event_name [out] The object to execute RETURN VALUE - FALSE No error. If *job_data==NULL then top not elligible for execution. - Could be that there is no top. - TRUE Error - + FALSE No error. event_name != NULL + TRUE Serious error */ bool -Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) +Event_queue::get_top_for_execution_if_time(THD *thd, + Event_queue_element_for_exec **event_name) { bool ret= FALSE; struct timespec top_time; - Event_queue_element *top= NULL; - bool to_free= FALSE; - bool to_drop= FALSE; - *job_data= NULL; + *event_name= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); LOCK_QUEUE_DATA(); for (;;) { - int res; + Event_queue_element *top= NULL; /* Break loop if thd has been killed */ if (thd->killed) @@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) continue; } - DBUG_PRINT("info", ("Ready for execution")); - if (!(*job_data= new Event_job_data())) - { - ret= TRUE; - break; - } - if ((res= db_repository->load_named_event(thd, top->dbname, top->name, - *job_data))) + if (!(*event_name= new Event_queue_element_for_exec()) || + (*event_name)->init(top->dbname, top->name)) { - DBUG_PRINT("error", ("Got %d from load_named_event", res)); - delete *job_data; - *job_data= NULL; ret= TRUE; break; } + DBUG_PRINT("info", ("Ready for execution")); top->mark_last_executed(thd); if (top->compute_next_execution_time()) top->status= Event_queue_element::DISABLED; DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status)); - (*job_data)->execution_count= top->execution_count; + top->execution_count++; + (*event_name)->dropped= top->dropped; top->update_timing_fields(thd); - if (((top->execute_at.year && !top->expression) || top->execute_at_null) || - (top->status == Event_queue_element::DISABLED)) + if (top->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("removing from the queue")); sql_print_information("SCHEDULER: Last execution of %s.%s. %s", top->dbname.str, top->name.str, top->dropped? "Dropping.":""); - to_free= TRUE; - to_drop= top->dropped; + delete top; queue_remove(&queue, 0); } else @@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) } end: UNLOCK_QUEUE_DATA(); - if (to_drop) - { - DBUG_PRINT("info", ("Dropping from disk")); - top->drop(thd); - } - if (to_free) - delete top; - DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data)); + DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", + ret, (long) *event_name)); - if (*job_data) - DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str, - (*job_data)->name.str, (*job_data)->definer.str)); + if (*event_name) + DBUG_PRINT("info", ("db: %s name: %s", + (*event_name)->dbname.str, (*event_name)->name.str)); DBUG_RETURN(ret); } diff --git a/sql/event_queue.h b/sql/event_queue.h index 9f48da4914f..a1237e1b52c 100644 --- a/sql/event_queue.h +++ b/sql/event_queue.h @@ -16,12 +16,10 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ class Event_basic; -class Event_db_repository; -class Event_job_data; class Event_queue_element; +class Event_queue_element_for_exec; class THD; -class Event_scheduler; class Event_queue { @@ -35,19 +33,19 @@ public: deinit_mutexes(); bool - init_queue(THD *thd, Event_db_repository *db_repo); + init_queue(THD *thd); void deinit_queue(); /* Methods for queue management follow */ - int - create_event(THD *thd, LEX_STRING dbname, LEX_STRING name); + void + create_event(THD *thd, Event_queue_element *new_element); - int + void update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, - LEX_STRING *new_schema, LEX_STRING *new_name); + Event_queue_element *new_element); void drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name); @@ -59,14 +57,15 @@ public: recalculate_activation_times(THD *thd); bool - get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); + get_top_for_execution_if_time(THD *thd, + Event_queue_element_for_exec **event_name); + void dump_internal_status(); - int - load_events_from_db(THD *thd); - + void + empty_queue(); protected: void find_n_remove_event(LEX_STRING db, LEX_STRING name); @@ -76,8 +75,6 @@ protected: drop_matching_events(THD *thd, LEX_STRING pattern, bool (*)(LEX_STRING, Event_basic *)); - void - empty_queue(); void dbug_dump_queue(time_t now); @@ -86,11 +83,7 @@ protected: pthread_mutex_t LOCK_event_queue; pthread_cond_t COND_queue_state; - Event_db_repository *db_repository; - - Event_scheduler *scheduler; - - /* The sorted queue with the Event_job_data objects */ + /* The sorted queue with the Event_queue_element objects */ QUEUE queue; TIME next_activation_at; diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index a47576cf0c0..1013f5af3a8 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -18,6 +18,7 @@ #include "event_data_objects.h" #include "event_scheduler.h" #include "event_queue.h" +#include "event_db_repository.h" #ifdef __GNUC__ #if __GNUC__ >= 2 @@ -34,6 +35,11 @@ extern pthread_attr_t connection_attrib; + +Event_db_repository *Event_worker_thread::db_repository; +Events *Event_worker_thread::events_facade; + + static const LEX_STRING scheduler_states_names[] = { @@ -60,8 +66,8 @@ struct scheduler_param { et The event itself */ -static void -evex_print_warnings(THD *thd, Event_job_data *et) +void +Event_worker_thread::print_warnings(THD *thd, Event_job_data *et) { MYSQL_ERROR *err; DBUG_ENTER("evex_print_warnings"); @@ -253,49 +259,97 @@ event_worker_thread(void *arg) { /* needs to be first for thread_stack */ THD *thd; - Event_job_data *event= (Event_job_data *)arg; - int ret; + Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg; thd= event->thd; - thd->thread_stack= (char *) &thd; // remember where our stack is - DBUG_ENTER("event_worker_thread"); - if (!post_init_event_thread(thd)) + Event_worker_thread worker_thread; + worker_thread.run(thd, (Event_queue_element_for_exec *)arg); + + deinit_event_thread(thd); + + return 0; // Can't return anything here +} + + +/* + Function that executes an event in a child thread. Setups the + environment for the event execution and cleans after that. + + SYNOPSIS + Event_worker_thread::run() + thd Thread context + event The Event_queue_element_for_exec object to be processed +*/ + +void +Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event) +{ + int ret; + Event_job_data *job_data= NULL; + DBUG_ENTER("Event_worker_thread::run"); + DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." + "THD=0x%lx", time(NULL), thd)); + + if (post_init_event_thread(thd)) + goto end; + + if (!(job_data= new Event_job_data())) + goto end; + else if ((ret= db_repository-> + load_named_event(thd, event->dbname, event->name, job_data))) { - DBUG_PRINT("info", ("Baikonur, time is %ld, BURAN reporting and operational." - "THD: 0x%lx", - (long) time(NULL), (long) thd)); - - sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. " - "Execution %u", - event->dbname.str, event->name.str, - event->definer.str, thd->thread_id, - event->execution_count); - - thd->enable_slow_log= TRUE; - - ret= event->execute(thd); - - evex_print_warnings(thd, event); - - sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. " - "RetCode=%d", event->dbname.str, event->name.str, - event->definer.str, thd->thread_id, ret); - if (ret == EVEX_COMPILE_ERROR) - sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", - event->dbname.str, event->name.str, - event->definer.str); - else if (ret == EVEX_MICROSECOND_UNSUP) - sql_print_information("SCHEDULER: MICROSECOND is not supported"); + DBUG_PRINT("error", ("Got %d from load_named_event", ret)); + goto end; + } + + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ", + job_data->dbname.str, job_data->name.str, + job_data->definer.str, thd->thread_id); + + thd->enable_slow_log= TRUE; + + ret= job_data->execute(thd); + + print_warnings(thd, job_data); + + sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. " + "RetCode=%d", job_data->dbname.str, job_data->name.str, + job_data->definer.str, thd->thread_id, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + job_data->dbname.str, job_data->name.str, + job_data->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) +end: + delete job_data; + + if (event->dropped) + { + sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str, + event->name.str); + /* + Using db_repository can lead to a race condition because we access + the table without holding LOCK_metadata. + Scenario: + 1. CREATE EVENT xyz AT ... (conn thread) + 2. execute xyz (worker) + 3. CREATE EVENT XYZ EVERY ... (conn thread) + 4. drop xyz (worker) + 5. XYZ was just created on disk but `drop xyz` of the worker dropped it. + A consequent load to create Event_queue_element will fail. + + If all operations are performed under LOCK_metadata there is no such + problem. However, this comes at the price of introduction bi-directional + association between class Events and class Event_worker_thread. + */ + events_facade->drop_event(thd, event->dbname, event->name, FALSE); } DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, event->name.str)); - delete event; - deinit_event_thread(thd); - - DBUG_RETURN(0); // Can't return anything here + delete event; } @@ -441,7 +495,6 @@ bool Event_scheduler::run(THD *thd) { int res= FALSE; - Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); sql_print_information("SCHEDULER: Manager thread started with id %lu", @@ -454,18 +507,20 @@ Event_scheduler::run(THD *thd) while (is_running()) { + Event_queue_element_for_exec *event_name; + /* Gets a minimized version */ - if (queue->get_top_for_execution_if_time(thd, &job_data)) + if (queue->get_top_for_execution_if_time(thd, &event_name)) { sql_print_information("SCHEDULER: Serious error during getting next " "event to execute. Stopping"); break; } - DBUG_PRINT("info", ("get_top returned job_data: 0x%lx", (long) job_data)); - if (job_data) + DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name)); + if (event_name) { - if ((res= execute_top(thd, job_data))) + if ((res= execute_top(thd, event_name))) break; } else @@ -499,7 +554,7 @@ Event_scheduler::run(THD *thd) */ bool -Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) +Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name) { THD *new_thd; pthread_t th; @@ -510,13 +565,13 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) pre_init_event_thread(new_thd); new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; - job_data->thd= new_thd; + event_name->thd= new_thd; DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition", - job_data->dbname.str, job_data->name.str)); + event_name->dbname.str, event_name->name.str)); /* Major failure */ if ((res= pthread_create(&th, &connection_attrib, event_worker_thread, - job_data))) + event_name))) goto error; ++started_events; @@ -537,7 +592,7 @@ error: delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } - delete job_data; + delete event_name; DBUG_RETURN(TRUE); } diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h index 18625ef35f3..2ab21464057 100644 --- a/sql/event_scheduler.h +++ b/sql/event_scheduler.h @@ -15,8 +15,11 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + class Event_queue; class Event_job_data; +class Event_db_repository; +class Events; void pre_init_event_thread(THD* thd); @@ -27,6 +30,29 @@ post_init_event_thread(THD* thd); void deinit_event_thread(THD *thd); + +class Event_worker_thread +{ +public: + static void + init(Events *events, Event_db_repository *db_repo) + { + db_repository= db_repo; + events_facade= events; + } + + void + run(THD *thd, Event_queue_element_for_exec *event); + +private: + void + print_warnings(THD *thd, Event_job_data *et); + + static Event_db_repository *db_repository; + static Events *events_facade; +}; + + class Event_scheduler { public: @@ -71,10 +97,9 @@ private: uint workers_count(); - /* helper functions */ bool - execute_top(THD *thd, Event_job_data *job_data); + execute_top(THD *thd, Event_queue_element_for_exec *event_name); /* helper functions for working with mutexes & conditionals */ void diff --git a/sql/events.cc b/sql/events.cc index e6224915d6b..425e288dfb7 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -97,7 +97,7 @@ Event_queue events_event_queue; static Event_scheduler events_event_scheduler; -static + Event_db_repository events_event_db_repository; Events Events::singleton; @@ -296,29 +296,6 @@ Events::Events() /* - Opens mysql.event table with specified lock - - SYNOPSIS - Events::open_event_table() - thd Thread context - lock_type How to lock the table - table We will store the open table here - - RETURN VALUE - 1 Cannot lock table - 2 The table is corrupted - different number of fields - 0 OK -*/ - -int -Events::open_event_table(THD *thd, enum thr_lock_type lock_type, - TABLE **table) -{ - return db_repository->open_event_table(thd, lock_type, table); -} - - -/* The function exported to the world for creating of events. SYNOPSIS @@ -351,16 +328,24 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists) /* On error conditions my_error() is called so no need to handle here */ if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists))) { - if ((ret= event_queue->create_event(thd, parse_data->dbname, - parse_data->name))) + Event_queue_element *new_element; + + if (!(new_element= new Event_queue_element())) + ret= TRUE; // OOM + else if ((ret= db_repository->load_named_event(thd, parse_data->dbname, + parse_data->name, + new_element))) { DBUG_ASSERT(ret == OP_LOAD_ERROR); - my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0)); + delete new_element; } + else + event_queue->create_event(thd, new_element); } pthread_mutex_unlock(&LOCK_event_metadata); DBUG_RETURN(ret); + } @@ -387,6 +372,7 @@ bool Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to) { int ret; + Event_queue_element *new_element; DBUG_ENTER("Events::update_event"); LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL; LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL; @@ -400,12 +386,20 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to) /* On error conditions my_error() is called so no need to handle here */ if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name))) { - if ((ret= event_queue->update_event(thd, parse_data->dbname, - parse_data->name, new_dbname, new_name))) + LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname; + LEX_STRING name= new_name ? *new_name : parse_data->name; + + if (!(new_element= new Event_queue_element())) + ret= TRUE; // OOM + else if ((ret= db_repository->load_named_event(thd, dbname, name, + new_element))) { DBUG_ASSERT(ret == OP_LOAD_ERROR); - my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0)); + delete new_element; } + else + event_queue->update_event(thd, parse_data->dbname, parse_data->name, + new_element); } pthread_mutex_unlock(&LOCK_event_metadata); @@ -423,10 +417,6 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to) name [in] Event's name if_exists [in] When set and the event does not exist => warning onto the stack - only_from_disk [in] Whether to remove the event from the queue too. - In case of Event_job_data::drop() it's needed to - do only disk drop because Event_queue will handle - removal from memory queue. RETURN VALUE FALSE OK @@ -434,8 +424,7 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to) */ bool -Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists, - bool only_from_disk) +Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists) { int ret; DBUG_ENTER("Events::drop_event"); @@ -448,10 +437,7 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists, pthread_mutex_lock(&LOCK_event_metadata); /* On error conditions my_error() is called so no need to handle here */ if (!(ret= db_repository->drop_event(thd, dbname, name, if_exists))) - { - if (!only_from_disk) - event_queue->drop_event(thd, dbname, name); - } + event_queue->drop_event(thd, dbname, name); pthread_mutex_unlock(&LOCK_event_metadata); DBUG_RETURN(ret); } @@ -655,11 +641,12 @@ Events::init() } check_system_tables_error= FALSE; - if (event_queue->init_queue(thd, db_repository)) + if (event_queue->init_queue(thd) || load_events_from_db(thd)) { sql_print_error("SCHEDULER: Error while loading from disk."); goto end; } + scheduler->init_scheduler(event_queue); DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON || @@ -667,6 +654,7 @@ Events::init() if (opt_event_scheduler == Events::EVENTS_ON) res= scheduler->start(); + Event_worker_thread::init(this, db_repository); end: delete thd; /* Remember that we don't have a THD */ @@ -903,3 +891,131 @@ Events::check_system_tables(THD *thd) DBUG_RETURN(ret); } + + +/* + Loads all ENABLED events from mysql.event into the prioritized + queue. Called during scheduler main thread initialization. Compiles + the events. Creates Event_queue_element instances for every ENABLED event + from mysql.event. + + SYNOPSIS + Events::load_events_from_db() + thd Thread context. Used for memory allocation in some cases. + + RETURN VALUE + 0 OK + !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, + EVEX_COMPILE_ERROR) - in all these cases mysql.event was + tampered. + + NOTES + Reports the error to the console +*/ + +int +Events::load_events_from_db(THD *thd) +{ + TABLE *table; + READ_RECORD read_record_info; + int ret= -1; + uint count= 0; + bool clean_the_queue= TRUE; + + DBUG_ENTER("Events::load_events_from_db"); + DBUG_PRINT("enter", ("thd=0x%lx", thd)); + + if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) + { + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open"); + DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); + } + + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info))) + { + Event_queue_element *et; + if (!(et= new Event_queue_element)) + { + DBUG_PRINT("info", ("Out of memory")); + break; + } + DBUG_PRINT("info", ("Loading event from row.")); + + if ((ret= et->load_from_row(table))) + { + sql_print_error("SCHEDULER: Error while loading from mysql.event. " + "Table probably corrupted"); + break; + } + if (et->status != Event_queue_element::ENABLED) + { + DBUG_PRINT("info",("%s is disabled",et->name.str)); + delete et; + continue; + } + + /* let's find when to be executed */ + if (et->compute_next_execution_time()) + { + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); + continue; + } + + { + Event_job_data temp_job_data; + DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); + + temp_job_data.load_from_row(table); + + /* + We load only on scheduler root just to check whether the body + compiles. + */ + switch (ret= temp_job_data.compile(thd, thd->mem_root)) { + case EVEX_MICROSECOND_UNSUP: + sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " + "supported but found in mysql.event"); + break; + case EVEX_COMPILE_ERROR: + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load", + et->dbname.str, et->name.str); + break; + default: + break; + } + thd->end_statement(); + thd->cleanup_after_query(); + } + if (ret) + { + delete et; + goto end; + } + + DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list.")); + event_queue->create_event(thd, et); + count++; + } + clean_the_queue= FALSE; +end: + end_read_record(&read_record_info); + + if (clean_the_queue) + { + event_queue->empty_queue(); + ret= -1; + } + else + { + ret= 0; + sql_print_information("SCHEDULER: Loaded %d event%s", count, + (count == 1)?"":"s"); + } + + close_thread_tables(thd); + + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); + DBUG_RETURN(ret); +} diff --git a/sql/events.h b/sql/events.h index 621ab0ffca5..35ee3c569d0 100644 --- a/sql/events.h +++ b/sql/events.h @@ -42,13 +42,6 @@ sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); class Events { public: - /* - Quite NOT the best practice and will be removed once - Event_timed::drop() and Event_timed is fixed not do drop directly - or other scheme will be found. - */ - friend class Event_queue_element; - /* The order should match the order in opt_typelib */ enum enum_opt_event_scheduler { @@ -92,15 +85,11 @@ public: update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to); bool - drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists, - bool only_from_disk); + drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists); void drop_schema_events(THD *thd, char *db); - int - open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); - bool show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name); @@ -119,6 +108,9 @@ private: bool check_system_tables(THD *thd); + int + load_events_from_db(THD *thd); + /* Singleton DP is used */ Events(); ~Events(){} diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 1b7cf2a342d..9307836731a 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4047,8 +4047,7 @@ end_with_restore_list: if (!(res= Events::get_instance()->drop_event(thd, lex->spname->m_db, lex->spname->m_name, - lex->drop_if_exists, - FALSE))) + lex->drop_if_exists))) send_ok(thd); } break; |