summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@hp.com>2014-12-23 09:36:01 -0800
committerJames E. Blair <jeblair@hp.com>2014-12-23 10:44:35 -0800
commitb14b9932ce9280e2672f7517b7416e2cdf1ac6a8 (patch)
treec45669306466ad5f3c99884a179a41b3eb62e5db
parenta11d0959b53e9e3a0eb0c26704fdb0308bd9df12 (diff)
downloadgear-b14b9932ce9280e2672f7517b7416e2cdf1ac6a8.tar.gz
Fix SSL non-blocking IO support
When an SSL socket is in non-blocking mode, EAGAIN is essentially masked by WANT_READ or WANT_WRITE. So treat EAGAIN, WANT_READ, and WANT_WRITE as equivalent. The reason for treating them as equivalent in non-blocking mode is that the following cases can happen: While reading, we might get: SSL_WANT_READ: This is the same as EAGAIN. SSL_WANT_WRITE: We need to wait until the edge poll triggers with POLLOUT signalling that we can write again. When that happens, we should retry this read call. While writing, we might get: SSL_WANT_READ: We need to wait until the edge poll triggers with POLLIN signalling that we can read again. When that happens, we should retry this write call. SSL_WANT_WRITE: This is the same as EAGAIN. This suggests that any time we receive either of the SSL_WANT responses, we should wait until either IN or OUT is edge-triggered, and always attempt to both read and write every time IN or OUT is triggered. To do this, mask WANT_READ, WANT_WRITE, and EAGAIN with a new exception that simply indicates internally to gear to retry the IO operation after the next poll trigger. Change-Id: Ib2be2fc97f13a95b4d082a54222013be4ecd4384
-rw-r--r--gear/__init__.py56
-rw-r--r--gear/tests/test_gear.py9
2 files changed, 35 insertions, 30 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index 0cab110..577767b 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -77,6 +77,10 @@ class DisconnectError(Exception):
pass
+class RetryIOError(Exception):
+ pass
+
+
def convert_to_bytes(data):
try:
data = data.encode('utf8')
@@ -267,15 +271,6 @@ class Connection(object):
else:
raise
break
-
- bytes_read = len(buff)
- if self.use_ssl and (bytes_read < bytes_to_read):
- remaining = self.conn.pending()
- while remaining and (bytes_read < bytes_to_read):
- buff += self.conn.recv(bytes_to_read - bytes_read)
- remaining = self.conn.pending()
- bytes_read = len(buff)
-
return buff
def _putAdminRequest(self, req):
@@ -307,10 +302,9 @@ class Connection(object):
return None
raw_bytes += segment
need_bytes = False
- except socket.error as e:
- if e.errno == errno.EAGAIN:
- if admin_request:
- self._putAdminRequest(admin_request)
+ except RetryIOError:
+ if admin_request:
+ self._putAdminRequest(admin_request)
raise
if admin is None:
if raw_bytes[0] == b'\x00':
@@ -2277,6 +2271,23 @@ class NonBlockingConnection(Connection):
if self.connected and self.conn:
self.conn.setblocking(0)
+ def _readRawBytes(self, bytes_to_read):
+ try:
+ buff = self.conn.recv(bytes_to_read)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ raise RetryIOError()
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ raise RetryIOError()
+ raise
+ except socket.error as e:
+ if e.errno == errno.EAGAIN:
+ # Read operation would block, we're done until
+ # epoll flags this connection again
+ raise RetryIOError()
+ raise
+ return buff
+
def sendPacket(self, packet):
"""Append a packet to this connection's send queue. The Client or
Server must manage actually sending the data.
@@ -2308,16 +2319,16 @@ class NonBlockingConnection(Connection):
r = self.conn.send(data)
except ssl.SSLError as e:
if e.errno == ssl.SSL_ERROR_WANT_READ:
- pass
+ raise RetryIOError()
elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
- pass
+ raise RetryIOError()
else:
raise
except socket.error as e:
if e.errno == errno.EAGAIN:
self.log.debug("Write operation on %s would block"
% self)
- return
+ raise RetryIOError()
raise
finally:
data = data[r:]
@@ -2514,12 +2525,10 @@ class Server(BaseClientServer):
self.log.debug("Processing input on %s" % conn)
try:
p = conn.readPacket()
- except socket.error as e:
- if e.errno == errno.EAGAIN:
- # Read operation would block, we're done until
- # epoll flags this connection again
- return
- raise
+ except RetryIOError:
+ # Read operation would block, we're done until
+ # epoll flags this connection again
+ return
if p:
if isinstance(p, Packet):
self.handlePacket(p)
@@ -2544,9 +2553,8 @@ class Server(BaseClientServer):
self.log.debug("Received error event on %s: %s" % (
conn, event))
raise DisconnectError()
- if event & select.POLLIN:
+ if event & (select.POLLIN | select.POLLOUT):
self.readFromConnection(conn)
- if event & select.POLLOUT:
self.writeToConnection(conn)
except socket.error as e:
if e.errno == errno.ECONNRESET:
diff --git a/gear/tests/test_gear.py b/gear/tests/test_gear.py
index 6f8596a..220ca53 100644
--- a/gear/tests/test_gear.py
+++ b/gear/tests/test_gear.py
@@ -131,12 +131,10 @@ class TestServerConnection(tests.BaseTestCase):
def assertEndOfData(self):
# End of data
- with testtools.ExpectedException(
- socket.error, ".* Resource temporarily unavailable"):
+ with testtools.ExpectedException(gear.RetryIOError):
self.conn.readPacket()
# Still end of data
- with testtools.ExpectedException(
- socket.error, ".* Resource temporarily unavailable"):
+ with testtools.ExpectedException(gear.RetryIOError):
self.conn.readPacket()
def test_readPacket_admin(self):
@@ -192,8 +190,7 @@ class TestServerConnection(tests.BaseTestCase):
self.socket._set_data([p1.toBinary()[:1448],
p1.toBinary()[1448:] + p2.toBinary()])
# First half of first packet
- with testtools.ExpectedException(
- socket.error, ".* Resource temporarily unavailable"):
+ with testtools.ExpectedException(gear.RetryIOError):
self.conn.readPacket()
# Second half of first packet
r1 = self.conn.readPacket()