summaryrefslogtreecommitdiff
path: root/sql/sql_parse.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_parse.cc')
-rw-r--r--sql/sql_parse.cc81
1 files changed, 66 insertions, 15 deletions
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 1d830f60da0..847ca3f5b40 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1155,24 +1155,48 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
/**
Read one command from connection and execute it (query or simple command).
- This function is called in loop from thread function.
+ This function is to be used by different schedulers (one-thread-per-connection,
+ pool-of-threads)
For profiling to work, it must never be called recursively.
+ @param thd - client connection context
+
+ @param blocking - wait for command to finish.
+ if true, will wait for outstanding operations (e.g group commit) to finish,
+ before returning. otherwise, it might return DISPATCH_COMMAND_WOULDBLOCK,
+ in this case another do_command() needs to be executed to finish the current
+ command.
+
+ @retval
+ DISPATCH_COMMAND_SUCCESS(0) - success
@retval
- 0 success
+ DISPATCH_COMMAND_ERROR request of THD shutdown (see dispatch_command() description)
@retval
- 1 request of thread shutdown (see dispatch_command() description)
+ DISPATCH_COMMAND_WOULDBLOCK - need to wait for commit notification
*/
-bool do_command(THD *thd)
+dispatch_command_return do_command(THD *thd, bool blocking)
{
- bool return_value;
+ dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+ DBUG_ASSERT(!thd->async_state.pending_ops());
+
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ {
+ /*
+ Resuming previously suspended command.
+ Restore the state
+ */
+ command = thd->async_state.m_command;
+ packet = thd->async_state.m_packet.str;
+ packet_length = (ulong)thd->async_state.m_packet.length;
+ goto resume;
+ }
/*
indicator of uninitialized lex => normal flow of errors handling
@@ -1240,12 +1264,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
- return_value= TRUE; // We have to close it.
+ return_value= DISPATCH_COMMAND_ERROR; // We have to close it.
goto out;
}
net->error= 0;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
@@ -1302,7 +1326,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
@@ -1328,7 +1352,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
@@ -1339,10 +1363,19 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
+
+resume:
return_value= dispatch_command(command, thd, packet+1,
- (uint) (packet_length-1));
- DBUG_ASSERT(!thd->apc_target.is_enabled());
+ (uint) (packet_length-1), blocking);
+ if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
+ {
+ /* Store current state for suspend/resume in threadpool*/
+ thd->async_state.m_command= command;
+ thd->async_state.m_packet={packet,packet_length};
+ DBUG_RETURN(return_value);
+ }
+ DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
thd->lex->restore_set_statement_var();
/* The statement instrumentation must be closed in all cases. */
@@ -1499,8 +1532,8 @@ public:
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length)
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
@@ -1512,6 +1545,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUME)
+ {
+ thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
+ goto resume;
+ }
+
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
@@ -2242,6 +2281,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
+
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
@@ -2255,7 +2295,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
dispatch_end:
- do_end_of_statement= true;
+ /*
+ For the threadpool i.e if non-blocking call, if not all async operations
+ are finished, return without cleanup. The cleanup will be done on
+ later, when command execution is resumed.
+ */
+ if (!blocking && !error && thd->async_state.pending_ops())
+ {
+ DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
+ }
+
+resume:
+
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
@@ -2367,7 +2418,7 @@ dispatch_end:
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
- DBUG_RETURN(error);
+ DBUG_RETURN(error?DISPATCH_COMMAND_ERROR: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)