diff options
| author | Vladislav Vaintroub <wlad@mariadb.com> | 2016-07-16 00:49:45 +0000 |
|---|---|---|
| committer | Vladislav Vaintroub <wlad@mariadb.com> | 2016-07-16 00:49:45 +0000 |
| commit | a170bd337e84fd13fa7e64cbb38d6a8baa2a2244 (patch) | |
| tree | bf78c6d79a7ea58482bb95a7c69a9e8d75aca028 | |
| parent | 627c5d9b57c461cda64f703c2e683c2a0b6ac53b (diff) | |
| download | mariadb-git-a170bd337e84fd13fa7e64cbb38d6a8baa2a2244.tar.gz | |
Unix threadpool refactoring (Actually makes Windows possible to use generic implementation
| -rw-r--r-- | sql/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | sql/mysqld.cc | 2 | ||||
| -rw-r--r-- | sql/sys_vars.cc | 12 | ||||
| -rw-r--r-- | sql/threadpool.h | 6 | ||||
| -rw-r--r-- | sql/threadpool_common.cc | 20 | ||||
| -rw-r--r-- | sql/threadpool_unix.cc | 409 | ||||
| -rw-r--r-- | sql/threadpool_win.cc | 12 |
7 files changed, 252 insertions, 213 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 089d793b2b0..4e3c194598a 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -155,9 +155,9 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS) IF(WIN32) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc) - ELSE() - SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc) ENDIF() + SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc) + ENDIF() MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY diff --git a/sql/mysqld.cc b/sql/mysqld.cc index fa8f143335d..bc2d673e3ca 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4416,7 +4416,7 @@ static int init_common_variables() #endif /* HAVE_SOLARIS_LARGE_PAGES */ -#if defined(HAVE_POOL_OF_THREADS) && !defined(_WIN32) +#if defined(HAVE_POOL_OF_THREADS) if (IS_SYSVAR_AUTOSIZE(&threadpool_size)) SYSVAR_AUTOSIZE(threadpool_size, my_getncpus()); #endif diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index db054a635af..f0635611414 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3217,23 +3217,17 @@ static Sys_var_ulong Sys_thread_cache_size( #ifdef HAVE_POOL_OF_THREADS static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type) { -#ifdef _WIN32 tp_set_max_threads(threadpool_max_threads); -#endif return false; } -#ifdef _WIN32 static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) { tp_set_min_threads(threadpool_min_threads); return false; } -#endif - -#ifndef _WIN32 static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var) { ulonglong v= var->save_result.ulonglong_value; @@ -3258,9 +3252,7 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type) tp_set_threadpool_stall_limit(threadpool_stall_limit); return false; } -#endif -#ifdef _WIN32 static Sys_var_uint Sys_threadpool_min_threads( "thread_pool_min_threads", "Minimum number of threads in the thread pool.", @@ -3269,7 +3261,7 @@ static Sys_var_uint Sys_threadpool_min_threads( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_tp_min_threads) ); -#else + static Sys_var_uint Sys_threadpool_idle_thread_timeout( "thread_pool_idle_timeout", "Timeout in seconds for an idle thread in the thread pool." @@ -3304,7 +3296,7 @@ static Sys_var_uint Sys_threadpool_stall_limit( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_threadpool_stall_limit) ); -#endif /* !WIN32 */ + static Sys_var_uint Sys_threadpool_max_threads( "thread_pool_max_threads", "Maximum allowed number of worker threads in the thread pool", diff --git a/sql/threadpool.h b/sql/threadpool.h index 57a3e052a97..8583e793678 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -122,6 +122,7 @@ struct TP_pool virtual int set_pool_size(uint){ return 0; } virtual int set_idle_timeout(uint){ return 0; } virtual int set_oversubscribe(uint){ return 0; } + virtual int set_stall_limit(uint){ return 0; } virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } }; @@ -141,11 +142,10 @@ struct TP_pool_win:TP_pool struct TP_pool_unix :TP_pool { TP_pool_unix(); + ~TP_pool_unix(); virtual TP_connection *new_connection(CONNECT *c); virtual void add(TP_connection *); - virtual int set_max_threads(uint); virtual int set_pool_size(uint); - virtual int set_idle_timeout(uint); - virtual int set_oversubscribe(uint); + virtual int set_stall_limit(uint); virtual int get_idle_thread_count(); }; diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 7e3492ae3ef..730ea2bd365 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -344,15 +344,17 @@ static bool tp_end_thread(THD *, bool) return 0; } -TP_pool *pool; +static TP_pool *pool; static bool tp_init() { + #ifdef _WIN32 pool = new (std::nothrow) TP_pool_win; return 0; #else -#error No threadpool + pool= new (std::nothrow) TP_pool_unix; + return 0; #endif } @@ -389,6 +391,19 @@ void tp_set_max_threads(uint val) pool->set_max_threads(val); } +void tp_set_threadpool_size(uint val) +{ + if (pool) + pool->set_pool_size(val); +} + + +void tp_set_threadpool_stall_limit(uint val) +{ + if (pool) + pool->set_stall_limit(val); +} + void tp_timeout_handler(TP_connection *c) { @@ -433,6 +448,7 @@ static void tp_end() delete pool; } + static scheduler_functions tp_scheduler_functions= { 0, // max_threads diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index 4079091e217..005afde464b 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -22,6 +22,17 @@ #ifdef HAVE_POOL_OF_THREADS +#ifdef _WIN32 +/* AIX may define this, too ?*/ +#define HAVE_IOCP +#endif + +#ifdef HAVE_IOCP +#define OPTIONAL_IO_POLL_READ_PARAM &overlapped +#else +#define OPTIONAL_IOCP_READ_PARAM 0 +#endif + #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> @@ -38,10 +49,23 @@ typedef struct kevent native_event; #elif defined (__sun) #include <port.h> typedef port_event_t native_event; +#elif defined (HAVE_IOCP) +typedef OVERLAPPED_ENTRY native_event; #else #error threadpool is not available on this platform #endif + +static void io_poll_close(int fd) +{ +#ifdef _WIN32 + CloseHandle((HANDLE)fd); +#else + close(fd); +#endif +} + + /** Maximum number of native events a listener can read in one go */ #define MAX_EVENTS 1024 @@ -108,26 +132,34 @@ typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, > worker_list_t; -struct connection_t +struct TP_connection_unix:public TP_connection { + TP_connection_unix(CONNECT *c); + virtual int init(){ return 0; }; + virtual void set_io_timeout(int sec); + virtual int start_io(); + virtual void wait_begin(int type); + virtual void wait_end(); - THD *thd; thread_group_t *thread_group; - connection_t *next_in_queue; - connection_t **prev_in_queue; + TP_connection_unix *next_in_queue; + TP_connection_unix **prev_in_queue; ulonglong abs_wait_timeout; - CONNECT* connect; - bool logged_in; bool bound_to_poll_descriptor; bool waiting; +#ifdef HAVE_IOCP + OVERLAPPED overlapped; +#endif }; -typedef I_P_List<connection_t, - I_P_List_adapter<connection_t, - &connection_t::next_in_queue, - &connection_t::prev_in_queue>, +typedef TP_connection_unix TP_connection_unix; + +typedef I_P_List<TP_connection_unix, + I_P_List_adapter<TP_connection_unix, + &TP_connection_unix::next_in_queue, + &TP_connection_unix::prev_in_queue>, I_P_List_null_counter, - I_P_List_fast_push_back<connection_t> > + I_P_List_fast_push_back<TP_connection_unix> > connection_queue_t; struct thread_group_t @@ -175,15 +207,12 @@ struct pool_timer_t static pool_timer_t pool_timer; -static void queue_put(thread_group_t *thread_group, connection_t *connection); +static void queue_put(thread_group_t *thread_group, TP_connection_unix *connection); static int wake_thread(thread_group_t *thread_group); -static void handle_event(connection_t *connection); static int wake_or_create_thread(thread_group_t *thread_group); static int create_worker(thread_group_t *thread_group); static void *worker_main(void *param); static void check_stall(thread_group_t *thread_group); -static void connection_abort(connection_t *connection); -static void set_wait_timeout(connection_t *connection); static void set_next_timeout_check(ulonglong abstime); static void print_pool_blocked_message(bool); @@ -208,7 +237,7 @@ static void print_pool_blocked_message(bool); Creates an io_poll descriptor On Linux: epoll_create() - - io_poll_associate_fd(int poll_fd, int fd, void *data) + - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt) Associate file descriptor with io poll descriptor On Linux : epoll_ctl(..EPOLL_CTL_ADD)) @@ -217,7 +246,7 @@ static void print_pool_blocked_message(bool); On Linux: epoll_ctl(..EPOLL_CTL_DEL) - - io_poll_start_read(int poll_fd,int fd, void *data) + - io_poll_start_read(int poll_fd,int fd, void *data, void *opt) The same as io_poll_associate_fd(), but cannot be used before io_poll_associate_fd() was called. On Linux : epoll_ctl(..EPOLL_CTL_MOD) @@ -245,7 +274,7 @@ static int io_poll_create() } -int io_poll_associate_fd(int pollfd, int fd, void *data) +int io_poll_associate_fd(int pollfd, int fd, void *data, void*) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -256,7 +285,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data) -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data, void *) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -315,7 +344,7 @@ int io_poll_create() return kqueue(); } -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, @@ -324,12 +353,12 @@ int io_poll_start_read(int pollfd, int fd, void *data) } -int io_poll_associate_fd(int pollfd, int fd, void *data) +int io_poll_associate_fd(int pollfd, int fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, data); - return io_poll_start_read(pollfd,fd, data); + return io_poll_start_read(pollfd,fd, data, 0); } @@ -371,14 +400,14 @@ static int io_poll_create() return port_create(); } -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data, void *) { return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); } -static int io_poll_associate_fd(int pollfd, int fd, void *data) +static int io_poll_associate_fd(int pollfd, int fd, void *data, void *) { - return io_poll_start_read(pollfd, fd, data); + return io_poll_start_read(pollfd, fd, data, 0); } int io_poll_disassociate_fd(int pollfd, int fd) @@ -410,16 +439,77 @@ static void* native_event_get_userdata(native_event *event) { return event->portev_user; } + +#elif defined(HAVE_IOCP) + +static int io_poll_create() +{ + HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); + return (int)h; +} + + +int io_poll_start_read(int pollfd, int fd, void *, void *opt) +{ + DWORD num_bytes = 0; + static char c; + + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + + if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0) + return 0; + + if (GetLastError() == ERROR_IO_PENDING) + return 0; + + return 1; +} + + +static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt) +{ + HANDLE h= CreateIoCompletionPort((HANDLE)fd, (HANDLE)pollfd, (ULONG_PTR)data, 0); + if (!h) + return -1; + return io_poll_start_read(pollfd,fd, 0, opt); +} + + +int io_poll_disassociate_fd(int pollfd, int fd) +{ + /* Not possible to unbind/rebind file descriptor in IOCP. */ + return 0; +} + + +int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) +{ + ULONG n; + BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events, + maxevents, &n, timeout_ms, FALSE); + + return ok ? (int)n : -1; +} + + +static void* native_event_get_userdata(native_event *event) +{ + return (void *)event->lpCompletionKey; +} + #endif /* Dequeue element from a workqueue */ -static connection_t *queue_get(thread_group_t *thread_group) +static TP_connection_unix *queue_get(thread_group_t *thread_group) { DBUG_ENTER("queue_get"); thread_group->queue_event_count++; - connection_t *c= thread_group->queue.front(); + TP_connection_unix *c= thread_group->queue.front(); if (c) { thread_group->queue.remove(c); @@ -450,7 +540,7 @@ static void timeout_check(pool_timer_t *timer) if (thd->net.reading_or_writing != 1) continue; - connection_t *connection= (connection_t *)thd->event_scheduler.data; + TP_connection_unix *connection= (TP_connection_unix *)thd->event_scheduler.data; if (!connection) { /* @@ -462,11 +552,7 @@ static void timeout_check(pool_timer_t *timer) if(connection->abs_wait_timeout < timer->current_microtime) { - /* Wait timeout exceeded, kill connection. */ - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed = KILL_CONNECTION; - post_kill_notification(thd); - mysql_mutex_unlock(&thd->LOCK_thd_data); + tp_timeout_handler(connection); } else { @@ -636,11 +722,11 @@ static void stop_timer(pool_timer_t *timer) @return a ready connection, or NULL on shutdown */ -static connection_t * listener(worker_thread_t *current_thread, +static TP_connection_unix * listener(worker_thread_t *current_thread, thread_group_t *thread_group) { DBUG_ENTER("listener"); - connection_t *retval= NULL; + TP_connection_unix *retval= NULL; for(;;) { @@ -721,14 +807,14 @@ static connection_t * listener(worker_thread_t *current_thread, */ for(int i=(listener_picks_event)?1:0; i < cnt ; i++) { - connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]); + TP_connection_unix *c= (TP_connection_unix *)native_event_get_userdata(&ev[i]); thread_group->queue.push_back(c); } if (listener_picks_event) { /* Handle the first event. */ - retval= (connection_t *)native_event_get_userdata(&ev[0]); + retval= (TP_connection_unix *)native_event_get_userdata(&ev[0]); mysql_mutex_unlock(&thread_group->mutex); break; } @@ -924,9 +1010,10 @@ void thread_group_destroy(thread_group_t *thread_group) mysql_mutex_destroy(&thread_group->mutex); if (thread_group->pollfd != -1) { - close(thread_group->pollfd); + io_poll_close(thread_group->pollfd); thread_group->pollfd= -1; } +#ifndef HAVE_IOCP for(int i=0; i < 2; i++) { if(thread_group->shutdown_pipe[i] != -1) @@ -935,6 +1022,8 @@ void thread_group_destroy(thread_group_t *thread_group) thread_group->shutdown_pipe[i]= -1; } } +#endif + if (my_atomic_add32(&shutdown_group_count, -1) == 1) my_free(all_groups); } @@ -957,7 +1046,32 @@ static int wake_thread(thread_group_t *thread_group) DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ } +/* + Wake listener thread (during shutdown) + Self-pipe trick is used in most cases,except IOCP. +*/ +static int wake_listener(thread_group_t *thread_group) +{ +#ifndef HAVE_IOCP + if (pipe(thread_group->shutdown_pipe)) + { + return -1; + } + /* Wake listener */ + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], NULL)) + { + return -1; + } + char c= 0; + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) + return -1; +#else + PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0); +#endif + return 0; +} /** Initiate shutdown for thread group. @@ -981,20 +1095,7 @@ static void thread_group_close(thread_group_t *thread_group) thread_group->shutdown= true; thread_group->listener= NULL; - if (pipe(thread_group->shutdown_pipe)) - { - DBUG_VOID_RETURN; - } - - /* Wake listener */ - if (io_poll_associate_fd(thread_group->pollfd, - thread_group->shutdown_pipe[0], NULL)) - { - DBUG_VOID_RETURN; - } - char c= 0; - if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) - DBUG_VOID_RETURN; + wake_listener(thread_group); /* Wake all workers. */ while(wake_thread(thread_group) == 0) @@ -1015,7 +1116,7 @@ static void thread_group_close(thread_group_t *thread_group) */ -static void queue_put(thread_group_t *thread_group, connection_t *connection) +static void queue_put(thread_group_t *thread_group, TP_connection_unix *connection) { DBUG_ENTER("queue_put"); @@ -1061,11 +1162,11 @@ static bool too_many_threads(thread_group_t *thread_group) NULL is returned if timeout has expired,or on shutdown. */ -connection_t *get_event(worker_thread_t *current_thread, +TP_connection_unix *get_event(worker_thread_t *current_thread, thread_group_t *thread_group, struct timespec *abstime) { DBUG_ENTER("get_event"); - connection_t *connection = NULL; + TP_connection_unix *connection = NULL; int err=0; mysql_mutex_lock(&thread_group->mutex); @@ -1111,7 +1212,7 @@ connection_t *get_event(worker_thread_t *current_thread, if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) { thread_group->io_event_count++; - connection = (connection_t *)native_event_get_userdata(&nev); + connection= (TP_connection_unix *)native_event_get_userdata(&nev); break; } } @@ -1206,45 +1307,38 @@ void wait_end(thread_group_t *thread_group) Allocate/initialize a new connection structure. */ -connection_t *alloc_connection() +TP_connection_unix::TP_connection_unix(CONNECT *c): + TP_connection(c), + waiting(false), + bound_to_poll_descriptor(false), + abs_wait_timeout(ULONGLONG_MAX), + thread_group(0), + next_in_queue(0), + prev_in_queue(0), +#ifdef HAVE_IOCP + overlapped() +#endif { - connection_t* connection; - DBUG_ENTER("alloc_connection"); - DBUG_EXECUTE_IF("simulate_failed_connection_1", DBUG_RETURN(0); ); - - if ((connection = (connection_t *)my_malloc(sizeof(connection_t),0))) - { - connection->waiting= false; - connection->logged_in= false; - connection->bound_to_poll_descriptor= false; - connection->abs_wait_timeout= ULONGLONG_MAX; - connection->thd= 0; - } - DBUG_RETURN(connection); } +TP_connection * TP_pool_unix::new_connection(CONNECT *c) +{ + return new (std::nothrow) TP_connection_unix(c); +} /** Add a new connection to thread pool.. */ -void tp_add_connection(CONNECT *connect) +void TP_pool_unix::add(TP_connection *c) { - connection_t *connection; DBUG_ENTER("tp_add_connection"); - connection= alloc_connection(); - if (!connection) - { - connect->close_and_delete(); - DBUG_VOID_RETURN; - } - connection->connect= connect; - + TP_connection_unix *connection=(TP_connection_unix *)c; /* Assign connection to a group. */ thread_group_t *group= - &all_groups[connect->thread_id%group_count]; + &all_groups[c->connect->thread_id%group_count]; connection->thread_group=group; @@ -1261,44 +1355,18 @@ void tp_add_connection(CONNECT *connect) } -/** - Terminate connection. -*/ - -static void connection_abort(connection_t *connection) -{ - DBUG_ENTER("connection_abort"); - thread_group_t *group= connection->thread_group; - - if (connection->thd) - { - threadpool_remove_connection(connection->thd); - } - - mysql_mutex_lock(&group->mutex); - group->connection_count--; - mysql_mutex_unlock(&group->mutex); - - my_free(connection); - DBUG_VOID_RETURN; -} - /** MySQL scheduler callback: wait begin */ -void tp_wait_begin(THD *thd, int type) +void TP_connection_unix::wait_begin(int type) { - DBUG_ENTER("tp_wait_begin"); - DBUG_ASSERT(thd); - connection_t *connection = (connection_t *)thd->event_scheduler.data; - if (connection) - { - DBUG_ASSERT(!connection->waiting); - connection->waiting= true; - wait_begin(connection->thread_group); - } + DBUG_ENTER("wait_begin"); + + DBUG_ASSERT(!waiting); + waiting= true; + ::wait_begin(thread_group); DBUG_VOID_RETURN; } @@ -1307,18 +1375,13 @@ void tp_wait_begin(THD *thd, int type) MySQL scheduler callback: wait end */ -void tp_wait_end(THD *thd) +void TP_connection_unix::wait_end() { - DBUG_ENTER("tp_wait_end"); + DBUG_ENTER("wait_end"); DBUG_ASSERT(thd); - - connection_t *connection = (connection_t *)thd->event_scheduler.data; - if (connection) - { - DBUG_ASSERT(connection->waiting); - connection->waiting = false; - wait_end(connection->thread_group); - } + DBUG_ASSERT(waiting); + waiting = false; + ::wait_end(thread_group); DBUG_VOID_RETURN; } @@ -1340,7 +1403,7 @@ static void set_next_timeout_check(ulonglong abstime) Set wait timeout for connection. */ -static void set_wait_timeout(connection_t *c) +void TP_connection_unix::set_io_timeout(int timeout_sec) { DBUG_ENTER("set_wait_timeout"); /* @@ -1351,11 +1414,11 @@ static void set_wait_timeout(connection_t *c) one tick interval. */ - c->abs_wait_timeout= pool_timer.current_microtime + + abs_wait_timeout= pool_timer.current_microtime + 1000LL*pool_timer.tick_interval + - 1000000LL*c->thd->variables.net_wait_timeout; + 1000000LL*timeout_sec; - set_next_timeout_check(c->abs_wait_timeout); + set_next_timeout_check(abs_wait_timeout); DBUG_VOID_RETURN; } @@ -1367,7 +1430,7 @@ static void set_wait_timeout(connection_t *c) after thread_pool_size setting. */ -static int change_group(connection_t *c, +static int change_group(TP_connection_unix *c, thread_group_t *old_group, thread_group_t *new_group) { @@ -1398,10 +1461,11 @@ static int change_group(connection_t *c, } -static int start_io(connection_t *connection) +int TP_connection_unix::start_io() { - int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket); + int fd= mysql_socket_getfd(thd->net.vio->mysql_socket); +#ifndef HAVE_IOCP /* Usually, connection will stay in the same group for the entire connection's life. However, we do allow group_count to @@ -1413,56 +1477,25 @@ static int start_io(connection_t *connection) on thread_id and current group count, and migrate if necessary. */ thread_group_t *group = - &all_groups[connection->thd->thread_id%group_count]; + &all_groups[thd->thread_id%group_count]; - if (group != connection->thread_group) + if (group != thread_group) { - if (change_group(connection, connection->thread_group, group)) + if (change_group(this, thread_group, group)) return -1; } - +#endif + /* Bind to poll descriptor if not yet done. */ - if (!connection->bound_to_poll_descriptor) + if (!bound_to_poll_descriptor) { - connection->bound_to_poll_descriptor= true; - return io_poll_associate_fd(group->pollfd, fd, connection); + bound_to_poll_descriptor= true; + return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); } - return io_poll_start_read(group->pollfd, fd, connection); -} - - - -static void handle_event(connection_t *connection) -{ - - DBUG_ENTER("handle_event"); - int err; - - if (!connection->logged_in) - { - connection->thd = threadpool_add_connection(connection->connect, connection); - err= (connection->thd == NULL); - connection->logged_in= true; - } - else - { - err= threadpool_process_request(connection->thd); - } - - if(err) - goto end; - - set_wait_timeout(connection); - err= start_io(connection); - -end: - if (err) - connection_abort(connection); - - DBUG_VOID_RETURN; + return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); } @@ -1490,14 +1523,14 @@ static void *worker_main(void *param) /* Run event loop */ for(;;) { - connection_t *connection; + TP_connection_unix *connection; struct timespec ts; set_timespec(ts,threadpool_idle_timeout); connection = get_event(&this_thread, thread_group, &ts); if (!connection) break; this_thread.event_count++; - handle_event(connection); + tp_callback(connection); } /* Thread shutdown: cleanup per-worker-thread structure. */ @@ -1518,16 +1551,17 @@ static void *worker_main(void *param) } -bool tp_init() +TP_pool_unix::TP_pool_unix() { - DBUG_ENTER("tp_init"); + DBUG_ENTER("TP_pool_unix::TP_pool_unix"); threadpool_max_size= MY_MAX(threadpool_size, 128); all_groups= (thread_group_t *) my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL)); if (!all_groups) { threadpool_max_size= 0; - DBUG_RETURN(1); + sql_print_error("Allocation failed"); + exit(1); } threadpool_started= true; scheduler_init(); @@ -1536,12 +1570,12 @@ bool tp_init() { thread_group_init(&all_groups[i], get_connection_attrib()); } - tp_set_threadpool_size(threadpool_size); + set_pool_size(threadpool_size); if(group_count == 0) { /* Something went wrong */ sql_print_error("Can't set threadpool size to %d",threadpool_size); - DBUG_RETURN(1); + exit(1); } PSI_register(mutex); PSI_register(cond); @@ -1549,10 +1583,10 @@ bool tp_init() pool_timer.tick_interval= threadpool_stall_limit; start_timer(&pool_timer); - DBUG_RETURN(0); + DBUG_VOID_RETURN; } -void tp_end() +TP_pool_unix::~TP_pool_unix() { DBUG_ENTER("tp_end"); @@ -1571,13 +1605,10 @@ void tp_end() /** Ensure that poll descriptors are created when threadpool_size changes */ - -void tp_set_threadpool_size(uint size) +int TP_pool_unix::set_pool_size(uint size) { bool success= true; - if (!threadpool_started) - return; - + for(uint i=0; i< size; i++) { thread_group_t *group= &all_groups[i]; @@ -1596,20 +1627,20 @@ void tp_set_threadpool_size(uint size) if (!success) { group_count= i; - return; + return -1; } } group_count= size; + return 0; } -void tp_set_threadpool_stall_limit(uint limit) +int TP_pool_unix::set_stall_limit(uint limit) { - if (!threadpool_started) - return; mysql_mutex_lock(&(pool_timer.mutex)); pool_timer.tick_interval= limit; mysql_mutex_unlock(&(pool_timer.mutex)); mysql_cond_signal(&(pool_timer.cond)); + return 0; } @@ -1620,7 +1651,7 @@ void tp_set_threadpool_stall_limit(uint limit) Don't do any locking, it is not required for stats. */ -int tp_get_idle_thread_count() +int TP_pool_unix::get_idle_thread_count() { int sum=0; for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++) diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index abc41681aa5..2c012a9e77b 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -13,11 +13,11 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifdef _WIN32_winT -#undef _WIN32_winT +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT #endif -#define _WIN32_winT 0x0601 +#define _WIN32_WINNT 0x0601 #include <my_global.h> #include <violite.h> @@ -64,9 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct) } -PTP_POOL pool; -TP_CALLBACK_ENVIRON callback_environ; -DWORD fls; +static PTP_POOL pool; +static TP_CALLBACK_ENVIRON callback_environ; +static DWORD fls; static bool skip_completion_port_on_success = false; |
