summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvinchen <vinchen13@gmail.com>2016-09-19 17:23:23 +0800
committerKristian Nielsen <knielsen@knielsen-hq.org>2016-10-19 13:51:08 +0200
commit43789901c7887eefda1e965c9d7571dae89bd911 (patch)
tree06b21bc8e229c7e61110cbd3743c822e70325378
parent8eb0f5ca1a3b62d144668ebf960a93a7a67138db (diff)
downloadmariadb-git-43789901c7887eefda1e965c9d7571dae89bd911.tar.gz
Control the binlog read speed for compressed protocol
-rw-r--r--include/mysql_com.h1
-rw-r--r--sql/net_serv.cc116
-rw-r--r--sql/slave.cc17
3 files changed, 71 insertions, 63 deletions
diff --git a/include/mysql_com.h b/include/mysql_com.h
index 461800f3ce7..fe78e922ffb 100644
--- a/include/mysql_com.h
+++ b/include/mysql_com.h
@@ -424,6 +424,7 @@ typedef struct st_net {
Pointer to query object in query cache, do not equal NULL (0) for
queries in cache that have not stored its results yet
*/
+ unsigned long real_network_read_len; // the my_real_read length for each package
#endif
void *thd; /* Used by MariaDB server to avoid calling current_thd */
unsigned int last_errno;
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index fccc947f3f1..44b8757ad58 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -1142,6 +1142,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
MYSQL_NET_READ_START();
+ net->real_network_read_len = 0;
#ifdef HAVE_COMPRESS
if (!net->compress)
{
@@ -1154,17 +1155,19 @@ my_net_read_packet(NET *net, my_bool read_from_server)
size_t total_length= 0;
do
{
- net->where_b += len;
- total_length += len;
- len = my_real_read(net,&complen, 0);
+ net->where_b += len;
+ total_length += len;
+ len = my_real_read(net,&complen, 0);
} while (len == MAX_PACKET_LENGTH);
if (len != packet_error)
- len+= total_length;
+ len+= total_length;
net->where_b = save_pos;
}
net->read_pos = net->buff + net->where_b;
- if (len != packet_error)
+ if (len != packet_error) {
net->read_pos[len]=0; /* Safeguard for mysql_use_result */
+ net->real_network_read_len = len;
+ }
MYSQL_NET_READ_DONE(0, len);
return len;
#ifdef HAVE_COMPRESS
@@ -1182,7 +1185,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
{
buf_length= net->buf_length; /* Data left in old packet */
first_packet_offset= start_of_packet= (net->buf_length -
- net->remain_in_buf);
+ net->remain_in_buf);
/* Restore the character that was overwritten by the end 0 */
net->buff[start_of_packet]= net->save_char;
}
@@ -1197,81 +1200,82 @@ my_net_read_packet(NET *net, my_bool read_from_server)
if (buf_length - start_of_packet >= NET_HEADER_SIZE)
{
- read_length = uint3korr(net->buff+start_of_packet);
- if (!read_length)
- {
- /* End of multi-byte packet */
- start_of_packet += NET_HEADER_SIZE;
- break;
- }
- if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
- {
- if (multi_byte_packet)
- {
- /* Remove packet header for second packet */
- memmove(net->buff + first_packet_offset + start_of_packet,
- net->buff + first_packet_offset + start_of_packet +
- NET_HEADER_SIZE,
- buf_length - start_of_packet);
- start_of_packet += read_length;
- buf_length -= NET_HEADER_SIZE;
- }
- else
- start_of_packet+= read_length + NET_HEADER_SIZE;
+ read_length = uint3korr(net->buff+start_of_packet);
+ if (!read_length)
+ {
+ /* End of multi-byte packet */
+ start_of_packet += NET_HEADER_SIZE;
+ break;
+ }
+ if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
+ {
+ if (multi_byte_packet)
+ {
+ /* Remove packet header for second packet */
+ memmove(net->buff + first_packet_offset + start_of_packet,
+ net->buff + first_packet_offset + start_of_packet +
+ NET_HEADER_SIZE,
+ buf_length - start_of_packet);
+ start_of_packet += read_length;
+ buf_length -= NET_HEADER_SIZE;
+ }
+ else
+ start_of_packet+= read_length + NET_HEADER_SIZE;
- if (read_length != MAX_PACKET_LENGTH) /* last package */
- {
- multi_byte_packet= 0; /* No last zero len packet */
- break;
- }
- multi_byte_packet= NET_HEADER_SIZE;
- /* Move data down to read next data packet after current one */
- if (first_packet_offset)
- {
- memmove(net->buff,net->buff+first_packet_offset,
- buf_length-first_packet_offset);
- buf_length-=first_packet_offset;
- start_of_packet -= first_packet_offset;
- first_packet_offset=0;
- }
- continue;
- }
+ if (read_length != MAX_PACKET_LENGTH) /* last package */
+ {
+ multi_byte_packet= 0; /* No last zero len packet */
+ break;
+ }
+ multi_byte_packet= NET_HEADER_SIZE;
+ /* Move data down to read next data packet after current one */
+ if (first_packet_offset)
+ {
+ memmove(net->buff,net->buff+first_packet_offset,
+ buf_length-first_packet_offset);
+ buf_length-=first_packet_offset;
+ start_of_packet -= first_packet_offset;
+ first_packet_offset=0;
+ }
+ continue;
+ }
}
/* Move data down to read next data packet after current one */
if (first_packet_offset)
{
- memmove(net->buff,net->buff+first_packet_offset,
- buf_length-first_packet_offset);
- buf_length-=first_packet_offset;
- start_of_packet -= first_packet_offset;
- first_packet_offset=0;
+ memmove(net->buff,net->buff+first_packet_offset,
+ buf_length-first_packet_offset);
+ buf_length-=first_packet_offset;
+ start_of_packet -= first_packet_offset;
+ first_packet_offset=0;
}
net->where_b=buf_length;
if ((packet_len = my_real_read(net,&complen, read_from_server))
- == packet_error)
+ == packet_error)
{
MYSQL_NET_READ_DONE(1, 0);
- return packet_error;
+ return packet_error;
}
read_from_server= 0;
if (my_uncompress(net->buff + net->where_b, packet_len,
- &complen))
+ &complen))
{
- net->error= 2; /* caller will close socket */
+ net->error= 2; /* caller will close socket */
net->last_errno= ER_NET_UNCOMPRESS_ERROR;
- MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0));
+ MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0));
MYSQL_NET_READ_DONE(1, 0);
- return packet_error;
+ return packet_error;
}
buf_length+= complen;
+ net->real_network_read_len += packet_len;
}
net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
net->buf_length= buf_length;
net->remain_in_buf= (ulong) (buf_length - start_of_packet);
len = ((ulong) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE -
- multi_byte_packet);
+ multi_byte_packet);
net->save_char= net->read_pos[len]; /* Must be saved */
net->read_pos[len]=0; /* Safeguard for mysql_use_result */
}
diff --git a/sql/slave.cc b/sql/slave.cc
index b3bc8ba9d28..96f2479e480 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3308,13 +3308,14 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
try a reconnect. We do not want to print anything to
the error log in this case because this a anormal
event in an idle server.
+ network_read_len get the real network read length in VIO, especially using compressed protocol
RETURN VALUES
'packet_error' Error
number Length of packet
*/
-static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
+static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ulong* network_read_len)
{
ulong len;
DBUG_ENTER("read_event");
@@ -3356,6 +3357,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
DBUG_RETURN(packet_error);
}
+ *network_read_len = mysql->net.real_network_read_len ;
+
DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d",
len, mysql->net.read_pos[4]));
DBUG_RETURN(len - 1);
@@ -4420,7 +4423,7 @@ connected:
ulonglong tokenamount = opt_read_binlog_speed_limit*1024;
while (!io_slave_killed(mi))
{
- ulong event_len;
+ ulong event_len, network_read_len = 0;
/*
We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The
@@ -4428,7 +4431,7 @@ connected:
we're in fact receiving nothing.
*/
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
- event_len= read_event(mysql, mi, &suppress_warnings);
+ event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len);
if (check_io_slave_killed(mi, NullS))
goto err;
@@ -4493,13 +4496,13 @@ Stopping slave I/O thread due to out-of-memory error from master");
ulonglong currenttime = my_micro_time()/1000;
tokenamount += (currenttime - lastchecktime)*read_binlog_speed_limit*1024/1000;
lastchecktime = currenttime;
- if(tokenamount < event_len)
+ if(tokenamount < network_read_len)
{
- ulonglong micro_sleeptime = 1000*1000*(event_len - tokenamount) / (read_binlog_speed_limit * 1024);
+ ulonglong micro_sleeptime = 1000*1000*(network_read_len - tokenamount) / (read_binlog_speed_limit * 1024);
my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second
}
- }while(tokenamount < event_len);
- tokenamount -= event_len;
+ }while(tokenamount < network_read_len);
+ tokenamount -= network_read_len;
}
/* XXX: 'synced' should be updated by queue_event to indicate