diff options
Diffstat (limited to 'Lib/asyncio/windows_events.py')
-rw-r--r-- | Lib/asyncio/windows_events.py | 43 |
1 files changed, 26 insertions, 17 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 315455aa7c..7d0dbe9d01 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -29,6 +29,12 @@ INFINITE = 0xffffffff ERROR_CONNECTION_REFUSED = 1225 ERROR_CONNECTION_ABORTED = 1236 +# Initial delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_INIT_DELAY = 0.001 + +# Maximum delay in seconds for connect_pipe() before retrying to connect +CONNECT_PIPE_MAX_DELAY = 0.100 + class _OverlappedFuture(futures.Future): """Subclass of Future which represents an overlapped operation. @@ -495,25 +501,28 @@ class IocpProactor: return self._register(ov, pipe, finish_accept_pipe, register=False) - def connect_pipe(self, address): - ov = _overlapped.Overlapped(NULL) - ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address) - - def finish_connect_pipe(err, handle, ov): - # err, handle were arguments passed to PostQueuedCompletionStatus() - # in a function run in a thread pool. - if err == _overlapped.ERROR_SEM_TIMEOUT: - # Connection did not succeed within time limit. - msg = _overlapped.FormatMessage(err) - raise ConnectionRefusedError(0, msg, None, err) - elif err != 0: - msg = _overlapped.FormatMessage(err) - raise OSError(0, msg, None, err) + def _connect_pipe(self, fut, address, delay): + # Unfortunately there is no way to do an overlapped connect to a pipe. + # Call CreateFile() in a loop until it doesn't fail with + # ERROR_PIPE_BUSY + try: + handle = _overlapped.ConnectPipe(address) + except OSError as exc: + if exc.winerror == _overlapped.ERROR_PIPE_BUSY: + # Polling: retry later + delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) + self._loop.call_later(delay, + self._connect_pipe, fut, address, delay) else: - return windows_utils.PipeHandle(handle) + fut.set_exception(exc) + else: + pipe = windows_utils.PipeHandle(handle) + fut.set_result(pipe) - return self._register(ov, None, finish_connect_pipe, - wait_for_post=True) + def connect_pipe(self, address): + fut = futures.Future(loop=self._loop) + self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY) + return fut def wait_for_handle(self, handle, timeout=None): """Wait for a handle. |