summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/event_data_objects.cc76
-rw-r--r--sql/event_data_objects.h25
-rw-r--r--sql/event_queue.cc247
-rw-r--r--sql/event_queue.h31
-rw-r--r--sql/event_scheduler.cc147
-rw-r--r--sql/event_scheduler.h29
-rw-r--r--sql/events.cc198
-rw-r--r--sql/events.h16
-rw-r--r--sql/sql_parse.cc3
9 files changed, 404 insertions, 368 deletions
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index 54b043bd916..198f6518184 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -20,6 +20,8 @@
#include "event_db_repository.h"
#include "sp_head.h"
+/* That's a provisional solution */
+extern Event_db_repository events_event_db_repository;
#define EVEX_MAX_INTERVAL_VALUE 1000000000L
@@ -30,6 +32,47 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
static void
event_restore_security_context(THD *thd, Security_context *backup);
+
+/*
+ Initiliazes dbname and name of an Event_queue_element_for_exec
+ object
+
+ SYNOPSIS
+ Event_queue_element_for_exec::init()
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (OOM)
+*/
+
+bool
+Event_queue_element_for_exec::init(LEX_STRING db, LEX_STRING n)
+{
+ if (!(dbname.str= my_strndup(db.str, dbname.length= db.length, MYF(MY_WME))))
+ return TRUE;
+ if (!(name.str= my_strndup(n.str, name.length= n.length, MYF(MY_WME))))
+ {
+ my_free((gptr) dbname.str, MYF(0));
+ return TRUE;
+ }
+ return FALSE;
+}
+
+
+/*
+ Destructor
+
+ SYNOPSIS
+ Event_queue_element_for_exec::~Event_queue_element_for_exec()
+*/
+
+Event_queue_element_for_exec::~Event_queue_element_for_exec()
+{
+ my_free((gptr) dbname.str, MYF(0));
+ my_free((gptr) name.str, MYF(0));
+}
+
+
/*
Returns a new instance
@@ -743,7 +786,7 @@ Event_timed::~Event_timed()
*/
Event_job_data::Event_job_data()
- :thd(NULL), sphead(NULL), sql_mode(0)
+ :sphead(NULL), sql_mode(0)
{
}
@@ -1239,6 +1282,7 @@ Event_queue_element::compute_next_execution_time()
DBUG_PRINT("info", ("Dropped: %d", dropped));
status= Event_queue_element::DISABLED;
status_changed= TRUE;
+ dropped= TRUE;
goto ret;
}
@@ -1447,32 +1491,6 @@ Event_queue_element::mark_last_executed(THD *thd)
/*
- Drops the event
-
- SYNOPSIS
- Event_queue_element::drop()
- thd thread context
-
- RETURN VALUE
- 0 OK
- -1 Cannot open mysql.event
- -2 Cannot find the event in mysql.event (already deleted?)
-
- others return code from SE in case deletion of the event row
- failed.
-*/
-
-int
-Event_queue_element::drop(THD *thd)
-{
- DBUG_ENTER("Event_queue_element::drop");
-
- DBUG_RETURN(Events::get_instance()->
- drop_event(thd, dbname, name, FALSE, TRUE));
-}
-
-
-/*
Saves status and last_executed_at to the disk if changed.
SYNOPSIS
@@ -1503,13 +1521,13 @@ Event_queue_element::update_timing_fields(THD *thd)
thd->reset_n_backup_open_tables_state(&backup);
- if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table))
+ if (events_event_db_repository.open_event_table(thd, TL_WRITE, &table))
{
ret= TRUE;
goto done;
}
fields= table->field;
- if ((ret= Events::get_instance()->db_repository->
+ if ((ret= events_event_db_repository.
find_named_event(thd, dbname, name, table)))
goto done;
diff --git a/sql/event_data_objects.h b/sql/event_data_objects.h
index e00b0b94eaf..4346b0eb5b8 100644
--- a/sql/event_data_objects.h
+++ b/sql/event_data_objects.h
@@ -27,6 +27,27 @@ class sp_head;
class Sql_alloc;
+class Event_queue_element_for_exec
+{
+public:
+ Event_queue_element_for_exec(){};
+ ~Event_queue_element_for_exec();
+
+ bool
+ init(LEX_STRING dbname, LEX_STRING name);
+
+ LEX_STRING dbname;
+ LEX_STRING name;
+ bool dropped;
+ THD *thd;
+
+private:
+ /* Prevent use of these */
+ Event_queue_element_for_exec(const Event_queue_element_for_exec &);
+ void operator=(Event_queue_element_for_exec &);
+};
+
+
class Event_basic
{
protected:
@@ -96,9 +117,6 @@ public:
bool
compute_next_execution_time();
- int
- drop(THD *thd);
-
void
mark_last_executed(THD *thd);
@@ -160,7 +178,6 @@ public:
class Event_job_data : public Event_basic
{
public:
- THD *thd;
sp_head *sphead;
LEX_STRING body;
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 45d354ea9b6..c4e6a518974 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -16,7 +16,6 @@
#include "mysql_priv.h"
#include "event_queue.h"
#include "event_data_objects.h"
-#include "event_db_repository.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
@@ -136,16 +135,14 @@ Event_queue::deinit_mutexes()
*/
bool
-Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
+Event_queue::init_queue(THD *thd)
{
- bool res;
struct event_queue_param *event_queue_param_value= NULL;
DBUG_ENTER("Event_queue::init_queue");
DBUG_PRINT("enter", ("this: 0x%lx", (long) this));
LOCK_QUEUE_DATA();
- db_repository= db_repo;
if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
0 /*max_on_top*/, event_queue_element_compare_q,
@@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
goto err;
}
- res= load_events_from_db(thd);
UNLOCK_QUEUE_DATA();
- if (res)
- deinit_queue();
-
- DBUG_RETURN(res);
+ DBUG_RETURN(FALSE);
err:
UNLOCK_QUEUE_DATA();
@@ -204,37 +197,29 @@ Event_queue::deinit_queue()
Event_queue::create_event()
dbname The schema of the new event
name The name of the new event
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
-Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
+void
+Event_queue::create_event(THD *thd, Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
DBUG_ENTER("Event_queue::create_event");
- DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str));
+ DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd,
+ new_element->dbname.str, new_element->name.str));
- new_element= new Event_queue_element();
- res= db_repository->load_named_event(thd, dbname, name, new_element);
- if (res || new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
delete new_element;
else
{
new_element->compute_next_execution_time();
+ DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
LOCK_QUEUE_DATA();
- DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element));
queue_insert_safe(&queue, (byte *) new_element);
dbug_dump_queue(thd->query_start());
pthread_cond_broadcast(&COND_queue_state);
UNLOCK_QUEUE_DATA();
}
-
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
name Name of the event
new_schema New schema, in case of RENAME TO, otherwise NULL
new_name New name, in case of RENAME TO, otherwise NULL
-
- RETURN VALUE
- OP_OK OK or scheduler not working
- OP_LOAD_ERROR Error during loading from disk
*/
-int
+void
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name)
+ Event_queue_element *new_element)
{
- int res;
- Event_queue_element *new_element;
-
DBUG_ENTER("Event_queue::update_event");
DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str));
- new_element= new Event_queue_element();
-
- res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
- new_name ? *new_name:name, new_element);
- if (res)
- {
- delete new_element;
- goto end;
- }
- else if (new_element->status == Event_queue_element::DISABLED)
+ if (new_element->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("The event is disabled."));
/*
@@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
dbug_dump_queue(thd->query_start());
UNLOCK_QUEUE_DATA();
-end:
- DBUG_PRINT("info", ("res=%d", res));
- DBUG_RETURN(res);
+ DBUG_VOID_RETURN;
}
@@ -454,133 +421,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
/*
- Loads all ENABLED events from mysql.event into the prioritized
- queue. Called during scheduler main thread initialization. Compiles
- the events. Creates Event_queue_element instances for every ENABLED event
- from mysql.event.
-
- SYNOPSIS
- Event_queue::load_events_from_db()
- thd - Thread context. Used for memory allocation in some cases.
-
- RETURN VALUE
- 0 OK
- !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
- EVEX_COMPILE_ERROR) - in all these cases mysql.event was
- tampered.
-
- NOTES
- Reports the error to the console
-*/
-
-int
-Event_queue::load_events_from_db(THD *thd)
-{
- TABLE *table;
- READ_RECORD read_record_info;
- int ret= -1;
- uint count= 0;
- bool clean_the_queue= TRUE;
-
- DBUG_ENTER("Event_queue::load_events_from_db");
- DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
-
- if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
- {
- sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
- DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
- }
-
- init_read_record(&read_record_info, thd, table ,NULL,1,0);
- while (!(read_record_info.read_record(&read_record_info)))
- {
- Event_queue_element *et;
- if (!(et= new Event_queue_element))
- {
- DBUG_PRINT("info", ("Out of memory"));
- break;
- }
- DBUG_PRINT("info", ("Loading event from row."));
-
- if ((ret= et->load_from_row(table)))
- {
- sql_print_error("SCHEDULER: Error while loading from mysql.event. "
- "Table probably corrupted");
- break;
- }
- if (et->status != Event_queue_element::ENABLED)
- {
- DBUG_PRINT("info",("%s is disabled",et->name.str));
- delete et;
- continue;
- }
-
- /* let's find when to be executed */
- if (et->compute_next_execution_time())
- {
- sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
- " Skipping", et->dbname.str, et->name.str);
- continue;
- }
-
- {
- Event_job_data temp_job_data;
- DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
-
- temp_job_data.load_from_row(table);
-
- /*
- We load only on scheduler root just to check whether the body
- compiles.
- */
- switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
- case EVEX_MICROSECOND_UNSUP:
- sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
- "supported but found in mysql.event");
- break;
- case EVEX_COMPILE_ERROR:
- sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
- et->dbname.str, et->name.str);
- break;
- default:
- break;
- }
- thd->end_statement();
- thd->cleanup_after_query();
- }
- if (ret)
- {
- delete et;
- goto end;
- }
-
- queue_insert_safe(&queue, (byte *) et);
- count++;
- }
- clean_the_queue= FALSE;
-end:
- end_read_record(&read_record_info);
-
- if (clean_the_queue)
- {
- empty_queue();
- ret= -1;
- }
- else
- {
- ret= 0;
- sql_print_information("SCHEDULER: Loaded %d event%s", count,
- (count == 1)?"":"s");
- }
-
- close_thread_tables(thd);
-
- DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
- DBUG_RETURN(ret);
-}
-
-
-/*
Recalculates activation times in the queue. There is one reason for
that. Because the values (execute_at) by which the queue is ordered are
changed by calls to compute_next_execution_time() on a request from the
@@ -629,7 +469,7 @@ Event_queue::empty_queue()
{
uint i;
DBUG_ENTER("Event_queue::empty_queue");
- DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
+ DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements));
sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
/* empty the queue */
for (i= 0; i < queue.elements; ++i)
@@ -690,31 +530,27 @@ static const char *queue_wait_msg= "Waiting for next activation";
SYNOPSIS
Event_queue::get_top_for_execution_if_time()
- thd [in] Thread
- job_data [out] The object to execute
+ thd [in] Thread
+ event_name [out] The object to execute
RETURN VALUE
- FALSE No error. If *job_data==NULL then top not elligible for execution.
- Could be that there is no top.
- TRUE Error
-
+ FALSE No error. event_name != NULL
+ TRUE Serious error
*/
bool
-Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
+Event_queue::get_top_for_execution_if_time(THD *thd,
+ Event_queue_element_for_exec **event_name)
{
bool ret= FALSE;
struct timespec top_time;
- Event_queue_element *top= NULL;
- bool to_free= FALSE;
- bool to_drop= FALSE;
- *job_data= NULL;
+ *event_name= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
LOCK_QUEUE_DATA();
for (;;)
{
- int res;
+ Event_queue_element *top= NULL;
/* Break loop if thd has been killed */
if (thd->killed)
@@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
continue;
}
- DBUG_PRINT("info", ("Ready for execution"));
- if (!(*job_data= new Event_job_data()))
- {
- ret= TRUE;
- break;
- }
- if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
- *job_data)))
+ if (!(*event_name= new Event_queue_element_for_exec()) ||
+ (*event_name)->init(top->dbname, top->name))
{
- DBUG_PRINT("error", ("Got %d from load_named_event", res));
- delete *job_data;
- *job_data= NULL;
ret= TRUE;
break;
}
+ DBUG_PRINT("info", ("Ready for execution"));
top->mark_last_executed(thd);
if (top->compute_next_execution_time())
top->status= Event_queue_element::DISABLED;
DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));
- (*job_data)->execution_count= top->execution_count;
+ top->execution_count++;
+ (*event_name)->dropped= top->dropped;
top->update_timing_fields(thd);
- if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
- (top->status == Event_queue_element::DISABLED))
+ if (top->status == Event_queue_element::DISABLED)
{
DBUG_PRINT("info", ("removing from the queue"));
sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
top->dbname.str, top->name.str,
top->dropped? "Dropping.":"");
- to_free= TRUE;
- to_drop= top->dropped;
+ delete top;
queue_remove(&queue, 0);
}
else
@@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
}
end:
UNLOCK_QUEUE_DATA();
- if (to_drop)
- {
- DBUG_PRINT("info", ("Dropping from disk"));
- top->drop(thd);
- }
- if (to_free)
- delete top;
- DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data));
+ DBUG_PRINT("info", ("returning %d et_new: 0x%lx ",
+ ret, (long) *event_name));
- if (*job_data)
- DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str,
- (*job_data)->name.str, (*job_data)->definer.str));
+ if (*event_name)
+ DBUG_PRINT("info", ("db: %s name: %s",
+ (*event_name)->dbname.str, (*event_name)->name.str));
DBUG_RETURN(ret);
}
diff --git a/sql/event_queue.h b/sql/event_queue.h
index 9f48da4914f..a1237e1b52c 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -16,12 +16,10 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class Event_basic;
-class Event_db_repository;
-class Event_job_data;
class Event_queue_element;
+class Event_queue_element_for_exec;
class THD;
-class Event_scheduler;
class Event_queue
{
@@ -35,19 +33,19 @@ public:
deinit_mutexes();
bool
- init_queue(THD *thd, Event_db_repository *db_repo);
+ init_queue(THD *thd);
void
deinit_queue();
/* Methods for queue management follow */
- int
- create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
+ void
+ create_event(THD *thd, Event_queue_element *new_element);
- int
+ void
update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
- LEX_STRING *new_schema, LEX_STRING *new_name);
+ Event_queue_element *new_element);
void
drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
@@ -59,14 +57,15 @@ public:
recalculate_activation_times(THD *thd);
bool
- get_top_for_execution_if_time(THD *thd, Event_job_data **job_data);
+ get_top_for_execution_if_time(THD *thd,
+ Event_queue_element_for_exec **event_name);
+
void
dump_internal_status();
- int
- load_events_from_db(THD *thd);
-
+ void
+ empty_queue();
protected:
void
find_n_remove_event(LEX_STRING db, LEX_STRING name);
@@ -76,8 +75,6 @@ protected:
drop_matching_events(THD *thd, LEX_STRING pattern,
bool (*)(LEX_STRING, Event_basic *));
- void
- empty_queue();
void
dbug_dump_queue(time_t now);
@@ -86,11 +83,7 @@ protected:
pthread_mutex_t LOCK_event_queue;
pthread_cond_t COND_queue_state;
- Event_db_repository *db_repository;
-
- Event_scheduler *scheduler;
-
- /* The sorted queue with the Event_job_data objects */
+ /* The sorted queue with the Event_queue_element objects */
QUEUE queue;
TIME next_activation_at;
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index a47576cf0c0..1013f5af3a8 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -18,6 +18,7 @@
#include "event_data_objects.h"
#include "event_scheduler.h"
#include "event_queue.h"
+#include "event_db_repository.h"
#ifdef __GNUC__
#if __GNUC__ >= 2
@@ -34,6 +35,11 @@
extern pthread_attr_t connection_attrib;
+
+Event_db_repository *Event_worker_thread::db_repository;
+Events *Event_worker_thread::events_facade;
+
+
static
const LEX_STRING scheduler_states_names[] =
{
@@ -60,8 +66,8 @@ struct scheduler_param {
et The event itself
*/
-static void
-evex_print_warnings(THD *thd, Event_job_data *et)
+void
+Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
{
MYSQL_ERROR *err;
DBUG_ENTER("evex_print_warnings");
@@ -253,49 +259,97 @@ event_worker_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd;
- Event_job_data *event= (Event_job_data *)arg;
- int ret;
+ Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
thd= event->thd;
-
thd->thread_stack= (char *) &thd; // remember where our stack is
- DBUG_ENTER("event_worker_thread");
- if (!post_init_event_thread(thd))
+ Event_worker_thread worker_thread;
+ worker_thread.run(thd, (Event_queue_element_for_exec *)arg);
+
+ deinit_event_thread(thd);
+
+ return 0; // Can't return anything here
+}
+
+
+/*
+ Function that executes an event in a child thread. Setups the
+ environment for the event execution and cleans after that.
+
+ SYNOPSIS
+ Event_worker_thread::run()
+ thd Thread context
+ event The Event_queue_element_for_exec object to be processed
+*/
+
+void
+Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
+{
+ int ret;
+ Event_job_data *job_data= NULL;
+ DBUG_ENTER("Event_worker_thread::run");
+ DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
+ "THD=0x%lx", time(NULL), thd));
+
+ if (post_init_event_thread(thd))
+ goto end;
+
+ if (!(job_data= new Event_job_data()))
+ goto end;
+ else if ((ret= db_repository->
+ load_named_event(thd, event->dbname, event->name, job_data)))
{
- DBUG_PRINT("info", ("Baikonur, time is %ld, BURAN reporting and operational."
- "THD: 0x%lx",
- (long) time(NULL), (long) thd));
-
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. "
- "Execution %u",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id,
- event->execution_count);
-
- thd->enable_slow_log= TRUE;
-
- ret= event->execute(thd);
-
- evex_print_warnings(thd, event);
-
- sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
- "RetCode=%d", event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id, ret);
- if (ret == EVEX_COMPILE_ERROR)
- sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
- event->dbname.str, event->name.str,
- event->definer.str);
- else if (ret == EVEX_MICROSECOND_UNSUP)
- sql_print_information("SCHEDULER: MICROSECOND is not supported");
+ DBUG_PRINT("error", ("Got %d from load_named_event", ret));
+ goto end;
+ }
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ",
+ job_data->dbname.str, job_data->name.str,
+ job_data->definer.str, thd->thread_id);
+
+ thd->enable_slow_log= TRUE;
+
+ ret= job_data->execute(thd);
+
+ print_warnings(thd, job_data);
+
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. "
+ "RetCode=%d", job_data->dbname.str, job_data->name.str,
+ job_data->definer.str, thd->thread_id, ret);
+ if (ret == EVEX_COMPILE_ERROR)
+ sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ job_data->dbname.str, job_data->name.str,
+ job_data->definer.str);
+ else if (ret == EVEX_MICROSECOND_UNSUP)
+end:
+ delete job_data;
+
+ if (event->dropped)
+ {
+ sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str,
+ event->name.str);
+ /*
+ Using db_repository can lead to a race condition because we access
+ the table without holding LOCK_metadata.
+ Scenario:
+ 1. CREATE EVENT xyz AT ... (conn thread)
+ 2. execute xyz (worker)
+ 3. CREATE EVENT XYZ EVERY ... (conn thread)
+ 4. drop xyz (worker)
+ 5. XYZ was just created on disk but `drop xyz` of the worker dropped it.
+ A consequent load to create Event_queue_element will fail.
+
+ If all operations are performed under LOCK_metadata there is no such
+ problem. However, this comes at the price of introduction bi-directional
+ association between class Events and class Event_worker_thread.
+ */
+ events_facade->drop_event(thd, event->dbname, event->name, FALSE);
}
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
event->name.str));
- delete event;
- deinit_event_thread(thd);
-
- DBUG_RETURN(0); // Can't return anything here
+ delete event;
}
@@ -441,7 +495,6 @@ bool
Event_scheduler::run(THD *thd)
{
int res= FALSE;
- Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
sql_print_information("SCHEDULER: Manager thread started with id %lu",
@@ -454,18 +507,20 @@ Event_scheduler::run(THD *thd)
while (is_running())
{
+ Event_queue_element_for_exec *event_name;
+
/* Gets a minimized version */
- if (queue->get_top_for_execution_if_time(thd, &job_data))
+ if (queue->get_top_for_execution_if_time(thd, &event_name))
{
sql_print_information("SCHEDULER: Serious error during getting next "
"event to execute. Stopping");
break;
}
- DBUG_PRINT("info", ("get_top returned job_data: 0x%lx", (long) job_data));
- if (job_data)
+ DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name));
+ if (event_name)
{
- if ((res= execute_top(thd, job_data)))
+ if ((res= execute_top(thd, event_name)))
break;
}
else
@@ -499,7 +554,7 @@ Event_scheduler::run(THD *thd)
*/
bool
-Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
+Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name)
{
THD *new_thd;
pthread_t th;
@@ -510,13 +565,13 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
pre_init_event_thread(new_thd);
new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
- job_data->thd= new_thd;
+ event_name->thd= new_thd;
DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition",
- job_data->dbname.str, job_data->name.str));
+ event_name->dbname.str, event_name->name.str));
/* Major failure */
if ((res= pthread_create(&th, &connection_attrib, event_worker_thread,
- job_data)))
+ event_name)))
goto error;
++started_events;
@@ -537,7 +592,7 @@ error:
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
- delete job_data;
+ delete event_name;
DBUG_RETURN(TRUE);
}
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index 18625ef35f3..2ab21464057 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -15,8 +15,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
class Event_queue;
class Event_job_data;
+class Event_db_repository;
+class Events;
void
pre_init_event_thread(THD* thd);
@@ -27,6 +30,29 @@ post_init_event_thread(THD* thd);
void
deinit_event_thread(THD *thd);
+
+class Event_worker_thread
+{
+public:
+ static void
+ init(Events *events, Event_db_repository *db_repo)
+ {
+ db_repository= db_repo;
+ events_facade= events;
+ }
+
+ void
+ run(THD *thd, Event_queue_element_for_exec *event);
+
+private:
+ void
+ print_warnings(THD *thd, Event_job_data *et);
+
+ static Event_db_repository *db_repository;
+ static Events *events_facade;
+};
+
+
class Event_scheduler
{
public:
@@ -71,10 +97,9 @@ private:
uint
workers_count();
-
/* helper functions */
bool
- execute_top(THD *thd, Event_job_data *job_data);
+ execute_top(THD *thd, Event_queue_element_for_exec *event_name);
/* helper functions for working with mutexes & conditionals */
void
diff --git a/sql/events.cc b/sql/events.cc
index e6224915d6b..425e288dfb7 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -97,7 +97,7 @@ Event_queue events_event_queue;
static
Event_scheduler events_event_scheduler;
-static
+
Event_db_repository events_event_db_repository;
Events Events::singleton;
@@ -296,29 +296,6 @@ Events::Events()
/*
- Opens mysql.event table with specified lock
-
- SYNOPSIS
- Events::open_event_table()
- thd Thread context
- lock_type How to lock the table
- table We will store the open table here
-
- RETURN VALUE
- 1 Cannot lock table
- 2 The table is corrupted - different number of fields
- 0 OK
-*/
-
-int
-Events::open_event_table(THD *thd, enum thr_lock_type lock_type,
- TABLE **table)
-{
- return db_repository->open_event_table(thd, lock_type, table);
-}
-
-
-/*
The function exported to the world for creating of events.
SYNOPSIS
@@ -351,16 +328,24 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, bool if_not_exists)
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->create_event(thd, parse_data, if_not_exists)))
{
- if ((ret= event_queue->create_event(thd, parse_data->dbname,
- parse_data->name)))
+ Event_queue_element *new_element;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, parse_data->dbname,
+ parse_data->name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->create_event(thd, new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
+
}
@@ -387,6 +372,7 @@ bool
Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
{
int ret;
+ Event_queue_element *new_element;
DBUG_ENTER("Events::update_event");
LEX_STRING *new_dbname= rename_to ? &rename_to->m_db : NULL;
LEX_STRING *new_name= rename_to ? &rename_to->m_name : NULL;
@@ -400,12 +386,20 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->update_event(thd, parse_data, new_dbname, new_name)))
{
- if ((ret= event_queue->update_event(thd, parse_data->dbname,
- parse_data->name, new_dbname, new_name)))
+ LEX_STRING dbname= new_dbname ? *new_dbname : parse_data->dbname;
+ LEX_STRING name= new_name ? *new_name : parse_data->name;
+
+ if (!(new_element= new Event_queue_element()))
+ ret= TRUE; // OOM
+ else if ((ret= db_repository->load_named_event(thd, dbname, name,
+ new_element)))
{
DBUG_ASSERT(ret == OP_LOAD_ERROR);
- my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0));
+ delete new_element;
}
+ else
+ event_queue->update_event(thd, parse_data->dbname, parse_data->name,
+ new_element);
}
pthread_mutex_unlock(&LOCK_event_metadata);
@@ -423,10 +417,6 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
name [in] Event's name
if_exists [in] When set and the event does not exist =>
warning onto the stack
- only_from_disk [in] Whether to remove the event from the queue too.
- In case of Event_job_data::drop() it's needed to
- do only disk drop because Event_queue will handle
- removal from memory queue.
RETURN VALUE
FALSE OK
@@ -434,8 +424,7 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to)
*/
bool
-Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
- bool only_from_disk)
+Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists)
{
int ret;
DBUG_ENTER("Events::drop_event");
@@ -448,10 +437,7 @@ Events::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
pthread_mutex_lock(&LOCK_event_metadata);
/* On error conditions my_error() is called so no need to handle here */
if (!(ret= db_repository->drop_event(thd, dbname, name, if_exists)))
- {
- if (!only_from_disk)
- event_queue->drop_event(thd, dbname, name);
- }
+ event_queue->drop_event(thd, dbname, name);
pthread_mutex_unlock(&LOCK_event_metadata);
DBUG_RETURN(ret);
}
@@ -655,11 +641,12 @@ Events::init()
}
check_system_tables_error= FALSE;
- if (event_queue->init_queue(thd, db_repository))
+ if (event_queue->init_queue(thd) || load_events_from_db(thd))
{
sql_print_error("SCHEDULER: Error while loading from disk.");
goto end;
}
+
scheduler->init_scheduler(event_queue);
DBUG_ASSERT(opt_event_scheduler == Events::EVENTS_ON ||
@@ -667,6 +654,7 @@ Events::init()
if (opt_event_scheduler == Events::EVENTS_ON)
res= scheduler->start();
+ Event_worker_thread::init(this, db_repository);
end:
delete thd;
/* Remember that we don't have a THD */
@@ -903,3 +891,131 @@ Events::check_system_tables(THD *thd)
DBUG_RETURN(ret);
}
+
+
+/*
+ Loads all ENABLED events from mysql.event into the prioritized
+ queue. Called during scheduler main thread initialization. Compiles
+ the events. Creates Event_queue_element instances for every ENABLED event
+ from mysql.event.
+
+ SYNOPSIS
+ Events::load_events_from_db()
+ thd Thread context. Used for memory allocation in some cases.
+
+ RETURN VALUE
+ 0 OK
+ !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP,
+ EVEX_COMPILE_ERROR) - in all these cases mysql.event was
+ tampered.
+
+ NOTES
+ Reports the error to the console
+*/
+
+int
+Events::load_events_from_db(THD *thd)
+{
+ TABLE *table;
+ READ_RECORD read_record_info;
+ int ret= -1;
+ uint count= 0;
+ bool clean_the_queue= TRUE;
+
+ DBUG_ENTER("Events::load_events_from_db");
+ DBUG_PRINT("enter", ("thd=0x%lx", thd));
+
+ if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
+ {
+ sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
+ DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
+ }
+
+ init_read_record(&read_record_info, thd, table ,NULL,1,0);
+ while (!(read_record_info.read_record(&read_record_info)))
+ {
+ Event_queue_element *et;
+ if (!(et= new Event_queue_element))
+ {
+ DBUG_PRINT("info", ("Out of memory"));
+ break;
+ }
+ DBUG_PRINT("info", ("Loading event from row."));
+
+ if ((ret= et->load_from_row(table)))
+ {
+ sql_print_error("SCHEDULER: Error while loading from mysql.event. "
+ "Table probably corrupted");
+ break;
+ }
+ if (et->status != Event_queue_element::ENABLED)
+ {
+ DBUG_PRINT("info",("%s is disabled",et->name.str));
+ delete et;
+ continue;
+ }
+
+ /* let's find when to be executed */
+ if (et->compute_next_execution_time())
+ {
+ sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
+ " Skipping", et->dbname.str, et->name.str);
+ continue;
+ }
+
+ {
+ Event_job_data temp_job_data;
+ DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));
+
+ temp_job_data.load_from_row(table);
+
+ /*
+ We load only on scheduler root just to check whether the body
+ compiles.
+ */
+ switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
+ case EVEX_MICROSECOND_UNSUP:
+ sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
+ "supported but found in mysql.event");
+ break;
+ case EVEX_COMPILE_ERROR:
+ sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
+ et->dbname.str, et->name.str);
+ break;
+ default:
+ break;
+ }
+ thd->end_statement();
+ thd->cleanup_after_query();
+ }
+ if (ret)
+ {
+ delete et;
+ goto end;
+ }
+
+ DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
+ event_queue->create_event(thd, et);
+ count++;
+ }
+ clean_the_queue= FALSE;
+end:
+ end_read_record(&read_record_info);
+
+ if (clean_the_queue)
+ {
+ event_queue->empty_queue();
+ ret= -1;
+ }
+ else
+ {
+ ret= 0;
+ sql_print_information("SCHEDULER: Loaded %d event%s", count,
+ (count == 1)?"":"s");
+ }
+
+ close_thread_tables(thd);
+
+ DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
+ DBUG_RETURN(ret);
+}
diff --git a/sql/events.h b/sql/events.h
index 621ab0ffca5..35ee3c569d0 100644
--- a/sql/events.h
+++ b/sql/events.h
@@ -42,13 +42,6 @@ sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs);
class Events
{
public:
- /*
- Quite NOT the best practice and will be removed once
- Event_timed::drop() and Event_timed is fixed not do drop directly
- or other scheme will be found.
- */
- friend class Event_queue_element;
-
/* The order should match the order in opt_typelib */
enum enum_opt_event_scheduler
{
@@ -92,15 +85,11 @@ public:
update_event(THD *thd, Event_parse_data *parse_data, sp_name *rename_to);
bool
- drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists,
- bool only_from_disk);
+ drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name, bool if_exists);
void
drop_schema_events(THD *thd, char *db);
- int
- open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
-
bool
show_create_event(THD *thd, LEX_STRING dbname, LEX_STRING name);
@@ -119,6 +108,9 @@ private:
bool
check_system_tables(THD *thd);
+ int
+ load_events_from_db(THD *thd);
+
/* Singleton DP is used */
Events();
~Events(){}
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 1b7cf2a342d..9307836731a 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -4047,8 +4047,7 @@ end_with_restore_list:
if (!(res= Events::get_instance()->drop_event(thd,
lex->spname->m_db,
lex->spname->m_name,
- lex->drop_if_exists,
- FALSE)))
+ lex->drop_if_exists)))
send_ok(thd);
}
break;