summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/Makefile.am6
-rw-r--r--sql/event_data_objects.cc324
-rw-r--r--sql/event_queue.cc40
-rw-r--r--sql/event_queue.h4
-rw-r--r--sql/event_scheduler.cc462
-rw-r--r--sql/event_scheduler.h2
6 files changed, 420 insertions, 418 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am
index 3518903d149..d0512f3d9de 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -65,8 +65,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \
parse_file.h sql_view.h sql_trigger.h \
sql_array.h sql_cursor.h events.h \
+ event_db_repository.h event_queue.h \
sql_plugin.h authors.h sql_partition.h event_data_objects.h \
- event_queue.h event_db_repository.h \
partition_info.h partition_element.h event_scheduler.h \
contributors.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
@@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
gstream.cc spatial.cc sql_help.cc sql_cursor.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
- sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\
- events.cc event_data_objects.cc \
+ sp_cache.cc parse_file.cc sql_trigger.cc \
+ event_scheduler.cc events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \
sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc
diff --git a/sql/event_data_objects.cc b/sql/event_data_objects.cc
index 8d990def32d..5f879c6bea0 100644
--- a/sql/event_data_objects.cc
+++ b/sql/event_data_objects.cc
@@ -24,65 +24,12 @@
#define EVEX_MAX_INTERVAL_VALUE 1000000000L
-
-/*
- Switches the security context
- SYNOPSIS
- event_change_security_context()
- thd Thread
- user The user
- host The host of the user
- db The schema for which the security_ctx will be loaded
- backup Where to store the old context
-
- RETURN VALUE
- FALSE OK
- TRUE Error (generates error too)
-*/
-
static bool
event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
- LEX_STRING db, Security_context *backup)
-{
- DBUG_ENTER("event_change_security_context");
- DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str));
-#ifndef NO_EMBEDDED_ACCESS_CHECKS
-
- *backup= thd->main_security_ctx;
- if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str,
- host.str, db.str))
- {
- my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str);
- DBUG_RETURN(TRUE);
- }
- thd->security_ctx= &thd->main_security_ctx;
-#endif
- DBUG_RETURN(FALSE);
-}
-
-
-/*
- Restores the security context
- SYNOPSIS
- event_restore_security_context()
- thd Thread
- backup Context to switch to
-*/
+ LEX_STRING db, Security_context *backup);
static void
-event_restore_security_context(THD *thd, Security_context *backup)
-{
- DBUG_ENTER("event_restore_security_context");
-#ifndef NO_EMBEDDED_ACCESS_CHECKS
- if (backup)
- {
- thd->main_security_ctx= *backup;
- thd->security_ctx= &thd->main_security_ctx;
- }
-#endif
- DBUG_VOID_RETURN;
-}
-
+event_restore_security_context(THD *thd, Security_context *backup);
/*
Returns a new instance
@@ -237,47 +184,6 @@ Event_parse_data::init_body(THD *thd)
/*
- Inits definer (definer_user and definer_host) during parsing.
-
- SYNOPSIS
- Event_parse_data::init_definer()
- thd Thread
-*/
-
-void
-Event_parse_data::init_definer(THD *thd)
-{
- int definer_user_len;
- int definer_host_len;
- DBUG_ENTER("Event_parse_data::init_definer");
-
- DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx "
- "thd->sec_ctx->priv_user=0x%lx", thd->mem_root,
- thd->security_ctx->priv_user));
-
- definer_user_len= strlen(thd->security_ctx->priv_user);
- definer_host_len= strlen(thd->security_ctx->priv_host);
-
- /* + 1 for @ */
- DBUG_PRINT("info",("init definer as whole"));
- definer.length= definer_user_len + definer_host_len + 1;
- definer.str= thd->alloc(definer.length + 1);
-
- DBUG_PRINT("info",("copy the user"));
- memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len);
- definer.str[definer_user_len]= '@';
-
- DBUG_PRINT("info",("copy the host"));
- memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host,
- definer_host_len);
- definer.str[definer.length]= '\0';
- DBUG_PRINT("info",("definer [%s] initted", definer.str));
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
Sets time for execution for one-time event.
SYNOPSIS
@@ -646,6 +552,47 @@ Event_parse_data::check_parse_data(THD *thd)
/*
+ Inits definer (definer_user and definer_host) during parsing.
+
+ SYNOPSIS
+ Event_parse_data::init_definer()
+ thd Thread
+*/
+
+void
+Event_parse_data::init_definer(THD *thd)
+{
+ int definer_user_len;
+ int definer_host_len;
+ DBUG_ENTER("Event_parse_data::init_definer");
+
+ DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx "
+ "thd->sec_ctx->priv_user=0x%lx", thd->mem_root,
+ thd->security_ctx->priv_user));
+
+ definer_user_len= strlen(thd->security_ctx->priv_user);
+ definer_host_len= strlen(thd->security_ctx->priv_host);
+
+ /* + 1 for @ */
+ DBUG_PRINT("info",("init definer as whole"));
+ definer.length= definer_user_len + definer_host_len + 1;
+ definer.str= thd->alloc(definer.length + 1);
+
+ DBUG_PRINT("info",("copy the user"));
+ memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len);
+ definer.str[definer_user_len]= '@';
+
+ DBUG_PRINT("info",("copy the host"));
+ memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host,
+ definer_host_len);
+ definer.str[definer.length]= '\0';
+ DBUG_PRINT("info",("definer [%s] initted", definer.str));
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
Constructor
SYNOPSIS
@@ -1668,6 +1615,69 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf)
/*
+ Executes the event (the underlying sp_head object);
+
+ SYNOPSIS
+ Event_job_data::execute()
+ thd THD
+
+ RETURN VALUE
+ 0 success
+ -99 No rights on this.dbname.str
+ others retcodes of sp_head::execute_procedure()
+*/
+
+int
+Event_job_data::execute(THD *thd)
+{
+ Security_context save_ctx;
+ /* this one is local and not needed after exec */
+ int ret= 0;
+
+ DBUG_ENTER("Event_job_data::execute");
+ DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
+
+ if ((ret= compile(thd, NULL)))
+ goto done;
+
+ event_change_security_context(thd, definer_user, definer_host, dbname,
+ &save_ctx);
+ /*
+ THD::~THD will clean this or if there is DROP DATABASE in the SP then
+ it will be free there. It should not point to our buffer which is allocated
+ on a mem_root.
+ */
+ thd->db= my_strdup(dbname.str, MYF(0));
+ thd->db_length= dbname.length;
+ if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
+ {
+ List<Item> empty_item_list;
+ empty_item_list.empty();
+ if (thd->enable_slow_log)
+ sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
+ sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
+
+ ret= sphead->execute_procedure(thd, &empty_item_list);
+ }
+ else
+ {
+ DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
+ definer_host.str, dbname.str));
+ ret= -99;
+ }
+
+ event_restore_security_context(thd, &save_ctx);
+done:
+ thd->end_statement();
+ thd->cleanup_after_query();
+
+ DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
+
+ DBUG_RETURN(ret);
+}
+
+
+/*
Compiles an event before it's execution. Compiles the anonymous
sp_head object held by the event
@@ -1800,69 +1810,6 @@ done:
/*
- Executes the event (the underlying sp_head object);
-
- SYNOPSIS
- Event_job_data::execute()
- thd THD
-
- RETURN VALUE
- 0 success
- -99 No rights on this.dbname.str
- others retcodes of sp_head::execute_procedure()
-*/
-
-int
-Event_job_data::execute(THD *thd)
-{
- Security_context save_ctx;
- /* this one is local and not needed after exec */
- int ret= 0;
-
- DBUG_ENTER("Event_job_data::execute");
- DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
-
- if ((ret= compile(thd, NULL)))
- goto done;
-
- event_change_security_context(thd, definer_user, definer_host, dbname,
- &save_ctx);
- /*
- THD::~THD will clean this or if there is DROP DATABASE in the SP then
- it will be free there. It should not point to our buffer which is allocated
- on a mem_root.
- */
- thd->db= my_strdup(dbname.str, MYF(0));
- thd->db_length= dbname.length;
- if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
- {
- List<Item> empty_item_list;
- empty_item_list.empty();
- if (thd->enable_slow_log)
- sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
- sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
-
- ret= sphead->execute_procedure(thd, &empty_item_list);
- }
- else
- {
- DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
- definer_host.str, dbname.str));
- ret= -99;
- }
-
- event_restore_security_context(thd, &save_ctx);
-done:
- thd->end_statement();
- thd->cleanup_after_query();
-
- DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
-
- DBUG_RETURN(ret);
-}
-
-
-/*
Checks whether two events are in the same schema
SYNOPSIS
@@ -1899,3 +1846,62 @@ event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b)
return !sortcmp_lex_string(name, b->name, system_charset_info) &&
!sortcmp_lex_string(db, b->dbname, system_charset_info);
}
+
+
+/*
+ Switches the security context
+ SYNOPSIS
+ event_change_security_context()
+ thd Thread
+ user The user
+ host The host of the user
+ db The schema for which the security_ctx will be loaded
+ backup Where to store the old context
+
+ RETURN VALUE
+ FALSE OK
+ TRUE Error (generates error too)
+*/
+
+static bool
+event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
+ LEX_STRING db, Security_context *backup)
+{
+ DBUG_ENTER("event_change_security_context");
+ DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str));
+#ifndef NO_EMBEDDED_ACCESS_CHECKS
+
+ *backup= thd->main_security_ctx;
+ if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str,
+ host.str, db.str))
+ {
+ my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str);
+ DBUG_RETURN(TRUE);
+ }
+ thd->security_ctx= &thd->main_security_ctx;
+#endif
+ DBUG_RETURN(FALSE);
+}
+
+
+/*
+ Restores the security context
+ SYNOPSIS
+ event_restore_security_context()
+ thd Thread
+ backup Context to switch to
+*/
+
+static void
+event_restore_security_context(THD *thd, Security_context *backup)
+{
+ DBUG_ENTER("event_restore_security_context");
+#ifndef NO_EMBEDDED_ACCESS_CHECKS
+ if (backup)
+ {
+ thd->main_security_ctx= *backup;
+ thd->security_ctx= &thd->main_security_ctx;
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
diff --git a/sql/event_queue.cc b/sql/event_queue.cc
index 69114c53118..c6c7d7f14ac 100644
--- a/sql/event_queue.cc
+++ b/sql/event_queue.cc
@@ -41,6 +41,7 @@ struct event_queue_param
Event_queue *queue;
pthread_mutex_t LOCK_loaded;
pthread_cond_t COND_loaded;
+ bool loading_finished;
};
@@ -85,9 +86,14 @@ event_queue_loader_thread(void *arg)
DBUG_ENTER("event_queue_loader_thread");
+
pthread_mutex_lock(&param->LOCK_loaded);
+ param->queue->check_system_tables(thd);
param->queue->load_events_from_db(thd);
+
+ param->loading_finished= TRUE;
pthread_cond_signal(&param->COND_loaded);
+
pthread_mutex_unlock(&param->LOCK_loaded);
end:
@@ -113,8 +119,6 @@ Event_queue::Event_queue()
mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
-
- queue_loaded= FALSE;
}
@@ -195,8 +199,10 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
event_queue_param_value= (struct event_queue_param *)
my_malloc(sizeof(struct event_queue_param), MYF(0));
+
event_queue_param_value->thd= new_thd;
event_queue_param_value->queue= this;
+ event_queue_param_value->loading_finished= FALSE;
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
@@ -207,8 +213,8 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
{
do {
pthread_cond_wait(&event_queue_param_value->COND_loaded,
- &event_queue_param_value->LOCK_loaded);
- } while (queue_loaded == FALSE);
+ &event_queue_param_value->LOCK_loaded);
+ } while (event_queue_param_value->loading_finished == FALSE);
}
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
@@ -662,8 +668,6 @@ end:
close_thread_tables(thd);
- queue_loaded= TRUE;
-
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret);
}
@@ -683,7 +687,7 @@ end:
TRUE Error
*/
-bool
+void
Event_queue::check_system_tables(THD *thd)
{
TABLE_LIST tables;
@@ -702,39 +706,35 @@ Event_queue::check_system_tables(THD *thd)
tables.lock_type= TL_READ;
if ((ret= simple_open_n_lock_tables(thd, &tables)))
- sql_print_error("Cannot open mysql.db");
- else
{
- ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
- mysql_db_table_fields, &mysql_db_table_last_check,
- ER_CANNOT_LOAD_FROM_TABLE);
- close_thread_tables(thd);
+ sql_print_error("Cannot open mysql.db");
+ goto end;
}
- if (ret)
- DBUG_RETURN(TRUE);
+ ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
+ mysql_db_table_fields, &mysql_db_table_last_check,
+ ER_CANNOT_LOAD_FROM_TABLE);
+ close_thread_tables(thd);
bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "user";
tables.lock_type= TL_READ;
- if ((ret= simple_open_n_lock_tables(thd, &tables)))
+ if (simple_open_n_lock_tables(thd, &tables))
sql_print_error("Cannot open mysql.db");
else
{
if (tables.table->s->fields < 29 ||
strncmp(tables.table->field[29]->field_name,
STRING_WITH_LEN("Event_priv")))
- {
sql_print_error("mysql.user has no `Event_priv` column at position 29");
- ret= TRUE;
- }
close_thread_tables(thd);
}
+end:
thd->restore_backup_open_tables_state(&backup);
- DBUG_RETURN(ret);
+ DBUG_VOID_RETURN;
}
diff --git a/sql/event_queue.h b/sql/event_queue.h
index 3270938e881..e8b46abde92 100644
--- a/sql/event_queue.h
+++ b/sql/event_queue.h
@@ -56,7 +56,7 @@ public:
void
drop_schema_events(THD *thd, LEX_STRING schema);
- static bool
+ void
check_system_tables(THD *thd);
void
@@ -99,8 +99,6 @@ protected:
/* The sorted queue with the Event_job_data objects */
QUEUE queue;
- bool queue_loaded;
-
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
uint mutex_last_attempted_lock_at_line;
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index 5e6ffb090cb..0c39c1a512b 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -28,19 +28,12 @@
#define SCHED_FUNC "<unknown>"
#endif
-#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
-#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
+#define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
+#define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
extern pthread_attr_t connection_attrib;
-struct scheduler_param
-{
- THD *thd;
- Event_scheduler *scheduler;
-};
-
-
static
LEX_STRING scheduler_states_names[] =
{
@@ -49,6 +42,11 @@ LEX_STRING scheduler_states_names[] =
{ C_STRING_WITH_LEN("STOPPING")}
};
+struct scheduler_param {
+ THD *thd;
+ Event_scheduler *scheduler;
+};
+
/*
Prints the stack of infos, warnings, errors from thd to
@@ -100,54 +98,6 @@ evex_print_warnings(THD *thd, Event_job_data *et)
/*
- Performs pre- pthread_create() initialisation of THD. Do this
- in the thread that will pass THD to the child thread. In the
- child thread call post_init_event_thread().
-
- SYNOPSIS
- pre_init_event_thread()
- thd The THD of the thread. Has to be allocated by the caller.
-
- NOTES
- 1. The host of the thead is my_localhost
- 2. thd->net is initted with NULL - no communication.
-*/
-
-void
-pre_init_event_thread(THD* thd)
-{
- DBUG_ENTER("pre_init_event_thread");
- thd->client_capabilities= 0;
- thd->security_ctx->master_access= 0;
- thd->security_ctx->db_access= 0;
- thd->security_ctx->host_or_ip= (char*)my_localhost;
- thd->security_ctx->set_user((char*)"event_scheduler");
- my_net_init(&thd->net, NULL);
- thd->net.read_timeout= slave_net_timeout;
- thd->slave_thread= 0;
- thd->options|= OPTION_AUTO_IS_NULL;
- thd->client_capabilities|= CLIENT_MULTI_RESULTS;
- pthread_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thread_id++;
- threads.append(thd);
- thread_count++;
- thread_running++;
- pthread_mutex_unlock(&LOCK_thread_count);
-
- /*
- Guarantees that we will see the thread in SHOW PROCESSLIST though its
- vio is NULL.
- */
-
- thd->proc_info= "Initialized";
- thd->version= refresh_version;
- thd->set_time();
-
- DBUG_VOID_RETURN;
-}
-
-
-/*
Performs post initialization of structures in a new thread.
SYNOPSIS
@@ -202,6 +152,54 @@ deinit_event_thread(THD *thd)
/*
+ Performs pre- pthread_create() initialisation of THD. Do this
+ in the thread that will pass THD to the child thread. In the
+ child thread call post_init_event_thread().
+
+ SYNOPSIS
+ pre_init_event_thread()
+ thd The THD of the thread. Has to be allocated by the caller.
+
+ NOTES
+ 1. The host of the thead is my_localhost
+ 2. thd->net is initted with NULL - no communication.
+*/
+
+void
+pre_init_event_thread(THD* thd)
+{
+ DBUG_ENTER("pre_init_event_thread");
+ thd->client_capabilities= 0;
+ thd->security_ctx->master_access= 0;
+ thd->security_ctx->db_access= 0;
+ thd->security_ctx->host_or_ip= (char*)my_localhost;
+ my_net_init(&thd->net, NULL);
+ thd->security_ctx->set_user((char*)"event_scheduler");
+ thd->net.read_timeout= slave_net_timeout;
+ thd->slave_thread= 0;
+ thd->options|= OPTION_AUTO_IS_NULL;
+ thd->client_capabilities|= CLIENT_MULTI_RESULTS;
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id= thread_id++;
+ threads.append(thd);
+ thread_count++;
+ thread_running++;
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ /*
+ Guarantees that we will see the thread in SHOW PROCESSLIST though its
+ vio is NULL.
+ */
+
+ thd->proc_info= "Initialized";
+ thd->version= refresh_version;
+ thd->set_time();
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
Function that executes the scheduler,
SYNOPSIS
@@ -259,33 +257,32 @@ event_worker_thread(void *arg)
thd->thread_stack= (char *) &thd; // remember where our stack is
DBUG_ENTER("event_worker_thread");
- if (post_init_event_thread(thd))
- goto end;
-
- DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
- "THD=0x%lx", time(NULL), thd));
+ if (!post_init_event_thread(thd))
+ {
+ DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
+ "THD=0x%lx", time(NULL), thd));
- sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id);
+ sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
+ event->dbname.str, event->name.str,
+ event->definer.str, thd->thread_id);
- thd->enable_slow_log= TRUE;
+ thd->enable_slow_log= TRUE;
- ret= event->execute(thd);
+ ret= event->execute(thd);
- evex_print_warnings(thd, event);
+ evex_print_warnings(thd, event);
- sql_print_information("SCHEDULER: [%s.%s of %s] executed "
- " in thread thread %lu. RetCode=%d",
- event->dbname.str, event->name.str,
- event->definer.str, thd->thread_id, ret);
- if (ret == EVEX_COMPILE_ERROR)
- sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ sql_print_information("SCHEDULER: [%s.%s of %s] executed "
+ " in thread thread %lu. RetCode=%d",
event->dbname.str, event->name.str,
- event->definer.str);
- else if (ret == EVEX_MICROSECOND_UNSUP)
- sql_print_information("SCHEDULER: MICROSECOND is not supported");
-
+ event->definer.str, thd->thread_id, ret);
+ if (ret == EVEX_COMPILE_ERROR)
+ sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
+ event->dbname.str, event->name.str,
+ event->definer.str);
+ else if (ret == EVEX_MICROSECOND_UNSUP)
+ sql_print_information("SCHEDULER: MICROSECOND is not supported");
+ }
end:
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
event->name.str));
@@ -293,7 +290,7 @@ end:
deinit_event_thread(thd);
- DBUG_RETURN(0); // Against gcc warnings
+ DBUG_RETURN(0); // Can't return anything here
}
@@ -305,17 +302,15 @@ end:
Event_scheduler::init_scheduler()
*/
-bool
+void
Event_scheduler::init_scheduler(Event_queue *q)
{
- LOCK_SCHEDULER_DATA();
- thread_id= 0;
- state= INITIALIZED;
+ LOCK_DATA();
queue= q;
started_events= 0;
- UNLOCK_SCHEDULER_DATA();
-
- return FALSE;
+ thread_id= 0;
+ state= INITIALIZED;
+ UNLOCK_DATA();
}
@@ -377,7 +372,7 @@ Event_scheduler::start()
struct scheduler_param *scheduler_param_value;
DBUG_ENTER("Event_scheduler::start");
- LOCK_SCHEDULER_DATA();
+ LOCK_DATA();
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
if (state > INITIALIZED)
goto end;
@@ -408,7 +403,7 @@ Event_scheduler::start()
DBUG_PRINT("info", ("Setting state go RUNNING"));
state= RUNNING;
end:
- UNLOCK_SCHEDULER_DATA();
+ UNLOCK_DATA();
if (ret && new_thd)
{
@@ -427,56 +422,6 @@ end:
/*
- Stops the scheduler (again). Waits for acknowledgement from the
- scheduler that it has stopped - synchronous stopping.
-
- SYNOPSIS
- Event_scheduler::stop()
-
- RETURN VALUE
- FALSE OK
- TRUE Error (not reported)
-*/
-
-bool
-Event_scheduler::stop()
-{
- THD *thd= current_thd;
- DBUG_ENTER("Event_scheduler::stop");
- DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
-
- LOCK_SCHEDULER_DATA();
- DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
- if (state != RUNNING)
- goto end;
-
- state= STOPPING;
-
- DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
- sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
-
- pthread_cond_signal(&COND_state);
-
- /* Guarantee we don't catch spurious signals */
- sql_print_information("SCHEDULER: Waiting the manager thread to reply");
- do {
- DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
- "thread. Current value of state is %s . "
- "workers count=%d", scheduler_states_names[state].str,
- workers_count()));
- /* thd could be 0x0, when shutting down */
- COND_STATE_WAIT(NULL);
- } while (state == STOPPING);
- DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
-
- thread_id= 0;
-end:
- UNLOCK_SCHEDULER_DATA();
- DBUG_RETURN(FALSE);
-}
-
-
-/*
The main loop of the scheduler.
SYNOPSIS
@@ -496,7 +441,7 @@ Event_scheduler::run(THD *thd)
Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run");
- LOCK_SCHEDULER_DATA();
+ LOCK_DATA();
thread_id= thd->thread_id;
sql_print_information("SCHEDULER: Manager thread started with id %lu",
@@ -529,7 +474,7 @@ Event_scheduler::run(THD *thd)
COND_STATE_WAIT(NULL);
thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_state"));
- LOCK_SCHEDULER_DATA();
+ LOCK_DATA();
}
else if (abstime.tv_sec)
{
@@ -545,16 +490,16 @@ Event_scheduler::run(THD *thd)
1. Spurious wake-up
2. The top of the queue was changed (new one becase of create/update)
*/
- /* This will do implicit UNLOCK_SCHEDULER_DATA() */
+ /* This will do implicit UNLOCK_DATA() */
thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
- LOCK_SCHEDULER_DATA();
+ LOCK_DATA();
}
else
{
- UNLOCK_SCHEDULER_DATA();
+ UNLOCK_DATA();
res= execute_top(thd, job_data);
- LOCK_SCHEDULER_DATA();
+ LOCK_DATA();
if (res)
break;
++started_events;
@@ -565,7 +510,7 @@ Event_scheduler::run(THD *thd)
pthread_cond_signal(&COND_state);
error:
state= INITIALIZED;
- UNLOCK_SCHEDULER_DATA();
+ UNLOCK_DATA();
sql_print_information("SCHEDULER: Stopped");
DBUG_RETURN(res);
@@ -627,23 +572,52 @@ error:
/*
- Returns the current state of the scheduler
+ Stops the scheduler (again). Waits for acknowledgement from the
+ scheduler that it has stopped - synchronous stopping.
SYNOPSIS
- Event_scheduler::get_state()
+ Event_scheduler::stop()
RETURN VALUE
- The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
+ FALSE OK
+ TRUE Error (not reported)
*/
-enum Event_scheduler::enum_state
-Event_scheduler::get_state()
+bool
+Event_scheduler::stop()
{
- enum Event_scheduler::enum_state ret;
- LOCK_SCHEDULER_DATA();
- ret= state;
- UNLOCK_SCHEDULER_DATA();
- return ret;
+ THD *thd= current_thd;
+ DBUG_ENTER("Event_scheduler::stop");
+ DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
+
+ LOCK_DATA();
+ DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
+ if (state != RUNNING)
+ goto end;
+
+ state= STOPPING;
+
+ DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
+ sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
+
+ pthread_cond_signal(&COND_state);
+
+ /* Guarantee we don't catch spurious signals */
+ sql_print_information("SCHEDULER: Waiting the manager thread to reply");
+ do {
+ DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
+ "thread. Current value of state is %s . "
+ "workers count=%d", scheduler_states_names[state].str,
+ workers_count()));
+ /* thd could be 0x0, when shutting down */
+ COND_STATE_WAIT(NULL);
+ } while (state == STOPPING);
+ DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
+
+ thread_id= 0;
+end:
+ UNLOCK_DATA();
+ DBUG_RETURN(FALSE);
}
@@ -697,7 +671,7 @@ Event_scheduler::queue_changed()
/*
Auxiliary function for locking LOCK_scheduler_state. Used
- by the LOCK_SCHEDULER_DATA macro.
+ by the LOCK_DATA macro.
SYNOPSIS
Event_scheduler::lock_data()
@@ -720,7 +694,7 @@ Event_scheduler::lock_data(const char *func, uint line)
/*
Auxiliary function for unlocking LOCK_scheduler_state. Used
- by the UNLOCK_SCHEDULER_DATA macro.
+ by the UNLOCK_DATA macro.
SYNOPSIS
Event_scheduler::unlock_data()
@@ -754,30 +728,52 @@ Event_scheduler::unlock_data(const char *func, uint line)
*/
void
-Event_scheduler::cond_wait(struct timespec *abstime,
- const char *func, uint line)
+Event_scheduler::cond_wait(struct timespec *abstime, const char *func,
+ uint line)
{
- DBUG_ENTER("Event_scheduler::cond_wait");
waiting_on_cond= TRUE;
mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
-
- if (abstime)
- pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
- else
+ if (!abstime)
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
-
+ else
+ pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE;
waiting_on_cond= FALSE;
+}
- DBUG_VOID_RETURN;
+
+/*
+ Returns the current state of the scheduler
+
+ SYNOPSIS
+ Event_scheduler::get_state()
+
+ RETURN VALUE
+ The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
+*/
+
+enum Event_scheduler::enum_state
+Event_scheduler::get_state()
+{
+ enum Event_scheduler::enum_state ret;
+ DBUG_ENTER("Event_scheduler::get_state");
+ LOCK_DATA();
+ ret= state;
+ UNLOCK_DATA();
+ DBUG_RETURN(ret);
}
/*
+ REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF
+ Returns whether the scheduler was initialized.
+*/
+
+/*
Dumps the internal status of the scheduler
SYNOPSIS
@@ -805,80 +801,82 @@ Event_scheduler::dump_internal_status(THD *thd)
tmp_string.length(0);
int_string.length(0);
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler state"), scs);
- protocol->store(scheduler_states_names[state].str,
- scheduler_states_names[state].length, scs);
+ do
+ {
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler state"), scs);
+ protocol->store(scheduler_states_names[state].str,
+ scheduler_states_names[state].length, scs);
- if ((ret= protocol->write()))
- goto end;
+ if ((ret= protocol->write()))
+ break;
- /* thread_id */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("thread_id"), scs);
- if (thread_id)
- {
- int_string.set((longlong) thread_id, scs);
- protocol->store(&int_string);
- }
- else
- protocol->store_null();
- if ((ret= protocol->write()))
- goto end;
+ /* thread_id */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("thread_id"), scs);
+ if (thread_id)
+ {
+ int_string.set((longlong) thread_id, scs);
+ protocol->store(&int_string);
+ }
+ else
+ protocol->store_null();
+ if ((ret= protocol->write()))
+ break;
- /* last locked at*/
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- mutex_last_locked_in_func,
- mutex_last_locked_at_line));
- protocol->store(&tmp_string);
- if ((ret= protocol->write()))
- goto end;
+ /* last locked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_locked_in_func,
+ mutex_last_locked_at_line));
+ protocol->store(&tmp_string);
+ if ((ret= protocol->write()))
+ break;
- /* last unlocked at*/
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
- tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
- tmp_string.alloced_length(), "%s::%d",
- mutex_last_unlocked_in_func,
- mutex_last_unlocked_at_line));
- protocol->store(&tmp_string);
- if ((ret= protocol->write()))
- goto end;
+ /* last unlocked at*/
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
+ tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
+ tmp_string.alloced_length(), "%s::%d",
+ mutex_last_unlocked_in_func,
+ mutex_last_unlocked_at_line));
+ protocol->store(&tmp_string);
+ if ((ret= protocol->write()))
+ break;
- /* waiting on */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
- int_string.set((longlong) waiting_on_cond, scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
+ /* waiting on */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
+ int_string.set((longlong) waiting_on_cond, scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ break;
- /* workers_count */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
- int_string.set((longlong) workers_count(), scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
+ /* workers_count */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
+ int_string.set((longlong) workers_count(), scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ break;
- /* workers_count */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
- int_string.set((longlong) started_events, scs);
- protocol->store(&int_string);
- if ((ret= protocol->write()))
- goto end;
+ /* workers_count */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
+ int_string.set((longlong) started_events, scs);
+ protocol->store(&int_string);
+ if ((ret= protocol->write()))
+ break;
- /* scheduler_data_locked */
- protocol->prepare_for_resend();
- protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
- int_string.set((longlong) mutex_scheduler_data_locked, scs);
- protocol->store(&int_string);
- ret= protocol->write();
-end:
+ /* scheduler_data_locked */
+ protocol->prepare_for_resend();
+ protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
+ int_string.set((longlong) mutex_scheduler_data_locked, scs);
+ protocol->store(&int_string);
+ ret= protocol->write();
+ } while (0);
#endif
DBUG_RETURN(ret);
diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h
index 8e00b7b767f..b5c3dae49f8 100644
--- a/sql/event_scheduler.h
+++ b/sql/event_scheduler.h
@@ -56,7 +56,7 @@ public:
bool
run(THD *thd);
- bool
+ void
init_scheduler(Event_queue *queue);
void