diff options
author | vinchen <vinchen13@gmail.com> | 2016-09-19 17:23:23 +0800 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-10-19 13:51:08 +0200 |
commit | 43789901c7887eefda1e965c9d7571dae89bd911 (patch) | |
tree | 06b21bc8e229c7e61110cbd3743c822e70325378 | |
parent | 8eb0f5ca1a3b62d144668ebf960a93a7a67138db (diff) | |
download | mariadb-git-43789901c7887eefda1e965c9d7571dae89bd911.tar.gz |
Control the binlog read speed for compressed protocol
-rw-r--r-- | include/mysql_com.h | 1 | ||||
-rw-r--r-- | sql/net_serv.cc | 116 | ||||
-rw-r--r-- | sql/slave.cc | 17 |
3 files changed, 71 insertions, 63 deletions
diff --git a/include/mysql_com.h b/include/mysql_com.h index 461800f3ce7..fe78e922ffb 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -424,6 +424,7 @@ typedef struct st_net { Pointer to query object in query cache, do not equal NULL (0) for queries in cache that have not stored its results yet */ + unsigned long real_network_read_len; // the my_real_read length for each package #endif void *thd; /* Used by MariaDB server to avoid calling current_thd */ unsigned int last_errno; diff --git a/sql/net_serv.cc b/sql/net_serv.cc index fccc947f3f1..44b8757ad58 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -1142,6 +1142,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) MYSQL_NET_READ_START(); + net->real_network_read_len = 0; #ifdef HAVE_COMPRESS if (!net->compress) { @@ -1154,17 +1155,19 @@ my_net_read_packet(NET *net, my_bool read_from_server) size_t total_length= 0; do { - net->where_b += len; - total_length += len; - len = my_real_read(net,&complen, 0); + net->where_b += len; + total_length += len; + len = my_real_read(net,&complen, 0); } while (len == MAX_PACKET_LENGTH); if (len != packet_error) - len+= total_length; + len+= total_length; net->where_b = save_pos; } net->read_pos = net->buff + net->where_b; - if (len != packet_error) + if (len != packet_error) { net->read_pos[len]=0; /* Safeguard for mysql_use_result */ + net->real_network_read_len = len; + } MYSQL_NET_READ_DONE(0, len); return len; #ifdef HAVE_COMPRESS @@ -1182,7 +1185,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) { buf_length= net->buf_length; /* Data left in old packet */ first_packet_offset= start_of_packet= (net->buf_length - - net->remain_in_buf); + net->remain_in_buf); /* Restore the character that was overwritten by the end 0 */ net->buff[start_of_packet]= net->save_char; } @@ -1197,81 +1200,82 @@ my_net_read_packet(NET *net, my_bool read_from_server) if (buf_length - start_of_packet >= NET_HEADER_SIZE) { - read_length = uint3korr(net->buff+start_of_packet); - if (!read_length) - { - /* End of multi-byte packet */ - start_of_packet += NET_HEADER_SIZE; - break; - } - if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet) - { - if (multi_byte_packet) - { - /* Remove packet header for second packet */ - memmove(net->buff + first_packet_offset + start_of_packet, - net->buff + first_packet_offset + start_of_packet + - NET_HEADER_SIZE, - buf_length - start_of_packet); - start_of_packet += read_length; - buf_length -= NET_HEADER_SIZE; - } - else - start_of_packet+= read_length + NET_HEADER_SIZE; + read_length = uint3korr(net->buff+start_of_packet); + if (!read_length) + { + /* End of multi-byte packet */ + start_of_packet += NET_HEADER_SIZE; + break; + } + if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet) + { + if (multi_byte_packet) + { + /* Remove packet header for second packet */ + memmove(net->buff + first_packet_offset + start_of_packet, + net->buff + first_packet_offset + start_of_packet + + NET_HEADER_SIZE, + buf_length - start_of_packet); + start_of_packet += read_length; + buf_length -= NET_HEADER_SIZE; + } + else + start_of_packet+= read_length + NET_HEADER_SIZE; - if (read_length != MAX_PACKET_LENGTH) /* last package */ - { - multi_byte_packet= 0; /* No last zero len packet */ - break; - } - multi_byte_packet= NET_HEADER_SIZE; - /* Move data down to read next data packet after current one */ - if (first_packet_offset) - { - memmove(net->buff,net->buff+first_packet_offset, - buf_length-first_packet_offset); - buf_length-=first_packet_offset; - start_of_packet -= first_packet_offset; - first_packet_offset=0; - } - continue; - } + if (read_length != MAX_PACKET_LENGTH) /* last package */ + { + multi_byte_packet= 0; /* No last zero len packet */ + break; + } + multi_byte_packet= NET_HEADER_SIZE; + /* Move data down to read next data packet after current one */ + if (first_packet_offset) + { + memmove(net->buff,net->buff+first_packet_offset, + buf_length-first_packet_offset); + buf_length-=first_packet_offset; + start_of_packet -= first_packet_offset; + first_packet_offset=0; + } + continue; + } } /* Move data down to read next data packet after current one */ if (first_packet_offset) { - memmove(net->buff,net->buff+first_packet_offset, - buf_length-first_packet_offset); - buf_length-=first_packet_offset; - start_of_packet -= first_packet_offset; - first_packet_offset=0; + memmove(net->buff,net->buff+first_packet_offset, + buf_length-first_packet_offset); + buf_length-=first_packet_offset; + start_of_packet -= first_packet_offset; + first_packet_offset=0; } net->where_b=buf_length; if ((packet_len = my_real_read(net,&complen, read_from_server)) - == packet_error) + == packet_error) { MYSQL_NET_READ_DONE(1, 0); - return packet_error; + return packet_error; } read_from_server= 0; if (my_uncompress(net->buff + net->where_b, packet_len, - &complen)) + &complen)) { - net->error= 2; /* caller will close socket */ + net->error= 2; /* caller will close socket */ net->last_errno= ER_NET_UNCOMPRESS_ERROR; - MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0)); + MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0)); MYSQL_NET_READ_DONE(1, 0); - return packet_error; + return packet_error; } buf_length+= complen; + net->real_network_read_len += packet_len; } net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE; net->buf_length= buf_length; net->remain_in_buf= (ulong) (buf_length - start_of_packet); len = ((ulong) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE - - multi_byte_packet); + multi_byte_packet); net->save_char= net->read_pos[len]; /* Must be saved */ net->read_pos[len]=0; /* Safeguard for mysql_use_result */ } diff --git a/sql/slave.cc b/sql/slave.cc index b3bc8ba9d28..96f2479e480 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3308,13 +3308,14 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, try a reconnect. We do not want to print anything to the error log in this case because this a anormal event in an idle server. + network_read_len get the real network read length in VIO, especially using compressed protocol RETURN VALUES 'packet_error' Error number Length of packet */ -static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) +static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ulong* network_read_len) { ulong len; DBUG_ENTER("read_event"); @@ -3356,6 +3357,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) DBUG_RETURN(packet_error); } + *network_read_len = mysql->net.real_network_read_len ; + DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", len, mysql->net.read_pos[4])); DBUG_RETURN(len - 1); @@ -4420,7 +4423,7 @@ connected: ulonglong tokenamount = opt_read_binlog_speed_limit*1024; while (!io_slave_killed(mi)) { - ulong event_len; + ulong event_len, network_read_len = 0; /* We say "waiting" because read_event() will wait if there's nothing to read. But if there's something to read, it will not wait. The @@ -4428,7 +4431,7 @@ connected: we're in fact receiving nothing. */ THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); - event_len= read_event(mysql, mi, &suppress_warnings); + event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len); if (check_io_slave_killed(mi, NullS)) goto err; @@ -4493,13 +4496,13 @@ Stopping slave I/O thread due to out-of-memory error from master"); ulonglong currenttime = my_micro_time()/1000; tokenamount += (currenttime - lastchecktime)*read_binlog_speed_limit*1024/1000; lastchecktime = currenttime; - if(tokenamount < event_len) + if(tokenamount < network_read_len) { - ulonglong micro_sleeptime = 1000*1000*(event_len - tokenamount) / (read_binlog_speed_limit * 1024); + ulonglong micro_sleeptime = 1000*1000*(network_read_len - tokenamount) / (read_binlog_speed_limit * 1024); my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second } - }while(tokenamount < event_len); - tokenamount -= event_len; + }while(tokenamount < network_read_len); + tokenamount -= network_read_len; } /* XXX: 'synced' should be updated by queue_event to indicate |