diff options
Diffstat (limited to 'sql/sql_parse.cc')
-rw-r--r-- | sql/sql_parse.cc | 81 |
1 files changed, 66 insertions, 15 deletions
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 1d830f60da0..847ca3f5b40 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1155,24 +1155,48 @@ static enum enum_server_command fetch_command(THD *thd, char *packet) /** Read one command from connection and execute it (query or simple command). - This function is called in loop from thread function. + This function is to be used by different schedulers (one-thread-per-connection, + pool-of-threads) For profiling to work, it must never be called recursively. + @param thd - client connection context + + @param blocking - wait for command to finish. + if true, will wait for outstanding operations (e.g group commit) to finish, + before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK, + in this case another do_command() needs to be executed to finish the current + command. + + @retval + DISPATCH_COMMAND_SUCCESS(0) - success @retval - 0 success + DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description) @retval - 1 request of thread shutdown (see dispatch_command() description) + DISPATCH_COMMAND_WOULDBLOCK - need to wait for commit notification */ -bool do_command(THD *thd) +dispatch_command_return do_command(THD *thd, bool blocking) { - bool return_value; + dispatch_command_return return_value; char *packet= 0; ulong packet_length; NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); + DBUG_ASSERT(!thd->async_state.pending_ops()); + + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME) + { + /* + Resuming previously suspended command. + Restore the state + */ + command = thd->async_state.m_command; + packet = thd->async_state.m_packet.str; + packet_length = (ulong)thd->async_state.m_packet.length; + goto resume; + } /* indicator of uninitialized lex => normal flow of errors handling @@ -1240,12 +1264,12 @@ bool do_command(THD *thd) if (net->error != 3) { - return_value= TRUE; // We have to close it. + return_value= DISPATCH_COMMAND_ERROR; // We have to close it. goto out; } net->error= 0; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; goto out; } @@ -1302,7 +1326,7 @@ bool do_command(THD *thd) MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; @@ -1328,7 +1352,7 @@ bool do_command(THD *thd) thd->m_statement_psi= NULL; thd->m_digest= NULL; - return_value= FALSE; + return_value= DISPATCH_COMMAND_SUCCESS; wsrep_after_command_before_result(thd); goto out; } @@ -1339,10 +1363,19 @@ bool do_command(THD *thd) DBUG_ASSERT(packet_length); DBUG_ASSERT(!thd->apc_target.is_enabled()); + +resume: return_value= dispatch_command(command, thd, packet+1, - (uint) (packet_length-1)); - DBUG_ASSERT(!thd->apc_target.is_enabled()); + (uint) (packet_length-1), blocking); + if (return_value == DISPATCH_COMMAND_WOULDBLOCK) + { + /* Store current state for suspend/resume in threadpool*/ + thd->async_state.m_command= command; + thd->async_state.m_packet={packet,packet_length}; + DBUG_RETURN(return_value); + } + DBUG_ASSERT(!thd->apc_target.is_enabled()); out: thd->lex->restore_set_statement_var(); /* The statement instrumentation must be closed in all cases. */ @@ -1499,8 +1532,8 @@ public: 1 request of thread shutdown, i. e. if command is COM_QUIT/COM_SHUTDOWN */ -bool dispatch_command(enum enum_server_command command, THD *thd, - char* packet, uint packet_length) +dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd, + char* packet, uint packet_length, bool blocking) { NET *net= &thd->net; bool error= 0; @@ -1512,6 +1545,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, "<?>"))); bool drop_more_results= 0; + if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME) + { + thd->async_state.m_state = thd_async_state::enum_async_state::NONE; + goto resume; + } + /* keep it withing 1 byte */ compile_time_assert(COM_END == 255); @@ -2242,6 +2281,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, general_log_print(thd, command, NullS); my_eof(thd); break; + case COM_SLEEP: case COM_CONNECT: // Impossible here case COM_TIME: // Impossible from client @@ -2255,7 +2295,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd, } dispatch_end: - do_end_of_statement= true; + /* + For the threadpool i.e if non-blocking call, if not all async operations + are finished, return without cleanup. The cleanup will be done on + later, when command execution is resumed. + */ + if (!blocking && !error && thd->async_state.pending_ops()) + { + DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK); + } + +resume: + #ifdef WITH_WSREP /* Next test should really be WSREP(thd), but that causes a failure when doing @@ -2367,7 +2418,7 @@ dispatch_end: /* Check that some variables are reset properly */ DBUG_ASSERT(thd->abort_on_warning == 0); thd->lex->restore_set_statement_var(); - DBUG_RETURN(error); + DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS); } static bool slow_filter_masked(THD *thd, ulonglong mask) |