diff options
Diffstat (limited to 'sockets.py')
-rw-r--r-- | sockets.py | 61 |
1 files changed, 32 insertions, 29 deletions
@@ -30,6 +30,8 @@ import ssl # Local imports. import scheduling +from scheduling import context +from proactor import Future # Errno values indicating the connection was disconnected. _DISCONNECTED = frozenset((errno.ECONNRESET, @@ -61,9 +63,14 @@ class SocketTransport: returns b''. """ assert n >= 0, n + while True: try: - return self.sock.recv(n) + try: + return context._eventloop.proactor.recv(self.sock, n) + except Future as f: + yield from scheduling.block_future(f) + return f.result() except socket.error as err: if err.errno in _TRYAGAIN: pass @@ -71,7 +78,6 @@ class SocketTransport: return b'' else: raise # Unexpected, propagate. - yield from scheduling.block_r(self.sock.fileno()) def send(self, data): """COROUTINE; Send data to the socket, blocking until all written. @@ -80,7 +86,11 @@ class SocketTransport: """ while data: try: - n = self.sock.send(data) + try: + n = context._eventloop.proactor.send(self.sock, data) + except Future as f: + yield from scheduling.block_future(f) + n = f.result() except socket.error as err: if err.errno in _TRYAGAIN: pass @@ -93,8 +103,6 @@ class SocketTransport: if n == len(data): break data = data[n:] - continue - yield from scheduling.block_w(self.sock.fileno()) return True @@ -122,9 +130,9 @@ class SslTransport: try: self.sslsock.do_handshake() except ssl.SSLWantReadError: - yield from scheduling.block_r(self.sslsock.fileno()) + yield from scheduling.block_r(self.sslsock) except ssl.SSLWantWriteError: - yield from scheduling.block_w(self.sslsock.fileno()) + yield from scheduling.block_w(self.sslsock) else: break @@ -137,12 +145,12 @@ class SslTransport: try: return self.sslsock.recv(n) except ssl.SSLWantReadError: - yield from scheduling.block_r(self.sslsock.fileno()) + yield from scheduling.block_r(self.sslsock) except ssl.SSLWantWriteError: - yield from scheduling.block_w(self.sslsock.fileno()) + yield from scheduling.block_w(self.sslsock) except socket.error as err: if err.errno in _TRYAGAIN: - yield from scheduling.block_r(self.sock.fileno()) + yield from scheduling.block_r(self.sslsock) elif err.errno in _DISCONNECTED: # Can this happen? return b'' @@ -155,12 +163,12 @@ class SslTransport: try: n = self.sslsock.send(data) except ssl.SSLWantReadError: - yield from scheduling.block_r(self.sslsock.fileno()) + yield from scheduling.block_r(self.sslsock) except ssl.SSLWantWriteError: - yield from scheduling.block_w(self.sslsock.fileno()) + yield from scheduling.block_w(self.sslsock) except socket.error as err: if err.errno in _TRYAGAIN: - yield from scheduling.block_w(self.sock.fileno()) + yield from scheduling.block_w(self.sslsock) elif err.errno in _DISCONNECTED: return False else: @@ -243,14 +251,10 @@ class BufferedReader: def connect(sock, address): """COROUTINE: Connect a socket to an address.""" try: - sock.connect(address) - except socket.error as err: - if err.errno != errno.EINPROGRESS: - raise - yield from scheduling.block_w(sock.fileno()) - err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if err != 0: - raise IOError(err, 'Connection refused') + return context._eventloop.proactor.connect(sock, address) + except Future as f: + yield from scheduling.block_future(f) + return f.result() def getaddrinfo(host, port, af=0, socktype=0, proto=0): @@ -310,15 +314,14 @@ class Listener: """COROUTINE: Accept a connection.""" while True: try: - conn, addr = self.sock.accept() + try: + return context._eventloop.proactor.accept(self.sock) + except Future as f: + yield from scheduling.block_future(f) + return f.result() except socket.error as err: - if err.errno in _TRYAGAIN: - yield from scheduling.block_r(self.sock.fileno()) - else: - raise # Unexpected, propagate. - else: - conn.setblocking(False) - return conn, addr + if err.errno not in _TRYAGAIN: + raise def create_listener(host, port, af=0, socktype=0, proto=0, |