summaryrefslogtreecommitdiff
path: root/sql/sql_parse.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_parse.cc')
-rw-r--r--sql/sql_parse.cc317
1 files changed, 113 insertions, 204 deletions
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index cb960a89cb6..bbb880cb117 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -113,9 +113,7 @@
#include "wsrep_trans_observer.h" /* wsrep transaction hooks */
static bool wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
- Parser_state *parser_state,
- bool is_com_multi,
- bool is_next_command);
+ Parser_state *parser_state);
#endif /* WITH_WSREP */
/**
@@ -391,7 +389,7 @@ const LEX_CSTRING command_name[257]={
{ STRING_WITH_LEN("Slave_worker") }, //251
{ STRING_WITH_LEN("Slave_IO") }, //252
{ STRING_WITH_LEN("Slave_SQL") }, //253
- { STRING_WITH_LEN("Com_multi") }, //254
+ { 0, 0},
{ STRING_WITH_LEN("Error") } // Last command number 255
};
@@ -490,7 +488,7 @@ void init_update_queries(void)
memset(server_command_flags, 0, sizeof(server_command_flags));
server_command_flags[COM_STATISTICS]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK;
- server_command_flags[COM_PING]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK | CF_NO_COM_MULTI;
+ server_command_flags[COM_PING]= CF_SKIP_QUERY_ID | CF_SKIP_QUESTIONS | CF_SKIP_WSREP_CHECK;
server_command_flags[COM_QUIT]= CF_SKIP_WSREP_CHECK;
server_command_flags[COM_PROCESS_INFO]= CF_SKIP_WSREP_CHECK;
@@ -519,7 +517,6 @@ void init_update_queries(void)
server_command_flags[COM_STMT_EXECUTE]= CF_SKIP_WSREP_CHECK;
server_command_flags[COM_STMT_SEND_LONG_DATA]= CF_SKIP_WSREP_CHECK;
server_command_flags[COM_REGISTER_SLAVE]= CF_SKIP_WSREP_CHECK;
- server_command_flags[COM_MULTI]= CF_SKIP_WSREP_CHECK | CF_NO_COM_MULTI;
/* Initialize the sql command flags array. */
memset(sql_command_flags, 0, sizeof(sql_command_flags));
@@ -958,7 +955,7 @@ void execute_init_command(THD *thd, LEX_STRING *init_command,
save_vio= thd->net.vio;
thd->net.vio= 0;
thd->clear_error(1);
- dispatch_command(COM_QUERY, thd, buf, (uint)len, FALSE, FALSE);
+ dispatch_command(COM_QUERY, thd, buf, (uint)len);
thd->client_capabilities= save_client_capabilities;
thd->net.vio= save_vio;
@@ -1084,7 +1081,7 @@ int bootstrap(MYSQL_FILE *file)
break;
}
- mysql_parse(thd, thd->query(), length, &parser_state, FALSE, FALSE);
+ mysql_parse(thd, thd->query(), length, &parser_state);
bootstrap_error= thd->is_error();
thd->protocol->end_statement();
@@ -1132,23 +1129,6 @@ void cleanup_items(Item *item)
DBUG_VOID_RETURN;
}
-static enum enum_server_command fetch_command(THD *thd, char *packet)
-{
- enum enum_server_command
- command= (enum enum_server_command) (uchar) packet[0];
- DBUG_ENTER("fetch_command");
-
- if (command >= COM_END ||
- (command >= COM_MDB_GAP_BEG && command <= COM_MDB_GAP_END))
- command= COM_END; // Wrong command
-
- DBUG_PRINT("info",("Command on %s = %d (%s)",
- vio_description(thd->net.vio), command,
- command_name[command].str));
- DBUG_RETURN(command);
-}
-
-
#ifdef WITH_WSREP
static bool wsrep_tables_accessible_when_detached(const TABLE_LIST *tables)
{
@@ -1170,28 +1150,73 @@ static bool wsrep_command_no_result(char command)
}
#endif /* WITH_WSREP */
#ifndef EMBEDDED_LIBRARY
+static enum enum_server_command fetch_command(THD *thd, char *packet)
+{
+ enum enum_server_command
+ command= (enum enum_server_command) (uchar) packet[0];
+ DBUG_ENTER("fetch_command");
+
+ if (command >= COM_END ||
+ (command >= COM_MDB_GAP_BEG && command <= COM_MDB_GAP_END))
+ command= COM_END; // Wrong command
+
+ DBUG_PRINT("info",("Command on %s = %d (%s)",
+ vio_description(thd->net.vio), command,
+ command_name[command].str));
+ DBUG_RETURN(command);
+}
/**
Read one command from connection and execute it (query or simple command).
- This function is called in loop from thread function.
+ This function is to be used by different schedulers (one-thread-per-connection,
+ pool-of-threads)
For profiling to work, it must never be called recursively.
+ @param thd - client connection context
+
+ @param blocking - wait for command to finish.
+ if false (nonblocking), then the function might
+ return when command is "half-finished", with
+ DISPATCH_COMMAND_WOULDBLOCK.
+ Currenly, this can *only* happen when using
+ threadpool. The command will resume, after all outstanding
+ async operations (i.e group commit) finish.
+ Threadpool scheduler takes care of "resume".
+
+ @retval
+ DISPATCH_COMMAND_SUCCESS - success
@retval
- 0 success
+ DISPATCH_COMMAND_CLOSE_CONNECTION request of THD shutdown
+ (s. dispatch_command() description)
@retval
- 1 request of thread shutdown (see dispatch_command() description)
+ DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations
+ to finish. Only returned if parameter
+ 'blocking' is false.
*/
-bool do_command(THD *thd)
+dispatch_command_return do_command(THD *thd, bool blocking)
{
- bool return_value;
+ dispatch_command_return return_value;
char *packet= 0;
ulong packet_length;
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
+ DBUG_ASSERT(!thd->async_state.pending_ops());
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ /*
+ Resuming previously suspended command.
+ Restore the state
+ */
+ command = thd->async_state.m_command;
+ packet = thd->async_state.m_packet.str;
+ packet_length = (ulong)thd->async_state.m_packet.length;
+ goto resume;
+ }
+
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -1258,12 +1283,12 @@ bool do_command(THD *thd)
if (net->error != 3)
{
- return_value= TRUE; // We have to close it.
+ return_value= DISPATCH_COMMAND_CLOSE_CONNECTION; // We have to close it.
goto out;
}
net->error= 0;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
goto out;
}
@@ -1330,7 +1355,7 @@ bool do_command(THD *thd)
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
@@ -1356,7 +1381,7 @@ bool do_command(THD *thd)
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- return_value= FALSE;
+ return_value= DISPATCH_COMMAND_SUCCESS;
wsrep_after_command_before_result(thd);
goto out;
}
@@ -1367,8 +1392,18 @@ bool do_command(THD *thd)
DBUG_ASSERT(packet_length);
DBUG_ASSERT(!thd->apc_target.is_enabled());
+
+resume:
return_value= dispatch_command(command, thd, packet+1,
- (uint) (packet_length-1), FALSE, FALSE);
+ (uint) (packet_length-1), blocking);
+ if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
+ {
+ /* Save current state, and resume later.*/
+ thd->async_state.m_command= command;
+ thd->async_state.m_packet={packet,packet_length};
+ DBUG_RETURN(return_value);
+ }
+
DBUG_ASSERT(!thd->apc_target.is_enabled());
out:
@@ -1479,45 +1514,6 @@ static void wsrep_copy_query(THD *thd)
}
#endif /* WITH_WSREP */
-/**
- check COM_MULTI packet
-
- @param thd thread handle
- @param packet pointer on the packet of commands
- @param packet_length length of this packet
-
- @retval 0 - Error
- @retval # - Number of commands in the batch
-*/
-
-uint maria_multi_check(THD *thd, char *packet, size_t packet_length)
-{
- uint counter= 0;
- DBUG_ENTER("maria_multi_check");
- while (packet_length)
- {
- char *packet_start= packet;
- size_t subpacket_length= net_field_length((uchar **)&packet_start);
- size_t length_length= packet_start - packet;
- // length of command + 3 bytes where that length was stored
- DBUG_PRINT("info", ("sub-packet length: %zu + %zu command: %x",
- subpacket_length, length_length,
- packet_start[3]));
-
- if (subpacket_length == 0 ||
- (subpacket_length + length_length) > packet_length)
- {
- my_message(ER_UNKNOWN_COM_ERROR, ER_THD(thd, ER_UNKNOWN_COM_ERROR),
- MYF(0));
- DBUG_RETURN(0);
- }
-
- counter++;
- packet= packet_start + subpacket_length;
- packet_length-= (subpacket_length + length_length);
- }
- DBUG_RETURN(counter);
-}
#if defined(WITH_ARIA_STORAGE_ENGINE)
@@ -1554,8 +1550,13 @@ public:
@param packet_length length of packet + 1 (to show that data is
null-terminated) except for COM_SLEEP, where it
can be zero.
- @param is_com_multi recursive call from COM_MULTI
- @param is_next_command there will be more command in the COM_MULTI batch
+ @param blocking if false (nonblocking), then the function might
+ return when command is "half-finished", with
+ DISPATCH_COMMAND_WOULDBLOCK.
+ Currenly, this can *only* happen when using threadpool.
+ The current command will resume, after all outstanding
+ async operations (i.e group commit) finish.
+ Threadpool scheduler takes care of "resume".
@todo
set thd->lex->sql_command to SQLCOM_END here.
@@ -1568,9 +1569,8 @@ public:
1 request of thread shutdown, i. e. if command is
COM_QUIT/COM_SHUTDOWN
*/
-bool dispatch_command(enum enum_server_command command, THD *thd,
- char* packet, uint packet_length, bool is_com_multi,
- bool is_next_command)
+dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
+ char* packet, uint packet_length, bool blocking)
{
NET *net= &thd->net;
bool error= 0;
@@ -1582,6 +1582,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
"<?>")));
bool drop_more_results= 0;
+ if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ {
+ thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
+ goto resume;
+ }
+
/* keep it withing 1 byte */
compile_time_assert(COM_END == 255);
@@ -1651,14 +1657,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
beginning of each command.
*/
thd->server_status&= ~SERVER_STATUS_CLEAR_SET;
- if (is_next_command)
- {
- drop_more_results= !MY_TEST(thd->server_status &
- SERVER_MORE_RESULTS_EXISTS);
- thd->server_status|= SERVER_MORE_RESULTS_EXISTS;
- if (is_com_multi)
- thd->get_stmt_da()->set_skip_flush();
- }
if (unlikely(thd->security_ctx->password_expired &&
command != COM_QUERY &&
@@ -1873,8 +1871,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
if (WSREP(thd))
{
if (wsrep_mysql_parse(thd, thd->query(), thd->query_length(),
- &parser_state,
- is_com_multi, is_next_command))
+ &parser_state))
{
WSREP_DEBUG("Deadlock error for: %s", thd->query());
mysql_mutex_lock(&thd->LOCK_thd_data);
@@ -1886,8 +1883,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
else
#endif /* WITH_WSREP */
- mysql_parse(thd, thd->query(), thd->query_length(), &parser_state,
- is_com_multi, is_next_command);
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) &&
! thd->is_error())
@@ -1971,8 +1967,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
if (WSREP(thd))
{
if (wsrep_mysql_parse(thd, beginning_of_next_stmt,
- length, &parser_state,
- is_com_multi, is_next_command))
+ length, &parser_state))
{
WSREP_DEBUG("Deadlock error for: %s", thd->query());
mysql_mutex_lock(&thd->LOCK_thd_data);
@@ -1985,8 +1980,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
}
else
#endif /* WITH_WSREP */
- mysql_parse(thd, beginning_of_next_stmt, length, &parser_state,
- is_com_multi, is_next_command);
+ mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
}
@@ -2037,13 +2031,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
break;
}
packet= arg_end + 1;
- // thd->reset_for_next_command reset state => restore it
- if (is_next_command)
- {
- thd->server_status|= SERVER_MORE_RESULTS_EXISTS;
- if (is_com_multi)
- thd->get_stmt_da()->set_skip_flush();
- }
lex_start(thd);
/* Must be before we init the table list. */
@@ -2332,84 +2319,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
general_log_print(thd, command, NullS);
my_eof(thd);
break;
- case COM_MULTI:
- {
- uint counter;
- uint current_com= 0;
- DBUG_ASSERT(!is_com_multi);
- if (!(thd->client_capabilities & CLIENT_MULTI_RESULTS))
- {
- /* The client does not support multiple result sets being sent back */
- my_error(ER_COMMULTI_BADCONTEXT, MYF(0));
- break;
- }
-
- if (!(counter= maria_multi_check(thd, packet, packet_length)))
- break;
-
- {
- char *packet_start= packet;
- /* We have to store next length because it will be destroyed by '\0' */
- size_t next_subpacket_length= net_field_length((uchar **)&packet_start);
- size_t next_length_length= packet_start - packet;
- unsigned char *readbuff= net->buff;
-
- if (net_allocate_new_packet(net, thd, MYF(0)))
- break;
-
- PSI_statement_locker *save_locker= thd->m_statement_psi;
- sql_digest_state *save_digest= thd->m_digest;
- thd->m_statement_psi= NULL;
- thd->m_digest= NULL;
-
- while (packet_length)
- {
- current_com++;
- size_t subpacket_length= next_subpacket_length + next_length_length;
- size_t length_length= next_length_length;
- if (subpacket_length < packet_length)
- {
- packet_start= packet + subpacket_length;
- next_subpacket_length= net_field_length((uchar**)&packet_start);
- next_length_length= packet_start - (packet + subpacket_length);
- }
- /* safety like in do_command() */
- packet[subpacket_length]= '\0';
-
- enum enum_server_command subcommand=
- fetch_command(thd, (packet + length_length));
- if (server_command_flags[subcommand] & CF_NO_COM_MULTI)
- {
- my_error(ER_BAD_COMMAND_IN_MULTI, MYF(0),
- command_name[subcommand].str);
- goto com_multi_end;
- }
-
- if (dispatch_command(subcommand, thd, packet + (1 + length_length),
- (uint)(subpacket_length - (1 + length_length)), TRUE,
- (current_com != counter)))
- {
- DBUG_ASSERT(thd->is_error());
- goto com_multi_end;
- }
-
- DBUG_ASSERT(subpacket_length <= packet_length);
- packet+= subpacket_length;
- packet_length-= (uint)subpacket_length;
- }
-
-com_multi_end:
- thd->m_statement_psi= save_locker;
- thd->m_digest= save_digest;
-
- /* release old buffer */
- net_flush(net);
- DBUG_ASSERT(net->buff == net->write_pos); // nothing to send
- my_free(readbuff);
- }
- break;
- }
case COM_SLEEP:
case COM_CONNECT: // Impossible here
case COM_TIME: // Impossible from client
@@ -2423,7 +2333,18 @@ com_multi_end:
}
dispatch_end:
- do_end_of_statement= true;
+ /*
+ For the threadpool i.e if non-blocking call, if not all async operations
+ are finished, return without cleanup. The cleanup will be done on
+ later, when command execution is resumed.
+ */
+ if (!blocking && !error && thd->async_state.pending_ops())
+ {
+ DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
+ }
+
+resume:
+
#ifdef WITH_WSREP
/*
Next test should really be WSREP(thd), but that causes a failure when doing
@@ -2470,11 +2391,8 @@ dispatch_end:
thd_proc_info(thd, "Updating status");
/* Finalize server status flags after executing a command. */
thd->update_server_status();
- if (command != COM_MULTI)
- {
- thd->protocol->end_statement();
- query_cache_end_of_result(thd);
- }
+ thd->protocol->end_statement();
+ query_cache_end_of_result(thd);
}
if (drop_more_results)
thd->server_status&= ~SERVER_MORE_RESULTS_EXISTS;
@@ -2502,8 +2420,7 @@ dispatch_end:
thd->m_statement_psi= NULL;
thd->m_digest= NULL;
- if (!is_com_multi)
- thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory
+ thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory
thd->reset_kill_query(); /* Ensure that killed_errmsg is released */
/*
@@ -2531,7 +2448,7 @@ dispatch_end:
/* Check that some variables are reset properly */
DBUG_ASSERT(thd->abort_on_warning == 0);
thd->lex->restore_set_statement_var();
- DBUG_RETURN(error);
+ DBUG_RETURN(error?DISPATCH_COMMAND_CLOSE_CONNECTION: DISPATCH_COMMAND_SUCCESS);
}
static bool slow_filter_masked(THD *thd, ulonglong mask)
@@ -3856,6 +3773,11 @@ mysql_execute_command(THD *thd)
thd->set_query_timer();
#ifdef WITH_WSREP
+ /* Check wsrep_mode rules before command execution. */
+ if (WSREP(thd) &&
+ wsrep_thd_is_local(thd) && !wsrep_check_mode_before_cmd_execute(thd))
+ goto error;
+
/*
Always start a new transaction for a wsrep THD unless the
current command is DDL or explicit BEGIN. This will guarantee that
@@ -7852,9 +7774,7 @@ static void wsrep_prepare_for_autocommit_retry(THD* thd,
}
static bool wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
- Parser_state *parser_state,
- bool is_com_multi,
- bool is_next_command)
+ Parser_state *parser_state)
{
bool is_autocommit=
!thd->in_multi_stmt_transaction_mode() &&
@@ -7863,7 +7783,7 @@ static bool wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
do
{
retry_autocommit= false;
- mysql_parse(thd, rawbuf, length, parser_state, is_com_multi, is_next_command);
+ mysql_parse(thd, rawbuf, length, parser_state);
/*
Convert all ER_QUERY_INTERRUPTED errors to ER_LOCK_DEADLOCK
@@ -7969,15 +7889,10 @@ static bool wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
@param thd Current thread
@param rawbuf Begining of the query text
@param length Length of the query text
- @param[out] found_semicolon For multi queries, position of the character of
- the next query in the query text.
- @param is_next_command there will be more command in the COM_MULTI batch
*/
void mysql_parse(THD *thd, char *rawbuf, uint length,
- Parser_state *parser_state,
- bool is_com_multi,
- bool is_next_command)
+ Parser_state *parser_state)
{
DBUG_ENTER("mysql_parse");
DBUG_EXECUTE_IF("parser_debug", turn_parser_debug_on_MYSQLparse(););
@@ -8001,12 +7916,6 @@ void mysql_parse(THD *thd, char *rawbuf, uint length,
*/
lex_start(thd);
thd->reset_for_next_command();
- if (is_next_command)
- {
- thd->server_status|= SERVER_MORE_RESULTS_EXISTS;
- if (is_com_multi)
- thd->get_stmt_da()->set_skip_flush();
- }
if (query_cache_send_result_to_client(thd, rawbuf, length) <= 0)
{