summaryrefslogtreecommitdiff
path: root/gear/__init__.py
diff options
context:
space:
mode:
authorClark Boylan <clark.boylan@gmail.com>2014-12-08 16:17:39 -0800
committerClark Boylan <clark.boylan@gmail.com>2014-12-19 14:46:21 -0800
commit3495c55a114ad38a86e5b74defd819a68f7c0e8c (patch)
tree176394ceef385153054336c631015874ba556f68 /gear/__init__.py
parent5d9f44fd8173db3140b9fc04a93ab01b7d7db154 (diff)
downloadgear-3495c55a114ad38a86e5b74defd819a68f7c0e8c.tar.gz
Read more bytes per readPacket iteration
readPacket() was calling conn.recv(1) and reading one byte at a time. This is probably not great for the python scheduler and we can get better performance reading up to 4k bytes at a time. Rewrite readPacket to read up to 4096 bytes in each conn.recv() call. Note this requires a change to how client/workers perform blocking reads. They must read packets until there are no packets to process whenever a connection has a POLLIN event. If they do not do this those connections may never raise POLLIN again and client/workers will lock up. Change-Id: I3b497bc44f6d068366c6ca00d9b9fae41cb9371d
Diffstat (limited to 'gear/__init__.py')
-rw-r--r--gear/__init__.py131
1 files changed, 86 insertions, 45 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index f27ebc5..0cab110 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -135,6 +135,7 @@ class Connection(object):
self.use_ssl = True
self.input_buffer = b''
+ self.need_bytes = False
self.echo_lock = threading.Lock()
self._init()
@@ -202,6 +203,8 @@ class Connection(object):
self.conn = s
self.connected = True
self.connect_time = time.time()
+ self.input_buffer = b''
+ self.need_bytes = False
def disconnect(self):
"""Disconnect from the server and remove all associated state
@@ -290,41 +293,55 @@ class Connection(object):
ptype = None
admin = None
admin_request = None
- packet = self.input_buffer
+ need_bytes = self.need_bytes
+ raw_bytes = self.input_buffer
try:
while True:
try:
- c = self._readRawBytes(1)
+ if not raw_bytes or need_bytes:
+ segment = self._readRawBytes(4096)
+ if not segment:
+ # This occurs when the connection is closed. The
+ # the connect method will reset input_buffer and
+ # need_bytes for us.
+ return None
+ raw_bytes += segment
+ need_bytes = False
except socket.error as e:
if e.errno == errno.EAGAIN:
if admin_request:
self._putAdminRequest(admin_request)
raise
- if not c:
- packet = b''
- return None
- packet += c
if admin is None:
- if packet[0] == b'\x00':
+ if raw_bytes[0] == b'\x00':
admin = False
else:
admin = True
admin_request = self._getAdminRequest()
if admin:
- if admin_request.isComplete(packet):
- packet = b''
+ complete, remainder = admin_request.isComplete(raw_bytes)
+ raw_bytes = remainder
+ if complete:
return admin_request
else:
- if code is None and len(packet) >= 12:
+ length = len(raw_bytes)
+ if code is None and length >= 12:
code, ptype, datalen = struct.unpack('!4sii',
- packet[:12])
- if len(packet) == datalen + 12:
- p = Packet(code, ptype, packet[12:],
+ raw_bytes[:12])
+ if length >= datalen + 12:
+ end = 12 + datalen
+ p = Packet(code, ptype, raw_bytes[12:end],
connection=self)
- packet = b''
+ raw_bytes = raw_bytes[end:]
return p
+ # If we don't return a packet above then we need more data
+ need_bytes = True
finally:
- self.input_buffer = packet
+ self.input_buffer = raw_bytes
+ self.need_bytes = need_bytes
+
+ def hasPendingData(self):
+ return self.input_buffer != b''
def sendAdminRequest(self, request, timeout=90):
"""Send an administrative request to the server.
@@ -439,13 +456,22 @@ class AdminRequest(object):
return cmd
def isComplete(self, data):
- if (data[-3:] == b'\n.\n' or
- data[-5:] == b'\r\n.\r\n' or
- data == b'.\n' or
- data == b'.\r\n'):
- self.response = data
- return True
- return False
+ x = -1
+ end_index_newline = data.find(b'\n.\n')
+ end_index_return = data.find(b'\r\n.\r\n')
+ if end_index_newline != -1:
+ x = end_index_newline + 3
+ elif end_index_return != -1:
+ x = end_index_return + 5
+ elif data.startswith(b'.\n'):
+ x = 2
+ elif data.startswith(b'.\r\n'):
+ x = 3
+ if x != -1:
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, data)
def setComplete(self):
self.wait_event.set()
@@ -504,10 +530,13 @@ class CancelJobAdminRequest(AdminRequest):
super(CancelJobAdminRequest, self).__init__(handle)
def isComplete(self, data):
- if data[-1:] == b'\n':
- self.response = data
- return True
- return False
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ x = end_index_newline + 1
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, data)
class VersionAdminRequest(AdminRequest):
@@ -522,10 +551,13 @@ class VersionAdminRequest(AdminRequest):
super(VersionAdminRequest, self).__init__()
def isComplete(self, data):
- if data[-1:] == b'\n':
- self.response = data
- return True
- return False
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ x = end_index_newline + 1
+ self.response = data[:x]
+ return (True, data[x:])
+ else:
+ return (False, data)
class WorkersAdminRequest(AdminRequest):
@@ -828,17 +860,22 @@ class BaseClientServer(object):
return
conn = conn_dict[fd]
if event & select.POLLIN:
- self.log.debug("Processing input on %s" % conn)
- p = conn.readPacket()
- if p:
- if isinstance(p, Packet):
- self.handlePacket(p)
+ # Process all packets that may have been read in this
+ # round of recv's by readPacket.
+ while True:
+ self.log.debug("Processing input on %s" % conn)
+ p = conn.readPacket()
+ if p:
+ if isinstance(p, Packet):
+ self.handlePacket(p)
+ else:
+ self.handleAdminRequest(p)
else:
- self.handleAdminRequest(p)
- else:
- self.log.debug("Received no data on %s" % conn)
- self._lostConnection(conn)
- return
+ self.log.debug("Received no data on %s" % conn)
+ self._lostConnection(conn)
+ return
+ if not conn.hasPendingData():
+ break
else:
self.log.debug("Received error event on %s" % conn)
self._lostConnection(conn)
@@ -2215,10 +2252,14 @@ class ServerAdminRequest(AdminRequest):
self.connection = connection
def isComplete(self, data):
- if data[-1:] == b'\n':
- self.command = data.strip()
- return True
- return False
+ end_index_newline = data.find(b'\n')
+ if end_index_newline != -1:
+ self.command = data[:end_index_newline]
+ # Remove newline from data
+ x = end_index_newline + 1
+ return (True, data[x:])
+ else:
+ return (False, data)
class NonBlockingConnection(Connection):
@@ -2235,7 +2276,6 @@ class NonBlockingConnection(Connection):
super(NonBlockingConnection, self).connect()
if self.connected and self.conn:
self.conn.setblocking(0)
- self.input_buffer = b''
def sendPacket(self, packet):
"""Append a packet to this connection's send queue. The Client or
@@ -2301,6 +2341,7 @@ class ServerConnection(NonBlockingConnection):
self.conn = conn
self.conn.setblocking(0)
self.input_buffer = b''
+ self.need_bytes = False
self.use_ssl = use_ssl
self.client_id = None
self.functions = set()