diff options
41 files changed, 4112 insertions, 2352 deletions
diff --git a/.bzrignore b/.bzrignore index 095c04b36ee..57fcbdd8f73 100644 --- a/.bzrignore +++ b/.bzrignore @@ -1770,3 +1770,4 @@ vio/viotest-sslconnect.cpp vio/viotest.cpp zlib/*.ds? zlib/*.vcproj +libmysqld/event_scheduler.cc diff --git a/libmysqld/Makefile.am b/libmysqld/Makefile.am index dab99d7509e..63e75c34a6d 100644 --- a/libmysqld/Makefile.am +++ b/libmysqld/Makefile.am @@ -68,7 +68,7 @@ sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \ spatial.cc gstream.cc sql_help.cc tztime.cc sql_cursor.cc \ sp_head.cc sp_pcontext.cc sp.cc sp_cache.cc sp_rcontext.cc \ parse_file.cc sql_view.cc sql_trigger.cc my_decimal.cc \ - event_executor.cc event.cc event_timed.cc \ + event_scheduler.cc event.cc event_timed.cc \ rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \ sql_tablespace.cc \ rpl_injector.cc my_user.c partition_info.cc diff --git a/mysql-test/r/events.result b/mysql-test/r/events.result index 01d206be7cb..d02a2af3c9f 100644 --- a/mysql-test/r/events.result +++ b/mysql-test/r/events.result @@ -17,13 +17,13 @@ db_x SHOW TABLES FROM db_x; Tables_in_db_x x_table -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; DROP EVENT e_x1; DROP EVENT e_x2; DROP DATABASE db_x; DROP USER pauline@localhost; USE events_test; -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; drop event if exists event1; Warnings: Note 1305 Event event1 does not exist @@ -100,7 +100,7 @@ a 800219 drop event non_qualif_ev; drop table non_qualif; -set global event_scheduler = 0; +set global event_scheduler = 2; create table t_event3 (a int, b float); drop event if exists event3; Warnings: @@ -324,18 +324,18 @@ events_test one_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test three_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test two_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED "This should show us only 3 events:"; -SHOW FULL EVENTS; +SHOW EVENTS; Db Name Definer Type Execute at Interval value Interval field Starts Ends Status events_test one_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test three_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test two_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED "This should show us only 2 events:"; -SHOW FULL EVENTS LIKE 't%event'; +SHOW EVENTS LIKE 't%event'; Db Name Definer Type Execute at Interval value Interval field Starts Ends Status events_test three_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test two_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED "This should show us no events:"; -SHOW FULL EVENTS FROM test LIKE '%'; +SHOW EVENTS FROM test LIKE '%'; Db Name Definer Type Execute at Interval value Interval field Starts Ends Status DROP DATABASE events_test2; "should see 1 event:"; @@ -343,11 +343,8 @@ SHOW EVENTS; Db Name Definer Type Execute at Interval value Interval field Starts Ends Status events_test one_event root@localhost RECURRING NULL 10 SECOND # # ENABLED "we should see 4 events now:"; -SHOW FULL EVENTS; +SHOW EVENTS; Db Name Definer Type Execute at Interval value Interval field Starts Ends Status -events_test one_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED -events_test three_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED -events_test two_event ev_test@localhost RECURRING NULL 20 SECOND # # ENABLED events_test one_event root@localhost RECURRING NULL 10 SECOND # # ENABLED SELECT EVENT_CATALOG, EVENT_SCHEMA, EVENT_NAME, DEFINER, EVENT_BODY, EVENT_TYPE, EXECUTE_AT, INTERVAL_VALUE, INTERVAL_FIELD, STATUS,ON_COMPLETION, EVENT_COMMENT from information_schema.events; EVENT_CATALOG EVENT_SCHEMA EVENT_NAME DEFINER EVENT_BODY EVENT_TYPE EXECUTE_AT INTERVAL_VALUE INTERVAL_FIELD STATUS ON_COMPLETION EVENT_COMMENT @@ -373,12 +370,12 @@ ERROR HY000: Incorrect AT value: 'definitely not a datetime' set names utf8; create event задачка on schedule every 123 minute starts now() ends now() + interval 1 month do select 1; drop event задачка; -set event_scheduler=0; +set event_scheduler=2; ERROR HY000: Variable 'event_scheduler' is a GLOBAL variable and should be set with SET GLOBAL -set global event_scheduler=2; -ERROR 42000: Variable 'event_scheduler' can't be set to the value of '2' +set global event_scheduler=3; +ERROR 42000: Variable 'event_scheduler' can't be set to the value of '3' "DISABLE the scheduler. Testing that it does not work when the variable is 0" -set global event_scheduler=0; +set global event_scheduler=2; select definer, name, db from mysql.event; definer name db select get_lock("test_lock1", 20); @@ -389,9 +386,10 @@ create event закачка on schedule every 10 hour do select get_lock("test_l select definer, name, db from mysql.event; definer name db root@localhost закачка events_test -"Should be 0 processes" +"Should be only 1 process" select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; user host db command state info +event_scheduler localhost NULL Connect Suspended NULL select release_lock("test_lock1"); release_lock("test_lock1") 1 @@ -409,11 +407,12 @@ get_lock("test_lock2", 20) create event закачка on schedule every 10 hour do select get_lock("test_lock2", 20); "Let some time pass to the event starts" "Should have only 2 processes: the scheduler and the locked event" -select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; +select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; user host db command state info event_scheduler localhost NULL Connect Sleeping NULL root localhost events_test Connect User lock select get_lock("test_lock2", 20) "Release the mutex, the event worker should finish." +"Release the mutex, the event worker should finish." select release_lock("test_lock2"); release_lock("test_lock2") 1 @@ -423,21 +422,17 @@ select get_lock("test_lock2_1", 20); get_lock("test_lock2_1", 20) 1 create event закачка21 on schedule every 10 hour do select get_lock("test_lock2_1", 20); -"Should see 1 process, locked on get_lock(" -"Shutting down the scheduler, it should wait for the running event" -set global event_scheduler=0; -"Should have only 2 processes: the scheduler and the locked event" -select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; +"Should have only 3 processes: the scheduler, our conn and the locked event" +select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; user host db command state info event_scheduler localhost NULL Connect Sleeping NULL root localhost events_test Connect User lock select get_lock("test_lock2_1", 20) -"Release the lock so the child process should finish. Hence the scheduler also" -select release_lock("test_lock2_1"); -release_lock("test_lock2_1") -1 -"Should see 0 processes now:" -select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; +set global event_scheduler=2; +"Should have only our process now:" +select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; user host db command state info +event_scheduler localhost NULL Connect Suspended NULL +root localhost events_test Connect User lock select get_lock("test_lock2_1", 20) drop event закачка21; create table t_16 (s1 int); create trigger t_16_bi before insert on t_16 for each row create event e_16 on schedule every 1 second do set @a=5; @@ -457,6 +452,9 @@ select 2; select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; event_schema event_name definer event_body events_test white_space root@localhost select 2 +select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; +event_schema event_name definer event_body +events_test white_space root@localhost select 2 drop event white_space; create event white_space on schedule every 10 hour disable do select 3; select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; diff --git a/mysql-test/r/events_bugs.result b/mysql-test/r/events_bugs.result index ef1ccfadecb..bc89c692f9a 100644 --- a/mysql-test/r/events_bugs.result +++ b/mysql-test/r/events_bugs.result @@ -35,7 +35,7 @@ create event e_55 on schedule every 10 hour starts 99990101000000 do drop table ERROR HY000: Incorrect STARTS value: '99990101000000' create event e_55 on schedule every 10 minute ends 99990101000000 do drop table t; ERROR HY000: ENDS is either invalid or before STARTS -set global event_scheduler=0; +set global event_scheduler=2; "Wait a bit to settle down" delete from mysql.event; set global event_scheduler= 1; @@ -57,7 +57,7 @@ root localhost events_test Connect User lock select get_lock('test_bug16407', 60 select release_lock('test_bug16407'); release_lock('test_bug16407') 1 -set global event_scheduler= 0; +set global event_scheduler= 2; select event_schema, event_name, sql_mode from information_schema.events order by event_schema, event_name; event_schema event_name sql_mode events_test e_16407 REAL_AS_FLOAT,PIPES_AS_CONCAT,ANSI_QUOTES,IGNORE_SPACE,ANSI @@ -115,7 +115,7 @@ release_lock('ee_16407_2') select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; user host db command state info event_scheduler localhost NULL Connect Sleeping NULL -set global event_scheduler= 0; +set global event_scheduler= 2; select * from events_smode_test order by ev_name, a; ev_name a ee_16407_3 1980-02-19 @@ -175,7 +175,7 @@ drop event ee_16407_5; drop event ee_16407_6; drop procedure ee_16407_5_pendant; drop procedure ee_16407_6_pendant; -set global event_scheduler= 0; +set global event_scheduler= 2; drop table events_smode_test; set sql_mode=@old_sql_mode; drop database events_test; diff --git a/mysql-test/r/events_logs_tests.result b/mysql-test/r/events_logs_tests.result index ab1666fefb9..911bc8b2d60 100644 --- a/mysql-test/r/events_logs_tests.result +++ b/mysql-test/r/events_logs_tests.result @@ -8,7 +8,7 @@ BEGIN SELECT user_host, argument FROM mysql.general_log WHERE argument LIKE '%alabala%'; END| "Check General Query Log" -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; create event log_general on schedule every 1 minute do SELect 'alabala', sleep(3) from dual; TRUNCATE mysql.general_log; "1 row, the current statement!" @@ -22,7 +22,7 @@ user_host argument root[root] @ localhost [localhost] SELect 'alabala', sleep(3) from dual DROP PROCEDURE select_general_log; DROP EVENT log_general; -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; "Check slow query log" "Save the values" SET @old_global_long_query_time:=(select get_value()); @@ -36,14 +36,14 @@ SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; user_host query_time db sql_text "Set new values" SET GLOBAL long_query_time=4; -SET SESSION long_query_time=2; +SET SESSION long_query_time=1; "Check that logging is working" -SELECT SLEEP(3); -SLEEP(3) +SELECT SLEEP(2); +SLEEP(2) 0 SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; user_host query_time db sql_text -root[root] @ localhost [] SLEEPVAL events_test SELECT SLEEP(3) +root[root] @ localhost [] SLEEPVAL events_test SELECT SLEEP(2) TRUNCATE mysql.slow_log; CREATE TABLE slow_event_test (slo_val tinyint, val tinyint); "This won't go to the slow log" @@ -54,7 +54,7 @@ SET GLOBAL event_scheduler=1; "Sleep some more time than the actual event run will take" SHOW VARIABLES LIKE 'event_scheduler'; Variable_name Value -event_scheduler ON +event_scheduler 1 "Check our table. Should see 1 row" SELECT * FROM slow_event_test; slo_val val @@ -64,18 +64,19 @@ SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; user_host query_time db sql_text "This should go to the slow log" SET SESSION long_query_time=10; +SET GLOBAL long_query_time=1; DROP EVENT long_event; -CREATE EVENT long_event2 ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(5); +CREATE EVENT long_event2 ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(2); "Sleep some more time than the actual event run will take" "Check our table. Should see 2 rows" SELECT * FROM slow_event_test; slo_val val 4 0 -4 0 -"Check slow log. Should see 1 row because 5 is over the threshold of 4 for GLOBAL, though under SESSION which is 10" +1 0 +"Check slow log. Should see 1 row because 4 is over the threshold of 3 for GLOBAL, though under SESSION which is 10" SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; user_host query_time db sql_text -root[root] @ localhost [localhost] SLEEPVAL events_test INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(5) +root[root] @ localhost [localhost] SLEEPVAL events_test INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(2) DROP EVENT long_event2; SET GLOBAL long_query_time =@old_global_long_query_time; SET SESSION long_query_time =@old_session_long_query_time; diff --git a/mysql-test/r/events_microsec.result b/mysql-test/r/events_microsec.result index ed15b066b93..b96bd551511 100644 --- a/mysql-test/r/events_microsec.result +++ b/mysql-test/r/events_microsec.result @@ -10,50 +10,4 @@ CREATE EVENT micro_test ON SCHEDULE EVERY 100 MINUTE_MICROSECOND DO SELECT 1; ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' CREATE EVENT micro_test ON SCHEDULE EVERY 100 SECOND_MICROSECOND DO SELECT 1; ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -"Now create normal event and change it on SQL level" -CREATE EVENT micro_test2 ON SCHEDULE EVERY 1 MONTH DO SELECT 1; -UPDATE mysql.event SET interval_field='MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; -SHOW CREATE EVENT micro_test2; -ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -SET GLOBAL event_scheduler=0; -"Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -Variable_name Value -event_scheduler OFF -UPDATE mysql.event SET interval_field='DAY_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; -SHOW CREATE EVENT micro_test2; -ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -SET GLOBAL event_scheduler=0; -"Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -Variable_name Value -event_scheduler OFF -UPDATE mysql.event SET interval_field='SECOND_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; -SHOW CREATE EVENT micro_test2; -ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -SET GLOBAL event_scheduler=0; -"Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -Variable_name Value -event_scheduler OFF -UPDATE mysql.event SET interval_field='HOUR_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; -SHOW CREATE EVENT micro_test2; -ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -SET GLOBAL event_scheduler=0; -"Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -Variable_name Value -event_scheduler OFF -UPDATE mysql.event SET interval_field='MINUTE_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; -SHOW CREATE EVENT micro_test2; -ERROR 42000: This version of MySQL doesn't yet support 'MICROSECOND' -SET GLOBAL event_scheduler=0; -"Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -Variable_name Value -event_scheduler OFF -SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER='event_scheduler'; -COUNT(*) -0 -DROP EVENT micro_test2; drop database events_test; diff --git a/mysql-test/r/events_scheduling.result b/mysql-test/r/events_scheduling.result index 8b1f29d320f..aec2053f0e7 100644 --- a/mysql-test/r/events_scheduling.result +++ b/mysql-test/r/events_scheduling.result @@ -14,7 +14,7 @@ ENDS NOW() + INTERVAL 6 SECOND ON COMPLETION PRESERVE DO INSERT INTO table_2 VALUES(1); CREATE EVENT only_one_time ON SCHEDULE EVERY 2 SECOND ENDS NOW() + INTERVAL 1 SECOND DO INSERT INTO table_3 VALUES(1); -CREATE EVENT two_time ON SCHEDULE EVERY 1 SECOND ENDS NOW() + INTERVAL 1 SECOND DO INSERT INTO table_4 VALUES(1); +CREATE EVENT two_time ON SCHEDULE EVERY 1 SECOND ENDS NOW() + INTERVAL 1 SECOND ON COMPLETION PRESERVE DO INSERT INTO table_4 VALUES(1); SELECT IF(SUM(a) >= 4, 'OK', 'ERROR') FROM table_1; IF(SUM(a) >= 4, 'OK', 'ERROR') OK @@ -38,9 +38,12 @@ DROP EVENT start_n_end; "Already dropped because ended. Therefore an error." DROP EVENT only_one_time; ERROR HY000: Unknown event 'only_one_time' -"Already dropped because ended. Therefore an error." +"Should be preserved" +SELECT EVENT_NAME, STATUS FROM INFORMATION_SCHEMA.EVENTS; +EVENT_NAME STATUS +E19170 ENABLED +two_time DISABLED DROP EVENT two_time; -ERROR HY000: Unknown event 'two_time' DROP TABLE table_1; DROP TABLE table_2; DROP TABLE table_3; diff --git a/mysql-test/r/events_stress.result b/mysql-test/r/events_stress.result index 9f95cfad75d..a903a4502bf 100644 --- a/mysql-test/r/events_stress.result +++ b/mysql-test/r/events_stress.result @@ -1,46 +1,61 @@ CREATE DATABASE IF NOT EXISTS events_test; -CREATE DATABASE events_test2; -USE events_test2; +CREATE DATABASE events_conn1_test2; +CREATE TABLE events_test.fill_it(test_name varchar(20), occur datetime); +CREATE USER event_user2@localhost; +CREATE DATABASE events_conn2_db; +GRANT ALL ON *.* TO event_user2@localhost; +CREATE USER event_user3@localhost; +CREATE DATABASE events_conn3_db; +GRANT ALL ON *.* TO event_user3@localhost; +"In the second connection we create some events which won't be dropped till the end" +"In the second connection we create some events which won't be dropped till the end" +USE events_conn1_test2; CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; USE events_test; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS; +COUNT(*) +203 +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; COUNT(*) 3 -DROP DATABASE events_test2; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +DROP DATABASE events_conn1_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; COUNT(*) 0 "Now testing stability - dropping db -> events while they are running" -CREATE DATABASE events_test2; -USE events_test2; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +CREATE DATABASE events_conn1_test2; +USE events_conn1_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; COUNT(*) -1000 +100 SET GLOBAL event_scheduler=1; -DROP DATABASE events_test2; -SET GLOBAL event_scheduler=0; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +DROP DATABASE events_conn1_test2; +SET GLOBAL event_scheduler=2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; COUNT(*) 0 -CREATE DATABASE events_test3; -USE events_test3; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; +CREATE DATABASE events_conn1_test3; +USE events_conn1_test3; +SET GLOBAL event_scheduler=1; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test3'; COUNT(*) -950 -CREATE DATABASE events_test4; -USE events_test4; -CREATE DATABASE events_test2; -USE events_test2; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +100 +CREATE DATABASE events_conn1_test4; +USE events_conn1_test4; +CREATE DATABASE events_conn1_test2; +USE events_conn1_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; COUNT(*) -1050 -DROP DATABASE events_test2; -SET GLOBAL event_scheduler=0; -DROP DATABASE events_test3; -SET GLOBAL event_scheduler=1; -DROP DATABASE events_test4; +100 +DROP DATABASE events_conn2_db; +DROP DATABASE events_conn3_db; +DROP DATABASE events_conn1_test2; +DROP DATABASE events_conn1_test3; +SET GLOBAL event_scheduler=2; +DROP DATABASE events_conn1_test4; SET GLOBAL event_scheduler=1; USE events_test; +DROP TABLE fill_it; DROP DATABASE events_test; diff --git a/mysql-test/t/disabled.def b/mysql-test/t/disabled.def index 7cbe4419904..e29e51b8c2c 100644 --- a/mysql-test/t/disabled.def +++ b/mysql-test/t/disabled.def @@ -9,10 +9,10 @@ # Do not use any TAB characters for whitespace. # ############################################################################## -events_bugs : BUG#17619 2006-02-21 andrey Race conditions -events_stress : BUG#17619 2006-02-21 andrey Race conditions -events : BUG#17619 2006-02-21 andrey Race conditions -events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails on some platforms. Has to be checked. +#events_bugs : BUG#17619 2006-02-21 andrey Race conditions +#events_stress : BUG#17619 2006-02-21 andrey Race conditions +#events : BUG#17619 2006-02-21 andrey Race conditions +#events_scheduling : BUG#19170 2006-04-26 andrey Test case of 19170 fails on some platforms. Has to be checked. ndb_autodiscover : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_autodiscover2 : BUG#18952 2006-02-16 jmiller Needs to be fixed w.r.t binlog ndb_binlog_discover : BUG#19395 2006-04-28 tomas/knielsen mysqld does not always detect cluster shutdown diff --git a/mysql-test/t/events.test b/mysql-test/t/events.test index fbcd4924d56..819d64ccf14 100644 --- a/mysql-test/t/events.test +++ b/mysql-test/t/events.test @@ -15,11 +15,10 @@ CREATE EVENT e_x2 ON SCHEDULE EVERY 1 SECOND DO DROP TABLE x_table; connection default; SHOW DATABASES LIKE 'db_x'; SET GLOBAL event_scheduler=1; ---sleep 2 +--sleep 1.5 SHOW DATABASES LIKE 'db_x'; SHOW TABLES FROM db_x; -SET GLOBAL event_scheduler=0; ---sleep 1 +SET GLOBAL event_scheduler=2; connection priv_conn; DROP EVENT e_x1; DROP EVENT e_x2; @@ -31,8 +30,7 @@ USE events_test; # # END: BUG #17289 Events: missing privilege check for drop database # -SET GLOBAL event_scheduler=0; ---sleep 1 +SET GLOBAL event_scheduler=2; drop event if exists event1; create event event1 on schedule every 15 minute starts now() ends date_add(now(), interval 5 hour) DO begin end; alter event event1 rename to event2 enable; @@ -92,11 +90,11 @@ drop event e_43; --echo "Let's check whether we can use non-qualified names" create table non_qualif(a int); create event non_qualif_ev on schedule every 10 minute do insert into non_qualif values (800219); ---sleep 2 +--sleep 1 select * from non_qualif; drop event non_qualif_ev; drop table non_qualif; -set global event_scheduler = 0; +set global event_scheduler = 2; create table t_event3 (a int, b float); drop event if exists event3; @@ -281,15 +279,15 @@ SHOW EVENTS; --echo "This should show us only 3 events:"; --replace_column 8 # 9 # -SHOW FULL EVENTS; +SHOW EVENTS; --echo "This should show us only 2 events:"; --replace_column 8 # 9 # -SHOW FULL EVENTS LIKE 't%event'; +SHOW EVENTS LIKE 't%event'; --echo "This should show us no events:"; --replace_column 8 # 9 # -SHOW FULL EVENTS FROM test LIKE '%'; +SHOW EVENTS FROM test LIKE '%'; #ok, we are back connection default; DROP DATABASE events_test2; @@ -300,7 +298,7 @@ SHOW EVENTS; --echo "we should see 4 events now:"; --replace_column 8 # 9 # -SHOW FULL EVENTS; +SHOW EVENTS; SELECT EVENT_CATALOG, EVENT_SCHEMA, EVENT_NAME, DEFINER, EVENT_BODY, EVENT_TYPE, EXECUTE_AT, INTERVAL_VALUE, INTERVAL_FIELD, STATUS,ON_COMPLETION, EVENT_COMMENT from information_schema.events; connection ev_con1; @@ -330,21 +328,21 @@ create event задачка on schedule every 123 minute starts now() ends now() drop event задачка; # event_scheduler is a global var ---error 1229 -set event_scheduler=0; -# event_scheduler could be only either 0 or 1 ---error 1231 -set global event_scheduler=2; +--error ER_GLOBAL_VARIABLE +set event_scheduler=2; +# event_scheduler could be only either 1 or 2 +--error ER_WRONG_VALUE_FOR_VAR +set global event_scheduler=3; --echo "DISABLE the scheduler. Testing that it does not work when the variable is 0" -set global event_scheduler=0; +set global event_scheduler=2; select definer, name, db from mysql.event; select get_lock("test_lock1", 20); create event закачка on schedule every 10 hour do select get_lock("test_lock1", 20); --echo "Should return 1 row" select definer, name, db from mysql.event; ---echo "Should be 0 processes" +--echo "Should be only 1 process" select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select release_lock("test_lock1"); drop event закачка; @@ -362,7 +360,7 @@ create event закачка on schedule every 10 hour do select get_lock("test_l --echo "Let some time pass to the event starts" --sleep 2 --echo "Should have only 2 processes: the scheduler and the locked event" -select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; +select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;--echo "Release the mutex, the event worker should finish." --echo "Release the mutex, the event worker should finish." select release_lock("test_lock2"); drop event закачка; @@ -379,18 +377,11 @@ set global event_scheduler=1; select get_lock("test_lock2_1", 20); create event закачка21 on schedule every 10 hour do select get_lock("test_lock2_1", 20); --sleep 1 ---echo "Should see 1 process, locked on get_lock(" -#select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; ---echo "Shutting down the scheduler, it should wait for the running event" -set global event_scheduler=0; ---sleep 1 ---echo "Should have only 2 processes: the scheduler and the locked event" +--echo "Should have only 3 processes: the scheduler, our conn and the locked event" +select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; +set global event_scheduler=2; +--echo "Should have only our process now:" select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; ---echo "Release the lock so the child process should finish. Hence the scheduler also" -select release_lock("test_lock2_1"); ---sleep 1 ---echo "Should see 0 processes now:" -select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; drop event закачка21; #### @@ -418,6 +409,7 @@ create event white_space on schedule every 10 hour disable do select 2; select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; +select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; drop event white_space; create event white_space on schedule every 10 hour disable do select 3; select event_schema, event_name, definer, event_body from information_schema.events where event_name='white_space'; @@ -426,7 +418,7 @@ drop event white_space; # END: BUG #17453: Creating Event crash the server # -##set global event_scheduler=1; +# # Bug#17403 "Events: packets out of order with show create event" # create event e1 on schedule every 1 year do set @a = 5; @@ -440,7 +432,7 @@ drop event e1; ##select get_lock("test_lock3", 20); ##create event закачка on schedule every 10 hour do select get_lock("test_lock3", 20); ##select sleep(2); -##show processlist; +##select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; ##drop event закачка; ##select release_lock("test_lock3"); @@ -450,14 +442,14 @@ drop event e1; ##select get_lock("test_lock4", 20); ##create event закачка4 on schedule every 1 second do select get_lock("test_lock4", 20); ##select sleep(3); -##--replace_column 1 # 6 # +##select /*6*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; ##drop event закачка4; ##select release_lock("test_lock4"); -##set global event_scheduler=0; +##set global event_scheduler=2; ##select sleep(2); ##--replace_column 1 # 6 # +##show processlist; ##select count(*) from mysql.event; drop database events_test; - diff --git a/mysql-test/t/events_bugs.test b/mysql-test/t/events_bugs.test index 3f339ff0398..e3b79a6bd13 100644 --- a/mysql-test/t/events_bugs.test +++ b/mysql-test/t/events_bugs.test @@ -30,13 +30,13 @@ set @a=3; CREATE PROCEDURE p_16 () CREATE EVENT e_16 ON SCHEDULE EVERY @a SECOND DO SET @a=5; call p_16(); --echo "Here we used to crash!" ---error 1516 +--error ER_EVENT_ALREADY_EXISTS call p_16(); ---error 1516 +--error ER_EVENT_ALREADY_EXISTS call p_16(); DROP EVENT e_16; CALL p_16(); ---error 1516 +--error ER_EVENT_ALREADY_EXISTS CALL p_16(); DROP PROCEDURE p_16; DROP EVENT e_16; @@ -47,9 +47,9 @@ DROP EVENT e_16; # # Start - 16396: Events: Distant-future dates become past dates # ---error 1504 +--error ER_WRONG_VALUE create event e_55 on schedule at 99990101000000 do drop table t; ---error 1504 +--error ER_WRONG_VALUE create event e_55 on schedule every 10 hour starts 99990101000000 do drop table t; --error ER_EVENT_ENDS_BEFORE_STARTS create event e_55 on schedule every 10 minute ends 99990101000000 do drop table t; @@ -60,7 +60,7 @@ create event e_55 on schedule every 10 minute ends 99990101000000 do drop table # # Start - 16407: Events: Changes in sql_mode won't be taken into account # -set global event_scheduler=0; +set global event_scheduler=2; --echo "Wait a bit to settle down" --sleep 1 delete from mysql.event; @@ -79,7 +79,7 @@ delimiter ;| --echo "Now if everything is fine the event has compiled and is locked select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select release_lock('test_bug16407'); -set global event_scheduler= 0; +set global event_scheduler= 2; select event_schema, event_name, sql_mode from information_schema.events order by event_schema, event_name; --echo "Let's check whether we change the sql_mode on ALTER EVENT" set sql_mode='traditional'; @@ -121,9 +121,9 @@ set global event_scheduler= 1; --sleep 1 select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select release_lock('ee_16407_2'); ---sleep 3 +--sleep 2 select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; -set global event_scheduler= 0; +set global event_scheduler= 2; select * from events_smode_test order by ev_name, a; --echo "OK, last check before we drop them" select event_schema, event_name, sql_mode from information_schema.events order by event_schema, event_name; @@ -156,7 +156,7 @@ set global event_scheduler= 1; --echo "Should have 2 locked processes" select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select release_lock('ee_16407_5'); ---sleep 3 +--sleep 2 --echo "Should have 0 processes locked" select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select * from events_smode_test order by ev_name, a; @@ -166,7 +166,7 @@ drop event ee_16407_5; drop event ee_16407_6; drop procedure ee_16407_5_pendant; drop procedure ee_16407_6_pendant; -set global event_scheduler= 0; +set global event_scheduler= 2; drop table events_smode_test; set sql_mode=@old_sql_mode; # diff --git a/mysql-test/t/events_logs_tests.test b/mysql-test/t/events_logs_tests.test index 21adc17d5b8..a468685ddc6 100644 --- a/mysql-test/t/events_logs_tests.test +++ b/mysql-test/t/events_logs_tests.test @@ -10,7 +10,7 @@ BEGIN END| delimiter ;| --echo "Check General Query Log" -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; create event log_general on schedule every 1 minute do SELect 'alabala', sleep(3) from dual; TRUNCATE mysql.general_log; --echo "1 row, the current statement!" @@ -22,7 +22,7 @@ SET GLOBAL event_scheduler=1; call select_general_log(); DROP PROCEDURE select_general_log; DROP EVENT log_general; -SET GLOBAL event_scheduler=0; +SET GLOBAL event_scheduler=2; --sleep 1 --echo "Check slow query log" @@ -53,10 +53,10 @@ TRUNCATE mysql.slow_log; SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; --echo "Set new values" SET GLOBAL long_query_time=4; -SET SESSION long_query_time=2; +SET SESSION long_query_time=1; --echo "Check that logging is working" -SELECT SLEEP(3); ---replace_regex /00:00:0[3-5]/SLEEPVAL/ +SELECT SLEEP(2); +--replace_regex /00:00:0[2-4]/SLEEPVAL/ SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; TRUNCATE mysql.slow_log; CREATE TABLE slow_event_test (slo_val tinyint, val tinyint); @@ -73,14 +73,15 @@ SELECT * FROM slow_event_test; SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; --echo "This should go to the slow log" SET SESSION long_query_time=10; +SET GLOBAL long_query_time=1; DROP EVENT long_event; -CREATE EVENT long_event2 ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(5); +CREATE EVENT long_event2 ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO slow_event_test SELECT @@long_query_time, SLEEP(2); --echo "Sleep some more time than the actual event run will take" ---sleep 7 +--sleep 3 --echo "Check our table. Should see 2 rows" SELECT * FROM slow_event_test; ---echo "Check slow log. Should see 1 row because 5 is over the threshold of 4 for GLOBAL, though under SESSION which is 10" ---replace_regex /00:00:0[5-7]/SLEEPVAL/ +--echo "Check slow log. Should see 1 row because 4 is over the threshold of 3 for GLOBAL, though under SESSION which is 10" +--replace_regex /00:00:0[2-4]/SLEEPVAL/ SELECT user_host, query_time, db, sql_text FROM mysql.slow_log; DROP EVENT long_event2; SET GLOBAL long_query_time =@old_global_long_query_time; diff --git a/mysql-test/t/events_microsec.test b/mysql-test/t/events_microsec.test index 34855fdadff..e01120a0756 100644 --- a/mysql-test/t/events_microsec.test +++ b/mysql-test/t/events_microsec.test @@ -1,55 +1,15 @@ create database if not exists events_test; use events_test; ---error 1235 +--error ER_NOT_SUPPORTED_YET CREATE EVENT micro_test ON SCHEDULE EVERY 100 MICROSECOND DO SELECT 1; ---error 1235 +--error ER_NOT_SUPPORTED_YET CREATE EVENT micro_test ON SCHEDULE EVERY 100 DAY_MICROSECOND DO SELECT 1; ---error 1235 +--error ER_NOT_SUPPORTED_YET CREATE EVENT micro_test ON SCHEDULE EVERY 100 HOUR_MICROSECOND DO SELECT 1; ---error 1235 +--error ER_NOT_SUPPORTED_YET CREATE EVENT micro_test ON SCHEDULE EVERY 100 MINUTE_MICROSECOND DO SELECT 1; ---error 1235 +--error ER_NOT_SUPPORTED_YET CREATE EVENT micro_test ON SCHEDULE EVERY 100 SECOND_MICROSECOND DO SELECT 1; ---echo "Now create normal event and change it on SQL level" -CREATE EVENT micro_test2 ON SCHEDULE EVERY 1 MONTH DO SELECT 1; -UPDATE mysql.event SET interval_field='MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; ---error 1235 -SHOW CREATE EVENT micro_test2; -SET GLOBAL event_scheduler=0; ---sleep 1 ---echo "Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -UPDATE mysql.event SET interval_field='DAY_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; ---error 1235 -SHOW CREATE EVENT micro_test2; -SET GLOBAL event_scheduler=0; ---sleep 1 ---echo "Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -UPDATE mysql.event SET interval_field='SECOND_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; ---error 1235 -SHOW CREATE EVENT micro_test2; -SET GLOBAL event_scheduler=0; ---sleep 1 ---echo "Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -UPDATE mysql.event SET interval_field='HOUR_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; ---error 1235 -SHOW CREATE EVENT micro_test2; -SET GLOBAL event_scheduler=0; ---sleep 1 ---echo "Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -UPDATE mysql.event SET interval_field='MINUTE_MICROSECOND' WHERE db=database() AND definer=user() AND name='micro_test2'; ---error 1235 -SHOW CREATE EVENT micro_test2; -SET GLOBAL event_scheduler=0; ---sleep 1 ---echo "Should not be running:" -SHOW VARIABLES like 'event_scheduler'; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER='event_scheduler'; -DROP EVENT micro_test2; - drop database events_test; diff --git a/mysql-test/t/events_scheduling.test b/mysql-test/t/events_scheduling.test index ae3cc7d5fac..a73d25cd8ee 100644 --- a/mysql-test/t/events_scheduling.test +++ b/mysql-test/t/events_scheduling.test @@ -15,7 +15,7 @@ CREATE EVENT start_n_end DO INSERT INTO table_2 VALUES(1); --sleep 5 CREATE EVENT only_one_time ON SCHEDULE EVERY 2 SECOND ENDS NOW() + INTERVAL 1 SECOND DO INSERT INTO table_3 VALUES(1); -CREATE EVENT two_time ON SCHEDULE EVERY 1 SECOND ENDS NOW() + INTERVAL 1 SECOND DO INSERT INTO table_4 VALUES(1); +CREATE EVENT two_time ON SCHEDULE EVERY 1 SECOND ENDS NOW() + INTERVAL 1 SECOND ON COMPLETION PRESERVE DO INSERT INTO table_4 VALUES(1); --sleep 5 SELECT IF(SUM(a) >= 4, 'OK', 'ERROR') FROM table_1; SELECT IF(SUM(a) >= 5, 'OK', 'ERROR') FROM table_2; @@ -28,8 +28,8 @@ DROP EVENT start_n_end; --echo "Already dropped because ended. Therefore an error." --error ER_EVENT_DOES_NOT_EXIST DROP EVENT only_one_time; ---echo "Already dropped because ended. Therefore an error." ---error ER_EVENT_DOES_NOT_EXIST +--echo "Should be preserved" +SELECT EVENT_NAME, STATUS FROM INFORMATION_SCHEMA.EVENTS; DROP EVENT two_time; DROP TABLE table_1; DROP TABLE table_2; diff --git a/mysql-test/t/events_stress.test b/mysql-test/t/events_stress.test index f6eed79425c..24bb5bc89b1 100644 --- a/mysql-test/t/events_stress.test +++ b/mysql-test/t/events_stress.test @@ -2,78 +2,120 @@ CREATE DATABASE IF NOT EXISTS events_test; # # DROP DATABASE test start (bug #16406) # -CREATE DATABASE events_test2; -USE events_test2; +CREATE DATABASE events_conn1_test2; +CREATE TABLE events_test.fill_it(test_name varchar(20), occur datetime); +CREATE USER event_user2@localhost; +CREATE DATABASE events_conn2_db; +GRANT ALL ON *.* TO event_user2@localhost; +CREATE USER event_user3@localhost; +CREATE DATABASE events_conn3_db; +GRANT ALL ON *.* TO event_user3@localhost; +connect (conn2,localhost,event_user2,,events_conn2_db); +--echo "In the second connection we create some events which won't be dropped till the end" +--disable_query_log +let $1= 100; +while ($1) +{ + eval CREATE EVENT conn2_ev$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn2_ev$1", NOW()); + dec $1; +} +--enable_query_log +connect (conn3,localhost,event_user3,,events_conn3_db); +--echo "In the second connection we create some events which won't be dropped till the end" +--disable_query_log +let $1= 100; +while ($1) +{ + eval CREATE EVENT conn3_ev$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn3_ev$1", NOW()); + dec $1; +} +--enable_query_log +connection default; +USE events_conn1_test2; CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1; USE events_test; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; -DROP DATABASE events_test2; -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; +DROP DATABASE events_conn1_test2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; --echo "Now testing stability - dropping db -> events while they are running" -CREATE DATABASE events_test2; -USE events_test2; +CREATE DATABASE events_conn1_test2; +USE events_conn1_test2; --disable_query_log -let $1= 1000; +let $1= 100; while ($1) { - eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + eval CREATE EVENT conn1_round1_ev$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn1_round1_ev$1", NOW()); dec $1; } --enable_query_log -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; SET GLOBAL event_scheduler=1; ---sleep 4 -DROP DATABASE events_test2; +--sleep 6 +DROP DATABASE events_conn1_test2; -SET GLOBAL event_scheduler=0; ---sleep 2 -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; -CREATE DATABASE events_test3; -USE events_test3; +SET GLOBAL event_scheduler=2; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; +CREATE DATABASE events_conn1_test3; +USE events_conn1_test3; --disable_query_log -let $1= 950; +let $1= 100; while ($1) { - eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + eval CREATE EVENT conn1_round2_ev$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn1_round2_ev$1", NOW()); dec $1; } --enable_query_log -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3'; ---sleep 3 -CREATE DATABASE events_test4; -USE events_test4; +SET GLOBAL event_scheduler=1; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test3'; +CREATE DATABASE events_conn1_test4; +USE events_conn1_test4; --disable_query_log -let $1= 860; +let $1= 100; while ($1) { - eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + eval CREATE EVENT conn1_round3_ev$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn1_round3_ev$1", NOW()); dec $1; } --enable_query_log - -CREATE DATABASE events_test2; -USE events_test2; +CREATE DATABASE events_conn1_test2; +USE events_conn1_test2; --disable_query_log -let $1= 1050; +let $1= 100; while ($1) { - eval CREATE EVENT ev_drop$1 ON SCHEDULE EVERY 1 SECOND DO SELECT $1; + eval CREATE EVENT ev_round4_drop$1 ON SCHEDULE EVERY 1 SECOND DO INSERT INTO events_test.fill_it VALUES("conn1_round4_ev$1", NOW()); dec $1; } --enable_query_log -SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2'; +SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; --sleep 6 -DROP DATABASE events_test2; -SET GLOBAL event_scheduler=0; -DROP DATABASE events_test3; -SET GLOBAL event_scheduler=1; -DROP DATABASE events_test4; +connection conn2; +--send +DROP DATABASE events_conn2_db; +connection conn3; +--send +DROP DATABASE events_conn3_db; +connection default; +--send +DROP DATABASE events_conn1_test2; +DROP DATABASE events_conn1_test3; +SET GLOBAL event_scheduler=2; +DROP DATABASE events_conn1_test4; SET GLOBAL event_scheduler=1; +connection conn2; +reap; +disconnect conn2; +connection conn3; +reap; +disconnect conn3; +connection default; USE events_test; +DROP TABLE fill_it; # # DROP DATABASE test end (bug #16406) # diff --git a/sql/Makefile.am b/sql/Makefile.am index ba9b58c0c5e..8f051b61e74 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -66,7 +66,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ parse_file.h sql_view.h sql_trigger.h \ sql_array.h sql_cursor.h event.h event_priv.h \ sql_plugin.h authors.h sql_partition.h \ - partition_info.h partition_element.h + partition_info.h partition_element.h event_scheduler.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ item_cmpfunc.cc item_strfunc.cc item_timefunc.cc \ @@ -103,7 +103,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ tztime.cc my_time.c my_user.c my_decimal.cc\ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ sp_cache.cc parse_file.cc sql_trigger.cc \ - event_executor.cc event.cc event_timed.cc \ + event_scheduler.cc event.cc event_timed.cc \ sql_plugin.cc sql_binlog.cc \ sql_builtin.cc sql_tablespace.cc partition_info.cc diff --git a/sql/cmakelists.txt b/sql/cmakelists.txt index 05b1efdbe51..2b44fbdcc79 100644 --- a/sql/cmakelists.txt +++ b/sql/cmakelists.txt @@ -51,7 +51,7 @@ ADD_EXECUTABLE(mysqld ../sql-common/client.c derror.cc des_key_file.cc sql_table.cc sql_test.cc sql_trigger.cc sql_udf.cc sql_union.cc sql_update.cc sql_view.cc strfunc.cc table.cc thr_malloc.cc time.cc tztime.cc uniques.cc unireg.cc item_xmlfunc.cc - rpl_tblmap.cc sql_binlog.cc event_executor.cc event_timed.cc + rpl_tblmap.cc sql_binlog.cc event_scheduler.cc event_timed.cc sql_tablespace.cc event.cc ../sql-common/my_user.c partition_info.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc diff --git a/sql/event.cc b/sql/event.cc index 4a3c6aad30c..7c3f17304aa 100644 --- a/sql/event.cc +++ b/sql/event.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2004-2005 MySQL AB +/* Copyright (C) 2004-2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,13 +16,12 @@ #include "event_priv.h" #include "event.h" +#include "event_scheduler.h" #include "sp.h" +#include "sp_head.h" /* TODO list : - - The default value of created/modified should not be 0000-00-00 because of - STRICT mode restricions. - - CREATE EVENT should not go into binary log! Does it now? The SQL statements issued by the EVENT are replicated. I have an idea how to solve the problem at failover. So the status field @@ -38,23 +37,8 @@ ENABLED to DISABLED status change and this is safe for replicating. As well an event may be deleted which is also safe for RBR. - - Maybe move all allocations during parsing to evex_mem_root thus saving - double parsing in evex_create_event! - - - If the server is killed (stopping) try to kill executing events? - - - What happens if one renames an event in the DB while it is in memory? - Or even deleting it? - - - Consider using conditional variable when doing shutdown instead of - waiting till all worker threads end. - - - Make Event_timed::get_show_create_event() work - - Add logging to file - - Move comparison code to class Event_timed - Warning: - For now parallel execution is not possible because the same sp_head cannot be executed few times!!! There is still no lock attached to particular event. @@ -62,12 +46,26 @@ Warning: */ -QUEUE EVEX_EQ_NAME; MEM_ROOT evex_mem_root; time_t mysql_event_last_create_time= 0L; -static TABLE_FIELD_W_TYPE event_table_fields[EVEX_FIELD_COUNT] = { +const char *event_scheduler_state_names[]= + { "OFF", "0", "ON", "1", "SUSPEND", "2", NullS }; + +TYPELIB Events::opt_typelib= +{ + array_elements(event_scheduler_state_names)-1, + "", + event_scheduler_state_names, + NULL +}; + + +ulong Events::opt_event_scheduler= 2; + +static +TABLE_FIELD_W_TYPE event_table_fields[Events::FIELD_COUNT] = { { {(char *) STRING_WITH_LEN("db")}, {(char *) STRING_WITH_LEN("char(64)")}, @@ -186,41 +184,17 @@ LEX_STRING interval_type_to_name[] = { }; - -/* - Inits the scheduler queue - prioritized queue from mysys/queue.c - - Synopsis - evex_queue_init() - - queue - pointer the the memory to be initialized as queue. has to be - allocated from the caller - - Notes - During initialization the queue is sized for 30 events, and when is full - will auto extent with 30. -*/ - -void -evex_queue_init(EVEX_QUEUE_TYPE *queue) -{ - if (init_queue_ex(queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, - event_timed_compare_q, NULL, 30 /*auto_extent*/)) - sql_print_error("Insufficient memory to initialize executing queue."); -} - - /* Compares 2 LEX strings regarding case. - Synopsis + SYNOPSIS my_time_compare() s - first LEX_STRING t - second LEX_STRING cs - charset - RETURNS: + RETURN VALUE -1 - s < t 0 - s == t 1 - s > t @@ -239,32 +213,26 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs) /* Compares 2 TIME structures - Synopsis + SYNOPSIS my_time_compare() a - first TIME b - second time - RETURNS: + RETURN VALUE -1 - a < b 0 - a == b 1 - a > b - Notes + NOTES TIME.second_part is not considered during comparison */ int my_time_compare(TIME *a, TIME *b) { - -#ifdef ENABLE_WHEN_WE_HAVE_MILLISECOND_IN_TIMESTAMPS - my_ulonglong a_t= TIME_to_ulonglong_datetime(a)*100L + a->second_part; - my_ulonglong b_t= TIME_to_ulonglong_datetime(b)*100L + b->second_part; -#else my_ulonglong a_t= TIME_to_ulonglong_datetime(a); my_ulonglong b_t= TIME_to_ulonglong_datetime(b); -#endif if (a_t > b_t) return 1; @@ -276,36 +244,11 @@ my_time_compare(TIME *a, TIME *b) /* - Compares the execute_at members of 2 Event_timed instances - - Synopsis - event_timed_compare() - - a - first Event_timed object - b - second Event_timed object - - RETURNS: - -1 - a->execute_at < b->execute_at - 0 - a->execute_at == b->execute_at - 1 - a->execute_at > b->execute_at - - Notes - execute_at.second_part is not considered during comparison -*/ - -int -event_timed_compare(Event_timed *a, Event_timed *b) -{ - return my_time_compare(&a->execute_at, &b->execute_at); -} - - -/* Compares the execute_at members of 2 Event_timed instances. Used as callback for the prioritized queue when shifting elements inside. - Synopsis + SYNOPSIS event_timed_compare() vptr - not used (set it to NULL) @@ -324,7 +267,8 @@ event_timed_compare(Event_timed *a, Event_timed *b) int event_timed_compare_q(void *vptr, byte* a, byte *b) { - return event_timed_compare((Event_timed *)a, (Event_timed *)b); + return my_time_compare(&((Event_timed *)a)->execute_at, + &((Event_timed *)b)->execute_at); } @@ -335,8 +279,8 @@ event_timed_compare_q(void *vptr, byte* a, byte *b) YEAR_MONTH - expression is in months DAY_MINUTE - expression is in minutes - Synopsis - event_reconstruct_interval_expression() + SYNOPSIS + Events::reconstruct_interval_expression() buf - preallocated String buffer to add the value to interval - the interval type (for instance YEAR_MONTH) expression - the value in the lowest entity @@ -347,9 +291,9 @@ event_timed_compare_q(void *vptr, byte* a, byte *b) */ int -event_reconstruct_interval_expression(String *buf, - interval_type interval, - longlong expression) +Events::reconstruct_interval_expression(String *buf, + interval_type interval, + longlong expression) { ulonglong expr= expression; char tmp_buff[128], *end; @@ -466,19 +410,20 @@ common_1_lev_code: Open mysql.event table for read SYNOPSIS - evex_open_event_table_for_read() + Events::open_event_table() thd Thread context lock_type How to lock the table table The table pointer - RETURN + RETURN VALUE 1 Cannot lock table 2 The table is corrupted - different number of fields 0 OK */ int -evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) +Events::open_event_table(THD *thd, enum thr_lock_type lock_type, + TABLE **table) { TABLE_LIST tables; DBUG_ENTER("open_proc_table"); @@ -491,7 +436,8 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table) if (simple_open_n_lock_tables(thd, &tables)) DBUG_RETURN(1); - if (table_check_intact(tables.table, EVEX_FIELD_COUNT, event_table_fields, + if (table_check_intact(tables.table, Events::FIELD_COUNT, + event_table_fields, &mysql_event_last_create_time, ER_CANNOT_LOAD_FROM_TABLE)) { @@ -558,50 +504,56 @@ evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, 'db' and 'name' and the first key is the primary key over the same fields. */ - if (dbname.length > table->field[EVEX_FIELD_DB]->field_length || - ev_name.length > table->field[EVEX_FIELD_NAME]->field_length || - user_name.length > table->field[EVEX_FIELD_DEFINER]->field_length) + if (dbname.length > table->field[Events::FIELD_DB]->field_length || + ev_name.length > table->field[Events::FIELD_NAME]->field_length || + user_name.length > table->field[Events::FIELD_DEFINER]->field_length) DBUG_RETURN(EVEX_KEY_NOT_FOUND); - table->field[EVEX_FIELD_DB]->store(dbname.str, dbname.length, &my_charset_bin); - table->field[EVEX_FIELD_NAME]->store(ev_name.str, ev_name.length, - &my_charset_bin); - table->field[EVEX_FIELD_DEFINER]->store(user_name.str, user_name.length, - &my_charset_bin); + table->field[Events::FIELD_DB]->store(dbname.str, dbname.length, + &my_charset_bin); + table->field[Events::FIELD_NAME]->store(ev_name.str, ev_name.length, + &my_charset_bin); + table->field[Events::FIELD_DEFINER]->store(user_name.str, + user_name.length, + &my_charset_bin); key_copy(key, table->record[0], table->key_info, table->key_info->key_length); if (table->file->index_read_idx(table->record[0], 0, key, table->key_info->key_length,HA_READ_KEY_EXACT)) + { + DBUG_PRINT("info", ("Row not fonud")); DBUG_RETURN(EVEX_KEY_NOT_FOUND); + } + DBUG_PRINT("info", ("Row found!")); DBUG_RETURN(0); } /* - Puts some data common to CREATE and ALTER EVENT into a row. + Puts some data common to CREATE and ALTER EVENT into a row. - SYNOPSIS - evex_fill_row() - thd THD - table the row to fill out - et Event's data + SYNOPSIS + evex_fill_row() + thd THD + table the row to fill out + et Event's data - Returns - 0 - ok - EVEX_GENERAL_ERROR - bad data - EVEX_GET_FIELD_FAILED - field count does not match. table corrupted? + RETURN VALUE + 0 - OK + EVEX_GENERAL_ERROR - bad data + EVEX_GET_FIELD_FAILED - field count does not match. table corrupted? - DESCRIPTION - Used both when an event is created and when it is altered. + DESCRIPTION + Used both when an event is created and when it is altered. */ static int evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update) { - enum evex_table_field field_num; + enum Events::enum_table_field field_num; DBUG_ENTER("evex_fill_row"); @@ -609,19 +561,20 @@ evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update) DBUG_PRINT("info", ("name =[%s]", et->name.str)); DBUG_PRINT("info", ("body =[%s]", et->body.str)); - if (table->field[field_num= EVEX_FIELD_DB]-> + if (table->field[field_num= Events::FIELD_DB]-> store(et->dbname.str, et->dbname.length, system_charset_info)) goto trunc_err; - if (table->field[field_num= EVEX_FIELD_NAME]-> + if (table->field[field_num= Events::FIELD_NAME]-> store(et->name.str, et->name.length, system_charset_info)) goto trunc_err; - /* both ON_COMPLETION and STATUS are NOT NULL thus not calling set_notnull() */ - table->field[EVEX_FIELD_ON_COMPLETION]->store((longlong)et->on_completion, - true); + /* both ON_COMPLETION and STATUS are NOT NULL thus not calling set_notnull()*/ + table->field[Events::FIELD_ON_COMPLETION]-> + store((longlong)et->on_completion, true); - table->field[EVEX_FIELD_STATUS]->store((longlong)et->status, true); + table->field[Events::FIELD_STATUS]-> + store((longlong)et->status, true); /* Change the SQL_MODE only if body was present in an ALTER EVENT and of course @@ -629,53 +582,54 @@ evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update) */ if (et->body.str) { - table->field[EVEX_FIELD_SQL_MODE]->store((longlong)thd->variables.sql_mode, - true); + table->field[Events::FIELD_SQL_MODE]-> + store((longlong)thd->variables.sql_mode, true); - if (table->field[field_num= EVEX_FIELD_BODY]-> + if (table->field[field_num= Events::FIELD_BODY]-> store(et->body.str, et->body.length, system_charset_info)) goto trunc_err; } if (et->expression) { - table->field[EVEX_FIELD_INTERVAL_EXPR]->set_notnull(); - table->field[EVEX_FIELD_INTERVAL_EXPR]->store((longlong)et->expression,true); + table->field[Events::FIELD_INTERVAL_EXPR]->set_notnull(); + table->field[Events::FIELD_INTERVAL_EXPR]-> + store((longlong)et->expression, true); - table->field[EVEX_FIELD_TRANSIENT_INTERVAL]->set_notnull(); + table->field[Events::FIELD_TRANSIENT_INTERVAL]->set_notnull(); /* In the enum (C) intervals start from 0 but in mysql enum valid values start from 1. Thus +1 offset is needed! */ - table->field[EVEX_FIELD_TRANSIENT_INTERVAL]->store((longlong)et->interval+1, - true); + table->field[Events::FIELD_TRANSIENT_INTERVAL]-> + store((longlong)et->interval+1, true); - table->field[EVEX_FIELD_EXECUTE_AT]->set_null(); + table->field[Events::FIELD_EXECUTE_AT]->set_null(); if (!et->starts_null) { - table->field[EVEX_FIELD_STARTS]->set_notnull(); - table->field[EVEX_FIELD_STARTS]-> + table->field[Events::FIELD_STARTS]->set_notnull(); + table->field[Events::FIELD_STARTS]-> store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME); } if (!et->ends_null) { - table->field[EVEX_FIELD_ENDS]->set_notnull(); - table->field[EVEX_FIELD_ENDS]-> + table->field[Events::FIELD_ENDS]->set_notnull(); + table->field[Events::FIELD_ENDS]-> store_time(&et->ends, MYSQL_TIMESTAMP_DATETIME); } } else if (et->execute_at.year) { - table->field[EVEX_FIELD_INTERVAL_EXPR]->set_null(); - table->field[EVEX_FIELD_TRANSIENT_INTERVAL]->set_null(); - table->field[EVEX_FIELD_STARTS]->set_null(); - table->field[EVEX_FIELD_ENDS]->set_null(); + table->field[Events::FIELD_INTERVAL_EXPR]->set_null(); + table->field[Events::FIELD_TRANSIENT_INTERVAL]->set_null(); + table->field[Events::FIELD_STARTS]->set_null(); + table->field[Events::FIELD_ENDS]->set_null(); - table->field[EVEX_FIELD_EXECUTE_AT]->set_notnull(); - table->field[EVEX_FIELD_EXECUTE_AT]->store_time(&et->execute_at, - MYSQL_TIMESTAMP_DATETIME); + table->field[Events::FIELD_EXECUTE_AT]->set_notnull(); + table->field[Events::FIELD_EXECUTE_AT]-> + store_time(&et->execute_at, MYSQL_TIMESTAMP_DATETIME); } else { @@ -686,13 +640,12 @@ evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update) */ } - ((Field_timestamp *)table->field[EVEX_FIELD_MODIFIED])->set_time(); + ((Field_timestamp *)table->field[Events::FIELD_MODIFIED])->set_time(); if (et->comment.str) { - if (table->field[field_num= EVEX_FIELD_COMMENT]->store(et->comment.str, - et->comment.length, - system_charset_info)) + if (table->field[field_num= Events::FIELD_COMMENT]-> + store(et->comment.str, et->comment.length, system_charset_info)) goto trunc_err; } @@ -704,28 +657,30 @@ trunc_err: /* - Creates an event in mysql.event - - SYNOPSIS - db_create_event() - thd THD - et Event_timed object containing information for the event - create_if_not - if an warning should be generated in case event exists - rows_affected - how many rows were affected - - Return value - 0 - OK - EVEX_GENERAL_ERROR - Failure - DESCRIPTION - Creates an event. Relies on evex_fill_row which is shared with - db_update_event. The name of the event is inside "et". + Creates an event in mysql.event + + SYNOPSIS + db_create_event() + thd THD + et Event_timed object containing information for the event + create_if_not If an warning should be generated in case event exists + rows_affected How many rows were affected + + RETURN VALUE + 0 - OK + EVEX_GENERAL_ERROR - Failure + + DESCRIPTION + Creates an event. Relies on evex_fill_row which is shared with + db_update_event. The name of the event is inside "et". */ -static int +int db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, uint *rows_affected) { int ret= 0; + CHARSET_INFO *scs= system_charset_info; TABLE *table; char olddb[128]; bool dbchanged= false; @@ -734,7 +689,7 @@ db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, *rows_affected= 0; DBUG_PRINT("info", ("open mysql.event for update")); - if (evex_open_event_table(thd, TL_WRITE, &table)) + if (Events::open_event_table(thd, TL_WRITE, &table)) { my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0)); goto err; @@ -778,7 +733,7 @@ db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, goto err; } - if (et->body.length > table->field[EVEX_FIELD_BODY]->field_length) + if (et->body.length > table->field[Events::FIELD_BODY]->field_length) { my_error(ER_TOO_LONG_BODY, MYF(0), et->name.str); goto err; @@ -791,15 +746,14 @@ db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, goto err; } - if ((ret=table->field[EVEX_FIELD_DEFINER]->store(et->definer.str, - et->definer.length, - system_charset_info))) + if ((ret=table->field[Events::FIELD_DEFINER]-> + store(et->definer.str, et->definer.length, scs))) { my_error(ER_EVENT_STORE_FAILED, MYF(0), et->name.str, ret); goto err; } - ((Field_timestamp *)table->field[EVEX_FIELD_CREATED])->set_time(); + ((Field_timestamp *)table->field[Events::FIELD_CREATED])->set_time(); /* evex_fill_row() calls my_error() in case of error so no need to @@ -819,8 +773,8 @@ db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, { thd->clear_error(); /* Such a statement can always go directly to binlog, no trans cache */ - thd->binlog_query(THD::MYSQL_QUERY_TYPE, - thd->query, thd->query_length, FALSE, FALSE); + thd->binlog_query(THD::MYSQL_QUERY_TYPE, thd->query, thd->query_length, + FALSE, FALSE); } #endif @@ -842,17 +796,21 @@ err: /* - Used to execute ALTER EVENT. Pendant to evex_update_event(). + Used to execute ALTER EVENT. Pendant to Events::update_event(). - SYNOPSIS - db_update_event() - thd THD - sp_name the name of the event to alter - et event's data + SYNOPSIS + db_update_event() + thd THD + sp_name the name of the event to alter + et event's data + + RETURN VALUE + 0 OK + EVEX_GENERAL_ERROR Error occured (my_error() called) - NOTES - sp_name is passed since this is the name of the event to - alter in case of RENAME TO. + NOTES + sp_name is passed since this is the name of the event to + alter in case of RENAME TO. */ static int @@ -863,12 +821,12 @@ db_update_event(THD *thd, Event_timed *et, sp_name *new_name) DBUG_ENTER("db_update_event"); DBUG_PRINT("enter", ("dbname: %.*s", et->dbname.length, et->dbname.str)); DBUG_PRINT("enter", ("name: %.*s", et->name.length, et->name.str)); - DBUG_PRINT("enter", ("user: %.*s", et->name.length, et->name.str)); + DBUG_PRINT("enter", ("user: %.*s", et->definer.length, et->definer.str)); if (new_name) DBUG_PRINT("enter", ("rename to: %.*s", new_name->m_name.length, new_name->m_name.str)); - if (evex_open_event_table(thd, TL_WRITE, &table)) + if (Events::open_event_table(thd, TL_WRITE, &table)) { my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0)); goto err; @@ -914,9 +872,9 @@ db_update_event(THD *thd, Event_timed *et, sp_name *new_name) if (new_name) { - table->field[EVEX_FIELD_DB]-> + table->field[Events::FIELD_DB]-> store(new_name->m_db.str, new_name->m_db.length, system_charset_info); - table->field[EVEX_FIELD_NAME]-> + table->field[Events::FIELD_NAME]-> store(new_name->m_name.str, new_name->m_name.length, system_charset_info); } @@ -938,33 +896,33 @@ err: /* - Looks for a named event in mysql.event and in case of success returns - an object will data loaded from the table. - - SYNOPSIS - db_find_event() - thd THD - name the name of the event to find - definer who owns the event - ett event's data if event is found - tbl TABLE object to use when not NULL - - NOTES - 1) Use sp_name for look up, return in **ett if found - 2) tbl is not closed at exit - - RETURN - 0 ok In this case *ett is set to the event - # error *ett == 0 + Looks for a named event in mysql.event and in case of success returns + an object will data loaded from the table. + + SYNOPSIS + db_find_event() + thd THD + name the name of the event to find + definer who owns the event + ett event's data if event is found + tbl TABLE object to use when not NULL + + NOTES + 1) Use sp_name for look up, return in **ett if found + 2) tbl is not closed at exit + + RETURN VALUE + 0 ok In this case *ett is set to the event + # error *ett == 0 */ -static int +int db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, TABLE *tbl, MEM_ROOT *root) { TABLE *table; int ret; - Event_timed *et= 0; + Event_timed *et=NULL; DBUG_ENTER("db_find_event"); DBUG_PRINT("enter", ("name: %*s", name->m_name.length, name->m_name.str)); @@ -973,7 +931,7 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, if (tbl) table= tbl; - else if (evex_open_event_table(thd, TL_READ, &table)) + else if (Events::open_event_table(thd, TL_READ, &table)) { my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0)); ret= EVEX_GENERAL_ERROR; @@ -1001,7 +959,7 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, } done: - if (ret) + if (ret && et) { delete et; et= 0; @@ -1015,179 +973,43 @@ done: /* - Looks for a named event in mysql.event and then loads it from - the table, compiles it and insert it into the cache. - - SYNOPSIS - evex_load_and_compile_event() - thd THD - spn the name of the event to alter - definer who is the owner - use_lock whether to obtain a lock on LOCK_event_arrays or not - - RETURN VALUE - 0 - OK - < 0 - error (in this case underlying functions call my_error()). -*/ - -static int -evex_load_and_compile_event(THD * thd, sp_name *spn, LEX_STRING definer, - bool use_lock) -{ - int ret= 0; - MEM_ROOT *tmp_mem_root; - Event_timed *ett; - Open_tables_state backup; - - DBUG_ENTER("db_load_and_compile_event"); - DBUG_PRINT("enter", ("name: %*s", spn->m_name.length, spn->m_name.str)); - - tmp_mem_root= thd->mem_root; - thd->mem_root= &evex_mem_root; - - thd->reset_n_backup_open_tables_state(&backup); - /* no need to use my_error() here because db_find_event() has done it */ - ret= db_find_event(thd, spn, &definer, &ett, NULL, NULL); - thd->restore_backup_open_tables_state(&backup); - if (ret) - goto done; - - ett->compute_next_execution_time(); - if (use_lock) - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - - evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) ett); - - /* - There is a copy in the array which we don't need. sphead won't be - destroyed. - */ - - if (use_lock) - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - -done: - if (thd->mem_root != tmp_mem_root) - thd->mem_root= tmp_mem_root; - - DBUG_RETURN(ret); -} - - -/* - Removes from queue in memory the event which is identified by the tupple - (db, name). - - SYNOPSIS - evex_remove_from_cache() - - db - db name - name - event name - use_lock - whether to lock the mutex LOCK_event_arrays or not in case it - has been already locked outside - is_drop - if an event is currently being executed then we can also delete - the Event_timed instance, so we alarm the event that it should - drop itself if this parameter is set to TRUE. It's false on - ALTER EVENT. - - RETURNS - 0 OK (always) -*/ - -static int -evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock, - bool is_drop) -{ - //ToDo : Add definer to the tuple (db, name) to become triple - uint i; - int ret= 0; - - DBUG_ENTER("evex_remove_from_cache"); - /* - It is possible that 2 (or 1) pass(es) won't find the event in memory. - The reason is that DISABLED events are not cached. - */ - - if (use_lock) - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - - for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) - { - Event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, Event_timed*); - DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?",db->str,name->str, et->dbname.str, - et->name.str)); - if (!sortcmp_lex_string(*name, et->name, system_charset_info) && - !sortcmp_lex_string(*db, et->dbname, system_charset_info)) - { - if (et->can_spawn_now()) - { - DBUG_PRINT("evex_remove_from_cache", ("not running - free and delete")); - et->free_sp(); - delete et; - } - else - { - DBUG_PRINT("evex_remove_from_cache", - ("running.defer mem free. is_drop=%d", is_drop)); - et->flags|= EVENT_EXEC_NO_MORE; - et->dropped= is_drop; - } - DBUG_PRINT("evex_remove_from_cache", ("delete from queue")); - evex_queue_delete_element(&EVEX_EQ_NAME, i); - /* ok, we have cleaned */ - ret= 0; - goto done; - } - } - -done: - if (use_lock) - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); + The function exported to the world for creating of events. - DBUG_RETURN(ret); -} + SYNOPSIS + Events::create_event() + thd THD + et event's data + create_options Options specified when in the query. We are + interested whether there is IF NOT EXISTS + rows_affected How many rows were affected + RETURN VALUE + 0 OK + !0 Error -/* - The function exported to the world for creating of events. - - SYNOPSIS - evex_create_event() - thd THD - et event's data - create_options Options specified when in the query. We are - interested whether there is IF NOT EXISTS - rows_affected How many rows were affected - - NOTES - - in case there is an event with the same name (db) and - IF NOT EXISTS is specified, an warning is put into the W stack. + NOTES + - in case there is an event with the same name (db) and + IF NOT EXISTS is specified, an warning is put into the W stack. */ int -evex_create_event(THD *thd, Event_timed *et, uint create_options, - uint *rows_affected) +Events::create_event(THD *thd, Event_timed *et, uint create_options, + uint *rows_affected) { - int ret = 0; + int ret; - DBUG_ENTER("evex_create_event"); + DBUG_ENTER("Events::create_event"); DBUG_PRINT("enter", ("name: %*s options:%d", et->name.length, et->name.str, create_options)); - if ((ret = db_create_event(thd, et, + if (!(ret = db_create_event(thd, et, create_options & HA_LEX_CREATE_IF_NOT_EXISTS, rows_affected))) - goto done; - - VOID(pthread_mutex_lock(&LOCK_evex_running)); - if (evex_is_running && et->status == MYSQL_EVENT_ENABLED) { - sp_name spn(et->dbname, et->name); - ret= evex_load_and_compile_event(thd, &spn, et->definer, true); + Event_scheduler *scheduler= Event_scheduler::get_instance(); + if (scheduler->initialized() && (ret= scheduler->add_event(thd, et, true))) + my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - -done: /* No need to close the table, it will be closed in sql_parse::do_command */ DBUG_RETURN(ret); @@ -1195,73 +1017,63 @@ done: /* - The function exported to the world for alteration of events. - - SYNOPSIS - evex_update_event() - thd THD - et event's data - new_name set in case of RENAME TO. - - NOTES - et contains data about dbname and event name. - new_name is the new name of the event, if not null (this means - that RENAME TO was specified in the query) + The function exported to the world for alteration of events. + + SYNOPSIS + Events::update_event() + thd THD + et event's data + new_name set in case of RENAME TO. + + RETURN VALUE + 0 OK + !0 Error + + NOTES + et contains data about dbname and event name. + new_name is the new name of the event, if not null (this means + that RENAME TO was specified in the query) */ int -evex_update_event(THD *thd, Event_timed *et, sp_name *new_name, - uint *rows_affected) +Events::update_event(THD *thd, Event_timed *et, sp_name *new_name, + uint *rows_affected) { int ret; - bool need_second_pass= true; - DBUG_ENTER("evex_update_event"); + DBUG_ENTER("Events::update_event"); DBUG_PRINT("enter", ("name: %*s", et->name.length, et->name.str)); - /* db_update_event() opens & closes the table to prevent crash later in the code when loading and compiling the new definition. Also on error conditions my_error() is called so no need to handle here */ - if ((ret= db_update_event(thd, et, new_name))) - goto done; - - VOID(pthread_mutex_lock(&LOCK_evex_running)); - if (!evex_is_running) - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_evex_running, done); - - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - evex_remove_from_cache(&et->dbname, &et->name, false, false); - if (et->status == MYSQL_EVENT_ENABLED) + if (!(ret= db_update_event(thd, et, new_name))) { - if (new_name) - ret= evex_load_and_compile_event(thd, new_name, et->definer, false); - else - { - sp_name spn(et->dbname, et->name); - ret= evex_load_and_compile_event(thd, &spn, et->definer, false); - } - if (ret == EVEX_COMPILE_ERROR) - my_error(ER_EVENT_COMPILE_ERROR, MYF(0)); + Event_scheduler *scheduler= Event_scheduler::get_instance(); + if (scheduler->initialized() && + (ret= scheduler->replace_event(thd, et, + new_name? &new_name->m_db: NULL, + new_name? &new_name->m_name: NULL))) + my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); } - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - -done: DBUG_RETURN(ret); } /* - Drops an event - - SYNOPSIS - db_drop_event() - thd THD - et event's name - drop_if_exists if set and the event not existing => warning onto the stack - rows_affected affected number of rows is returned heres + Drops an event + + SYNOPSIS + db_drop_event() + thd THD + et event's name + drop_if_exists if set and the event not existing => warning onto the stack + rows_affected affected number of rows is returned heres + + RETURN VALUE + 0 OK + !0 Error (my_error() called) */ int db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, @@ -1275,7 +1087,7 @@ int db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, ret= EVEX_OPEN_TABLE_FAILED; thd->reset_n_backup_open_tables_state(&backup); - if (evex_open_event_table(thd, TL_WRITE, &table)) + if (Events::open_event_table(thd, TL_WRITE, &table)) { my_error(ER_EVENT_OPEN_TABLE_FAILED, MYF(0)); goto done; @@ -1315,58 +1127,54 @@ done: /* - Drops an event - - SYNOPSIS - evex_drop_event() - thd THD - et event's name - drop_if_exists if set and the event not existing => warning onto the stack - rows_affected affected number of rows is returned heres - + Drops an event + + SYNOPSIS + Events::drop_event() + thd THD + et event's name + drop_if_exists if set and the event not existing => warning onto the stack + rows_affected affected number of rows is returned heres + + RETURN VALUE + 0 OK + !0 Error (reported) */ int -evex_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, - uint *rows_affected) +Events::drop_event(THD *thd, Event_timed *et, bool drop_if_exists, + uint *rows_affected) { - int ret= 0; - - DBUG_ENTER("evex_drop_event"); - - - VOID(pthread_mutex_lock(&LOCK_evex_running)); - if (evex_is_running) - ret= evex_remove_from_cache(&et->dbname, &et->name, true, true); - VOID(pthread_mutex_unlock(&LOCK_evex_running)); + int ret; - if (ret == 1) - ret= 0; - else if (ret == 0) - ret= db_drop_event(thd, et, drop_if_exists, rows_affected); - else - my_error(ER_UNKNOWN_ERROR, MYF(0)); + DBUG_ENTER("Events::drop_event"); + if (!(ret= db_drop_event(thd, et, drop_if_exists, rows_affected))) + { + Event_scheduler *scheduler= Event_scheduler::get_instance(); + if (scheduler->initialized() && (ret= scheduler->drop_event(thd, et))) + my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); + } DBUG_RETURN(ret); } /* - SHOW CREATE EVENT + SHOW CREATE EVENT - SYNOPSIS - evex_show_create_event() - thd THD - spn the name of the event (db, name) - definer the definer of the event + SYNOPSIS + Events::show_create_event() + thd THD + spn the name of the event (db, name) + definer the definer of the event - RETURNS - 0 - OK - 1 - Error during writing to the wire + RETURN VALUE + 0 OK + 1 Error during writing to the wire */ int -evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) +Events::show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) { int ret; Event_timed *et= NULL; @@ -1379,7 +1187,7 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) ret= db_find_event(thd, spn, &definer, &et, NULL, thd->mem_root); thd->restore_backup_open_tables_state(&backup); - if (et) + if (!ret) { Protocol *protocol= thd->protocol; char show_str_buf[768]; @@ -1389,12 +1197,10 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) ulong sql_mode_len=0; show_str.length(0); + show_str.set_charset(system_charset_info); if (et->get_create_event(thd, &show_str)) - { - delete et; - DBUG_RETURN(1); - } + goto err; field_list.push_back(new Item_empty_string("Event", NAME_LEN)); @@ -1408,201 +1214,216 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer) show_str.length())); if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) - { - delete et; - DBUG_RETURN(1); - } + goto err; + protocol->prepare_for_resend(); protocol->store(et->name.str, et->name.length, system_charset_info); protocol->store((char*) sql_mode_str, sql_mode_len, system_charset_info); - protocol->store(show_str.ptr(), show_str.length(), system_charset_info); + protocol->store(show_str.c_ptr(), show_str.length(), system_charset_info); ret= protocol->write(); send_eof(thd); - delete et; } - + delete et; DBUG_RETURN(ret); +err: + delete et; + DBUG_RETURN(1); } /* - evex_drop_db_events - Drops all events in the selected database + Drops all events from a schema - thd - Thread - db - ASCIIZ the name of the database - - Returns: - 0 - OK - 1 - Failed to delete a specific row - 2 - Got NULL while reading db name from a row - - Note: - The algo is the following - 1. Go through the in-memory cache, if the scheduler is working - and for every event whose dbname matches the database we drop - check whether is currently in execution: - - Event_timed::can_spawn() returns true -> the event is not - being executed in a child thread. The reason not to use - Event_timed::is_running() is that the latter shows only if - it is being executed, which is 99% of the time in the thread - but there are some initiliazations before and after the - anonymous SP is being called. So if we delete in this moment - -=> *boom*, so we have to check whether the thread has been - spawned and can_spawn() is the right method. - - Event_timed::can_spawn() returns false -> being runned ATM - just set the flags so it should drop itself. + SYNOPSIS + Events::drop_schema_events() + thd Thread + db ASCIIZ schema name + + RETURN VALUE + 0 OK + !0 Error */ int -evex_drop_db_events(THD *thd, char *db) +Events::drop_schema_events(THD *thd, char *db) { - TABLE *table; - READ_RECORD read_record_info; int ret= 0; - uint i; LEX_STRING db_lex= {db, strlen(db)}; DBUG_ENTER("evex_drop_db_events"); - DBUG_PRINT("info",("dropping events from %s", db)); + DBUG_PRINT("enter", ("dropping events from %s", db)); - VOID(pthread_mutex_lock(&LOCK_event_arrays)); + Event_scheduler *scheduler= Event_scheduler::get_instance(); + if (scheduler->initialized()) + ret= scheduler->drop_schema_events(thd, &db_lex); + else + ret= db_drop_events_from_table(thd, &db_lex); - if ((ret= evex_open_event_table(thd, TL_WRITE, &table))) - { - sql_print_error("Table mysql.event is damaged."); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - DBUG_RETURN(SP_OPEN_TABLE_FAILED); - } + DBUG_RETURN(ret); +} - DBUG_PRINT("info",("%d elements in the queue", - evex_queue_num_elements(EVEX_EQ_NAME))); - VOID(pthread_mutex_lock(&LOCK_evex_running)); - if (!evex_is_running) - goto skip_memory; - for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) - { - Event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, Event_timed*); - if (sortcmp_lex_string(et->dbname, db_lex, system_charset_info)) - continue; +/* + Drops all events in the selected database, from mysql.event. - if (et->can_spawn_now_n_lock(thd)) - { - DBUG_PRINT("info",("event %s not running - direct delete", et->name.str)); - if (!(ret= evex_db_find_event_aux(thd, et, table))) - { - DBUG_PRINT("info",("event %s found on disk", et->name.str)); - if ((ret= table->file->ha_delete_row(table->record[0]))) - { - sql_print_error("Error while deleting a row - dropping " - "a database. Skipping the rest."); - my_error(ER_EVENT_DROP_FAILED, MYF(0), et->name.str); - goto end; - } - DBUG_PRINT("info",("deleted event [%s] num [%d]. Time to free mem", - et->name.str, i)); - } - else if (ret == EVEX_KEY_NOT_FOUND) - { - sql_print_error("Expected to find event %s.%s of %s on disk-not there.", - et->dbname.str, et->name.str, et->definer.str); - } - et->free_sp(); - delete et; - et= 0; - /* no need to call et->spawn_unlock because we already cleaned et */ - } - else - { - DBUG_PRINT("info",("event %s is running. setting exec_no_more and dropped", - et->name.str)); - et->flags|= EVENT_EXEC_NO_MORE; - et->dropped= TRUE; - } - DBUG_PRINT("info",("%d elements in the queue", - evex_queue_num_elements(EVEX_EQ_NAME))); - evex_queue_delete_element(&EVEX_EQ_NAME, i);// 0 is top - DBUG_PRINT("info",("%d elements in the queue", - evex_queue_num_elements(EVEX_EQ_NAME))); - /* - decrease so we start at the same position, there will be - less elements in the queue, it will still be ordered so on - next iteration it will be again i the current element or if - no more we finish. - */ - --i; - } + SYNOPSIS + evex_drop_db_events_from_table() + thd Thread + db Schema name -skip_memory: - /* - The reasoning behind having two loops is the following: - If there was only one loop, the table-scan, then for every element which - matches, the queue in memory has to be searched to remove the element. - While if we go first over the queue and remove what's in there we have only - one pass over it and after finishing it, moving to table-scan for the disabled - events. This needs quite less time and means quite less locking on - LOCK_event_arrays. - */ - DBUG_PRINT("info",("Mem-cache checked, now going to db for disabled events")); + RETURN VALUE + 0 OK + !0 Error from ha_delete_row +*/ + +int +db_drop_events_from_table(THD *thd, LEX_STRING *db) +{ + int ret; + TABLE *table; + READ_RECORD read_record_info; + DBUG_ENTER("db_drop_events_from_table"); + DBUG_PRINT("info", ("dropping events from %s", db->str)); + + if ((ret= Events::open_event_table(thd, TL_WRITE, &table))) + { + sql_print_error("Table mysql.event is damaged."); + DBUG_RETURN(ret); + } /* only enabled events are in memory, so we go now and delete the rest */ - init_read_record(&read_record_info, thd, table ,NULL,1,0); + init_read_record(&read_record_info, thd, table, NULL, 1, 0); while (!(read_record_info.read_record(&read_record_info)) && !ret) { - char *et_db; + char *et_db= get_field(thd->mem_root, + table->field[Events::FIELD_DB]); - if ((et_db= get_field(thd->mem_root, table->field[EVEX_FIELD_DB])) == NULL) - { - ret= 2; - break; - } - LEX_STRING et_db_lex= {et_db, strlen(et_db)}; - if (!sortcmp_lex_string(et_db_lex, db_lex, system_charset_info)) + DBUG_PRINT("info", ("Current event %s.%s", et_db, + get_field(thd->mem_root, + table->field[Events::FIELD_NAME]))); + + if (!sortcmp_lex_string(et_db_lex, *db, system_charset_info)) { - Event_timed ett; - char *ptr; - - if ((ptr= get_field(thd->mem_root, table->field[EVEX_FIELD_STATUS])) - == NullS) - { - sql_print_error("Error while loading from mysql.event. " - "Table probably corrupted"); - goto end; - } - /* - When not running nothing is in memory so we have to clean - everything. - We don't delete EVENT_ENABLED events when the scheduler is running - because maybe this is an event which we asked to drop itself when - it is finished and it hasn't finished yet, so we don't touch it. - It will drop itself. The not running ENABLED events has been already - deleted from ha_delete_row() above in the loop over the QUEUE - (in case the executor is running). - 'D' stands for DISABLED, 'E' for ENABLED - it's an enum - */ - if ((evex_is_running && ptr[0] == 'D') || !evex_is_running) - { - DBUG_PRINT("info", ("Dropping %s.%s", et_db, ett.name.str)); - if ((ret= table->file->ha_delete_row(table->record[0]))) - { - my_error(ER_EVENT_DROP_FAILED, MYF(0), ett.name.str); - goto end; - } - } + DBUG_PRINT("info", ("Dropping")); + if ((ret= table->file->ha_delete_row(table->record[0]))) + my_error(ER_EVENT_DROP_FAILED, MYF(0), + get_field(thd->mem_root, + table->field[Events::FIELD_NAME])); } } - DBUG_PRINT("info",("Disk checked for disabled events. Finishing.")); - -end: - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); end_read_record(&read_record_info); - thd->version--; /* Force close to free memory */ close_thread_tables(thd); DBUG_RETURN(ret); } + + + +/* + Inits the scheduler's structures. + + SYNOPSIS + Events::init() + + NOTES + This function is not synchronized. + + RETURN VALUE + 0 OK + 1 Error +*/ + +int +Events::init() +{ + int ret= 0; + DBUG_ENTER("Events::init"); + + /* it should be an assignment! */ + if (opt_event_scheduler) + { + Event_scheduler *scheduler= Event_scheduler::get_instance(); + DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2); + DBUG_RETURN(scheduler->init() || + (opt_event_scheduler == 1? scheduler->start(): + scheduler->start_suspended())); + } + DBUG_RETURN(0); +} + + +/* + Cleans up scheduler's resources. Called at server shutdown. + + SYNOPSIS + Events::shutdown() + + NOTES + This function is not synchronized. +*/ + +void +Events::shutdown() +{ + DBUG_ENTER("Events::shutdown"); + Event_scheduler *scheduler= Event_scheduler::get_instance(); + if (scheduler->initialized()) + { + scheduler->stop(); + scheduler->destroy(); + } + + DBUG_VOID_RETURN; +} + + +/* + Proxy for Event_scheduler::dump_internal_status + + SYNOPSIS + Events::dump_internal_status() + thd Thread + + RETURN VALUE + 0 OK + !0 Error +*/ + +int +Events::dump_internal_status(THD *thd) +{ + return Event_scheduler::dump_internal_status(thd); +} + + +/* + Inits Events mutexes + + SYNOPSIS + Events::init_mutexes() + thd Thread +*/ + +void +Events::init_mutexes() +{ + Event_scheduler::init_mutexes(); +} + + +/* + Destroys Events mutexes + + SYNOPSIS + Events::destroy_mutexes() +*/ + +void +Events::destroy_mutexes() +{ + Event_scheduler::destroy_mutexes(); +} diff --git a/sql/event.h b/sql/event.h index 27de8b46e32..40ede7b0c5f 100644 --- a/sql/event.h +++ b/sql/event.h @@ -1,4 +1,6 @@ -/* Copyright (C) 2004-2005 MySQL AB +#ifndef _EVENT_H_ +#define _EVENT_H_ +/* Copyright (C) 2004-2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -14,66 +16,109 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifndef _EVENT_H_ -#define _EVENT_H_ -#include "sp.h" -#include "sp_head.h" - -#define EVEX_OK SP_OK -#define EVEX_KEY_NOT_FOUND SP_KEY_NOT_FOUND -#define EVEX_OPEN_TABLE_FAILED SP_OPEN_TABLE_FAILED -#define EVEX_WRITE_ROW_FAILED SP_WRITE_ROW_FAILED -#define EVEX_DELETE_ROW_FAILED SP_DELETE_ROW_FAILED -#define EVEX_GET_FIELD_FAILED SP_GET_FIELD_FAILED -#define EVEX_PARSE_ERROR SP_PARSE_ERROR -#define EVEX_INTERNAL_ERROR SP_INTERNAL_ERROR -#define EVEX_NO_DB_ERROR SP_NO_DB_ERROR + +#define EVEX_OK 0 +#define EVEX_KEY_NOT_FOUND -1 +#define EVEX_OPEN_TABLE_FAILED -2 +#define EVEX_WRITE_ROW_FAILED -3 +#define EVEX_DELETE_ROW_FAILED -4 +#define EVEX_GET_FIELD_FAILED -5 +#define EVEX_PARSE_ERROR -6 +#define EVEX_INTERNAL_ERROR -7 +#define EVEX_NO_DB_ERROR -8 #define EVEX_COMPILE_ERROR -19 #define EVEX_GENERAL_ERROR -20 -#define EVEX_BAD_IDENTIFIER SP_BAD_IDENTIFIER -#define EVEX_BODY_TOO_LONG SP_BODY_TOO_LONG -#define EVEX_BAD_PARAMS -21 -#define EVEX_NOT_RUNNING -22 -#define EVEX_MICROSECOND_UNSUP -23 +#define EVEX_BAD_IDENTIFIER -21 +#define EVEX_BODY_TOO_LONG -22 +#define EVEX_BAD_PARAMS -23 +#define EVEX_NOT_RUNNING -24 +#define EVEX_MICROSECOND_UNSUP -25 +#define EVEX_CANT_KILL -26 #define EVENT_EXEC_NO_MORE (1L << 0) #define EVENT_NOT_USED (1L << 1) +#define EVENT_FREE_WHEN_FINISHED (1L << 2) -extern ulong opt_event_executor; +class Event_timed; -enum enum_event_on_completion +class Events { - MYSQL_EVENT_ON_COMPLETION_DROP = 1, - MYSQL_EVENT_ON_COMPLETION_PRESERVE -}; +public: + static ulong opt_event_scheduler; + static TYPELIB opt_typelib; -enum enum_event_status -{ - MYSQL_EVENT_ENABLED = 1, - MYSQL_EVENT_DISABLED + enum enum_table_field + { + FIELD_DB = 0, + FIELD_NAME, + FIELD_BODY, + FIELD_DEFINER, + FIELD_EXECUTE_AT, + FIELD_INTERVAL_EXPR, + FIELD_TRANSIENT_INTERVAL, + FIELD_CREATED, + FIELD_MODIFIED, + FIELD_LAST_EXECUTED, + FIELD_STARTS, + FIELD_ENDS, + FIELD_STATUS, + FIELD_ON_COMPLETION, + FIELD_SQL_MODE, + FIELD_COMMENT, + FIELD_COUNT /* a cool trick to count the number of fields :) */ + }; + + static int + create_event(THD *thd, Event_timed *et, uint create_options, + uint *rows_affected); + + static int + update_event(THD *thd, Event_timed *et, sp_name *new_name, + uint *rows_affected); + + static int + drop_event(THD *thd, Event_timed *et, bool drop_if_exists, + uint *rows_affected); + + static int + open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); + + static int + show_create_event(THD *thd, sp_name *spn, LEX_STRING definer); + + static int + reconstruct_interval_expression(String *buf, interval_type interval, + longlong expression); + + static int + drop_schema_events(THD *thd, char *db); + + static int + dump_internal_status(THD *thd); + + static int + init(); + + static void + shutdown(); + + static void + init_mutexes(); + + static void + destroy_mutexes(); + + +private: + /* Prevent use of these */ + Events(const Events &); + void operator=(Events &); }; -enum evex_table_field -{ - EVEX_FIELD_DB = 0, - EVEX_FIELD_NAME, - EVEX_FIELD_BODY, - EVEX_FIELD_DEFINER, - EVEX_FIELD_EXECUTE_AT, - EVEX_FIELD_INTERVAL_EXPR, - EVEX_FIELD_TRANSIENT_INTERVAL, - EVEX_FIELD_CREATED, - EVEX_FIELD_MODIFIED, - EVEX_FIELD_LAST_EXECUTED, - EVEX_FIELD_STARTS, - EVEX_FIELD_ENDS, - EVEX_FIELD_STATUS, - EVEX_FIELD_ON_COMPLETION, - EVEX_FIELD_SQL_MODE, - EVEX_FIELD_COMMENT, - EVEX_FIELD_COUNT /* a cool trick to count the number of fields :) */ -} ; + + +class sp_head; class Event_timed { @@ -82,12 +127,26 @@ class Event_timed my_bool in_spawned_thread; ulong locked_by_thread_id; my_bool running; + ulong thread_id; pthread_mutex_t LOCK_running; + pthread_cond_t COND_finished; bool status_changed; bool last_executed_changed; public: + enum enum_status + { + ENABLED = 1, + DISABLED + }; + + enum enum_on_completion + { + ON_COMPLETION_DROP = 1, + ON_COMPLETION_PRESERVE + }; + TIME last_executed; LEX_STRING dbname; @@ -111,8 +170,8 @@ public: ulonglong created; ulonglong modified; - enum enum_event_on_completion on_completion; - enum enum_event_status status; + enum enum_on_completion on_completion; + enum enum_status status; sp_head *sphead; ulong sql_mode; const uchar *body_begin; @@ -153,36 +212,15 @@ public: DBUG_ASSERT(0); } + Event_timed(); - Event_timed():in_spawned_thread(0),locked_by_thread_id(0), - running(0), status_changed(false), - last_executed_changed(false), expression(0), created(0), - modified(0), on_completion(MYSQL_EVENT_ON_COMPLETION_DROP), - status(MYSQL_EVENT_ENABLED), sphead(0), sql_mode(0), - body_begin(0), dropped(false), - free_sphead_on_delete(true), flags(0) - - { - pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST); - init(); - } - - ~Event_timed() - { - deinit_mutexes(); - - if (free_sphead_on_delete) - free_sp(); - } + ~Event_timed(); void init(); - + void - deinit_mutexes() - { - pthread_mutex_destroy(&this->LOCK_running); - } + deinit_mutexes(); int init_definer(THD *thd); @@ -214,12 +252,12 @@ public: bool compute_next_execution_time(); - void - mark_last_executed(THD *thd); - int drop(THD *thd); + void + mark_last_executed(THD *thd); + bool update_fields(THD *thd); @@ -227,142 +265,32 @@ public: get_create_event(THD *thd, String *buf); int - execute(THD *thd, MEM_ROOT *mem_root= NULL); + execute(THD *thd, MEM_ROOT *mem_root); int - compile(THD *thd, MEM_ROOT *mem_root= NULL); - - my_bool - is_running() - { - my_bool ret; - - VOID(pthread_mutex_lock(&this->LOCK_running)); - ret= running; - VOID(pthread_mutex_unlock(&this->LOCK_running)); - - return ret; - } - - /* - Checks whether the object is being used in a spawned thread. - This method is for very basic checking. Use ::can_spawn_now_n_lock() - for most of the cases. - */ - - my_bool - can_spawn_now() - { - my_bool ret; - VOID(pthread_mutex_lock(&this->LOCK_running)); - ret= !in_spawned_thread; - VOID(pthread_mutex_unlock(&this->LOCK_running)); - return ret; - } + compile(THD *thd, MEM_ROOT *mem_root); - /* - Checks whether this thread can lock the object for modification -> - preventing being spawned for execution, and locks if possible. - use ::can_spawn_now() only for basic checking because a race - condition may occur between the check and eventual modification (deletion) - of the object. - */ - - my_bool - can_spawn_now_n_lock(THD *thd); - - int - spawn_unlock(THD *thd); + bool + is_running(); int - spawn_now(void * (*thread_func)(void*)); + spawn_now(void * (*thread_func)(void*), void *arg); - void + bool spawn_thread_finish(THD *thd); void - free_sp() - { - delete sphead; - sphead= 0; - } -protected: + free_sp(); + bool - change_security_context(THD *thd, Security_context *s_ctx, - Security_context **backup); + has_equal_db(Event_timed *etn); + + int + kill_thread(THD *thd); void - restore_security_context(THD *thd, Security_context *backup); + set_thread_id(ulong tid) { thread_id= tid; } }; -int -evex_create_event(THD *thd, Event_timed *et, uint create_options, - uint *rows_affected); - -int -evex_update_event(THD *thd, Event_timed *et, sp_name *new_name, - uint *rows_affected); - -int -evex_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, - uint *rows_affected); - -int -evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table); - -int -evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer); - -int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); - -int -event_reconstruct_interval_expression(String *buf, - interval_type interval, - longlong expression); - -int -evex_drop_db_events(THD *thd, char *db); - - -int -init_events(); - -void -shutdown_events(); - - -// auxiliary -int -event_timed_compare(Event_timed **a, Event_timed **b); - - - -/* -CREATE TABLE event ( - db char(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', - name char(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', - body longblob NOT NULL, - definer char(77) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', - execute_at DATETIME default NULL, - interval_value int(11) default NULL, - interval_field ENUM('YEAR','QUARTER','MONTH','DAY','HOUR','MINUTE','WEEK', - 'SECOND','MICROSECOND', 'YEAR_MONTH','DAY_HOUR', - 'DAY_MINUTE','DAY_SECOND', - 'HOUR_MINUTE','HOUR_SECOND', - 'MINUTE_SECOND','DAY_MICROSECOND', - 'HOUR_MICROSECOND','MINUTE_MICROSECOND', - 'SECOND_MICROSECOND') default NULL, - created TIMESTAMP NOT NULL, - modified TIMESTAMP NOT NULL, - last_executed DATETIME default NULL, - starts DATETIME default NULL, - ends DATETIME default NULL, - status ENUM('ENABLED','DISABLED') NOT NULL default 'ENABLED', - on_completion ENUM('DROP','PRESERVE') NOT NULL default 'DROP', - comment varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL default '', - PRIMARY KEY (definer,db,name) -) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT 'Events'; -*/ - #endif /* _EVENT_H_ */ diff --git a/sql/event_executor.cc b/sql/event_executor.cc index 21464dd777b..f236fb47771 100644 --- a/sql/event_executor.cc +++ b/sql/event_executor.cc @@ -13,998 +13,3 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "event_priv.h" -#include "event.h" -#include "sp.h" - -#define WAIT_STATUS_READY 0 -#define WAIT_STATUS_EMPTY_QUEUE 1 -#define WAIT_STATUS_NEW_TOP_EVENT 2 -#define WAIT_STATUS_STOP_EXECUTOR 3 - - -/* - Make this define DBUG_FAULTY_THR to be able to put breakpoints inside - code used by the scheduler's thread(s). In this case user connections - are not possible because the scheduler thread code is ran inside the - main thread (no spawning takes place. If you want to debug client - connection then start with --one-thread and make the define - DBUG_FAULTY_THR ! -*/ -#define DBUG_FAULTY_THR2 - -extern ulong thread_created; -extern const char *my_localhost; -extern pthread_attr_t connection_attrib; - -pthread_mutex_t LOCK_event_arrays, // mutex for when working with the queue - LOCK_workers_count, // mutex for when inc/dec uint workers_count - LOCK_evex_running; // mutes for managing bool evex_is_running - -static pthread_mutex_t LOCK_evex_main_thread; // mutex for when working with the queue -bool scheduler_main_thread_running= false; - -bool evex_is_running= false; - -ulonglong evex_main_thread_id= 0; -ulong opt_event_executor; -my_bool event_executor_running_global_var; -static my_bool evex_mutexes_initted= FALSE; -static uint workers_count; - -static int -evex_load_events_from_db(THD *thd); - -bool -evex_print_warnings(THD *thd, Event_timed *et); - -/* - TODO Andrey: Check for command line option whether to start - the main thread or not. -*/ - -pthread_handler_t -event_executor_worker(void *arg); - -pthread_handler_t -event_executor_main(void *arg); - - -/* - Returns the seconds difference of 2 TIME structs - - SYNOPSIS - evex_time_diff() - a - TIME struct 1 - b - TIME struct 2 - - Returns: - the seconds difference -*/ - -static int -evex_time_diff(TIME *a, TIME *b) -{ - return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b); -} - - -/* - Inits the mutexes used by the scheduler module - - SYNOPSIS - evex_init_mutexes() - - NOTES - The mutexes are : - LOCK_event_arrays - LOCK_workers_count - LOCK_evex_running -*/ - -static void -evex_init_mutexes() -{ - if (evex_mutexes_initted) - return; - - evex_mutexes_initted= TRUE; - pthread_mutex_init(&LOCK_event_arrays, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_workers_count, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_evex_running, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&LOCK_evex_main_thread, MY_MUTEX_INIT_FAST); - - event_executor_running_global_var= opt_event_executor; -} - -extern TABLE_FIELD_W_TYPE mysql_db_table_fields[]; -extern time_t mysql_db_table_last_check; - -/* - Opens mysql.db and mysql.user and checks whether - 1. mysql.db has column Event_priv at column 20 (0 based); - 2. mysql.user has column Event_priv at column 29 (0 based); - - Synopsis - evex_check_system_tables() -*/ - -void -evex_check_system_tables() -{ - THD *thd= current_thd; - TABLE_LIST tables; - Open_tables_state backup; - - /* thd is 0x0 during boot of the server. Later it's !=0x0 */ - if (!thd) - return; - - thd->reset_n_backup_open_tables_state(&backup); - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "db"; - tables.lock_type= TL_READ; - - if (simple_open_n_lock_tables(thd, &tables)) - sql_print_error("Cannot open mysql.db"); - else - { - table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, mysql_db_table_fields, - &mysql_db_table_last_check,ER_CANNOT_LOAD_FROM_TABLE); - close_thread_tables(thd); - } - - bzero((char*) &tables, sizeof(tables)); - tables.db= (char*) "mysql"; - tables.table_name= tables.alias= (char*) "user"; - tables.lock_type= TL_READ; - - if (simple_open_n_lock_tables(thd, &tables)) - sql_print_error("Cannot open mysql.db"); - else - { - if (tables.table->s->fields < 29 || - strncmp(tables.table->field[29]->field_name, - STRING_WITH_LEN("Event_priv"))) - sql_print_error("mysql.user has no `Event_priv` column at position 29"); - - close_thread_tables(thd); - } - - thd->restore_backup_open_tables_state(&backup); -} - - -/* - Inits the scheduler. Called on server start and every time the scheduler - is started with switching the event_scheduler global variable to TRUE - - SYNOPSIS - init_events() - - NOTES - Inits the mutexes used by the scheduler. Done at server start. -*/ - -int -init_events() -{ - pthread_t th; - DBUG_ENTER("init_events"); - - DBUG_PRINT("info",("Starting events main thread")); - - evex_check_system_tables(); - - evex_init_mutexes(); - - VOID(pthread_mutex_lock(&LOCK_evex_running)); - evex_is_running= false; - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - if (event_executor_running_global_var) - { -#ifndef DBUG_FAULTY_THR - /* TODO Andrey: Change the error code returned! */ - if (pthread_create(&th, &connection_attrib, event_executor_main,(void*)NULL)) - DBUG_RETURN(ER_SLAVE_THREAD); -#else - event_executor_main(NULL); -#endif - } - - DBUG_RETURN(0); -} - - -/* - Cleans up scheduler memory. Called on server shutdown. - - SYNOPSIS - shutdown_events() - - NOTES - Destroys the mutexes. -*/ - -void -shutdown_events() -{ - DBUG_ENTER("shutdown_events"); - - if (evex_mutexes_initted) - { - evex_mutexes_initted= FALSE; - VOID(pthread_mutex_lock(&LOCK_evex_running)); - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - pthread_mutex_destroy(&LOCK_event_arrays); - pthread_mutex_destroy(&LOCK_workers_count); - pthread_mutex_destroy(&LOCK_evex_running); - pthread_mutex_destroy(&LOCK_evex_main_thread); - } - DBUG_VOID_RETURN; -} - - -/* - Inits an scheduler thread handler, both the main and a worker - - SYNOPSIS - init_event_thread() - thd - the THD of the thread. Has to be allocated by the caller. - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. - - Returns - 0 - OK - -1 - Error -*/ - -static int -init_event_thread(THD* thd) -{ - DBUG_ENTER("init_event_thread"); - thd->client_capabilities= 0; - thd->security_ctx->master_access= 0; - thd->security_ctx->db_access= 0; - thd->security_ctx->host_or_ip= (char*)my_localhost; - my_net_init(&thd->net, 0); - thd->net.read_timeout = slave_net_timeout; - thd->slave_thread= 0; - thd->options|= OPTION_AUTO_IS_NULL; - thd->client_capabilities= CLIENT_LOCAL_FILES; - thd->real_id=pthread_self(); - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->thread_id= thread_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - if (init_thr_lock() || thd->store_globals()) - { - thd->cleanup(); - delete thd; - DBUG_RETURN(-1); - } - -#if !defined(__WIN__) && !defined(__NETWARE__) - sigset_t set; - VOID(sigemptyset(&set)); // Get mask in use - VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); -#endif - - thd->proc_info= "Initialized"; - thd->version= refresh_version; - thd->set_time(); - DBUG_RETURN(0); -} - - -/* - This function waits till the time next event in the queue should be - executed. - - Returns - WAIT_STATUS_READY There is an event to be executed right now - WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped. - WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled - on top. Restart ticking. - WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0; -*/ - -static int -executor_wait_till_next_event_exec(THD *thd) -{ - Event_timed *et; - TIME time_now; - int t2sleep; - - DBUG_ENTER("executor_wait_till_next_event_exec"); - /* - now let's see how much time to sleep, we know there is at least 1 - element in the queue. - */ - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - DBUG_RETURN(WAIT_STATUS_EMPTY_QUEUE); - } - et= evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*); - DBUG_ASSERT(et); - if (et->status == MYSQL_EVENT_DISABLED) - { - DBUG_PRINT("evex main thread",("Now it is disabled-exec no more")); - if (et->dropped) - et->drop(thd); - delete et; - evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1 - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - sql_print_information("Event found disabled, dropping."); - DBUG_RETURN(1); - } - - DBUG_PRINT("evex main thread",("computing time to sleep till next exec")); - /* set the internal clock of thd */ - thd->end_time(); - my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); - t2sleep= evex_time_diff(&et->execute_at, &time_now); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - - t2sleep*=20; - DBUG_PRINT("evex main thread",("unlocked LOCK_event_arrays")); - if (t2sleep > 0) - { - ulonglong modified= et->modified; - /* - We sleep t2sleep seconds but we check every second whether this thread - has been killed, or there is a new candidate - */ - while (t2sleep-- && !thd->killed && event_executor_running_global_var && - evex_queue_num_elements(EVEX_EQ_NAME) && - (evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*) == et && - evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*)->modified == - modified)) - { - DBUG_PRINT("evex main thread",("will sleep a bit more.")); - my_sleep(50000); - } - DBUG_PRINT("info",("saved_modified=%llu current=%llu", modified, - evex_queue_num_elements(EVEX_EQ_NAME)? - evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*)->modified: - (ulonglong)~0)); - } - - int ret= WAIT_STATUS_READY; - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - ret= WAIT_STATUS_EMPTY_QUEUE; - else if (evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*) != et) - ret= WAIT_STATUS_NEW_TOP_EVENT; - if (thd->killed && event_executor_running_global_var) - ret= WAIT_STATUS_STOP_EXECUTOR; - - DBUG_RETURN(ret); -} - - -/* - The main scheduler thread. Inits the priority queue on start and - destroys it on thread shutdown. Forks child threads for every event - execution. Sleeps between thread forking and does not do a busy wait. - - SYNOPSIS - event_executor_main() - arg unused - - NOTES - 1. The host of the thead is my_localhost - 2. thd->net is initted with NULL - no communication. - -*/ - -pthread_handler_t -event_executor_main(void *arg) -{ - THD *thd; /* needs to be first for thread_stack */ - uint i=0, j=0; - my_ulonglong cnt= 0; - - DBUG_ENTER("event_executor_main"); - DBUG_PRINT("event_executor_main", ("EVEX thread started")); - - pthread_mutex_lock(&LOCK_evex_main_thread); - if (!scheduler_main_thread_running) - scheduler_main_thread_running= true; - else - { - DBUG_PRINT("event_executor_main", ("already running. thd_id=%d", - evex_main_thread_id)); - pthread_mutex_unlock(&LOCK_evex_main_thread); - my_thread_end(); - pthread_exit(0); - DBUG_RETURN(0); // Can't return anything here - } - pthread_mutex_unlock(&LOCK_evex_main_thread); - - /* init memory root */ - init_alloc_root(&evex_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - - /* needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff*/ - my_thread_init(); - - if (sizeof(my_time_t) != sizeof(time_t)) - { - sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." - "The scheduler will not work correctly. Stopping."); - DBUG_ASSERT(0); - goto err_no_thd; - } - - /* note that contructor of THD uses DBUG_ ! */ - if (!(thd = new THD)) - { - sql_print_error("SCHEDULER: Cannot create THD for the main thread."); - goto err_no_thd; - } - thd->thread_stack = (char*)&thd; // remember where our stack is - - pthread_detach_this_thread(); - - if (init_event_thread(thd)) - goto finish; - - /* - make this thread visible it has no vio -> show processlist won't see it - unless it's marked as system thread - */ - thd->system_thread= 1; - - VOID(pthread_mutex_lock(&LOCK_thread_count)); - threads.append(thd); - thread_count++; - thread_running++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - DBUG_PRINT("EVEX main thread", ("Initing events_queue")); - - /* - eventually manifest that we are running, not to crashe because of - usage of non-initialized memory structures. - */ - VOID(pthread_mutex_lock(&LOCK_evex_running)); - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - evex_queue_init(&EVEX_EQ_NAME); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - evex_is_running= true; - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - thd->security_ctx->user= my_strdup("event_scheduler", MYF(0)); - - if (evex_load_events_from_db(thd)) - goto finish; - - evex_main_thread_id= thd->thread_id; - - sql_print_information("SCHEDULER: Main thread started"); - while (!thd->killed) - { - TIME time_now; - Event_timed *et; - - cnt++; - DBUG_PRINT("info", ("EVEX External Loop %d thd->k", cnt)); - - thd->proc_info = "Sleeping"; - if (!event_executor_running_global_var) - { - sql_print_information("SCHEDULER: Asked to stop."); - break; - } - - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - my_sleep(100000);// sleep 0.1s - continue; - } - -restart_ticking: - switch (executor_wait_till_next_event_exec(thd)) { - case WAIT_STATUS_READY: // time to execute the event on top - DBUG_PRINT("evex main thread",("time to execute an event")); - break; - case WAIT_STATUS_EMPTY_QUEUE: // no more events - DBUG_PRINT("evex main thread",("no more events")); - continue; - break; - case WAIT_STATUS_NEW_TOP_EVENT: // new event on top in the queue - DBUG_PRINT("evex main thread",("restart ticking")); - goto restart_ticking; - case WAIT_STATUS_STOP_EXECUTOR: - sql_print_information("SCHEDULER: Asked to stop."); - goto finish; - break; - default: - DBUG_ASSERT(0); - } - - - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - thd->end_time(); - my_tz_UTC->gmt_sec_to_TIME(&time_now, thd->query_start()); - - if (!evex_queue_num_elements(EVEX_EQ_NAME)) - { - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - DBUG_PRINT("evex main thread",("empty queue")); - continue; - } - et= evex_queue_first_element(&EVEX_EQ_NAME, Event_timed*); - DBUG_PRINT("evex main thread",("got event from the queue")); - - if (!et->execute_at_null && my_time_compare(&time_now,&et->execute_at) == -1) - { - DBUG_PRINT("evex main thread",("still not the time for execution")); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - continue; - } - - DBUG_PRINT("evex main thread",("it's right time")); - if (et->status == MYSQL_EVENT_ENABLED) - { - int fork_ret_code; - - DBUG_PRINT("evex main thread", ("[%10s] this exec at [%llu]", et->name.str, - TIME_to_ulonglong_datetime(&et->execute_at))); - et->mark_last_executed(thd); - if (et->compute_next_execution_time()) - { - sql_print_error("SCHEDULER: Error while computing time of %s.%s . " - "Disabling after execution.", - et->dbname.str, et->name.str); - et->status= MYSQL_EVENT_DISABLED; - } - DBUG_PRINT("evex main thread", ("[%10s] next exec at [%llu]", et->name.str, - TIME_to_ulonglong_datetime(&et->execute_at))); - - et->update_fields(thd); -#ifndef DBUG_FAULTY_THR - thread_safe_increment(workers_count, &LOCK_workers_count); - switch ((fork_ret_code= et->spawn_now(event_executor_worker))) { - case EVENT_EXEC_CANT_FORK: - thread_safe_decrement(workers_count, &LOCK_workers_count); - sql_print_error("SCHEDULER: Problem while trying to create a thread"); - UNLOCK_MUTEX_AND_BAIL_OUT(LOCK_event_arrays, finish); - case EVENT_EXEC_ALREADY_EXEC: - thread_safe_decrement(workers_count, &LOCK_workers_count); - sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", - et->dbname.str, et->name.str); - break; - default: - DBUG_ASSERT(!fork_ret_code); - if (fork_ret_code) - thread_safe_decrement(workers_count, &LOCK_workers_count); - break; - } -#else - event_executor_worker((void *) et); -#endif - /* - 1. For one-time event : year is > 0 and expression is 0 - 2. For recurring, expression is != -=> check execute_at_null in this case - */ - if ((et->execute_at.year && !et->expression) || et->execute_at_null) - et->flags |= EVENT_EXEC_NO_MORE; - - if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED) - evex_queue_delete_element(&EVEX_EQ_NAME, 0);// 0 is top, internally 1 - else - evex_queue_first_updated(&EVEX_EQ_NAME); - } - DBUG_PRINT("evex main thread",("unlocking")); - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - }/* while */ -finish: - - /* First manifest that this thread does not work and then destroy */ - VOID(pthread_mutex_lock(&LOCK_evex_running)); - evex_is_running= false; - evex_main_thread_id= 0; - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - - /* - TODO: A better will be with a conditional variable - */ - /* - Read workers_count without lock, no need for locking. - In the worst case we have to wait 1sec more. - */ - sql_print_information("SCHEDULER: Stopping. Waiting for worker threads to finish."); - while (1) - { - VOID(pthread_mutex_lock(&LOCK_workers_count)); - if (!workers_count) - { - VOID(pthread_mutex_unlock(&LOCK_workers_count)); - break; - } - VOID(pthread_mutex_unlock(&LOCK_workers_count)); - my_sleep(1000000);// 1s - } - - /* - First we free all objects ... - Lock because a DROP DATABASE could be running in parallel and it locks on these - */ - sql_print_information("SCHEDULER: Emptying the queue."); - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - for (i= 0; i < evex_queue_num_elements(EVEX_EQ_NAME); ++i) - { - Event_timed *et= evex_queue_element(&EVEX_EQ_NAME, i, Event_timed*); - et->free_sp(); - delete et; - } - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - /* ... then we can thrash the whole queue at once */ - evex_queue_destroy(&EVEX_EQ_NAME); - - thd->proc_info = "Clearing"; - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); // destructor will not free it, because we are weird - THD_CHECK_SENTRY(thd); - - pthread_mutex_lock(&LOCK_thread_count); - thread_count--; - thread_running--; -#ifndef DBUG_FAULTY_THR - THD_CHECK_SENTRY(thd); - delete thd; -#endif - pthread_mutex_unlock(&LOCK_thread_count); - - -err_no_thd: - VOID(pthread_mutex_lock(&LOCK_evex_running)); - evex_is_running= false; - event_executor_running_global_var= false; - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - free_root(&evex_mem_root, MYF(0)); - sql_print_information("SCHEDULER: Stopped."); - -#ifndef DBUG_FAULTY_THR - pthread_mutex_lock(&LOCK_evex_main_thread); - scheduler_main_thread_running= false; - pthread_mutex_unlock(&LOCK_evex_main_thread); - - my_thread_end(); - pthread_exit(0); -#endif - DBUG_RETURN(0); // Can't return anything here -} - - -/* - Function that executes an event in a child thread. Setups the - environment for the event execution and cleans after that. - - SYNOPSIS - event_executor_worker() - arg The Event_timed object to be processed -*/ - -pthread_handler_t -event_executor_worker(void *event_void) -{ - THD *thd; /* needs to be first for thread_stack */ - Event_timed *event = (Event_timed *) event_void; - MEM_ROOT worker_mem_root; - - DBUG_ENTER("event_executor_worker"); - - init_alloc_root(&worker_mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); - -#ifndef DBUG_FAULTY_THR - my_thread_init(); - - if (!(thd = new THD)) /* note that contructor of THD uses DBUG_ ! */ - { - sql_print_error("SCHEDULER: Cannot create a THD structure in an worker."); - goto err_no_thd; - } - thd->thread_stack = (char*)&thd; // remember where our stack is - thd->mem_root= &worker_mem_root; - - pthread_detach_this_thread(); - - if (init_event_thread(thd)) - goto err; - - thd->init_for_queries(); - - /* make this thread visible it has no vio -> show processlist needs this flag */ - thd->system_thread= 1; - - VOID(pthread_mutex_lock(&LOCK_thread_count)); - threads.append(thd); - thread_count++; - thread_running++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); -#else - thd= current_thd; -#endif - - thd->enable_slow_log= TRUE; - { - int ret; - sql_print_information("SCHEDULER: Executing event %s.%s of %s [EXPR:%d]", - event->dbname.str, event->name.str, - event->definer.str, (int) event->expression); - - ret= event->execute(thd, &worker_mem_root); - - evex_print_warnings(thd, event); - sql_print_information("SCHEDULER: Executed event %s.%s of %s [EXPR:%d]. " - "RetCode=%d", event->dbname.str, event->name.str, - event->definer.str, (int) event->expression, ret); - if (ret == EVEX_COMPILE_ERROR) - sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of", - event->dbname.str, event->name.str, - event->definer.str); - else if (ret == EVEX_MICROSECOND_UNSUP) - sql_print_information("SCHEDULER: MICROSECOND is not supported"); - } - event->spawn_thread_finish(thd); - - -err: - VOID(pthread_mutex_lock(&LOCK_thread_count)); -#ifndef DBUG_FAULTY_THR - thread_count--; - thread_running--; - /* - Some extra safety, which should not been needed (normally, event deletion - should already have done these assignments (each event which sets these - variables is supposed to set them to 0 before terminating)). - */ - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - thd->proc_info = "Clearing"; - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); // destructor will not free it, because we are weird - THD_CHECK_SENTRY(thd); - - VOID(pthread_mutex_lock(&LOCK_thread_count)); - THD_CHECK_SENTRY(thd); - delete thd; -#endif - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - -err_no_thd: - - free_root(&worker_mem_root, MYF(0)); - thread_safe_decrement(workers_count, &LOCK_workers_count); - -#ifndef DBUG_FAULTY_THR - my_thread_end(); - pthread_exit(0); -#endif - DBUG_RETURN(0); // Can't return anything here -} - - -/* - Loads all ENABLED events from mysql.event into the prioritized - queue. Called during scheduler main thread initialization. Compiles - the events. Creates Event_timed instances for every ENABLED event - from mysql.event. - - SYNOPSIS - evex_load_events_from_db() - thd - Thread context. Used for memory allocation in some cases. - - RETURNS - 0 OK - !0 Error - - NOTES - Reports the error to the console -*/ - -static int -evex_load_events_from_db(THD *thd) -{ - TABLE *table; - READ_RECORD read_record_info; - int ret= -1; - uint count= 0; - - DBUG_ENTER("evex_load_events_from_db"); - - if ((ret= evex_open_event_table(thd, TL_READ, &table))) - { - sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); - DBUG_RETURN(SP_OPEN_TABLE_FAILED); - } - - VOID(pthread_mutex_lock(&LOCK_event_arrays)); - - init_read_record(&read_record_info, thd, table ,NULL,1,0); - while (!(read_record_info.read_record(&read_record_info))) - { - Event_timed *et; - if (!(et= new Event_timed)) - { - DBUG_PRINT("evex_load_events_from_db", ("Out of memory")); - ret= -1; - goto end; - } - DBUG_PRINT("evex_load_events_from_db", ("Loading event from row.")); - - if ((ret= et->load_from_row(&evex_mem_root, table))) - { - sql_print_error("SCHEDULER: Error while loading from mysql.event. " - "Table probably corrupted"); - goto end; - } - if (et->status != MYSQL_EVENT_ENABLED) - { - DBUG_PRINT("evex_load_events_from_db",("%s is disabled",et->name.str)); - delete et; - continue; - } - - DBUG_PRINT("evex_load_events_from_db", - ("Event %s loaded from row. Time to compile", et->name.str)); - - switch (ret= et->compile(thd, &evex_mem_root)) { - case EVEX_MICROSECOND_UNSUP: - sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " - "supported but found in mysql.event"); - goto end; - case EVEX_COMPILE_ERROR: - sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", - et->dbname.str, et->name.str); - goto end; - default: - break; - } - - /* let's find when to be executed */ - if (et->compute_next_execution_time()) - { - sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." - " Skipping", et->dbname.str, et->name.str); - continue; - } - - DBUG_PRINT("evex_load_events_from_db", ("Adding to the exec list.")); - - evex_queue_insert(&EVEX_EQ_NAME, (EVEX_PTOQEL) et); - DBUG_PRINT("evex_load_events_from_db", ("%p %*s", - et, et->name.length,et->name.str)); - count++; - } - - ret= 0; - -end: - VOID(pthread_mutex_unlock(&LOCK_event_arrays)); - end_read_record(&read_record_info); - - /* Force close to free memory */ - thd->version--; - - close_thread_tables(thd); - if (!ret) - sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); - DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); - - DBUG_RETURN(ret); -} - - -/* - The update method of the global variable event_scheduler. - If event_scheduler is switched from 0 to 1 then the scheduler main - thread is started. - - SYNOPSIS - event_executor_worker() - thd - Thread context (unused) - car - the new value - - Returns - 0 OK (always) -*/ - -bool -sys_var_event_executor::update(THD *thd, set_var *var) -{ - /* here start the thread if not running. */ - DBUG_ENTER("sys_var_event_executor::update"); - VOID(pthread_mutex_lock(&LOCK_evex_running)); - *value= var->save_result.ulong_value; - - DBUG_PRINT("new_value", ("%d", *value)); - if ((my_bool) *value && !evex_is_running) - { - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - init_events(); - } else - VOID(pthread_mutex_unlock(&LOCK_evex_running)); - - DBUG_RETURN(0); -} - - -extern LEX_STRING warning_level_names[]; - -typedef void (*sql_print_xxx_func)(const char *format, ...); -static sql_print_xxx_func sql_print_xxx_handlers[3] = -{ - sql_print_information, - sql_print_warning, - sql_print_error -}; - - -/* - Prints the stack of infos, warnings, errors from thd to - the console so it can be fetched by the logs-into-tables and - checked later. - - Synopsis - evex_print_warnings - thd - thread used during the execution of the event - et - the event itself - - Returns - 0 - OK (always) - -*/ - -bool -evex_print_warnings(THD *thd, Event_timed *et) -{ - MYSQL_ERROR *err; - DBUG_ENTER("evex_show_warnings"); - char msg_buf[1024]; - char prefix_buf[512]; - String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); - prefix.length(0); - - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); - while ((err= it++)) - { - String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); - /* set it to 0 or we start adding at the end. That's the trick ;) */ - err_msg.length(0); - if (!prefix.length()) - { - prefix.append("SCHEDULER: ["); - - append_identifier(thd,&prefix,et->definer_user.str,et->definer_user.length); - prefix.append('@'); - append_identifier(thd,&prefix,et->definer_host.str,et->definer_host.length); - prefix.append("][", 2); - append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); - prefix.append('.'); - append_identifier(thd,&prefix, et->name.str, et->name.length); - prefix.append("] ", 2); - } - - err_msg.append(prefix); - err_msg.append(err->msg, strlen(err->msg), system_charset_info); - err_msg.append("]"); - DBUG_ASSERT(err->level < 3); - (sql_print_xxx_handlers[err->level])("%*s", err_msg.length(), err_msg.c_ptr()); - } - - - DBUG_RETURN(FALSE); -} diff --git a/sql/event_priv.h b/sql/event_priv.h index 6b23136847e..b0a18377ace 100644 --- a/sql/event_priv.h +++ b/sql/event_priv.h @@ -1,4 +1,6 @@ -/* Copyright (C) 2004-2005 MySQL AB +#ifndef _EVENT_PRIV_H_ +#define _EVENT_PRIV_H_ +/* Copyright (C) 2004-2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -14,8 +16,6 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifndef _EVENT_PRIV_H_ -#define _EVENT_PRIV_H_ #include "mysql_priv.h" @@ -23,11 +23,6 @@ #define EVENT_EXEC_ALREADY_EXEC 1 #define EVENT_EXEC_CANT_FORK 2 -#define EVEX_USE_QUEUE - -#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \ - { VOID(pthread_mutex_unlock(&__mutex)); goto __label; } - #define EVEX_DB_FIELD_LEN 64 #define EVEX_NAME_FIELD_LEN 64 #define EVEX_MAX_INTERVAL_VALUE 2147483647L @@ -44,39 +39,49 @@ evex_db_find_event_by_name(THD *thd, const LEX_STRING dbname, int event_timed_compare_q(void *vptr, byte* a, byte *b); -int db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, - uint *rows_affected); - +int +db_drop_event(THD *thd, Event_timed *et, bool drop_if_exists, + uint *rows_affected); +int +db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, Event_timed **ett, + TABLE *tbl, MEM_ROOT *root); -#define EXEC_QUEUE_QUEUE_NAME executing_queue -#define EXEC_QUEUE_DARR_NAME evex_executing_queue +int +db_create_event(THD *thd, Event_timed *et, my_bool create_if_not, + uint *rows_affected); +int +db_drop_events_from_table(THD *thd, LEX_STRING *db); -#define EVEX_QUEUE_TYPE QUEUE -#define EVEX_PTOQEL byte * +int +sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs); -#define EVEX_EQ_NAME executing_queue -#define evex_queue_first_element(queue, __cast) ((__cast)queue_top(queue)) -#define evex_queue_element(queue, idx, __cast) ((__cast)queue_element(queue, idx)) -#define evex_queue_delete_element(queue, idx) queue_remove(queue, idx) -#define evex_queue_destroy(queue) delete_queue(queue) -#define evex_queue_first_updated(queue) queue_replaced(queue) -#define evex_queue_insert(queue, element) queue_insert_safe(queue, element); +/* Compares only the name part of the identifier */ +bool +event_timed_name_equal(Event_timed *et, LEX_STRING *name); +/* Compares only the schema part of the identifier */ +bool +event_timed_db_equal(Event_timed *et, LEX_STRING *db); +/* + Compares only the definer part of the identifier. Use during DROP USER + to drop user's events. (Still not implemented) +*/ +bool +event_timed_definer_equal(Event_timed *et, LEX_STRING *definer); -void -evex_queue_init(EVEX_QUEUE_TYPE *queue); +/* Compares the whole identifier*/ +bool +event_timed_identifier_equal(Event_timed *a, Event_timed *b); -#define evex_queue_num_elements(queue) queue.elements +bool +change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, + LEX_STRING db, Security_context *s_ctx, + Security_context **backup); -extern bool evex_is_running; -extern MEM_ROOT evex_mem_root; -extern pthread_mutex_t LOCK_event_arrays, - LOCK_workers_count, - LOCK_evex_running; -extern ulonglong evex_main_thread_id; -extern QUEUE EVEX_EQ_NAME; +void +restore_security_context(THD *thd, Security_context *backup); #endif /* _EVENT_PRIV_H_ */ diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc new file mode 100644 index 00000000000..a19f14d3726 --- /dev/null +++ b/sql/event_scheduler.cc @@ -0,0 +1,2423 @@ +/* Copyright (C) 2004-2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "event_priv.h" +#include "event.h" +#include "event_scheduler.h" +#include "sp_head.h" + +/* + ToDo: + 1. Talk to Alik to get a check for configure.in for my_time_t and time_t + 2. Look at guardian.h|cc to see its life cycle, has similarities. +*/ + + +/* + The scheduler is implemented as class Event_scheduler. Only one instance is + kept during the runtime of the server, by implementing the Singleton DP. + Object instance is always there because the memory is allocated statically + and initialized when the OS loader loads mysqld. This initialization is + bare. Extended initialization is done during the call to + Event_scheduler::init() in Events::init(). The reason for that late initialization + is that some subsystems needed to boot the Scheduler are not available at + earlier stages of the mysqld boot procedure. Events::init() is called in + mysqld.cc . If the mysqld is started with --event-scheduler=0 then + no initialization takes place and the scheduler is unavailable during this + server run. The server should be started with --event-scheduler=1 to have + the scheduler initialized and able to execute jobs. This starting alwa + s implies that the jobs execution will start immediately. If the server + is started with --event-scheduler=2 then the scheduler is started in suspended + state. Default state, if --event-scheduler is not specified is 2. + + The scheduler only manages execution of the events. Their creation, + alteration and deletion is delegated to other routines found in event.cc . + These routines interact with the scheduler : + - CREATE EVENT -> Event_scheduler::add_event() + - ALTER EVENT -> Event_scheduler::replace_event() + - DROP EVENT -> Event_scheduler::drop_event() + + There is one mutex in the single Event_scheduler object which controls + the simultaneous access to the objects invariants. Using one lock makes + it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is + initialized in Event_scheduler::init(). Which in turn is called by the + Facade class Events in event.cc, coming from init_thread_environment() from + mysqld.cc -> no concurrency at this point. It's destroyed in + Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc . + + The full initialization is done in Event_scheduler::init() called from + Events::init(). It's done before any requests coming in, so this is a + guarantee for not having concurrency. + + The scheduler is started with Event_scheduler::start() and stopped with + Event_scheduler::stop(). When the scheduler starts it loads all events + from mysql.event table. Unfortunately, there is a race condition between + the event disk management functions and the scheduler ones + (add/replace/drop_event & load_events_from_db()), because the operations + do not happen under one global lock but the disk operations are guarded + by the MYISAM lock on mysql.event. In the same time, the queue operations + are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during + server startup and stopped()-ed during server shutdown (in Events::shutdown() + called by kill_server() in mysqld.cc) these races does not exist. + + Since the user may want to temporarily inhibit execution of events the + scheduler can be suspended and then it can be forced to resume its + operations. The API call to perform these is + Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) . + When the scheduler is suspended the main scheduler thread, which ATM + happens to have thread_id 1, locks on a condition COND_suspend_or_resume. + When this is signal is sent for the reverse operation the main scheduler + loops continues to roll and execute events. + + When the scheduler is suspended all add/replace/drop_event() operations + work as expected and the modify the queue but no events execution takes + place. + + In contrast to the previous scheduler implementation, found in + event_executor.cc, the start, shutdown, suspend and resume are synchronous + operations. As a whole all operations are synchronized and no busy waits + are used except in stop_all_running_events(), which waits until all + running event worker threads have finished. It would have been nice to + use a conditional on which this method will wait and the last thread to + finish would signal it but this implies subclassing THD. + + The scheduler does not keep a counter of how many event worker threads are + running, at any specific moment, because this will copy functionality + already existing in the server. Namely, all THDs are registered in the + global `threads` array. THD has member variable system_thread which + identifies the type of thread. Connection threads being NON_SYSTEM_THREAD, + all other have their enum value. Important for the scheduler are + SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER. + + Class THD subclasses class ilink, which is the linked list of all threads. + When a THD instance is destroyed it's being removed from threads, thus + no manual intervention is needed. On the contrary registering is manual + with threads.append() . Traversing the threads array every time a subclass + of THD, for instance if we would have had THD_scheduler_worker to see + how many events we have and whether the scheduler is shutting down will + take much time and lead to a deadlock. stop_all_running_events() is called + under LOCK_scheduler_data. If the THD_scheduler_worker was aware of + the single Event_scheduler instance it will try to check + Event_scheduler::state but for this it would need to acquire + LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a + busy wait. + + DROP DATABASE DDL should drop all events defined in a specific schema. + DROP USER also should drop all events who has as definer the user being + dropped (this one is not addressed at the moment but a hook exists). For + this specific needs Event_scheduler::drop_matching_events() is + implemented. Which expects a callback to be applied on every object in + the queue. Thus events that match specific schema or user, will be + removed from the queue. The exposed interface is : + - Event_scheduler::drop_schema_events() + - Event_scheduler::drop_user_events() + + This bulk dropping happens under LOCK_scheduler_data, thus no two or + more threads can execute it in parallel. However, DROP DATABASE is also + synchronized, currently, in the server thus this does not impact the + overall performance. In addition, DROP DATABASE is not that often + executed DDL. + + Though the interface to the scheduler is only through the public methods + of class Event_scheduler, there are currently few functions which are + used during its operations. Namely : + - static evex_print_warnings() + After every event execution all errors/warnings are dumped, so the user + can see in case of a problem what the problem was. + + - static init_event_thread() + This function is both used by event_scheduler_thread() and + event_worker_thread(). It initializes the THD structure. The + initialization looks pretty similar to the one in slave.cc done for the + replication threads. However, though the similarities it cannot be + factored out to have one routine. + + - static event_scheduler_thread() + Because our way to register functions to be used by the threading library + does not allow usage of static methods this function is used to start the + scheduler in it. It does THD initialization and then calls + Event_scheduler::run(). + + - static event_worker_thread() + With already stated the reason for not being able to use methods, this + function executes the worker threads. + + The execution of events is, to some extent, synchronized to inhibit race + conditions when Event_timed::thread_id is being updated with the thread_id of + the THD in which the event is being executed. The thread_id is in the + Event_timed object because we need to be able to kill quickly a specific + event during ALTER/DROP EVENT without traversing the global `threads` array. + However, this makes the scheduler's code more complicated. The event worker + thread is started by Event_timed::spawn_now(), which in turn calls + pthread_create(). The thread_id which will be associated in init_event_thread + is not known in advance thus the registering takes place in + event_worker_thread(). This registering has to be synchronized under + LOCK_scheduler_data, so no kill_event() on a object in + replace_event/drop_event/drop_matching_events() could take place. + + This synchronization is done through class Worker_thread_param that is + local to this file. Event_scheduler::execute_top() is called under + LOCK_scheduler_data. This method : + 1. Creates an instance of Worker_thread_param on the stack + 2. Locks Worker_thread_param::LOCK_started + 3. Calls Event_timed::spawn_now() which in turn creates a new thread. + 4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the + worker thread send signal. The code is spurious wake-up safe because + Worker_thread_param::started is checked. + 5. The worker thread initializes its THD, then sets Event_timed::thread_id, + sets Worker_thread_param::started to TRUE and sends back + Worker_thread_param::COND_started. From this moment on, the event + is being executed and could be killed by using Event_timed::thread_id. + When Event_timed::spawn_thread_finish() is called in the worker thread, + it sets thread_id to 0. From this moment on, the worker thread should not + touch the Event_timed instance. + + + The life-cycle of the server is a FSA. + enum enum_state Event_scheduler::state keeps the state of the scheduler. + + The states are: + + |---UNINITIALIZED + | + | |------------------> IN_SHUTDOWN + --> INITIALIZED -> COMMENCING ---> RUNNING ----------| + ^ ^ | | ^ | + | |- CANTSTART <--| | |- SUSPENDED <-| + |______________________________| + + - UNINITIALIZED :The object is created and only the mutex is initialized + - INITIALIZED :All member variables are initialized + - COMMENCING :The scheduler is starting, no other attempt to start + should succeed before the state is back to INITIALIZED. + - CANTSTART :Set by the ::run() method in case it can't start for some + reason. In this case the connection thread that tries to + start the scheduler sees that some error has occurred and + returns an error to the user. Finally, the connection + thread sets the state to INITIALIZED, so further attempts + to start the scheduler could be made. + - RUNNING :The scheduler is running. New events could be added, + dropped, altered. The scheduler could be stopped. + - SUSPENDED :Like RUNNING but execution of events does not take place. + Operations on the memory queue are possible. + - IN_SHUTDOWN :The scheduler is shutting down, due to request by setting + the global event_scheduler to 0/FALSE, or because of a + KILL command sent by a user to the master thread. + + In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA() + are used for (un)locking purposes. They are used to save the programmer + from typing everytime + lock_data(__FUNCTION__, __LINE__); + All locking goes through Event_scheduler::lock_data() and ::unlock_data(). + These two functions then record in variables where for last time + LOCK_scheduler_data was locked and unlocked (two different variables). In + multithreaded environment, in some cases they make no sense but are useful for + inspecting deadlocks without having the server debug log turned on and the + server is still running. + + The same strategy is used for conditional variables. + Event_scheduler::cond_wait() is invoked from all places with parameter + an enum enum_cond_vars. In this manner, it's possible to inspect the last + on which condition the last call to cond_wait() was waiting. If the server + was started with debug trace switched on, the trace file also holds information + about conditional variables used. +*/ + + +#define LOCK_SCHEDULER_DATA() lock_data(__FUNCTION__,__LINE__) +#define UNLOCK_SCHEDULER_DATA() unlock_data(__FUNCTION__,__LINE__) + + +#ifndef DBUG_OFF +static +LEX_STRING states_names[] = +{ + {(char*) STRING_WITH_LEN("UNINITIALIZED")}, + {(char*) STRING_WITH_LEN("INITIALIZED")}, + {(char*) STRING_WITH_LEN("COMMENCING")}, + {(char*) STRING_WITH_LEN("CANTSTART")}, + {(char*) STRING_WITH_LEN("RUNNING")}, + {(char*) STRING_WITH_LEN("SUSPENDED")}, + {(char*) STRING_WITH_LEN("IN_SHUTDOWN")} +}; +#endif + + +Event_scheduler +Event_scheduler::singleton; + + +const char * const +Event_scheduler::cond_vars_names[Event_scheduler::COND_LAST] = +{ + "new work", + "started or stopped", + "suspend or resume" +}; + + +class Worker_thread_param +{ +public: + Event_timed *et; + pthread_mutex_t LOCK_started; + pthread_cond_t COND_started; + bool started; + + Worker_thread_param(Event_timed *etn):et(etn), started(FALSE) + { + pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST); + pthread_cond_init(&COND_started, NULL); + } + + ~Worker_thread_param() + { + pthread_mutex_destroy(&LOCK_started); + pthread_cond_destroy(&COND_started); + } +}; + + +/* + Prints the stack of infos, warnings, errors from thd to + the console so it can be fetched by the logs-into-tables and + checked later. + + SYNOPSIS + evex_print_warnings + thd - thread used during the execution of the event + et - the event itself +*/ + +static void +evex_print_warnings(THD *thd, Event_timed *et) +{ + MYSQL_ERROR *err; + DBUG_ENTER("evex_print_warnings"); + if (!thd->warn_list.elements) + DBUG_VOID_RETURN; + + char msg_buf[10 * STRING_BUFFER_USUAL_SIZE]; + char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE]; + String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info); + prefix.length(0); + prefix.append("SCHEDULER: ["); + + append_identifier(thd, &prefix, et->definer_user.str, et->definer_user.length); + prefix.append('@'); + append_identifier(thd, &prefix, et->definer_host.str, et->definer_host.length); + prefix.append("][", 2); + append_identifier(thd,&prefix, et->dbname.str, et->dbname.length); + prefix.append('.'); + append_identifier(thd,&prefix, et->name.str, et->name.length); + prefix.append("] ", 2); + + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + while ((err= it++)) + { + String err_msg(msg_buf, sizeof(msg_buf), system_charset_info); + /* set it to 0 or we start adding at the end. That's the trick ;) */ + err_msg.length(0); + err_msg.append(prefix); + err_msg.append(err->msg, strlen(err->msg), system_charset_info); + err_msg.append("]"); + DBUG_ASSERT(err->level < 3); + (sql_print_message_handlers[err->level])("%*s", err_msg.length(), + err_msg.c_ptr()); + } + DBUG_VOID_RETURN; +} + + +/* + Inits an scheduler thread handler, both the main and a worker + + SYNOPSIS + init_event_thread() + thd - the THD of the thread. Has to be allocated by the caller. + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + + RETURN VALUE + 0 OK + -1 Error +*/ + +static int +init_event_thread(THD** t, enum enum_thread_type thread_type) +{ + THD *thd= *t; + thd->thread_stack= (char*)t; // remember where our stack is + DBUG_ENTER("init_event_thread"); + thd->client_capabilities= 0; + thd->security_ctx->master_access= 0; + thd->security_ctx->db_access= 0; + thd->security_ctx->host_or_ip= (char*)my_localhost; + my_net_init(&thd->net, 0); + thd->net.read_timeout= slave_net_timeout; + thd->slave_thread= 0; + thd->options|= OPTION_AUTO_IS_NULL; + thd->client_capabilities|= CLIENT_MULTI_RESULTS; + thd->real_id=pthread_self(); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->thread_id= thread_id++; + threads.append(thd); + thread_count++; + thread_running++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + if (init_thr_lock() || thd->store_globals()) + { + thd->cleanup(); + DBUG_RETURN(-1); + } + +#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__) + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + /* + Guarantees that we will see the thread in SHOW PROCESSLIST though its + vio is NULL. + */ + thd->system_thread= thread_type; + + thd->proc_info= "Initialized"; + thd->version= refresh_version; + thd->set_time(); + + DBUG_RETURN(0); +} + + +/* + Inits the main scheduler thread and then calls Event_scheduler::run() + of arg. + + SYNOPSIS + event_scheduler_thread() + arg void* ptr to Event_scheduler + + NOTES + 1. The host of the thead is my_localhost + 2. thd->net is initted with NULL - no communication. + 3. The reason to have a proxy function is that it's not possible to + use a method as function to be executed in a spawned thread: + - our pthread_hander_t macro uses extern "C" + - separating thread setup from the real execution loop is also to be + considered good. + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_scheduler_thread(void *arg) +{ + /* needs to be first for thread_stack */ + THD *thd= NULL; + Event_scheduler *scheduler= (Event_scheduler *) arg; + + DBUG_ENTER("event_scheduler_thread"); + + my_thread_init(); + pthread_detach_this_thread(); + + /* note that constructor of THD uses DBUG_ ! */ + if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_SCHEDULER)) + { + sql_print_error("SCHEDULER: Cannot init manager event thread."); + scheduler->report_error_during_start(); + } + else + { + thd->security_ctx->set_user((char*)"event_scheduler"); + + sql_print_information("SCHEDULER: Manager thread booting"); + if (Event_scheduler::check_system_tables(thd)) + scheduler->report_error_during_start(); + else + scheduler->run(thd); + + /* + NOTE: Don't touch `scheduler` after this point because we have notified + the + thread which shuts us down that we have finished cleaning. In this + very moment a new scheduler thread could be started and a crash is + not welcome. + */ + } + + /* + If we cannot create THD then don't decrease because we haven't touched + thread_count and thread_running in init_event_thread() which was never + called. In init_event_thread() thread_count and thread_running are + always increased even in the case the method returns an error. + */ + if (thd) + { + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + thread_running--; + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + } + my_thread_end(); + DBUG_RETURN(0); // Can't return anything here +} + + +/* + Function that executes an event in a child thread. Setups the + environment for the event execution and cleans after that. + + SYNOPSIS + event_worker_thread() + arg The Event_timed object to be processed + + RETURN VALUE + 0 OK +*/ + +pthread_handler_t +event_worker_thread(void *arg) +{ + THD *thd; /* needs to be first for thread_stack */ + Worker_thread_param *param= (Worker_thread_param *) arg; + Event_timed *event= param->et; + MEM_ROOT worker_mem_root; + int ret; + bool startup_error= FALSE; + Security_context *save_ctx; + /* this one is local and not needed after exec */ + Security_context security_ctx; + + DBUG_ENTER("event_worker_thread"); + DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str)); + + my_thread_init(); + pthread_detach_this_thread(); + + if (!(thd= new THD) || init_event_thread(&thd, SYSTEM_THREAD_EVENT_WORKER)) + { + sql_print_error("SCHEDULER: Startup failure."); + startup_error= TRUE; + event->spawn_thread_finish(thd); + } + else + event->set_thread_id(thd->thread_id); + + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + /* + If we don't change it before we send the signal back, then an intermittent + DROP EVENT will take LOCK_scheduler_data and try to kill this thread, because + event->thread_id is already real. However, because thd->security_ctx->user + is not initialized then a crash occurs in kill_one_thread(). Thus, we have + to change the context before sending the signal. We are under + LOCK_scheduler_data being held by Event_scheduler::run() -> ::execute_top(). + */ + change_security_context(thd, event->definer_user, event->definer_host, + event->dbname, &security_ctx, &save_ctx); + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + + /* Signal the scheduler thread that we have started successfully */ + pthread_mutex_lock(¶m->LOCK_started); + param->started= TRUE; + pthread_cond_signal(¶m->COND_started); + pthread_mutex_unlock(¶m->LOCK_started); + + if (!startup_error) + { + uint flags; + + thd->init_for_queries(); + thd->enable_slow_log= TRUE; + + event->set_thread_id(thd->thread_id); + sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu", + event->dbname.str, event->name.str, + event->definer.str, thd->thread_id); + + ret= event->execute(thd, thd->mem_root); + evex_print_warnings(thd, event); + sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d", + event->dbname.str, event->name.str, + event->definer.str, ret); + if (ret == EVEX_COMPILE_ERROR) + sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", + event->dbname.str, event->name.str, + event->definer.str); + else if (ret == EVEX_MICROSECOND_UNSUP) + sql_print_information("SCHEDULER: MICROSECOND is not supported"); + + DBUG_PRINT("info", ("master_access=%d db_access=%d", + thd->security_ctx->master_access, thd->security_ctx->db_access)); + + /* If true is returned, we are expected to free it */ + if (event->spawn_thread_finish(thd)) + { + DBUG_PRINT("info", ("Freeing object pointer")); + delete event; + } + } + + if (thd) + { + thd->proc_info= "Clearing"; + DBUG_ASSERT(thd->net.buff != 0); + /* + Free it here because net.vio is NULL for us => THD::~THD will check it + and won't call net_end(&net); See also replication code. + */ + net_end(&thd->net); + DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id)); + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thread_count--; + thread_running--; + delete thd; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + } + + my_thread_end(); + DBUG_RETURN(0); // Can't return anything here +} + + +/* + Constructor of class Event_scheduler. + + SYNOPSIS + Event_scheduler::Event_scheduler() +*/ + +Event_scheduler::Event_scheduler() + :state(UNINITIALIZED), start_scheduler_suspended(FALSE), + thread_id(0), mutex_last_locked_at_line(0), + mutex_last_unlocked_at_line(0), mutex_last_locked_in_func(""), + mutex_last_unlocked_in_func(""), cond_waiting_on(COND_NONE), + mutex_scheduler_data_locked(FALSE) +{ +} + + +/* + Returns the singleton instance of the class. + + SYNOPSIS + Event_scheduler::get_instance() + + RETURN VALUE + address +*/ + +Event_scheduler* +Event_scheduler::get_instance() +{ + DBUG_ENTER("Event_scheduler::get_instance"); + DBUG_RETURN(&singleton); +} + + +/* + The implementation of full-fledged initialization. + + SYNOPSIS + Event_scheduler::init() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_scheduler::init() +{ + int i= 0; + bool ret= FALSE; + DBUG_ENTER("Event_scheduler::init"); + DBUG_PRINT("enter", ("this=%p", this)); + + LOCK_SCHEDULER_DATA(); + for (;i < COND_LAST; i++) + if (pthread_cond_init(&cond_vars[i], NULL)) + { + sql_print_error("SCHEDULER: Unable to initalize conditions"); + ret= TRUE; + goto end; + } + + /* init memory root */ + init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + + if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, + event_timed_compare_q, NULL, 30 /*auto_extent*/)) + { + sql_print_error("SCHEDULER: Can't initialize the execution queue"); + ret= TRUE; + goto end; + } + + if (sizeof(my_time_t) != sizeof(time_t)) + { + sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ." + "The scheduler may not work correctly. Stopping."); + DBUG_ASSERT(0); + ret= TRUE; + goto end; + } + + state= INITIALIZED; +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(ret); +} + + +/* + Frees all memory allocated by the scheduler object. + + SYNOPSIS + Event_scheduler::destroy() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +void +Event_scheduler::destroy() +{ + DBUG_ENTER("Event_scheduler"); + + LOCK_SCHEDULER_DATA(); + switch (state) { + case UNINITIALIZED: + break; + case INITIALIZED: + delete_queue(&queue); + free_root(&scheduler_root, MYF(0)); + int i; + for (i= 0; i < COND_LAST; i++) + pthread_cond_destroy(&cond_vars[i]); + state= UNINITIALIZED; + break; + default: + sql_print_error("SCHEDULER: Destroying while state is %d", state); + /* I trust my code but ::safe() > ::sorry() */ + DBUG_ASSERT(0); + break; + } + UNLOCK_SCHEDULER_DATA(); + + DBUG_VOID_RETURN; +} + + +/* + Adds an event to the scheduler queue + + SYNOPSIS + Event_scheduler::add_event() + et The event to add + check_existence Whether to check if already loaded. + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::add_event(THD *thd, Event_timed *et, bool check_existence) +{ + enum enum_error_code res; + Event_timed *et_new; + DBUG_ENTER("Event_scheduler::add_event"); + DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + if (check_existence && find_event(et, FALSE)) + { + res= OP_ALREADY_EXISTS; + goto end; + } + + /* We need to load the event on scheduler_root */ + if (!(res= load_event(thd, et, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + DBUG_PRINT("info", ("Sending COND_new_work")); + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(res); +} + + +/* + Drops an event from the scheduler queue + + SYNOPSIS + Event_scheduler::drop_event() + etn The event to drop + state Wait the event or kill&drop + + RETURN VALUE + FALSE OK (replaced or scheduler not working) + TRUE Failure +*/ + +bool +Event_scheduler::drop_event(THD *thd, Event_timed *et) +{ + int res; + Event_timed *et_old; + DBUG_ENTER("Event_scheduler::drop_event"); + DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et,&LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + + if (!(et_old= find_event(et, TRUE))) + DBUG_PRINT("info", ("No such event found, probably DISABLED")); + + UNLOCK_SCHEDULER_DATA(); + + /* See comments in ::replace_event() why this is split in two parts. */ + if (et_old) + { + switch ((res= et_old->kill_thread(thd))) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + sql_print_error("SCHEDULER: Got unexpected error %d", res); + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(FALSE); +} + + +/* + Replaces an event in the scheduler queue + + SYNOPSIS + Event_scheduler::replace_event() + et The event to replace(add) into the queue + state Async or sync stopping + + RETURN VALUE + OP_OK OK or scheduler not working + OP_LOAD_ERROR Error during loading from disk + OP_ALREADY_EXISTS Event already in the queue +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::replace_event(THD *thd, Event_timed *et, LEX_STRING *new_schema, + LEX_STRING *new_name) +{ + enum enum_error_code res; + Event_timed *et_old, *et_new= NULL; + LEX_STRING old_schema, old_name; + + DBUG_ENTER("Event_scheduler::replace_event"); + DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p", + thd, et, et->dbname.str, et->name.str, &LOCK_scheduler_data)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + DBUG_PRINT("info", ("scheduler not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); + } + + if (!(et_old= find_event(et, TRUE))) + DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED", + et->dbname.str, et->name.str)); + + if (new_schema && new_name) + { + old_schema= et->dbname; + old_name= et->name; + et->dbname= *new_schema; + et->name= *new_name; + } + /* + We need to load the event (it's strings but on the object itself) + on scheduler_root. et_new could be NULL : + 1. Error occured + 2. If the replace is DISABLED, we don't load it into the queue. + */ + if (!(res= load_event(thd, et, &et_new))) + { + queue_insert_safe(&queue, (byte *) et_new); + DBUG_PRINT("info", ("Sending COND_new_work")); + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else if (res == OP_DISABLED_EVENT) + res= OP_OK; + + if (new_schema && new_name) + { + et->dbname= old_schema; + et->name= old_name; + } + + UNLOCK_SCHEDULER_DATA(); + /* + Andrey: Is this comment still truthful ??? + + We don't move this code above because a potential kill_thread will call + THD::awake(). Which in turn will try to acqure mysys_var->current_mutex, + which is LOCK_scheduler_data on which the COND_new_work in ::run() locks. + Hence, we try to acquire a lock which we have already acquired and we run + into an assert. Holding LOCK_scheduler_data however is not needed because + we don't touch any invariant of the scheduler anymore. ::drop_event() does + the same. + */ + if (et_old) + { + switch (et_old->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et_old->flags |= EVENT_FREE_WHEN_FINISHED; + break; + case 0: + /* + kill_thread() waits till the spawned thread finishes after it's + killed. Hence, we delete here memory which is no more referenced from + a running thread. + */ + delete et_old; + /* + We don't signal COND_new_work here because: + 1. Even if the dropped event is on top of the queue this will not + move another one to be executed before the time the one on the + top (but could be at the same second as the dropped one) + 2. If this was the last event on the queue, then pthread_cond_timedwait + in ::run() will finish and then see that the queue is empty and + call cond_wait(). Hence, no need to interrupt the blocked + ::run() thread. + */ + break; + default: + DBUG_ASSERT(0); + } + } + + DBUG_RETURN(res); +} + + +/* + Searches for an event in the scheduler queue + + SYNOPSIS + Event_scheduler::find_event() + etn The event to find + comparator The function to use for comparing + remove_from_q If found whether to remove from the Q + + RETURN VALUE + NULL Not found + otherwise Address + + NOTE + The caller should do the locking also the caller is responsible for + actual signalling in case an event is removed from the queue + (signalling COND_new_work for instance). +*/ + +Event_timed * +Event_scheduler::find_event(Event_timed *etn, bool remove_from_q) +{ + uint i; + DBUG_ENTER("Event_scheduler::find_event"); + + for (i= 0; i < queue.elements; ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", etn->dbname.str, etn->name.str, + et->dbname.str, et->name.str)); + if (event_timed_identifier_equal(etn, et)) + { + if (remove_from_q) + queue_remove(&queue, i); + DBUG_RETURN(et); + } + } + + DBUG_RETURN(NULL); +} + + +/* + Drops all events from the in-memory queue and disk that match + certain pattern evaluated by a comparator function + + SYNOPSIS + Event_scheduler::drop_matching_events() + thd THD + pattern A pattern string + comparator The function to use for comparing + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events + + NOTE + Expected is the caller to acquire lock on LOCK_scheduler_data +*/ + +void +Event_scheduler::drop_matching_events(THD *thd, LEX_STRING *pattern, + bool (*comparator)(Event_timed *,LEX_STRING *)) +{ + DBUG_ENTER("Event_scheduler::drop_matching_events"); + DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern->length, pattern->str, + state)); + if (is_running_or_suspended()) + { + uint i= 0, dropped= 0; + while (i < queue.elements) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str)); + if (comparator(et, pattern)) + { + /* + The queue is ordered. If we remove an element, then all elements after + it will shift one position to the left, if we imagine it as an array + from left to the right. In this case we should not increment the + counter and the (i < queue.elements) condition is ok. + */ + queue_remove(&queue, i); + + /* See replace_event() */ + switch (et->kill_thread(thd)) { + case EVEX_CANT_KILL: + /* Don't delete but continue */ + et->flags |= EVENT_FREE_WHEN_FINISHED; + ++dropped; + break; + case 0: + delete et; + ++dropped; + break; + default: + DBUG_ASSERT(0); + } + } + else + i++; + } + DBUG_PRINT("info", ("Dropped %lu", dropped)); + } + /* + Don't send COND_new_work because no need to wake up the scheduler thread. + When it wakes next time up it will recalculate how much more it should + sleep if the top of the queue has been changed by this method. + */ + + DBUG_VOID_RETURN; +} + + +/* + Drops all events from the in-memory queue and disk that are from + certain schema. + + SYNOPSIS + Event_scheduler::drop_schema_events() + thd THD + db The schema name + + RETURN VALUE + -1 Scheduler not working + >=0 Number of dropped events +*/ + +int +Event_scheduler::drop_schema_events(THD *thd, LEX_STRING *schema) +{ + int ret; + DBUG_ENTER("Event_scheduler::drop_schema_events"); + LOCK_SCHEDULER_DATA(); + if (is_running_or_suspended()) + drop_matching_events(thd, schema, event_timed_db_equal); + + ret= db_drop_events_from_table(thd, schema); + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(ret); +} + + +extern pthread_attr_t connection_attrib; + + +/* + Starts the event scheduler + + SYNOPSIS + Event_scheduler::start() + + RETURN VALUE + FALSE OK + TRUE Error +*/ + +bool +Event_scheduler::start() +{ + bool ret= FALSE; + pthread_t th; + DBUG_ENTER("Event_scheduler::start"); + + LOCK_SCHEDULER_DATA(); + /* If already working or starting don't make another attempt */ + DBUG_ASSERT(state == INITIALIZED); + if (state > INITIALIZED) + { + DBUG_PRINT("info", ("scheduler is already running or starting")); + ret= TRUE; + goto end; + } + + /* + Now if another thread calls start it will bail-out because the branch + above will be executed. Thus no two or more child threads will be forked. + If the child thread cannot start for some reason then `state` is set + to CANTSTART and COND_started is also signaled. In this case we + set `state` back to INITIALIZED so another attempt to start the scheduler + can be made. + */ + state= COMMENCING; + /* Fork */ + if (pthread_create(&th, &connection_attrib, event_scheduler_thread, + (void*)this)) + { + DBUG_PRINT("error", ("cannot create a new thread")); + state= INITIALIZED; + ret= TRUE; + goto end; + } + + /* Wait till the child thread has booted (w/ or wo success) */ + while (!is_running_or_suspended() && state != CANTSTART) + cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + + /* + If we cannot start for some reason then don't prohibit further attempts. + Set back to INITIALIZED. + */ + if (state == CANTSTART) + { + state= INITIALIZED; + ret= TRUE; + goto end; + } + +end: + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(ret); +} + + +/* + Starts the event scheduler in suspended mode. + + SYNOPSIS + Event_scheduler::start_suspended() + + RETURN VALUE + TRUE OK + FALSE Error +*/ + +bool +Event_scheduler::start_suspended() +{ + DBUG_ENTER("Event_scheduler::start_suspended"); + start_scheduler_suspended= TRUE; + DBUG_RETURN(start()); +} + + + +/* + Report back that we cannot start. Used for ocasions where + we can't go into ::run() and have to report externally. + + SYNOPSIS + Event_scheduler::report_error_during_start() +*/ + +inline void +Event_scheduler::report_error_during_start() +{ + DBUG_ENTER("Event_scheduler::report_error_during_start"); + + LOCK_SCHEDULER_DATA(); + state= CANTSTART; + DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + UNLOCK_SCHEDULER_DATA(); + + DBUG_VOID_RETURN; +} + + +/* + The internal loop of the event scheduler + + SYNOPSIS + Event_scheduler::run() + thd Thread + + RETURN VALUE + FALSE OK + TRUE Failure +*/ + +bool +Event_scheduler::run(THD *thd) +{ + int ret; + struct timespec abstime; + DBUG_ENTER("Event_scheduler::run"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + LOCK_SCHEDULER_DATA(); + ret= load_events_from_db(thd); + + if (!ret) + { + thread_id= thd->thread_id; + state= start_scheduler_suspended? SUSPENDED:RUNNING; + start_scheduler_suspended= FALSE; + } + else + state= CANTSTART; + + DBUG_PRINT("info", ("Sending back COND_started_or_stopped")); + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + if (ret) + { + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(TRUE); + } + if (!check_n_suspend_if_needed(thd)) + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Manager thread started with id %lu", + thd->thread_id); + abstime.tv_nsec= 0; + while (is_running_or_suspended()) + { + TIME time_now_utc; + Event_timed *et; + my_bool tmp; + time_t now_utc; + + LOCK_SCHEDULER_DATA(); + if (check_n_wait_for_non_empty_queue(thd)) + continue; + + /* On TRUE data is unlocked, go back to the beginning */ + if (check_n_suspend_if_needed(thd)) + continue; + + /* Guaranteed locked here */ + if (state == IN_SHUTDOWN || shutdown_in_progress) + { + UNLOCK_SCHEDULER_DATA(); + break; + } + DBUG_ASSERT(state == RUNNING); + + et= (Event_timed *)queue_top(&queue); + + /* Skip disabled events */ + if (et->status != Event_timed::ENABLED) + { + sql_print_error("SCHEDULER: Found a disabled event %*s.%*s in the queue", + et->dbname.length, et->dbname.str, et->name.length, + et->name.str); + queue_remove(&queue, 0); + /* ToDo: check this again */ + delete et; + UNLOCK_SCHEDULER_DATA(); + continue; + } + thd->proc_info= (char *)"Computing"; + DBUG_PRINT("evex manager",("computing time to sleep till next exec")); + /* Timestamp is in UTC */ + abstime.tv_sec= sec_since_epoch_TIME(&et->execute_at); + + thd->end_time(); + if (abstime.tv_sec > thd->query_start()) + { + /* Event trigger time is in the future */ + thd->proc_info= (char *)"Sleep"; + DBUG_PRINT("info", ("Going to sleep. Should wakeup after approx %d secs", + abstime.tv_sec - thd->query_start())); + DBUG_PRINT("info", ("Entering condition because waiting for activation")); + /* + Use THD::enter_cond()/exit_cond() or we won't be able to kill a + sleeping thread. Though ::stop() can do it by sending COND_new_work + an user can't by just issuing 'KILL x'; . In the latter case + pthread_cond_timedwait() will wait till `abstime`. + "Sleeping until next time" + */ + thd->enter_cond(&cond_vars[COND_new_work],&LOCK_scheduler_data,"Sleeping"); + + pthread_cond_timedwait(&cond_vars[COND_new_work], &LOCK_scheduler_data, + &abstime); + + DBUG_PRINT("info", ("Manager woke up. state is %d", state)); + /* + If we get signal we should recalculate the whether it's the right time + because there could be : + 1. Spurious wake-up + 2. The top of the queue was changed (new one becase of add/drop/replace) + */ + /* This will do implicit UNLOCK_SCHEDULER_DATA() */ + thd->exit_cond(""); + } + else + { + thd->proc_info= (char *)"Executing"; + /* + Execute the event. An error may occur if a thread cannot be forked. + In this case stop the manager. + We should enter ::execute_top() with locked LOCK_scheduler_data. + */ + int ret= execute_top(thd); + UNLOCK_SCHEDULER_DATA(); + if (ret) + break; + } + } +end_loop: + thd->proc_info= (char *)"Cleaning"; + + LOCK_SCHEDULER_DATA(); + /* + It's possible that a user has used (SQL)COM_KILL. Hence set the appropriate + state because it is only set by ::stop(). + */ + if (state != IN_SHUTDOWN) + { + DBUG_PRINT("info", ("We got KILL but the but not from ::stop()")); + state= IN_SHUTDOWN; + } + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Shutting down"); + + thd->proc_info= (char *)"Cleaning queue"; + clean_queue(thd); + THD_CHECK_SENTRY(thd); + + /* free mamager_root memory but don't destroy the root */ + thd->proc_info= (char *)"Cleaning memory root"; + free_root(&scheduler_root, MYF(0)); + THD_CHECK_SENTRY(thd); + + /* + We notify the waiting thread which shutdowns us that we have cleaned. + There are few more instructions to be executed in this pthread but + they don't affect manager structures thus it's safe to signal already + at this point. + */ + LOCK_SCHEDULER_DATA(); + thd->proc_info= (char *)"Sending shutdown signal"; + DBUG_PRINT("info", ("Sending COND_started_or_stopped")); + if (state == IN_SHUTDOWN) + pthread_cond_signal(&cond_vars[COND_started_or_stopped]); + + state= INITIALIZED; + /* + We set it here because ::run() can stop not only because of ::stop() + call but also because of `KILL x` + */ + thread_id= 0; + sql_print_information("SCHEDULER: Stopped"); + UNLOCK_SCHEDULER_DATA(); + + /* We have modified, we set back */ + thd->query= NULL; + thd->query_length= 0; + + DBUG_RETURN(FALSE); +} + + +/* + Executes the top element of the queue. Auxiliary method for ::run(). + + SYNOPSIS + Event_scheduler::execute_top() + + RETURN VALUE + FALSE OK + TRUE Failure + + NOTE + NO locking is done. EXPECTED is that the caller should have locked + the queue (w/ LOCK_scheduler_data). +*/ + +bool +Event_scheduler::execute_top(THD *thd) +{ + int spawn_ret_code; + bool ret= FALSE; + DBUG_ENTER("Event_scheduler::execute_top"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + Event_timed *et= (Event_timed *)queue_top(&queue); + + /* Is it good idea to pass a stack address ?*/ + Worker_thread_param param(et); + + pthread_mutex_lock(¶m.LOCK_started); + /* + We don't lock LOCK_scheduler_data fpr workers_increment() because it's a + pre-requisite for calling the current_method. + */ + switch ((spawn_ret_code= et->spawn_now(event_worker_thread, ¶m))) { + case EVENT_EXEC_CANT_FORK: + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + sql_print_error("SCHEDULER: Problem while trying to create a thread"); + ret= TRUE; + break; + case EVENT_EXEC_ALREADY_EXEC: + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", + et->dbname.str, et->name.str); + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) + queue_remove(&queue, 0);// 0 is top, internally 1 + else + queue_replaced(&queue); + break; + default: + DBUG_ASSERT(!spawn_ret_code); + if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) + queue_remove(&queue, 0);// 0 is top, internally 1 + else + queue_replaced(&queue); + /* + We don't lock LOCK_scheduler_data here because it's a pre-requisite + for calling the current_method. + */ + if (likely(!spawn_ret_code)) + { + /* Wait the forked thread to start */ + do { + pthread_cond_wait(¶m.COND_started, ¶m.LOCK_started); + } while (!param.started); + } + /* + param was allocated on the stack so no explicit delete as well as + in this moment it's no more used in the spawned thread so it's safe + to be deleted. + */ + break; + } + pthread_mutex_unlock(¶m.LOCK_started); + /* `param` is on the stack and will be destructed by the compiler */ + + DBUG_RETURN(ret); +} + + +/* + Cleans the scheduler's queue. Auxiliary method for ::run(). + + SYNOPSIS + Event_scheduler::clean_queue() + thd Thread +*/ + +void +Event_scheduler::clean_queue(THD *thd) +{ + CHARSET_INFO *scs= system_charset_info; + uint i; + int ret; + DBUG_ENTER("Event_scheduler::clean_queue"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + LOCK_SCHEDULER_DATA(); + stop_all_running_events(thd); + UNLOCK_SCHEDULER_DATA(); + + sql_print_information("SCHEDULER: Emptying the queue"); + + /* empty the queue */ + for (i= 0; i < queue.elements; ++i) + { + Event_timed *et= (Event_timed *) queue_element(&queue, i); + et->free_sp(); + delete et; + } + resize_queue(&queue, 0); + + DBUG_VOID_RETURN; +} + + +/* + Stops all running events + + SYNOPSIS + Event_scheduler::stop_all_running_events() + thd Thread + + NOTE + LOCK_scheduler data must be acquired prior to call to this method +*/ + +void +Event_scheduler::stop_all_running_events(THD *thd) +{ + CHARSET_INFO *scs= system_charset_info; + uint i; + DYNAMIC_ARRAY running_threads; + THD *tmp; + DBUG_ENTER("Event_scheduler::stop_all_running_events"); + DBUG_PRINT("enter", ("workers_count=%d", workers_count())); + + my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10); + + bool had_super= FALSE; + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + push_dynamic(&running_threads, (gptr) &tmp->thread_id); + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + /* We need temporarily SUPER_ACL to be able to kill our offsprings */ + if (!(thd->security_ctx->master_access & SUPER_ACL)) + thd->security_ctx->master_access|= SUPER_ACL; + else + had_super= TRUE; + + char tmp_buff[10*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + + for (i= 0; i < running_threads.elements; ++i) + { + int ret; + ulong thd_id= *dynamic_element(&running_threads, i, ulong*); + + int_string.set((longlong) thd_id,scs); + tmp_string.append(int_string); + if (i < running_threads.elements - 1) + tmp_string.append(' '); + + if ((ret= kill_one_thread(thd, thd_id, FALSE))) + { + sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret); + break; + } + } + if (running_threads.elements) + sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr()); + + if (!had_super) + thd->security_ctx->master_access &= ~SUPER_ACL; + + delete_dynamic(&running_threads); + + sql_print_information("SCHEDULER: Waiting for worker threads to finish"); + + while (workers_count()) + my_sleep(100000); + + DBUG_VOID_RETURN; +} + + +/* + Stops the event scheduler + + SYNOPSIS + Event_scheduler::stop() + + RETURN VALUE + OP_OK OK + OP_CANT_KILL Error during stopping of manager thread + OP_NOT_RUNNING Manager not working + + NOTE + The caller must have acquited LOCK_scheduler_data. +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::stop() +{ + int ret; + THD *thd= current_thd; + DBUG_ENTER("Event_scheduler::stop"); + DBUG_PRINT("enter", ("thd=%p", current_thd)); + + LOCK_SCHEDULER_DATA(); + if (!is_running_or_suspended()) + { + /* + One situation to be here is if there was a start that forked a new + thread but the new thread did not acquire yet LOCK_scheduler_data. + Hence, in this case return an error. + */ + DBUG_PRINT("info", ("manager not running but %d. doing nothing", state)); + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_NOT_RUNNING); + } + state= IN_SHUTDOWN; + + DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); + sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); + + /* + Sending the COND_new_work to ::run() is a way to get this working without + race conditions. If we use kill_one_thread() it will call THD::awake() and + because in ::run() both THD::enter_cond()/::exit_cond() are used, + THD::awake() will try to lock LOCK_scheduler_data. If we UNLOCK it before, + then the pthread_cond_signal(COND_started_or_stopped) could be signaled in + ::run() and we can miss the signal before we relock. A way is to use + another mutex for this shutdown procedure but better not. + */ + pthread_cond_signal(&cond_vars[COND_new_work]); + /* Or we are suspended - then we should wake up */ + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + + /* Guarantee we don't catch spurious signals */ + sql_print_information("SCHEDULER: Waiting the manager thread to reply"); + while (state != INITIALIZED) + { + DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " + "thread. Current value of state is %d . " + "workers count=%d", state, workers_count())); + cond_wait(COND_started_or_stopped, &LOCK_scheduler_data); + } + DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(OP_OK); +} + + +/* + Suspends or resumes the scheduler. + SUSPEND - it won't execute any event till resumed. + RESUME - it will resume if suspended. + + SYNOPSIS + Event_scheduler::suspend_or_resume() + + RETURN VALUE + OP_OK OK +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::suspend_or_resume( + enum Event_scheduler::enum_suspend_or_resume action) +{ + enum enum_error_code ret; + DBUG_ENTER("Event_scheduler::suspend_or_resume"); + DBUG_PRINT("enter", ("action=%d", action)); + + LOCK_SCHEDULER_DATA(); + + if ((action == SUSPEND && state == SUSPENDED) || + (action == RESUME && state == RUNNING)) + { + DBUG_PRINT("info", ("Either trying to suspend suspended or resume " + "running scheduler. Doing nothing.")); + } + else + { + /* Wake the main thread up if he is asleep */ + DBUG_PRINT("info", ("Sending signal")); + if (action==SUSPEND) + { + state= SUSPENDED; + pthread_cond_signal(&cond_vars[COND_new_work]); + } + else + { + state= RUNNING; + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + } + DBUG_PRINT("info", ("Waiting on COND_suspend_or_resume")); + cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Got response")); + } + UNLOCK_SCHEDULER_DATA(); + DBUG_RETURN(OP_OK); +} + + +/* + Returns the number of executing events. + + SYNOPSIS + Event_scheduler::workers_count() +*/ + +uint +Event_scheduler::workers_count() +{ + THD *tmp; + uint count= 0; + + DBUG_ENTER("Event_scheduler::workers_count"); + VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (tmp->command == COM_DAEMON) + continue; + if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) + ++count; + } + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + DBUG_PRINT("exit", ("%d", count)); + DBUG_RETURN(count); +} + + +/* + Checks and suspends if needed + + SYNOPSIS + Event_scheduler::check_n_suspend_if_needed() + thd Thread + + RETURN VALUE + FALSE Not suspended, we haven't slept + TRUE We were suspended. LOCK_scheduler_data is unlocked. + + NOTE + The caller should have locked LOCK_scheduler_data! + The mutex will be unlocked in case this function returns TRUE +*/ + +bool +Event_scheduler::check_n_suspend_if_needed(THD *thd) +{ + bool was_suspended= FALSE; + DBUG_ENTER("Event_scheduler::check_n_suspend_if_needed"); + if (thd->killed && !shutdown_in_progress) + { + state= SUSPENDED; + thd->killed= THD::NOT_KILLED; + } + if (state == SUSPENDED) + { + thd->enter_cond(&cond_vars[COND_suspend_or_resume], &LOCK_scheduler_data, + "Suspended"); + /* Send back signal to the thread that asked us to suspend operations */ + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + sql_print_information("SCHEDULER: Suspending operations"); + was_suspended= TRUE; + } + while (state == SUSPENDED) + { + cond_wait(COND_suspend_or_resume, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Woke up after waiting on COND_suspend_or_resume")); + if (state != SUSPENDED) + { + pthread_cond_signal(&cond_vars[COND_suspend_or_resume]); + sql_print_information("SCHEDULER: Resuming operations"); + } + } + if (was_suspended) + { + if (queue.elements) + { + uint i; + DBUG_PRINT("info", ("We have to recompute the execution times")); + + for (i= 0; i < queue.elements; i++) + ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); + queue_fix(&queue); + } + /* This will implicitly unlock LOCK_scheduler_data */ + thd->exit_cond(""); + } + DBUG_RETURN(was_suspended); +} + + +/* + Checks for empty queue and waits till new element gets in + + SYNOPSIS + Event_scheduler::check_n_wait_for_non_empty_queue() + thd Thread + + RETURN VALUE + FALSE Did not wait - LOCK_scheduler_data still locked. + TRUE Waited - LOCK_scheduler_data unlocked. + + NOTE + The caller should have locked LOCK_scheduler_data! +*/ + +bool +Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) +{ + bool slept= FALSE; + DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); + DBUG_PRINT("enter", ("q.elements=%lu state=%s", + queue.elements, states_names[state])); + + if (!queue.elements) + thd->enter_cond(&cond_vars[COND_new_work], &LOCK_scheduler_data, + "Empty queue, sleeping"); + + /* Wait in a loop protecting against catching spurious signals */ + while (!queue.elements && state == RUNNING) + { + slept= TRUE; + DBUG_PRINT("info", ("Entering condition because of empty queue")); + cond_wait(COND_new_work, &LOCK_scheduler_data); + DBUG_PRINT("info", ("Manager woke up. Hope we have events now. state=%d", + state)); + /* + exit_cond does implicit mutex_UNLOCK, we needed it locked if + 1. we loop again + 2. end the current loop and start doing calculations + */ + } + if (slept) + thd->exit_cond(""); + + DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", + queue.elements, states_names[state], thd->killed)); + + DBUG_RETURN(slept); +} + + +/* + Wrapper for pthread_mutex_lock + + SYNOPSIS + Event_scheduler::lock_data() + mutex Mutex to lock + line The line number on which the lock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +inline int +Event_scheduler::lock_data(const char *func, uint line) +{ + int ret; + DBUG_ENTER("Event_scheduler::lock_mutex"); + DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", + &LOCK_scheduler_data, func, line)); + ret= pthread_mutex_lock(&LOCK_scheduler_data); + mutex_last_locked_in_func= func; + mutex_last_locked_at_line= line; + mutex_scheduler_data_locked= TRUE; + DBUG_RETURN(ret); +} + + +/* + Wrapper for pthread_mutex_unlock + + SYNOPSIS + Event_scheduler::unlock_data() + mutex Mutex to unlock + line The line number on which the unlock is done + + RETURN VALUE + Error code of pthread_mutex_lock() +*/ + +inline int +Event_scheduler::unlock_data(const char *func, uint line) +{ + DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); + DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", + &LOCK_scheduler_data, func, line)); + mutex_last_unlocked_at_line= line; + mutex_scheduler_data_locked= FALSE; + mutex_last_unlocked_in_func= func; + DBUG_RETURN(pthread_mutex_unlock(&LOCK_scheduler_data)); +} + + +/* + Wrapper for pthread_cond_wait + + SYNOPSIS + Event_scheduler::cond_wait() + cond Conditional to wait for + mutex Mutex of the conditional + + RETURN VALUE + Error code of pthread_cond_wait() +*/ + +inline int +Event_scheduler::cond_wait(enum Event_scheduler::enum_cond_vars cond, + pthread_mutex_t *mutex) +{ + int ret; + DBUG_ENTER("Event_scheduler::cond_wait"); + DBUG_PRINT("enter", ("cond=%s mutex=%p", cond_vars_names[cond], mutex)); + ret= pthread_cond_wait(&cond_vars[cond_waiting_on=cond], mutex); + cond_waiting_on= COND_NONE; + DBUG_RETURN(ret); +} + + +/* + Checks whether the scheduler is in a running or suspended state. + + SYNOPSIS + Event_scheduler::is_running_or_suspended() + + RETURN VALUE + TRUE Either running or suspended + FALSE IN_SHUTDOWN, not started, etc. +*/ + +inline bool +Event_scheduler::is_running_or_suspended() +{ + return (state == SUSPENDED || state == RUNNING); +} + + +/* + Returns the current state of the scheduler + + SYNOPSIS + Event_scheduler::get_state() +*/ + +enum Event_scheduler::enum_state +Event_scheduler::get_state() +{ + enum Event_scheduler::enum_state ret; + DBUG_ENTER("Event_scheduler::get_state"); + /* lock_data & unlock_data are not static */ + pthread_mutex_lock(&singleton.LOCK_scheduler_data); + ret= singleton.state; + pthread_mutex_unlock(&singleton.LOCK_scheduler_data); + DBUG_RETURN(ret); +} + + +/* + Returns whether the scheduler was initialized. + + SYNOPSIS + Event_scheduler::initialized() + + RETURN VALUE + FALSE Was not initialized so far + TRUE Was initialized +*/ + +bool +Event_scheduler::initialized() +{ + DBUG_ENTER("Event_scheduler::initialized"); + DBUG_RETURN(Event_scheduler::get_state() != UNINITIALIZED); +} + + +/* + Returns the number of elements in the queue + + SYNOPSIS + Event_scheduler::events_count() + + RETURN VALUE + 0 Number of Event_timed objects in the queue +*/ + +uint +Event_scheduler::events_count() +{ + uint n; + DBUG_ENTER("Event_scheduler::events_count"); + LOCK_SCHEDULER_DATA(); + n= queue.elements; + UNLOCK_SCHEDULER_DATA(); + + DBUG_RETURN(n); +} + + +/* + Looks for a named event in mysql.event and then loads it from + the table, compiles and inserts it into the cache. + + SYNOPSIS + Event_scheduler::load_event() + thd THD + etn The name of the event to load and compile on scheduler's root + etn_new The loaded event + + RETURN VALUE + NULL Error during compile or the event is non-enabled. + otherwise Address +*/ + +enum Event_scheduler::enum_error_code +Event_scheduler::load_event(THD *thd, Event_timed *etn, Event_timed **etn_new) +{ + int ret= 0; + MEM_ROOT *tmp_mem_root; + Event_timed *et_loaded= NULL; + Open_tables_state backup; + + DBUG_ENTER("Event_scheduler::load_and_compile_event"); + DBUG_PRINT("enter",("thd=%p name:%*s",thd, etn->name.length, etn->name.str)); + + thd->reset_n_backup_open_tables_state(&backup); + /* No need to use my_error() here because db_find_event() has done it */ + { + sp_name spn(etn->dbname, etn->name); + ret= db_find_event(thd, &spn, &etn->definer, &et_loaded, NULL, + &scheduler_root); + } + thd->restore_backup_open_tables_state(&backup); + /* In this case no memory was allocated so we don't need to clean */ + if (ret) + DBUG_RETURN(OP_LOAD_ERROR); + + if (et_loaded->status != Event_timed::ENABLED) + { + /* + We don't load non-enabled events. + In db_find_event() `et_new` was allocated on the heap and not on + scheduler_root therefore we delete it here. + */ + delete et_loaded; + DBUG_RETURN(OP_DISABLED_EVENT); + } + + et_loaded->compute_next_execution_time(); + *etn_new= et_loaded; + + DBUG_RETURN(OP_OK); +} + + +/* + Loads all ENABLED events from mysql.event into the prioritized + queue. Called during scheduler main thread initialization. Compiles + the events. Creates Event_timed instances for every ENABLED event + from mysql.event. + + SYNOPSIS + Event_scheduler::load_events_from_db() + thd - Thread context. Used for memory allocation in some cases. + + RETURN VALUE + 0 OK + !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, + EVEX_COMPILE_ERROR) - in all these cases mysql.event was + tampered. + + NOTES + Reports the error to the console +*/ + +int +Event_scheduler::load_events_from_db(THD *thd) +{ + TABLE *table; + READ_RECORD read_record_info; + int ret= -1; + uint count= 0; + bool clean_the_queue= FALSE; + /* Compile the events on this root but only for syntax check, then discard */ + MEM_ROOT boot_root; + + DBUG_ENTER("Event_scheduler::load_events_from_db"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + if (state > COMMENCING) + { + DBUG_ASSERT(0); + sql_print_error("SCHEDULER: Trying to load events while already running."); + DBUG_RETURN(EVEX_GENERAL_ERROR); + } + + if ((ret= Events::open_event_table(thd, TL_READ, &table))) + { + sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open."); + DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); + } + + init_alloc_root(&boot_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC); + init_read_record(&read_record_info, thd, table ,NULL,1,0); + while (!(read_record_info.read_record(&read_record_info))) + { + Event_timed *et; + if (!(et= new Event_timed)) + { + DBUG_PRINT("info", ("Out of memory")); + clean_the_queue= TRUE; + break; + } + DBUG_PRINT("info", ("Loading event from row.")); + + if ((ret= et->load_from_row(&scheduler_root, table))) + { + clean_the_queue= TRUE; + sql_print_error("SCHEDULER: Error while loading from mysql.event. " + "Table probably corrupted"); + break; + } + if (et->status != Event_timed::ENABLED) + { + DBUG_PRINT("info",("%s is disabled",et->name.str)); + delete et; + continue; + } + + DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); + + /* We load only on scheduler root just to check whether the body compiles */ + switch (ret= et->compile(thd, &boot_root)) { + case EVEX_MICROSECOND_UNSUP: + et->free_sp(); + sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " + "supported but found in mysql.event"); + goto end; + case EVEX_COMPILE_ERROR: + sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load.", + et->dbname.str, et->name.str); + goto end; + default: + /* Free it, it will be compiled again on the worker thread */ + et->free_sp(); + break; + } + + /* let's find when to be executed */ + if (et->compute_next_execution_time()) + { + sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." + " Skipping", et->dbname.str, et->name.str); + continue; + } + + DBUG_PRINT("load_events_from_db", ("Adding %p to the exec list.")); + queue_insert_safe(&queue, (byte *) et); + count++; + } +end: + end_read_record(&read_record_info); + free_root(&boot_root, MYF(0)); + + if (clean_the_queue) + { + for (count= 0; count < queue.elements; ++count) + queue_remove(&queue, 0); + ret= -1; + } + else + { + ret= 0; + sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); + } + + /* Force close to free memory */ + thd->version--; + + close_thread_tables(thd); + + DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); + DBUG_RETURN(ret); +} + + +/* + Opens mysql.db and mysql.user and checks whether: + 1. mysql.db has column Event_priv at column 20 (0 based); + 2. mysql.user has column Event_priv at column 29 (0 based); + + SYNOPSIS + Event_scheduler::check_system_tables() +*/ + +bool +Event_scheduler::check_system_tables(THD *thd) +{ + TABLE_LIST tables; + bool not_used; + Open_tables_state backup; + bool ret; + + DBUG_ENTER("Event_scheduler::check_system_tables"); + DBUG_PRINT("enter", ("thd=%p", thd)); + + thd->reset_n_backup_open_tables_state(&backup); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "db"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, + mysql_db_table_fields, &mysql_db_table_last_check, + ER_CANNOT_LOAD_FROM_TABLE); + close_thread_tables(thd); + } + if (ret) + DBUG_RETURN(TRUE); + + bzero((char*) &tables, sizeof(tables)); + tables.db= (char*) "mysql"; + tables.table_name= tables.alias= (char*) "user"; + tables.lock_type= TL_READ; + + if ((ret= simple_open_n_lock_tables(thd, &tables))) + sql_print_error("Cannot open mysql.db"); + else + { + if (tables.table->s->fields < 29 || + strncmp(tables.table->field[29]->field_name, + STRING_WITH_LEN("Event_priv"))) + { + sql_print_error("mysql.user has no `Event_priv` column at position 29"); + ret= TRUE; + } + close_thread_tables(thd); + } + + thd->restore_backup_open_tables_state(&backup); + + DBUG_RETURN(ret); +} + + +/* + Inits mutexes. + + SYNOPSIS + Event_scheduler::init_mutexes() +*/ + +void +Event_scheduler::init_mutexes() +{ + pthread_mutex_init(&singleton.LOCK_scheduler_data, MY_MUTEX_INIT_FAST); +} + + +/* + Destroys mutexes. + + SYNOPSIS + Event_scheduler::destroy_mutexes() +*/ + +void +Event_scheduler::destroy_mutexes() +{ + pthread_mutex_destroy(&singleton.LOCK_scheduler_data); +} + + +/* + Dumps some data about the internal status of the scheduler. + + SYNOPSIS + Event_scheduler::dump_internal_status() + thd THD + + RETURN VALUE + 0 OK + 1 Error +*/ + +int +Event_scheduler::dump_internal_status(THD *thd) +{ + DBUG_ENTER("dump_internal_status"); +#ifndef DBUG_OFF + CHARSET_INFO *scs= system_charset_info; + Protocol *protocol= thd->protocol; + List<Item> field_list; + int ret; + char tmp_buff[5*STRING_BUFFER_USUAL_SIZE]; + char int_buff[STRING_BUFFER_USUAL_SIZE]; + String tmp_string(tmp_buff, sizeof(tmp_buff), scs); + String int_string(int_buff, sizeof(int_buff), scs); + tmp_string.length(0); + int_string.length(0); + + field_list.push_back(new Item_empty_string("Name", 20)); + field_list.push_back(new Item_empty_string("Value",20)); + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) + DBUG_RETURN(1); + + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("state"), scs); + protocol->store(states_names[singleton.state].str, + states_names[singleton.state].length, + scs); + + ret= protocol->write(); + /* + If not initialized - don't show anything else. get_instance() + will otherwise implicitly initialize it. We don't want that. + */ + if (singleton.state >= INITIALIZED) + { + /* last locked at*/ + /* + The first thing to do, or get_instance() will overwrite the values. + mutex_last_locked_at_line / mutex_last_unlocked_at_line + */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("last locked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + singleton.mutex_last_locked_in_func, + singleton.mutex_last_locked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + + /* last unlocked at*/ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("last unlocked at"), scs); + tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s::%d", + singleton.mutex_last_unlocked_in_func, + singleton.mutex_last_unlocked_at_line)); + protocol->store(&tmp_string); + ret= protocol->write(); + + /* waiting on */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("waiting on condition"), scs); + tmp_string.length(scs->cset-> + snprintf(scs, (char*) tmp_string.ptr(), + tmp_string.alloced_length(), "%s", + (singleton.cond_waiting_on != COND_NONE) ? + cond_vars_names[singleton.cond_waiting_on]: + "NONE")); + protocol->store(&tmp_string); + ret= protocol->write(); + + Event_scheduler *scheduler= get_instance(); + + /* workers_count */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("workers_count"), scs); + int_string.set((longlong) scheduler->workers_count(), scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* queue.elements */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("queue.elements"), scs); + int_string.set((longlong) scheduler->queue.elements, scs); + protocol->store(&int_string); + ret= protocol->write(); + + /* scheduler_data_locked */ + protocol->prepare_for_resend(); + protocol->store(STRING_WITH_LEN("scheduler data locked"), scs); + int_string.set((longlong) scheduler->mutex_scheduler_data_locked, scs); + protocol->store(&int_string); + ret= protocol->write(); + } + send_eof(thd); +#endif + DBUG_RETURN(0); +} diff --git a/sql/event_scheduler.h b/sql/event_scheduler.h new file mode 100644 index 00000000000..e7bf4b633c5 --- /dev/null +++ b/sql/event_scheduler.h @@ -0,0 +1,254 @@ +#ifndef _EVENT_SCHEDULER_H_ +#define _EVENT_SCHEDULER_H_ +/* Copyright (C) 2004-2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +class THD; +typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*); + +int +events_init(); + +void +events_shutdown(); + + +class Event_scheduler +{ +public: + /* Return codes */ + enum enum_error_code + { + OP_OK= 0, + OP_NOT_RUNNING, + OP_CANT_KILL, + OP_CANT_INIT, + OP_DISABLED_EVENT, + OP_LOAD_ERROR, + OP_ALREADY_EXISTS + }; + + enum enum_state + { + UNINITIALIZED= 0, + INITIALIZED, + COMMENCING, + CANTSTART, + RUNNING, + SUSPENDED, + IN_SHUTDOWN + }; + + enum enum_suspend_or_resume + { + SUSPEND= 1, + RESUME= 2 + }; + + /* Singleton access */ + static Event_scheduler* + get_instance(); + + /* Methods for queue management follow */ + + enum enum_error_code + add_event(THD *thd, Event_timed *et, bool check_existence); + + bool + drop_event(THD *thd, Event_timed *et); + + enum enum_error_code + replace_event(THD *thd, Event_timed *et, LEX_STRING *new_schema, + LEX_STRING *new_name); + + int + drop_schema_events(THD *thd, LEX_STRING *schema); + + int + drop_user_events(THD *thd, LEX_STRING *definer, uint *dropped_num) + { DBUG_ASSERT(0); return 0;} + + uint + events_count(); + + /* State changing methods follow */ + + bool + start(); + + enum enum_error_code + stop(); + + bool + start_suspended(); + + bool + run(THD *thd); + + enum enum_error_code + suspend_or_resume(enum enum_suspend_or_resume action); + + bool + init(); + + void + destroy(); + + static void + init_mutexes(); + + static void + destroy_mutexes(); + + void + report_error_during_start(); + + /* Information retrieving methods follow */ + + enum enum_state + get_state(); + + bool + initialized(); + + static int + dump_internal_status(THD *thd); + + static bool + check_system_tables(THD *thd); + +private: + Event_timed * + find_event(Event_timed *etn, bool remove_from_q); + + uint + workers_count(); + + bool + is_running_or_suspended(); + + /* helper functions */ + bool + execute_top(THD *thd); + + void + clean_queue(THD *thd); + + void + stop_all_running_events(THD *thd); + + enum enum_error_code + load_event(THD *thd, Event_timed *etn, Event_timed **etn_new); + + int + load_events_from_db(THD *thd); + + void + drop_matching_events(THD *thd, LEX_STRING *pattern, + bool (*)(Event_timed *,LEX_STRING *)); + + bool + check_n_suspend_if_needed(THD *thd); + + bool + check_n_wait_for_non_empty_queue(THD *thd); + + /* Singleton DP is used */ + Event_scheduler(); + + enum enum_cond_vars + { + COND_NONE= -1, + /* + COND_new_work is a conditional used to signal that there is a change + of the queue that should inform the executor thread that new event should + be executed sooner than previously expected, because of add/replace event. + */ + COND_new_work= 0, + /* + COND_started is a conditional used to synchronize the thread in which + ::start() was called and the spawned thread. ::start() spawns a new thread + and then waits on COND_started but also checks when awaken that `state` is + either RUNNING or CANTSTART. Then it returns back. + */ + COND_started_or_stopped, + /* + Conditional used for signalling from the scheduler thread back to the + thread that calls ::suspend() or ::resume. Synchronizing the calls. + */ + COND_suspend_or_resume, + /* Must be always last */ + COND_LAST, + }; + + /* Singleton instance */ + static Event_scheduler singleton; + + /* This is the current status of the life-cycle of the manager. */ + enum enum_state state; + + /* Set to start the scheduler in suspended state */ + bool start_scheduler_suspended; + + /* + LOCK_scheduler_data is the mutex which protects the access to the + manager's queue as well as used when signalling COND_new_work, + COND_started and COND_shutdown. + */ + pthread_mutex_t LOCK_scheduler_data; + + /* + Holds the thread id of the executor thread or 0 if the executor is not + running. It is used by ::shutdown() to know which thread to kill with + kill_one_thread(). The latter wake ups a thread if it is waiting on a + conditional variable and sets thd->killed to non-zero. + */ + ulong thread_id; + + pthread_cond_t cond_vars[COND_LAST]; + static const char * const cond_vars_names[COND_LAST]; + + /* The MEM_ROOT of the object */ + MEM_ROOT scheduler_root; + + /* The sorted queue with the Event_timed objects */ + QUEUE queue; + + uint mutex_last_locked_at_line; + uint mutex_last_unlocked_at_line; + const char* mutex_last_locked_in_func; + const char* mutex_last_unlocked_in_func; + enum enum_cond_vars cond_waiting_on; + bool mutex_scheduler_data_locked; + + /* helper functions for working with mutexes & conditionals */ + int + lock_data(const char *func, uint line); + + int + unlock_data(const char *func, uint line); + + int + cond_wait(enum enum_cond_vars, pthread_mutex_t *mutex); + +private: + /* Prevent use of these */ + Event_scheduler(const Event_scheduler &); + void operator=(Event_scheduler &); +}; + +#endif /* _EVENT_SCHEDULER_H_ */ diff --git a/sql/event_timed.cc b/sql/event_timed.cc index 879f4d6a3c9..96ecb4dd024 100644 --- a/sql/event_timed.cc +++ b/sql/event_timed.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2004-2005 MySQL AB +/* Copyright (C) 2004-2006 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,7 +17,82 @@ #define MYSQL_LEX 1 #include "event_priv.h" #include "event.h" -#include "sp.h" +#include "sp_head.h" + + +/* + Constructor + + SYNOPSIS + Event_timed::Event_timed() +*/ + +Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0), + running(0), thread_id(0), status_changed(false), + last_executed_changed(false), expression(0), + created(0), modified(0), + on_completion(Event_timed::ON_COMPLETION_DROP), + status(Event_timed::ENABLED), sphead(0), + sql_mode(0), body_begin(0), dropped(false), + free_sphead_on_delete(true), flags(0) + +{ + pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST); + pthread_cond_init(&this->COND_finished, NULL); + init(); +} + + +/* + Destructor + + SYNOPSIS + Event_timed::~Event_timed() +*/ + +Event_timed::~Event_timed() +{ + deinit_mutexes(); + + if (free_sphead_on_delete) + free_sp(); +} + + +/* + Destructor + + SYNOPSIS + Event_timed::~deinit_mutexes() +*/ + +void +Event_timed::deinit_mutexes() +{ + pthread_mutex_destroy(&this->LOCK_running); + pthread_cond_destroy(&this->COND_finished); +} + + +/* + Checks whether the event is running + + SYNOPSIS + Event_timed::is_running() +*/ + +bool +Event_timed::is_running() +{ + bool ret; + + VOID(pthread_mutex_lock(&this->LOCK_running)); + ret= running; + VOID(pthread_mutex_unlock(&this->LOCK_running)); + + return ret; +} + /* Init all member variables @@ -238,7 +313,7 @@ Event_timed::init_execute_at(THD *thd, Item *expr) expr how much? new_interval what is the interval - RETURNS + RETURN VALUE 0 OK EVEX_PARSE_ERROR fix_fields failed EVEX_BAD_PARAMS Interval is not positive @@ -342,7 +417,7 @@ Event_timed::init_interval(THD *thd, Item *expr, interval_type new_interval) DATE_ADD(NOW(), INTERVAL 1 DAY) -- start tommorow at same time. - RETURNS + RETURN VALUE 0 OK EVEX_PARSE_ERROR fix_fields failed EVEX_BAD_PARAMS starts before now @@ -408,7 +483,7 @@ Event_timed::init_starts(THD *thd, Item *new_starts) DATE_ADD(NOW(), INTERVAL 1 DAY) -- end tommorow at same time. - RETURNS + RETURN VALUE 0 OK EVEX_PARSE_ERROR fix_fields failed ER_WRONG_VALUE starts distant date (after year 2037) @@ -492,6 +567,9 @@ Event_timed::init_comment(THD *thd, LEX_STRING *set_comment) SYNOPSIS Event_timed::init_definer() + + RETURN VALUE + 0 OK */ int @@ -534,6 +612,10 @@ Event_timed::init_definer(THD *thd) SYNOPSIS Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) + RETURN VALUE + 0 OK + EVEX_GET_FIELD_FAILED Error + NOTES This method is silent on errors and should behave like that. Callers should handle throwing of error messages. The reason is that the class @@ -555,29 +637,29 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) et= this; - if (table->s->fields != EVEX_FIELD_COUNT) + if (table->s->fields != Events::FIELD_COUNT) goto error; if ((et->dbname.str= get_field(mem_root, - table->field[EVEX_FIELD_DB])) == NULL) + table->field[Events::FIELD_DB])) == NULL) goto error; et->dbname.length= strlen(et->dbname.str); if ((et->name.str= get_field(mem_root, - table->field[EVEX_FIELD_NAME])) == NULL) + table->field[Events::FIELD_NAME])) == NULL) goto error; et->name.length= strlen(et->name.str); if ((et->body.str= get_field(mem_root, - table->field[EVEX_FIELD_BODY])) == NULL) + table->field[Events::FIELD_BODY])) == NULL) goto error; et->body.length= strlen(et->body.str); if ((et->definer.str= get_field(mem_root, - table->field[EVEX_FIELD_DEFINER])) == NullS) + table->field[Events::FIELD_DEFINER])) == NullS) goto error; et->definer.length= strlen(et->definer.str); @@ -594,69 +676,71 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) et->definer_host.str= strmake_root(mem_root, ptr + 1, len);/* 1:because of @*/ et->definer_host.length= len; - et->starts_null= table->field[EVEX_FIELD_STARTS]->is_null(); - res1= table->field[EVEX_FIELD_STARTS]->get_date(&et->starts,TIME_NO_ZERO_DATE); + et->starts_null= table->field[Events::FIELD_STARTS]->is_null(); + res1= table->field[Events::FIELD_STARTS]-> + get_date(&et->starts,TIME_NO_ZERO_DATE); - et->ends_null= table->field[EVEX_FIELD_ENDS]->is_null(); - res2= table->field[EVEX_FIELD_ENDS]->get_date(&et->ends, TIME_NO_ZERO_DATE); + et->ends_null= table->field[Events::FIELD_ENDS]->is_null(); + res2= table->field[Events::FIELD_ENDS]->get_date(&et->ends, TIME_NO_ZERO_DATE); - if (!table->field[EVEX_FIELD_INTERVAL_EXPR]->is_null()) - et->expression= table->field[EVEX_FIELD_INTERVAL_EXPR]->val_int(); + if (!table->field[Events::FIELD_INTERVAL_EXPR]->is_null()) + et->expression= table->field[Events::FIELD_INTERVAL_EXPR]->val_int(); else et->expression= 0; /* If res1 and res2 are true then both fields are empty. - Hence if EVEX_FIELD_EXECUTE_AT is empty there is an error. + Hence if Events::FIELD_EXECUTE_AT is empty there is an error. */ - et->execute_at_null= table->field[EVEX_FIELD_EXECUTE_AT]->is_null(); + et->execute_at_null= + table->field[Events::FIELD_EXECUTE_AT]->is_null(); DBUG_ASSERT(!(et->starts_null && et->ends_null && !et->expression && et->execute_at_null)); if (!et->expression && - table->field[EVEX_FIELD_EXECUTE_AT]->get_date(&et->execute_at, - TIME_NO_ZERO_DATE)) + table->field[Events::FIELD_EXECUTE_AT]-> get_date(&et->execute_at, + TIME_NO_ZERO_DATE)) goto error; /* In DB the values start from 1 but enum interval_type starts from 0 */ - if (!table->field[EVEX_FIELD_TRANSIENT_INTERVAL]->is_null()) - et->interval= (interval_type) - ((ulonglong) table->field[EVEX_FIELD_TRANSIENT_INTERVAL]->val_int() - 1); + if (!table->field[Events::FIELD_TRANSIENT_INTERVAL]->is_null()) + et->interval= (interval_type) ((ulonglong) + table->field[Events::FIELD_TRANSIENT_INTERVAL]->val_int() - 1); else et->interval= (interval_type) 0; - et->created= table->field[EVEX_FIELD_CREATED]->val_int(); - et->modified= table->field[EVEX_FIELD_MODIFIED]->val_int(); + et->created= table->field[Events::FIELD_CREATED]->val_int(); + et->modified= table->field[Events::FIELD_MODIFIED]->val_int(); - table->field[EVEX_FIELD_LAST_EXECUTED]-> + table->field[Events::FIELD_LAST_EXECUTED]-> get_date(&et->last_executed, TIME_NO_ZERO_DATE); last_executed_changed= false; /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ - if ((ptr= get_field(mem_root, table->field[EVEX_FIELD_STATUS])) == NullS) + if ((ptr= get_field(mem_root, table->field[Events::FIELD_STATUS])) == NullS) goto error; DBUG_PRINT("load_from_row", ("Event [%s] is [%s]", et->name.str, ptr)); - et->status= (ptr[0]=='E'? MYSQL_EVENT_ENABLED:MYSQL_EVENT_DISABLED); + et->status= (ptr[0]=='E'? Event_timed::ENABLED:Event_timed::DISABLED); /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ if ((ptr= get_field(mem_root, - table->field[EVEX_FIELD_ON_COMPLETION])) == NullS) + table->field[Events::FIELD_ON_COMPLETION])) == NullS) goto error; - et->on_completion= (ptr[0]=='D'? MYSQL_EVENT_ON_COMPLETION_DROP: - MYSQL_EVENT_ON_COMPLETION_PRESERVE); + et->on_completion= (ptr[0]=='D'? Event_timed::ON_COMPLETION_DROP: + Event_timed::ON_COMPLETION_PRESERVE); - et->comment.str= get_field(mem_root, table->field[EVEX_FIELD_COMMENT]); + et->comment.str= get_field(mem_root, table->field[Events::FIELD_COMMENT]); if (et->comment.str != NullS) et->comment.length= strlen(et->comment.str); else et->comment.length= 0; - et->sql_mode= (ulong) table->field[EVEX_FIELD_SQL_MODE]->val_int(); + et->sql_mode= (ulong) table->field[Events::FIELD_SQL_MODE]->val_int(); DBUG_RETURN(0); error: @@ -676,7 +760,7 @@ error: i_value quantity of time type interval to add i_type type of interval to add (SECOND, MINUTE, HOUR, WEEK ...) - RETURNS + RETURN VALUE 0 OK 1 Error @@ -834,6 +918,10 @@ done: SYNOPSIS Event_timed::compute_next_execution_time() + RETURN VALUE + FALSE OK + TRUE Error + NOTES The time is set in execute_at, if no more executions the latter is set to 0000-00-00. @@ -852,7 +940,7 @@ Event_timed::compute_next_execution_time() TIME_to_ulonglong_datetime(&ends), TIME_to_ulonglong_datetime(&last_executed))); - if (status == MYSQL_EVENT_DISABLED) + if (status == Event_timed::DISABLED) { DBUG_PRINT("compute_next_execution_time", ("Event %s is DISABLED", name.str)); @@ -866,14 +954,15 @@ Event_timed::compute_next_execution_time() { DBUG_PRINT("info",("One-time event %s.%s of was already executed", dbname.str, name.str, definer.str)); - dropped= (on_completion == MYSQL_EVENT_ON_COMPLETION_DROP); + dropped= (on_completion == Event_timed::ON_COMPLETION_DROP); DBUG_PRINT("info",("One-time event will be dropped=%d.", dropped)); - status= MYSQL_EVENT_DISABLED; + status= Event_timed::DISABLED; status_changed= true; } goto ret; } + current_thd->end_time(); my_tz_UTC->gmt_sec_to_TIME(&time_now, current_thd->query_start()); DBUG_PRINT("info",("NOW=[%llu]", TIME_to_ulonglong_datetime(&time_now))); @@ -885,9 +974,9 @@ Event_timed::compute_next_execution_time() /* time_now is after ends. don't execute anymore */ set_zero_time(&execute_at, MYSQL_TIMESTAMP_DATETIME); execute_at_null= TRUE; - if (on_completion == MYSQL_EVENT_ON_COMPLETION_DROP) + if (on_completion == Event_timed::ON_COMPLETION_DROP) dropped= true; - status= MYSQL_EVENT_DISABLED; + status= Event_timed::DISABLED; status_changed= true; goto ret; @@ -937,7 +1026,6 @@ Event_timed::compute_next_execution_time() { TIME next_exec; - DBUG_PRINT("info", ("Executed at least once")); if (get_next_time(&next_exec, &starts, &time_now, last_executed.year? &last_executed:&starts, expression, interval)) @@ -946,12 +1034,15 @@ Event_timed::compute_next_execution_time() /* There was previous execution */ if (my_time_compare(&ends, &next_exec) == -1) { - DBUG_PRINT("info", ("Next execution after ENDS. Stop executing.")); + DBUG_PRINT("info", ("Next execution of %s after ENDS. Stop executing.", + name.str)); /* Next execution after ends. No more executions */ set_zero_time(&execute_at, MYSQL_TIMESTAMP_DATETIME); execute_at_null= TRUE; - if (on_completion == MYSQL_EVENT_ON_COMPLETION_DROP) + if (on_completion == Event_timed::ON_COMPLETION_DROP) dropped= true; + status= Event_timed::DISABLED; + status_changed= true; } else { @@ -1006,7 +1097,6 @@ Event_timed::compute_next_execution_time() { TIME next_exec; - DBUG_PRINT("info", ("Executed at least once.")); if (get_next_time(&next_exec, &starts, &time_now, last_executed.year? &last_executed:&starts, expression, interval)) @@ -1042,7 +1132,9 @@ Event_timed::compute_next_execution_time() DBUG_PRINT("info", ("Next execution after ENDS. Stop executing.")); set_zero_time(&execute_at, MYSQL_TIMESTAMP_DATETIME); execute_at_null= TRUE; - if (on_completion == MYSQL_EVENT_ON_COMPLETION_DROP) + status= Event_timed::DISABLED; + status_changed= true; + if (on_completion == Event_timed::ON_COMPLETION_DROP) dropped= true; } else @@ -1083,9 +1175,6 @@ Event_timed::mark_last_executed(THD *thd) my_tz_UTC->gmt_sec_to_TIME(&time_now, (my_time_t) thd->query_start()); last_executed= time_now; /* was execute_at */ -#ifdef ANDREY_0 - last_executed= execute_at; -#endif last_executed_changed= true; } @@ -1125,7 +1214,7 @@ Event_timed::drop(THD *thd) RETURN VALUE 0 OK - SP_OPEN_TABLE_FAILED Error while opening mysql.event for writing + EVEX_OPEN_TABLE_FAILED Error while opening mysql.event for writing EVEX_WRITE_ROW_FAILED On error to write to disk others return code from SE in case deletion of the event @@ -1149,9 +1238,9 @@ Event_timed::update_fields(THD *thd) thd->reset_n_backup_open_tables_state(&backup); - if (evex_open_event_table(thd, TL_WRITE, &table)) + if (Events::open_event_table(thd, TL_WRITE, &table)) { - ret= SP_OPEN_TABLE_FAILED; + ret= EVEX_OPEN_TABLE_FAILED; goto done; } @@ -1165,15 +1254,15 @@ Event_timed::update_fields(THD *thd) if (last_executed_changed) { - table->field[EVEX_FIELD_LAST_EXECUTED]->set_notnull(); - table->field[EVEX_FIELD_LAST_EXECUTED]->store_time(&last_executed, - MYSQL_TIMESTAMP_DATETIME); + table->field[Events::FIELD_LAST_EXECUTED]->set_notnull(); + table->field[Events::FIELD_LAST_EXECUTED]->store_time(&last_executed, + MYSQL_TIMESTAMP_DATETIME); last_executed_changed= false; } if (status_changed) { - table->field[EVEX_FIELD_STATUS]->set_notnull(); - table->field[EVEX_FIELD_STATUS]->store((longlong)status, true); + table->field[Events::FIELD_STATUS]->set_notnull(); + table->field[Events::FIELD_STATUS]->store((longlong)status, true); status_changed= false; } @@ -1215,8 +1304,8 @@ Event_timed::get_create_event(THD *thd, String *buf) DBUG_ENTER("get_create_event"); DBUG_PRINT("ret_info",("body_len=[%d]body=[%s]", body.length, body.str)); - if (expression && - event_reconstruct_interval_expression(&expr_buf, interval, expression)) + if (expression && Events::reconstruct_interval_expression(&expr_buf, interval, + expression)) DBUG_RETURN(EVEX_MICROSECOND_UNSUP); buf->append(STRING_WITH_LEN("CREATE EVENT ")); @@ -1243,12 +1332,12 @@ Event_timed::get_create_event(THD *thd, String *buf) buf->append(STRING_WITH_LEN("'")); } - if (on_completion == MYSQL_EVENT_ON_COMPLETION_DROP) + if (on_completion == Event_timed::ON_COMPLETION_DROP) buf->append(STRING_WITH_LEN(" ON COMPLETION NOT PRESERVE ")); else buf->append(STRING_WITH_LEN(" ON COMPLETION PRESERVE ")); - if (status == MYSQL_EVENT_ENABLED) + if (status == Event_timed::ENABLED) buf->append(STRING_WITH_LEN("ENABLE")); else buf->append(STRING_WITH_LEN("DISABLE")); @@ -1273,7 +1362,7 @@ Event_timed::get_create_event(THD *thd, String *buf) thd THD mem_root If != NULL use it to compile the event on it - RETURNS + RETURN VALUE 0 success -99 No rights on this.dbname.str -100 event in execution (parallel execution is impossible) @@ -1301,12 +1390,6 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) running= true; VOID(pthread_mutex_unlock(&this->LOCK_running)); - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); - change_security_context(thd, &security_ctx, &save_ctx); - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); - if (!sphead && (ret= compile(thd, mem_root))) goto done; /* Now we are sure we have valid this->sphead so we can copy the context */ @@ -1334,12 +1417,11 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) definer_host.str, dbname.str)); ret= -99; } - restore_security_context(thd, save_ctx); - DBUG_PRINT("info", ("master_access=%d db_access=%d", - thd->security_ctx->master_access, thd->security_ctx->db_access)); VOID(pthread_mutex_lock(&this->LOCK_running)); running= false; + /* Will compile every time a new sp_head on different root */ + free_sp(); VOID(pthread_mutex_unlock(&this->LOCK_running)); done: @@ -1361,55 +1443,16 @@ done: /* - Switches the security context - Synopsis - Event_timed::change_security_context() - thd - thread - backup - where to store the old context - - RETURN - 0 - OK - 1 - Error (generates error too) -*/ -bool -Event_timed::change_security_context(THD *thd, Security_context *s_ctx, - Security_context **backup) -{ - DBUG_ENTER("Event_timed::change_security_context"); - DBUG_PRINT("info",("%s@%s@%s",definer_user.str,definer_host.str, dbname.str)); -#ifndef NO_EMBEDDED_ACCESS_CHECKS - s_ctx->init(); - *backup= 0; - if (acl_getroot_no_password(s_ctx, definer_user.str, definer_host.str, - definer_host.str, dbname.str)) - { - my_error(ER_NO_SUCH_USER, MYF(0), definer_user.str, definer_host.str); - DBUG_RETURN(true); - } - *backup= thd->security_ctx; - thd->security_ctx= s_ctx; -#endif - DBUG_RETURN(false); -} - - -/* - Restores the security context - Synopsis - Event_timed::restore_security_context() - thd - thread - backup - switch to this context + Frees the memory of the sp_head object we hold + SYNOPSIS + Event_timed::free_sp() */ void -Event_timed::restore_security_context(THD *thd, Security_context *backup) +Event_timed::free_sp() { - DBUG_ENTER("Event_timed::restore_security_context"); -#ifndef NO_EMBEDDED_ACCESS_CHECKS - if (backup) - thd->security_ctx= backup; -#endif - DBUG_VOID_RETURN; + delete sphead; + sphead= 0; } @@ -1445,6 +1488,9 @@ Event_timed::compile(THD *thd, MEM_ROOT *mem_root) CHARSET_INFO *old_character_set_client, *old_collation_connection, *old_character_set_results; + Security_context *save_ctx; + /* this one is local and not needed after exec */ + Security_context security_ctx; DBUG_ENTER("Event_timed::compile"); @@ -1488,8 +1534,10 @@ Event_timed::compile(THD *thd, MEM_ROOT *mem_root) thd->query= show_create.c_ptr(); thd->query_length= show_create.length(); - DBUG_PRINT("Event_timed::compile", ("query:%s",thd->query)); + DBUG_PRINT("info", ("query:%s",thd->query)); + change_security_context(thd, definer_user, definer_host, dbname, + &security_ctx, &save_ctx); thd->lex= &lex; lex_start(thd, (uchar*)thd->query, thd->query_length); lex.et_compile_phase= TRUE; @@ -1527,6 +1575,7 @@ done: lex.et->deinit_mutexes(); lex_end(&lex); + restore_security_context(thd, save_ctx); DBUG_PRINT("note", ("return old data on its place. set back NAMES")); thd->lex= old_lex; @@ -1548,72 +1597,63 @@ done: } -/* - Checks whether this thread can lock the object for modification -> - preventing being spawned for execution, and locks if possible. - use ::can_spawn_now() only for basic checking because a race - condition may occur between the check and eventual modification (deletion) - of the object. - - Returns - true - locked - false - cannot lock -*/ - -my_bool -Event_timed::can_spawn_now_n_lock(THD *thd) -{ - my_bool ret= FALSE; - VOID(pthread_mutex_lock(&this->LOCK_running)); - if (!in_spawned_thread) - { - in_spawned_thread= TRUE; - ret= TRUE; - locked_by_thread_id= thd->thread_id; - } - VOID(pthread_mutex_unlock(&this->LOCK_running)); - return ret; -} - - extern pthread_attr_t connection_attrib; /* Checks whether is possible and forks a thread. Passes self as argument. - Returns - EVENT_EXEC_STARTED - OK - EVENT_EXEC_ALREADY_EXEC - Thread not forked, already working - EVENT_EXEC_CANT_FORK - Unable to spawn thread (error) + RETURN VALUE + EVENT_EXEC_STARTED OK + EVENT_EXEC_ALREADY_EXEC Thread not forked, already working + EVENT_EXEC_CANT_FORK Unable to spawn thread (error) */ int -Event_timed::spawn_now(void * (*thread_func)(void*)) +Event_timed::spawn_now(void * (*thread_func)(void*), void *arg) { + THD *thd= current_thd; int ret= EVENT_EXEC_STARTED; - static uint exec_num= 0; DBUG_ENTER("Event_timed::spawn_now"); - DBUG_PRINT("info", ("this=0x%lx", this)); DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str)); VOID(pthread_mutex_lock(&this->LOCK_running)); + + DBUG_PRINT("info", ("SCHEDULER: execute_at of %s is %lld", name.str, + TIME_to_ulonglong_datetime(&execute_at))); + mark_last_executed(thd); + if (compute_next_execution_time()) + { + sql_print_error("SCHEDULER: Error while computing time of %s.%s . " + "Disabling after execution.", dbname.str, name.str); + status= DISABLED; + } + DBUG_PRINT("evex manager", ("[%10s] next exec at [%llu]", name.str, + TIME_to_ulonglong_datetime(&execute_at))); + /* + 1. For one-time event : year is > 0 and expression is 0 + 2. For recurring, expression is != -=> check execute_at_null in this case + */ + if ((execute_at.year && !expression) || execute_at_null) + { + sql_print_information("SCHEDULER: [%s.%s of %s] no more executions " + "after this one", dbname.str, name.str, + definer.str); + flags |= EVENT_EXEC_NO_MORE | EVENT_FREE_WHEN_FINISHED; + } + + update_fields(thd); + if (!in_spawned_thread) { pthread_t th; in_spawned_thread= true; - if (pthread_create(&th, &connection_attrib, thread_func, (void*)this)) + + if (pthread_create(&th, &connection_attrib, thread_func, arg)) { DBUG_PRINT("info", ("problem while spawning thread")); ret= EVENT_EXEC_CANT_FORK; in_spawned_thread= false; } -#ifndef DBUG_OFF - else - { - sql_print_information("SCHEDULER: Started thread %d", ++exec_num); - DBUG_PRINT("info", ("thread spawned")); - } -#endif } else { @@ -1626,55 +1666,207 @@ Event_timed::spawn_now(void * (*thread_func)(void*)) } -void +bool Event_timed::spawn_thread_finish(THD *thd) { + bool should_free; DBUG_ENTER("Event_timed::spawn_thread_finish"); - VOID(pthread_mutex_lock(&this->LOCK_running)); + VOID(pthread_mutex_lock(&LOCK_running)); in_spawned_thread= false; - if ((flags & EVENT_EXEC_NO_MORE) || status == MYSQL_EVENT_DISABLED) + DBUG_PRINT("info", ("Sending COND_finished for thread %d", thread_id)); + thread_id= 0; + if (dropped) + drop(thd); + pthread_cond_broadcast(&COND_finished); + should_free= flags & EVENT_FREE_WHEN_FINISHED; + VOID(pthread_mutex_unlock(&LOCK_running)); + DBUG_RETURN(should_free); +} + + +/* + Kills a running event + SYNOPSIS + Event_timed::kill_thread() + + RETURN VALUE + 0 OK + -1 EVEX_CANT_KILL + !0 Error +*/ + +int +Event_timed::kill_thread(THD *thd) +{ + int ret= 0; + DBUG_ENTER("Event_timed::kill_thread"); + pthread_mutex_lock(&LOCK_running); + DBUG_PRINT("info", ("thread_id=%lu", thread_id)); + + if (thread_id == thd->thread_id) { - DBUG_PRINT("info", ("%s exec no more. to drop=%d", name.str, dropped)); - if (dropped) - drop(thd); - VOID(pthread_mutex_unlock(&this->LOCK_running)); - delete this; - DBUG_VOID_RETURN; + /* + We don't kill ourselves in cases like : + alter event e_43 do alter event e_43 do set @a = 4 because + we will never receive COND_finished. + */ + DBUG_PRINT("info", ("It's not safe to kill ourselves in self altering queries")); + ret= EVEX_CANT_KILL; } - VOID(pthread_mutex_unlock(&this->LOCK_running)); - DBUG_VOID_RETURN; + else if (thread_id && !(ret= kill_one_thread(thd, thread_id, false))) + { + thd->enter_cond(&COND_finished, &LOCK_running, "Waiting for finished"); + DBUG_PRINT("info", ("Waiting for COND_finished from thread %d", thread_id)); + while (thread_id) + pthread_cond_wait(&COND_finished, &LOCK_running); + + DBUG_PRINT("info", ("Got COND_finished")); + /* This will implicitly unlock LOCK_running. Hence we return before that */ + thd->exit_cond(""); + + DBUG_RETURN(0); + } + else if (!thread_id && in_spawned_thread) + { + /* + Because the manager thread waits for the forked thread to update thread_id + this situation is impossible. + */ + DBUG_ASSERT(0); + } + pthread_mutex_unlock(&LOCK_running); + DBUG_PRINT("exit", ("%d", ret)); + DBUG_RETURN(ret); +} + + +/* + Checks whether two events have the same name + + SYNOPSIS + event_timed_name_equal() + + RETURN VALUE + TRUE names are equal + FALSE names are not equal +*/ + +bool +event_timed_name_equal(Event_timed *et, LEX_STRING *name) +{ + return !sortcmp_lex_string(et->name, *name, system_charset_info); } /* - Unlocks the object after it has been locked with ::can_spawn_now_n_lock() + Checks whether two events are in the same schema + + SYNOPSIS + event_timed_db_equal() + + RETURN VALUE + TRUE schemas are equal + FALSE schemas are not equal +*/ + +bool +event_timed_db_equal(Event_timed *et, LEX_STRING *db) +{ + return !sortcmp_lex_string(et->dbname, *db, system_charset_info); +} + + +/* + Checks whether two events have the same definer + + SYNOPSIS + event_timed_definer_equal() Returns - 0 - ok - 1 - not locked by this thread + TRUE definers are equal + FALSE definers are not equal */ -int -Event_timed::spawn_unlock(THD *thd) +bool +event_timed_definer_equal(Event_timed *et, LEX_STRING *definer) { - int ret= 0; - VOID(pthread_mutex_lock(&this->LOCK_running)); - if (!in_spawned_thread) + return !sortcmp_lex_string(et->definer, *definer, system_charset_info); +} + + +/* + Checks whether two events are equal by identifiers + + SYNOPSIS + event_timed_identifier_equal() + + RETURN VALUE + TRUE equal + FALSE not equal +*/ + +bool +event_timed_identifier_equal(Event_timed *a, Event_timed *b) +{ + return event_timed_name_equal(a, &b->name) && + event_timed_db_equal(a, &b->dbname) && + event_timed_definer_equal(a, &b->definer); +} + + +/* + Switches the security context + SYNOPSIS + change_security_context() + thd Thread + user The user + host The host of the user + db The schema for which the security_ctx will be loaded + s_ctx Security context to load state into + backup Where to store the old context + + RETURN VALUE + 0 - OK + 1 - Error (generates error too) +*/ + +bool +change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, + LEX_STRING db, Security_context *s_ctx, + Security_context **backup) +{ + DBUG_ENTER("change_security_context"); + DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str)); +#ifndef NO_EMBEDDED_ACCESS_CHECKS + s_ctx->init(); + *backup= 0; + if (acl_getroot_no_password(s_ctx, user.str, host.str, host.str, db.str)) { - if (locked_by_thread_id == thd->thread_id) - { - in_spawned_thread= FALSE; - locked_by_thread_id= 0; - } - else - { - sql_print_error("A thread tries to unlock when he hasn't locked. " - "thread_id=%ld locked by %ld", - thd->thread_id, locked_by_thread_id); - DBUG_ASSERT(0); - ret= 1; - } + my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str); + DBUG_RETURN(TRUE); } - VOID(pthread_mutex_unlock(&this->LOCK_running)); - return ret; + *backup= thd->security_ctx; + thd->security_ctx= s_ctx; +#endif + DBUG_RETURN(FALSE); +} + + +/* + Restores the security context + SYNOPSIS + restore_security_context() + thd - thread + backup - switch to this context +*/ + +void +restore_security_context(THD *thd, Security_context *backup) +{ + DBUG_ENTER("restore_security_context"); +#ifndef NO_EMBEDDED_ACCESS_CHECKS + if (backup) + thd->security_ctx= backup; +#endif + DBUG_VOID_RETURN; } diff --git a/sql/lex.h b/sql/lex.h index 555a68dc388..d541a3fb228 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -453,6 +453,7 @@ static SYMBOL symbols[] = { { "RTREE", SYM(RTREE_SYM)}, { "SAVEPOINT", SYM(SAVEPOINT_SYM)}, { "SCHEDULE", SYM(SCHEDULE_SYM)}, + { "SCHEDULER", SYM(SCHEDULER_SYM)}, { "SCHEMA", SYM(DATABASE)}, { "SCHEMAS", SYM(DATABASES)}, { "SECOND", SYM(SECOND_SYM)}, diff --git a/sql/log.cc b/sql/log.cc index fa86064682d..31133a71757 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -55,6 +55,14 @@ static int binlog_commit(THD *thd, bool all); static int binlog_rollback(THD *thd, bool all); static int binlog_prepare(THD *thd, bool all); +sql_print_message_func sql_print_message_handlers[3] = +{ + sql_print_information, + sql_print_warning, + sql_print_error +}; + + /* This is a POD. Please keep it that way! diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 93db68d1b50..4da098f4b48 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -79,7 +79,8 @@ char *sql_strmake_with_convert(const char *str, uint32 arg_length, CHARSET_INFO *from_cs, uint32 max_res_length, CHARSET_INFO *to_cs, uint32 *result_length); -void kill_one_thread(THD *thd, ulong id, bool only_kill_query); +uint kill_one_thread(THD *thd, ulong id, bool only_kill_query); +void sql_kill(THD *thd, ulong id, bool only_kill_query); bool net_request_file(NET* net, const char* fname); char* query_table_status(THD *thd,const char *db,const char *table_name); @@ -1385,10 +1386,13 @@ bool init_errmessage(void); #endif /* MYSQL_SERVER */ void sql_perror(const char *message); + int vprint_msg_to_log(enum loglevel level, const char *format, va_list args); void sql_print_error(const char *format, ...); void sql_print_warning(const char *format, ...); void sql_print_information(const char *format, ...); +typedef void (*sql_print_message_func)(const char *format, ...); +extern sql_print_message_func sql_print_message_handlers[]; /* type of the log table */ #define QUERY_LOG_SLOW 1 diff --git a/sql/mysqld.cc b/sql/mysqld.cc index f44c7d87ada..0584bcc5ba1 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -863,8 +863,8 @@ static void close_connections(void) { DBUG_PRINT("quit",("Informing thread %ld that it's time to die", tmp->thread_id)); - /* We skip slave threads on this first loop through. */ - if (tmp->slave_thread) + /* We skip slave threads & scheduler on this first loop through. */ + if (tmp->slave_thread || tmp->system_thread == SYSTEM_THREAD_EVENT_SCHEDULER) continue; tmp->killed= THD::KILL_CONNECTION; @@ -883,6 +883,7 @@ static void close_connections(void) } (void) pthread_mutex_unlock(&LOCK_thread_count); // For unlink from list + Events::shutdown(); end_slave(); if (thread_count) @@ -1292,6 +1293,7 @@ static void clean_up_mutexes() (void) pthread_mutex_destroy(&LOCK_bytes_sent); (void) pthread_mutex_destroy(&LOCK_bytes_received); (void) pthread_mutex_destroy(&LOCK_user_conn); + Events::destroy_mutexes(); #ifdef HAVE_OPENSSL (void) pthread_mutex_destroy(&LOCK_des_key_file); #ifndef HAVE_YASSL @@ -2847,6 +2849,7 @@ static int init_thread_environment() (void) pthread_mutex_init(&LOCK_server_started, MY_MUTEX_INIT_FAST); (void) pthread_cond_init(&COND_server_started,NULL); sp_cache_init(); + Events::init_mutexes(); /* Parameter for threads created for connections */ (void) pthread_attr_init(&connection_attrib); (void) pthread_attr_setdetachstate(&connection_attrib, @@ -2992,7 +2995,6 @@ static int init_server_components() #ifdef HAVE_REPLICATION init_slave_list(); #endif - init_events(); /* Setup logs */ @@ -3566,6 +3568,7 @@ we force server id to 2, but this MySQL server will not act as a slave."); if (!opt_noacl) { + Events::init(); plugin_load(); #ifdef HAVE_DLOPEN udf_init(); @@ -3667,7 +3670,6 @@ we force server id to 2, but this MySQL server will not act as a slave."); clean_up(1); wait_for_signal_thread_to_end(); clean_up_mutexes(); - shutdown_events(); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); exit(0); @@ -4658,7 +4660,7 @@ enum options_mysqld OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, OPT_SAFE_USER_CREATE, OPT_SQL_MODE, OPT_HAVE_NAMED_PIPE, - OPT_DO_PSTACK, OPT_EVENT_EXECUTOR, OPT_REPORT_HOST, + OPT_DO_PSTACK, OPT_EVENT_SCHEDULER, OPT_REPORT_HOST, OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT, OPT_SHOW_SLAVE_AUTH_INFO, OPT_SLAVE_LOAD_TMPDIR, OPT_NO_MIX_TYPE, @@ -4991,9 +4993,9 @@ Disable with --skip-bdb (will save memory).", (gptr*) &global_system_variables.engine_condition_pushdown, (gptr*) &global_system_variables.engine_condition_pushdown, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, - {"event-scheduler", OPT_EVENT_EXECUTOR, "Enable/disable the event scheduler.", - (gptr*) &opt_event_executor, (gptr*) &opt_event_executor, 0, GET_BOOL, NO_ARG, - 0/*default*/, 0/*min-value*/, 1/*max-value*/, 0, 0, 0}, + {"event-scheduler", OPT_EVENT_SCHEDULER, "Enable/disable the event scheduler.", + (gptr*) &Events::opt_event_scheduler, (gptr*) &Events::opt_event_scheduler, 0, GET_STR, + REQUIRED_ARG, 2/*default*/, 0/*min-value*/, 2/*max-value*/, 0, 0, 0}, {"exit-info", 'T', "Used for debugging; Use at your own risk!", 0, 0, 0, GET_LONG, OPT_ARG, 0, 0, 0, 0, 0, 0}, {"external-locking", OPT_USE_LOCKING, "Use system (external) locking. With this option enabled you can run myisamchk to test (not repair) tables while the MySQL server is running.", @@ -7320,6 +7322,24 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), break; } #endif + case OPT_EVENT_SCHEDULER: + if (!argument) + Events::opt_event_scheduler= 2; + else + { + int type; + if ((type=find_type(argument, &Events::opt_typelib, 1)) <= 0) + { + fprintf(stderr,"Unknown option to event-scheduler: %s\n",argument); + exit(1); + } + /* + type= 1 2 3 4 5 6 + (OFF | 0) - (ON | 1) - (2 | SUSPEND) + */ + Events::opt_event_scheduler= (type-1) / 2; + } + break; case (int) OPT_SKIP_NEW: opt_specialflag|= SPECIAL_NO_NEW_FUNC; delay_key_write_options= (uint) DELAY_KEY_WRITE_NONE; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 9cabe1a3df0..66e2aa1c31c 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -61,12 +61,13 @@ static Slave_log_event* find_slave_event(IO_CACHE* log, static int init_failsafe_rpl_thread(THD* thd) { DBUG_ENTER("init_failsafe_rpl_thread"); + thd->system_thread = SYSTEM_THREAD_DELAYED_INSERT; /* thd->bootstrap is to report errors barely to stderr; if this code is enable again one day, one should check if bootstrap is still needed (maybe this thread has no other error reporting method). */ - thd->system_thread = thd->bootstrap = 1; + thd->bootstrap = 1; thd->security_ctx->skip_grants(); my_net_init(&thd->net, 0); thd->net.read_timeout = slave_net_timeout; diff --git a/sql/set_var.cc b/sql/set_var.cc index ae380bdf2d5..9b06e0f833f 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -56,6 +56,8 @@ #include <thr_alarm.h> #include <myisam.h> +#include "event_scheduler.h" + /* WITH_BERKELEY_STORAGE_ENGINE */ extern bool berkeley_shared_data; extern ulong berkeley_max_lock, berkeley_log_buffer_size; @@ -106,7 +108,6 @@ extern ulong ndb_report_thresh_binlog_mem_usage; -extern my_bool event_executor_running_global_var; static HASH system_variable_hash; const char *bool_type_names[]= { "OFF", "ON", NullS }; @@ -222,9 +223,8 @@ sys_var_long_ptr sys_delayed_insert_timeout("delayed_insert_timeout", &delayed_insert_timeout); sys_var_long_ptr sys_delayed_queue_size("delayed_queue_size", &delayed_queue_size); -sys_var_event_executor sys_event_executor("event_scheduler", - (my_bool *) - &event_executor_running_global_var); + +sys_var_event_scheduler sys_event_scheduler("event_scheduler"); sys_var_long_ptr sys_expire_logs_days("expire_logs_days", &expire_logs_days); sys_var_bool_ptr sys_flush("flush", &myisam_flush); @@ -747,7 +747,7 @@ SHOW_VAR init_vars[]= { {sys_div_precincrement.name,(char*) &sys_div_precincrement,SHOW_SYS}, {sys_engine_condition_pushdown.name, (char*) &sys_engine_condition_pushdown, SHOW_SYS}, - {sys_event_executor.name, (char*) &sys_event_executor, SHOW_SYS}, + {sys_event_scheduler.name, (char*) &sys_event_scheduler, SHOW_SYS}, {sys_expire_logs_days.name, (char*) &sys_expire_logs_days, SHOW_SYS}, {sys_flush.name, (char*) &sys_flush, SHOW_SYS}, {sys_flush_time.name, (char*) &sys_flush_time, SHOW_SYS}, @@ -3579,6 +3579,68 @@ byte *sys_var_thd_dbug::value_ptr(THD *thd, enum_var_type type, LEX_STRING *b) return (byte*) thd->strdup(buf); } + +/* + The update method of the global variable event_scheduler. + If event_scheduler is switched from 0 to 1 then the scheduler main + thread is resumed and if from 1 to 0 the scheduler thread is suspended + + SYNOPSIS + sys_var_event_scheduler::update() + thd Thread context (unused) + var The new value + + Returns + FALSE OK + TRUE Error +*/ + +bool +sys_var_event_scheduler::update(THD *thd, set_var *var) +{ + enum Event_scheduler::enum_error_code res; + Event_scheduler *scheduler= Event_scheduler::get_instance(); + /* here start the thread if not running. */ + DBUG_ENTER("sys_var_event_scheduler::update"); + + DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value)); + if (!scheduler->initialized()) + { + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--event-scheduler=0"); + DBUG_RETURN(true); + } + + if (var->save_result.ulong_value < 1 || var->save_result.ulong_value > 2) + { + char buf[64]; + my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "event_scheduler", + llstr(var->save_result.ulong_value, buf)); + DBUG_RETURN(true); + } + if ((res= scheduler->suspend_or_resume(var->save_result.ulong_value == 1? + Event_scheduler::RESUME: + Event_scheduler::SUSPEND))) + my_error(ER_EVENT_SET_VAR_ERROR, MYF(0), (uint) res); + DBUG_RETURN((bool) res); +} + + +byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, + LEX_STRING *base) +{ + Event_scheduler *scheduler= Event_scheduler::get_instance(); + + if (!scheduler->initialized()) + thd->sys_var_tmp.long_value= 0; + else if (scheduler->get_state() == Event_scheduler::RUNNING) + thd->sys_var_tmp.long_value= 1; + else + thd->sys_var_tmp.long_value= 2; + + return (byte*) &thd->sys_var_tmp; +} + + /**************************************************************************** Used templates ****************************************************************************/ diff --git a/sql/set_var.h b/sql/set_var.h index 8076f10bb0a..0ae88144c0f 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -842,13 +842,14 @@ public: }; -class sys_var_event_executor :public sys_var_bool_ptr +class sys_var_event_scheduler :public sys_var_long_ptr { /* We need a derived class only to have a warn_deprecated() */ public: - sys_var_event_executor(const char *name_arg, my_bool *value_arg) : - sys_var_bool_ptr(name_arg, value_arg) {}; + sys_var_event_scheduler(const char *name_arg) : + sys_var_long_ptr(name_arg, NULL, NULL) {}; bool update(THD *thd, set_var *var); + byte *value_ptr(THD *thd, enum_var_type type, LEX_STRING *base); }; extern void fix_binlog_format_after_update(THD *thd, enum_var_type type); diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index a2bc77714bb..f34002ab0cf 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -5042,7 +5042,7 @@ ER_OPTION_PREVENTS_STATEMENT ger "Der MySQL-Server läuft mit der Option %s und kann diese Anweisung deswegen nicht ausführen" por "O servidor MySQL está rodando com a opção %s razão pela qual não pode executar esse commando" spa "El servidor MySQL está rodando con la opción %s tal que no puede ejecutar este comando" - swe "MySQL är startad med --skip-grant-tables. Pga av detta kan du inte använda detta kommando" + swe "MySQL är startad med %s. Pga av detta kan du inte använda detta kommando" ER_DUPLICATED_VALUE_IN_TYPE eng "Column '%-.100s' has duplicated value '%-.64s' in %s" ger "Feld '%-.100s' hat doppelten Wert '%-.64s' in %s" @@ -5842,3 +5842,7 @@ ER_WRONG_PARTITION_NAME swe "Felaktigt partitionsnamn" ER_CANT_CHANGE_TX_ISOLATION 25001 eng "Transaction isolation level can't be changed while a transaction is in progress" +ER_EVENT_MODIFY_QUEUE_ERROR + eng "Internal scheduler error %d" +ER_EVENT_SET_VAR_ERROR + eng "Error during starting/stopping of the scheduler. Error code %u" diff --git a/sql/sql_class.cc b/sql/sql_class.cc index ebf0d988e7e..a2ccaf092ee 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -253,7 +253,8 @@ THD::THD() net.last_error[0]=0; // If error on boot net.query_cache_query=0; // If error on boot ull=0; - system_thread= cleanup_done= abort_on_warning= no_warnings_for_error= 0; + system_thread= NON_SYSTEM_THREAD; + cleanup_done= abort_on_warning= no_warnings_for_error= 0; peer_port= 0; // For SHOW PROCESSLIST #ifdef HAVE_ROW_BASED_REPLICATION transaction.m_pending_rows_event= 0; @@ -512,6 +513,8 @@ void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var) void THD::awake(THD::killed_state state_to_set) { + DBUG_ENTER("THD::awake"); + DBUG_PRINT("enter", ("this=0x%lx", this)); THD_CHECK_SENTRY(this); safe_mutex_assert_owner(&LOCK_delete); @@ -555,6 +558,7 @@ void THD::awake(THD::killed_state state_to_set) } pthread_mutex_unlock(&mysys_var->mutex); } + DBUG_VOID_RETURN; } /* @@ -2031,6 +2035,13 @@ void Security_context::skip_grants() } +bool Security_context::set_user(char *user_arg) +{ + safeFree(user); + user= my_strdup(user_arg, MYF(0)); + return user == 0; +} + /**************************************************************************** Handling of open and locked tables states. diff --git a/sql/sql_class.h b/sql/sql_class.h index 04095ff9acd..a0971b22d3d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -629,6 +629,8 @@ public: { return (*priv_host ? priv_host : (char *)"%"); } + + bool set_user(char *user_arg); }; @@ -770,6 +772,19 @@ public: }; +/* Flags for the THD::system_thread variable */ +enum enum_thread_type +{ + NON_SYSTEM_THREAD= 0, + SYSTEM_THREAD_DELAYED_INSERT= 1, + SYSTEM_THREAD_SLAVE_IO= 2, + SYSTEM_THREAD_SLAVE_SQL= 4, + SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8, + SYSTEM_THREAD_EVENT_SCHEDULER= 16, + SYSTEM_THREAD_EVENT_WORKER= 32 +}; + + /* For each client connection we create a separate thread with THD serving as a thread/connection descriptor @@ -1103,7 +1118,8 @@ public: long dbug_thread_id; pthread_t real_id; uint tmp_table, global_read_lock; - uint server_status,open_options,system_thread; + uint server_status,open_options; + enum enum_thread_type system_thread; uint32 db_length; uint select_number; //number of select (used for EXPLAIN) /* variables.transaction_isolation is reset to this after each commit */ @@ -1404,11 +1420,6 @@ public: #define reenable_binlog(A) (A)->options= tmp_disable_binlog__save_options;} -/* Flags for the THD::system_thread (bitmap) variable */ -#define SYSTEM_THREAD_DELAYED_INSERT 1 -#define SYSTEM_THREAD_SLAVE_IO 2 -#define SYSTEM_THREAD_SLAVE_SQL 4 -#define SYSTEM_THREAD_NDBCLUSTER_BINLOG 8 /* Used to hold information about file and file structure in exchainge diff --git a/sql/sql_db.cc b/sql/sql_db.cc index 3d035359b6f..6d5362c2554 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -871,7 +871,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent) exit: (void)sp_drop_db_routines(thd, db); /* QQ Ignore errors for now */ - (void)evex_drop_db_events(thd, db); /* QQ Ignore errors for now */ + error= Events::drop_schema_events(thd, db); start_waiting_global_read_lock(thd); /* If this database was the client's selected database, we silently change the diff --git a/sql/sql_error.h b/sql/sql_error.h index 223b50be744..b5cac24d894 100644 --- a/sql/sql_error.h +++ b/sql/sql_error.h @@ -40,3 +40,5 @@ void push_warning_printf(THD *thd, MYSQL_ERROR::enum_warning_level level, uint code, const char *format, ...); void mysql_reset_errors(THD *thd, bool force); bool mysqld_show_warnings(THD *thd, ulong levels_to_show); + +extern LEX_STRING warning_level_names[]; diff --git a/sql/sql_lex.h b/sql/sql_lex.h index f0bd85367d0..3342165a97a 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -111,7 +111,8 @@ enum enum_sql_command { SQLCOM_SHOW_AUTHORS, SQLCOM_BINLOG_BASE64_EVENT, SQLCOM_SHOW_PLUGINS, SQLCOM_CREATE_EVENT, SQLCOM_ALTER_EVENT, SQLCOM_DROP_EVENT, - SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS, + SQLCOM_SHOW_CREATE_EVENT, SQLCOM_SHOW_EVENTS, + SQLCOM_SHOW_SCHEDULER_STATUS, /* This should be the last !!! */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index ae2a173c936..826f09af425 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2049,7 +2049,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, { statistic_increment(thd->status_var.com_stat[SQLCOM_KILL], &LOCK_status); ulong id=(ulong) uint4korr(packet); - kill_one_thread(thd,id,false); + sql_kill(thd,id,false); break; } case COM_SET_OPTION: @@ -3834,14 +3834,17 @@ end_with_restore_list: switch (lex->sql_command) { case SQLCOM_CREATE_EVENT: - res= evex_create_event(thd, lex->et, (uint) lex->create_info.options, - &rows_affected); + res= Events::create_event(thd, lex->et, + (uint) lex->create_info.options, + &rows_affected); break; case SQLCOM_ALTER_EVENT: - res= evex_update_event(thd, lex->et, lex->spname, &rows_affected); + res= Events::update_event(thd, lex->et, lex->spname, + &rows_affected); break; case SQLCOM_DROP_EVENT: - res= evex_drop_event(thd, lex->et, lex->drop_if_exists, &rows_affected); + res= Events::drop_event(thd, lex->et, lex->drop_if_exists, + &rows_affected); default:; } DBUG_PRINT("info", ("CREATE/ALTER/DROP returned error code=%d af_rows=%d", @@ -3879,9 +3882,16 @@ end_with_restore_list: my_error(ER_TOO_LONG_IDENT, MYF(0), lex->spname->m_name.str); goto error; } - res= evex_show_create_event(thd, lex->spname, lex->et->definer); + res= Events::show_create_event(thd, lex->spname, lex->et->definer); break; } +#ifndef DBUG_OFF + case SQLCOM_SHOW_SCHEDULER_STATUS: + { + res= Events::dump_internal_status(thd); + break; + } +#endif case SQLCOM_CREATE_FUNCTION: // UDF function { if (check_access(thd,INSERT_ACL,"mysql",0,1,0,0)) @@ -4121,7 +4131,7 @@ end_with_restore_list: MYF(0)); goto error; } - kill_one_thread(thd, (ulong)it->val_int(), lex->type & ONLY_KILL_QUERY); + sql_kill(thd, (ulong)it->val_int(), lex->type & ONLY_KILL_QUERY); break; } #ifndef NO_EMBEDDED_ACCESS_CHECKS @@ -6914,22 +6924,26 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables, return result; } + /* - kill on thread + kills a thread SYNOPSIS kill_one_thread() thd Thread class id Thread id + only_kill_query Should it kill the query or the connection NOTES This is written such that we have a short lock on LOCK_thread_count */ -void kill_one_thread(THD *thd, ulong id, bool only_kill_query) +uint kill_one_thread(THD *thd, ulong id, bool only_kill_query) { THD *tmp; uint error=ER_NO_SUCH_THREAD; + DBUG_ENTER("kill_one_thread"); + DBUG_PRINT("enter", ("id=%lu only_kill=%d", id, only_kill_query)); VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list I_List_iterator<THD> it(threads); while ((tmp=it++)) @@ -6955,8 +6969,25 @@ void kill_one_thread(THD *thd, ulong id, bool only_kill_query) error=ER_KILL_DENIED_ERROR; pthread_mutex_unlock(&tmp->LOCK_delete); } + DBUG_PRINT("exit", ("%d", error)); + DBUG_RETURN(error); +} + - if (!error) +/* + kills a thread and sends response + + SYNOPSIS + sql_kill() + thd Thread class + id Thread id + only_kill_query Should it kill the query or the connection +*/ + +void sql_kill(THD *thd, ulong id, bool only_kill_query) +{ + uint error; + if (!(error= kill_one_thread(thd, id, only_kill_query))) send_ok(thd); else my_error(error, MYF(0), id); diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 4071f86989f..7c8499f2a2c 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -4049,8 +4049,9 @@ fill_events_copy_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table) /* type */ sch_table->field[5]->store(STRING_WITH_LEN("RECURRING"), scs); - if (event_reconstruct_interval_expression(&show_str, et.interval, - et.expression)) + if (Events::reconstruct_interval_expression(&show_str, + et.interval, + et.expression)) DBUG_RETURN(1); sch_table->field[7]->set_notnull(); @@ -4080,13 +4081,13 @@ fill_events_copy_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table) } //status - if (et.status == MYSQL_EVENT_ENABLED) + if (et.status == Event_timed::ENABLED) sch_table->field[12]->store(STRING_WITH_LEN("ENABLED"), scs); else sch_table->field[12]->store(STRING_WITH_LEN("DISABLED"), scs); //on_completion - if (et.on_completion == MYSQL_EVENT_ON_COMPLETION_DROP) + if (et.on_completion == Event_timed::ON_COMPLETION_DROP) sch_table->field[13]->store(STRING_WITH_LEN("NOT PRESERVE"), scs); else sch_table->field[13]->store(STRING_WITH_LEN("PRESERVE"), scs); @@ -4138,7 +4139,7 @@ int fill_schema_events(THD *thd, TABLE_LIST *tables, COND *cond) thd->reset_n_backup_open_tables_state(&backup); - if ((ret= evex_open_event_table(thd, TL_READ, &event_table))) + if ((ret= Events::open_event_table(thd, TL_READ, &event_table))) { sql_print_error("Table mysql.event is damaged."); ret= 1; @@ -4147,13 +4148,10 @@ int fill_schema_events(THD *thd, TABLE_LIST *tables, COND *cond) event_table->file->ha_index_init(0, 1); - /* - see others' events only if you have PROCESS_ACL !! - thd->lex->verbose is set either if SHOW FULL EVENTS or - in case of SELECT FROM I_S.EVENTS - */ - verbose= (thd->lex->verbose - && (thd->security_ctx->master_access & PROCESS_ACL)); + /* see others' events only if you have PROCESS_ACL !! */ + verbose= ((thd->lex->verbose || + thd->lex->orig_sql_command != SQLCOM_SHOW_EVENTS) && + (thd->security_ctx->master_access & PROCESS_ACL)); if (verbose && thd->security_ctx->user) { @@ -4162,12 +4160,13 @@ int fill_schema_events(THD *thd, TABLE_LIST *tables, COND *cond) } else { - event_table->field[EVEX_FIELD_DEFINER]->store(definer, strlen(definer), scs); + event_table->field[Events::FIELD_DEFINER]-> + store(definer, strlen(definer), scs); key_len= event_table->key_info->key_part[0].store_length; if (thd->lex->select_lex.db) { - event_table->field[EVEX_FIELD_DB]-> + event_table->field[Events::FIELD_DB]-> store(thd->lex->select_lex.db, strlen(thd->lex->select_lex.db), scs); key_len+= event_table->key_info->key_part[1].store_length; } diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 32b5bc8adb9..aa42cd901d8 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -567,6 +567,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token RTREE_SYM %token SAVEPOINT_SYM %token SCHEDULE_SYM +%token SCHEDULER_SYM %token SECOND_MICROSECOND_SYM %token SECOND_SYM %token SECURITY_SYM @@ -1400,7 +1401,7 @@ opt_ev_status: /* empty */ { $$= 0; } { LEX *lex=Lex; if (!lex->et_compile_phase) - lex->et->status= MYSQL_EVENT_ENABLED; + lex->et->status= Event_timed::ENABLED; $$= 1; } | DISABLE_SYM @@ -1408,7 +1409,7 @@ opt_ev_status: /* empty */ { $$= 0; } LEX *lex=Lex; if (!lex->et_compile_phase) - lex->et->status= MYSQL_EVENT_DISABLED; + lex->et->status= Event_timed::DISABLED; $$= 1; } ; @@ -1472,14 +1473,14 @@ ev_on_completion: { LEX *lex=Lex; if (!lex->et_compile_phase) - lex->et->on_completion= MYSQL_EVENT_ON_COMPLETION_PRESERVE; + lex->et->on_completion= Event_timed::ON_COMPLETION_PRESERVE; $$= 1; } | ON COMPLETION_SYM NOT_SYM PRESERVE_SYM { LEX *lex=Lex; if (!lex->et_compile_phase) - lex->et->on_completion= MYSQL_EVENT_ON_COMPLETION_DROP; + lex->et->on_completion= Event_timed::ON_COMPLETION_DROP; $$= 1; } ; @@ -8049,15 +8050,24 @@ show_param: if (prepare_schema_table(YYTHD, lex, 0, SCH_TRIGGERS)) YYABORT; } - | opt_full EVENTS_SYM opt_db wild_and_where + | EVENTS_SYM opt_db wild_and_where { LEX *lex= Lex; lex->sql_command= SQLCOM_SELECT; lex->orig_sql_command= SQLCOM_SHOW_EVENTS; - lex->select_lex.db= $3; + lex->select_lex.db= $2; if (prepare_schema_table(YYTHD, lex, 0, SCH_EVENTS)) YYABORT; } + | SCHEDULER_SYM STATUS_SYM + { +#ifndef DBUG_OFF + Lex->sql_command= SQLCOM_SHOW_SCHEDULER_STATUS; +#else + yyerror(ER(ER_SYNTAX_ERROR)); + YYABORT; +#endif + } | TABLE_SYM STATUS_SYM opt_db wild_and_where { LEX *lex= Lex; @@ -9488,6 +9498,7 @@ keyword_sp: | ROW_SYM {} | RTREE_SYM {} | SCHEDULE_SYM {} + | SCHEDULER_SYM {} | SECOND_SYM {} | SERIAL_SYM {} | SERIALIZABLE_SYM {} diff --git a/sql/table.cc b/sql/table.cc index bacb703a28c..fb9733c9180 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -2392,9 +2392,7 @@ table_check_intact(TABLE *table, uint table_f_count, table running on a old server will be valid. */ field->sql_type(sql_type); - if (sql_type.length() < table_def->type.length - 1 || - strncmp(sql_type.ptr(), - table_def->type.str, + if (strncmp(sql_type.c_ptr_safe(), table_def->type.str, table_def->type.length - 1)) { sql_print_error("(%s) Expected field %s at position %d to have type " |