diff options
author | Clark Boylan <clark.boylan@gmail.com> | 2014-12-08 16:17:39 -0800 |
---|---|---|
committer | Clark Boylan <clark.boylan@gmail.com> | 2014-12-19 14:46:21 -0800 |
commit | 3495c55a114ad38a86e5b74defd819a68f7c0e8c (patch) | |
tree | 176394ceef385153054336c631015874ba556f68 /gear/__init__.py | |
parent | 5d9f44fd8173db3140b9fc04a93ab01b7d7db154 (diff) | |
download | gear-3495c55a114ad38a86e5b74defd819a68f7c0e8c.tar.gz |
Read more bytes per readPacket iteration
readPacket() was calling conn.recv(1) and reading one byte at a time.
This is probably not great for the python scheduler and we can get
better performance reading up to 4k bytes at a time. Rewrite readPacket
to read up to 4096 bytes in each conn.recv() call.
Note this requires a change to how client/workers perform blocking
reads. They must read packets until there are no packets to process
whenever a connection has a POLLIN event. If they do not do this those
connections may never raise POLLIN again and client/workers will lock
up.
Change-Id: I3b497bc44f6d068366c6ca00d9b9fae41cb9371d
Diffstat (limited to 'gear/__init__.py')
-rw-r--r-- | gear/__init__.py | 131 |
1 files changed, 86 insertions, 45 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index f27ebc5..0cab110 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -135,6 +135,7 @@ class Connection(object): self.use_ssl = True self.input_buffer = b'' + self.need_bytes = False self.echo_lock = threading.Lock() self._init() @@ -202,6 +203,8 @@ class Connection(object): self.conn = s self.connected = True self.connect_time = time.time() + self.input_buffer = b'' + self.need_bytes = False def disconnect(self): """Disconnect from the server and remove all associated state @@ -290,41 +293,55 @@ class Connection(object): ptype = None admin = None admin_request = None - packet = self.input_buffer + need_bytes = self.need_bytes + raw_bytes = self.input_buffer try: while True: try: - c = self._readRawBytes(1) + if not raw_bytes or need_bytes: + segment = self._readRawBytes(4096) + if not segment: + # This occurs when the connection is closed. The + # the connect method will reset input_buffer and + # need_bytes for us. + return None + raw_bytes += segment + need_bytes = False except socket.error as e: if e.errno == errno.EAGAIN: if admin_request: self._putAdminRequest(admin_request) raise - if not c: - packet = b'' - return None - packet += c if admin is None: - if packet[0] == b'\x00': + if raw_bytes[0] == b'\x00': admin = False else: admin = True admin_request = self._getAdminRequest() if admin: - if admin_request.isComplete(packet): - packet = b'' + complete, remainder = admin_request.isComplete(raw_bytes) + raw_bytes = remainder + if complete: return admin_request else: - if code is None and len(packet) >= 12: + length = len(raw_bytes) + if code is None and length >= 12: code, ptype, datalen = struct.unpack('!4sii', - packet[:12]) - if len(packet) == datalen + 12: - p = Packet(code, ptype, packet[12:], + raw_bytes[:12]) + if length >= datalen + 12: + end = 12 + datalen + p = Packet(code, ptype, raw_bytes[12:end], connection=self) - packet = b'' + raw_bytes = raw_bytes[end:] return p + # If we don't return a packet above then we need more data + need_bytes = True finally: - self.input_buffer = packet + self.input_buffer = raw_bytes + self.need_bytes = need_bytes + + def hasPendingData(self): + return self.input_buffer != b'' def sendAdminRequest(self, request, timeout=90): """Send an administrative request to the server. @@ -439,13 +456,22 @@ class AdminRequest(object): return cmd def isComplete(self, data): - if (data[-3:] == b'\n.\n' or - data[-5:] == b'\r\n.\r\n' or - data == b'.\n' or - data == b'.\r\n'): - self.response = data - return True - return False + x = -1 + end_index_newline = data.find(b'\n.\n') + end_index_return = data.find(b'\r\n.\r\n') + if end_index_newline != -1: + x = end_index_newline + 3 + elif end_index_return != -1: + x = end_index_return + 5 + elif data.startswith(b'.\n'): + x = 2 + elif data.startswith(b'.\r\n'): + x = 3 + if x != -1: + self.response = data[:x] + return (True, data[x:]) + else: + return (False, data) def setComplete(self): self.wait_event.set() @@ -504,10 +530,13 @@ class CancelJobAdminRequest(AdminRequest): super(CancelJobAdminRequest, self).__init__(handle) def isComplete(self, data): - if data[-1:] == b'\n': - self.response = data - return True - return False + end_index_newline = data.find(b'\n') + if end_index_newline != -1: + x = end_index_newline + 1 + self.response = data[:x] + return (True, data[x:]) + else: + return (False, data) class VersionAdminRequest(AdminRequest): @@ -522,10 +551,13 @@ class VersionAdminRequest(AdminRequest): super(VersionAdminRequest, self).__init__() def isComplete(self, data): - if data[-1:] == b'\n': - self.response = data - return True - return False + end_index_newline = data.find(b'\n') + if end_index_newline != -1: + x = end_index_newline + 1 + self.response = data[:x] + return (True, data[x:]) + else: + return (False, data) class WorkersAdminRequest(AdminRequest): @@ -828,17 +860,22 @@ class BaseClientServer(object): return conn = conn_dict[fd] if event & select.POLLIN: - self.log.debug("Processing input on %s" % conn) - p = conn.readPacket() - if p: - if isinstance(p, Packet): - self.handlePacket(p) + # Process all packets that may have been read in this + # round of recv's by readPacket. + while True: + self.log.debug("Processing input on %s" % conn) + p = conn.readPacket() + if p: + if isinstance(p, Packet): + self.handlePacket(p) + else: + self.handleAdminRequest(p) else: - self.handleAdminRequest(p) - else: - self.log.debug("Received no data on %s" % conn) - self._lostConnection(conn) - return + self.log.debug("Received no data on %s" % conn) + self._lostConnection(conn) + return + if not conn.hasPendingData(): + break else: self.log.debug("Received error event on %s" % conn) self._lostConnection(conn) @@ -2215,10 +2252,14 @@ class ServerAdminRequest(AdminRequest): self.connection = connection def isComplete(self, data): - if data[-1:] == b'\n': - self.command = data.strip() - return True - return False + end_index_newline = data.find(b'\n') + if end_index_newline != -1: + self.command = data[:end_index_newline] + # Remove newline from data + x = end_index_newline + 1 + return (True, data[x:]) + else: + return (False, data) class NonBlockingConnection(Connection): @@ -2235,7 +2276,6 @@ class NonBlockingConnection(Connection): super(NonBlockingConnection, self).connect() if self.connected and self.conn: self.conn.setblocking(0) - self.input_buffer = b'' def sendPacket(self, packet): """Append a packet to this connection's send queue. The Client or @@ -2301,6 +2341,7 @@ class ServerConnection(NonBlockingConnection): self.conn = conn self.conn.setblocking(0) self.input_buffer = b'' + self.need_bytes = False self.use_ssl = use_ssl self.client_id = None self.functions = set() |