diff options
Diffstat (limited to 'eventlet/greenio/py3.py')
| -rw-r--r-- | eventlet/greenio/py3.py | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/eventlet/greenio/py3.py b/eventlet/greenio/py3.py new file mode 100644 index 0000000..22a865e --- /dev/null +++ b/eventlet/greenio/py3.py @@ -0,0 +1,191 @@ +import _pyio as _original_pyio +import errno +import os as _original_os +import socket as _original_socket +from io import ( + BufferedRandom as _OriginalBufferedRandom, + BufferedReader as _OriginalBufferedReader, + BufferedWriter as _OriginalBufferedWriter, + DEFAULT_BUFFER_SIZE, + TextIOWrapper as _OriginalTextIOWrapper, + IOBase as _OriginalIOBase, +) +from types import FunctionType + +from eventlet.greenio.base import ( + _operation_on_closed_file, + greenpipe_doc, + set_nonblocking, + SOCKET_BLOCKING, +) +from eventlet.hubs import notify_close, notify_opened, IOClosed, trampoline +from eventlet.support import get_errno, six + +__all__ = ['_fileobject', 'GreenPipe'] + +# TODO get rid of this, it only seems like the original _fileobject +_fileobject = _original_socket.SocketIO + +# Large part of the following code is copied from the original +# eventlet.greenio module + + +class GreenFileIO(_OriginalIOBase): + def __init__(self, name, mode='r', closefd=True, opener=None): + if isinstance(name, int): + fileno = name + self._name = "<fd:%d>" % fileno + else: + assert isinstance(name, six.string_types) + with open(name, mode) as fd: + self._name = fd.name + fileno = _original_os.dup(fd.fileno()) + + notify_opened(fileno) + self._fileno = fileno + self._mode = mode + self._closed = False + set_nonblocking(self) + self._seekable = None + + @property + def closed(self): + return self._closed + + def seekable(self): + if self._seekable is None: + try: + _original_os.lseek(self._fileno, 0, _original_os.SEEK_CUR) + except IOError as e: + if get_errno(e) == errno.ESPIPE: + self._seekable = False + else: + raise + else: + self._seekable = True + + return self._seekable + + def readable(self): + return 'r' in self._mode or '+' in self._mode + + def writable(self): + return 'w' in self._mode or '+' in self._mode + + def fileno(self): + return self._fileno + + def read(self, buflen): + while True: + try: + return _original_os.read(self._fileno, buflen) + except OSError as e: + if get_errno(e) not in SOCKET_BLOCKING: + raise IOError(*e.args) + self._trampoline(self, read=True) + + def readinto(self, b): + up_to = len(b) + data = self.read(up_to) + bytes_read = len(data) + b[:bytes_read] = data + return bytes_read + + def isatty(self): + try: + return _original_os.isatty(self.fileno()) + except OSError as e: + raise IOError(*e.args) + + def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None): + if self._closed: + # Don't trampoline if we're already closed. + raise IOClosed() + try: + return trampoline(fd, read=read, write=write, timeout=timeout, + timeout_exc=timeout_exc, + mark_as_closed=self._mark_as_closed) + except IOClosed: + # Our fileno has been obsoleted. Defang ourselves to + # prevent spurious closes. + self._mark_as_closed() + raise + + def _mark_as_closed(self): + """ Mark this socket as being closed """ + self._closed = True + + def write(self, data): + while True: + try: + return _original_os.write(self._fileno, data) + except OSError as e: + if get_errno(e) not in SOCKET_BLOCKING: + raise IOError(*e.args) + else: + trampoline(self, write=True) + + def close(self): + _original_os.close(self._fileno) + notify_close(self._fileno) + self._closed = True + for method in [ + 'fileno', 'flush', 'isatty', 'next', 'read', 'readinto', + 'readline', 'readlines', 'seek', 'tell', 'truncate', + 'write', 'xreadlines', '__iter__', '__next__', 'writelines']: + setattr(self, method, _operation_on_closed_file) + + def truncate(self, size=-1): + if size == -1: + size = self.tell() + try: + rv = _original_os.ftruncate(self._fileno, size) + except OSError as e: + raise IOError(*e.args) + else: + self.seek(size) # move position&clear buffer + return rv + + def seek(self, offset, whence=_original_os.SEEK_SET): + try: + return _original_os.lseek(self._fileno, offset, whence) + except OSError as e: + raise IOError(*e.args) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +_open_environment = dict(globals()) +_open_environment.update(dict( + BufferedRandom=_OriginalBufferedRandom, + BufferedWriter=_OriginalBufferedWriter, + BufferedReader=_OriginalBufferedReader, + TextIOWrapper=_OriginalTextIOWrapper, + FileIO=GreenFileIO, + os=_original_os, +)) + +_open = FunctionType( + six.get_function_code(_original_pyio.open), + _open_environment, +) + + +def GreenPipe(name, mode="r", buffering=-1, encoding=None, errors=None, + newline=None, closefd=True, opener=None): + try: + fileno = name.fileno() + except AttributeError: + pass + else: + fileno = _original_os.dup(fileno) + name.close() + name = fileno + + return _open(name, mode, buffering, encoding, errors, newline, closefd, opener) + +GreenPipe.__doc__ = greenpipe_doc |
