summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2012-12-06 11:30:48 -0800
committerGuido van Rossum <guido@python.org>2012-12-06 11:30:48 -0800
commit547e8accb5290b5e513a09213c3b61bfd34316ff (patch)
treebd46ae8d2e58a98f851c9abe343ad03a4be6357b
parent52ed361f753ef324d589ec06be9a4b083b78b135 (diff)
parent6596bfba52727f63361d2f6c2f631bd999b8537d (diff)
downloadtrollius-547e8accb5290b5e513a09213c3b61bfd34316ff.tar.gz
Merge default -> proactor (only TODO is affected).
-rw-r--r--TODO4
-rw-r--r--main.py2
-rw-r--r--overlapped.c997
-rw-r--r--polling.py314
-rw-r--r--proactor.py432
-rw-r--r--scheduling.py55
-rw-r--r--setup.cfg2
-rw-r--r--setup.py4
-rw-r--r--sockets.py61
9 files changed, 1554 insertions, 317 deletions
diff --git a/TODO b/TODO
index df868cc..a341b51 100644
--- a/TODO
+++ b/TODO
@@ -1,5 +1,5 @@
# -*- Mode: text -*-
-
+
TO DO SMALLER TASKS
- Make Task more like Future; getting result() should re-raise exception.
@@ -87,7 +87,7 @@ TO DO LATER
- Support ZeroMQ "sockets" which are user objects. Though possibly
this can be supported by getting the underlying fd? See
http://mail.python.org/pipermail/python-ideas/2012-October/017532.html
- OTOH see
+ OTOH see
https://github.com/zeromq/pyzmq/blob/master/zmq/eventloop/ioloop.py
- Study goroutines (again).
diff --git a/main.py b/main.py
index c1f9d0a..0941760 100644
--- a/main.py
+++ b/main.py
@@ -51,7 +51,7 @@ def doit2():
def doit():
- TIMEOUT = 2
+ TIMEOUT = 5
tasks = set()
# This references NDB's default test service.
diff --git a/overlapped.c b/overlapped.c
new file mode 100644
index 0000000..8b4bdc1
--- /dev/null
+++ b/overlapped.c
@@ -0,0 +1,997 @@
+/*
+ * Support for overlapped IO
+ *
+ * Some code borrowed from Modules/_winapi.c of CPython
+ */
+
+/* XXX check overflow and DWORD <-> Py_ssize_t conversions
+ Check itemsize */
+
+#include "Python.h"
+#include "structmember.h"
+
+#define WINDOWS_LEAN_AND_MEAN
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <mswsock.h>
+
+#if defined(MS_WIN32) && !defined(MS_WIN64)
+# define F_POINTER "k"
+# define T_POINTER T_ULONG
+#else
+# define F_POINTER "K"
+# define T_POINTER T_ULONGLONG
+#endif
+
+#define F_HANDLE F_POINTER
+#define F_ULONG_PTR F_POINTER
+#define F_DWORD "k"
+#define F_BOOL "i"
+#define F_UINT "I"
+
+#define T_HANDLE T_POINTER
+
+enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT,
+ TYPE_CONNECT, TYPE_DISCONNECT};
+
+/*
+ * Some functions should be loaded at runtime
+ */
+
+static LPFN_ACCEPTEX Py_AcceptEx = NULL;
+static LPFN_CONNECTEX Py_ConnectEx = NULL;
+static LPFN_DISCONNECTEX Py_DisconnectEx = NULL;
+static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED) = NULL;
+
+#define GET_WSA_POINTER(s, x) \
+ (SOCKET_ERROR != WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, \
+ &Guid##x, sizeof(Guid##x), &Py_##x, \
+ sizeof(Py_##x), &dwBytes, NULL, NULL))
+
+static int
+initialize_function_pointers(void)
+{
+ GUID GuidAcceptEx = WSAID_ACCEPTEX;
+ GUID GuidConnectEx = WSAID_CONNECTEX;
+ GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
+ HINSTANCE hKernel32;
+ SOCKET s;
+ DWORD dwBytes;
+
+ s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == INVALID_SOCKET) {
+ PyErr_SetFromWindowsErr(WSAGetLastError());
+ return -1;
+ }
+
+ if (!GET_WSA_POINTER(s, AcceptEx) ||
+ !GET_WSA_POINTER(s, ConnectEx) ||
+ !GET_WSA_POINTER(s, DisconnectEx))
+ {
+ closesocket(s);
+ PyErr_SetFromWindowsErr(WSAGetLastError());
+ return -1;
+ }
+
+ closesocket(s);
+
+ /* On WinXP we will have Py_CancelIoEx == NULL */
+ hKernel32 = GetModuleHandle("KERNEL32");
+ *(FARPROC *)&Py_CancelIoEx = GetProcAddress(hKernel32, "CancelIoEx");
+ return 0;
+}
+
+/*
+ * Completion port stuff
+ */
+
+PyDoc_STRVAR(
+ CreateIoCompletionPort_doc,
+ "CreateIoCompletionPort(handle, port, key, concurrency) -> port\n\n"
+ "Create a completion port or register a handle with a port.");
+
+static PyObject *
+overlapped_CreateIoCompletionPort(PyObject *self, PyObject *args)
+{
+ HANDLE FileHandle;
+ HANDLE ExistingCompletionPort;
+ ULONG_PTR CompletionKey;
+ DWORD NumberOfConcurrentThreads;
+ HANDLE ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE F_ULONG_PTR F_DWORD,
+ &FileHandle, &ExistingCompletionPort, &CompletionKey,
+ &NumberOfConcurrentThreads))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = CreateIoCompletionPort(FileHandle, ExistingCompletionPort,
+ CompletionKey, NumberOfConcurrentThreads);
+ Py_END_ALLOW_THREADS
+
+ if (ret == NULL)
+ return PyErr_SetFromWindowsErr(0);
+ return Py_BuildValue(F_HANDLE, ret);
+}
+
+PyDoc_STRVAR(
+ GetQueuedCompletionStatus_doc,
+ "GetQueuedCompletionStatus(port, msecs) -> (err, bytes, key, address)\n\n"
+ "Get a message from completion port. Wait for up to msecs milliseconds.");
+
+static PyObject *
+overlapped_GetQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+ HANDLE CompletionPort;
+ DWORD Milliseconds;
+ DWORD NumberOfBytes;
+ ULONG_PTR CompletionKey;
+ OVERLAPPED *Overlapped = NULL;
+ DWORD err;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD,
+ &CompletionPort, &Milliseconds))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetQueuedCompletionStatus(CompletionPort, &NumberOfBytes,
+ &CompletionKey, &Overlapped, Milliseconds);
+ Py_END_ALLOW_THREADS
+
+ err = ret ? ERROR_SUCCESS : GetLastError();
+ if (Overlapped == NULL) {
+ if (err == WAIT_TIMEOUT)
+ Py_RETURN_NONE;
+ else
+ return PyErr_SetFromWindowsErr(err);
+ }
+ return Py_BuildValue(F_DWORD F_DWORD F_ULONG_PTR F_POINTER,
+ err, NumberOfBytes, CompletionKey, Overlapped);
+}
+
+PyDoc_STRVAR(
+ PostQueuedCompletionStatus_doc,
+ "PostQueuedCompletionStatus(port, bytes, key, address) -> None\n\n"
+ "Post a message to completion port.");
+
+static PyObject *
+overlapped_PostQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+ HANDLE CompletionPort;
+ DWORD NumberOfBytes;
+ ULONG_PTR CompletionKey;
+ OVERLAPPED *Overlapped;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_ULONG_PTR F_POINTER,
+ &CompletionPort, &NumberOfBytes, &CompletionKey,
+ &Overlapped))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = PostQueuedCompletionStatus(CompletionPort, NumberOfBytes,
+ CompletionKey, Overlapped);
+ Py_END_ALLOW_THREADS
+
+ if (!ret)
+ return PyErr_SetFromWindowsErr(0);
+ Py_RETURN_NONE;
+}
+
+/*
+ * Bind socket handle to local port without doing slow getaddrinfo()
+ */
+
+PyDoc_STRVAR(
+ BindLocal_doc,
+ "BindLocal(handle, length_of_address_tuple) -> None\n\n"
+ "Bind a socket handle to an arbitrary local port.\n"
+ "If length_of_address_tuple is 2 then an AF_INET address is used.\n"
+ "If length_of_address_tuple is 4 then an AF_INET6 address is used.");
+
+static PyObject *
+overlapped_BindLocal(PyObject *self, PyObject *args)
+{
+ SOCKET Socket;
+ int TupleLength;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "i", &Socket, &TupleLength))
+ return NULL;
+
+ if (TupleLength == 2) {
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_port = 0;
+ addr.sin_addr.S_un.S_addr = INADDR_ANY;
+ ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR;
+ } else if (TupleLength == 4) {
+ struct sockaddr_in6 addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin6_family = AF_INET6;
+ addr.sin6_port = 0;
+ addr.sin6_addr = in6addr_any;
+ ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR;
+ } else {
+ PyErr_SetString(PyExc_ValueError, "expected tuple of length 2 or 4");
+ return NULL;
+ }
+
+ if (!ret)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ Py_RETURN_NONE;
+}
+
+/*
+ * Set notification mode for the handle
+ */
+
+PyDoc_STRVAR(
+ SetFileCompletionNotificationModes_doc,
+ "SetFileCompletionNotificationModes(FileHandle, Flags) -> None\n\n"
+ "Set whether notification happens if operation succeeds without blocking");
+
+static PyObject *
+overlapped_SetFileCompletionNotificationModes(PyObject *self, PyObject *args)
+{
+ HANDLE FileHandle;
+ UCHAR Flags;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_BOOL, &FileHandle, &Flags))
+ return NULL;
+
+ if (!SetFileCompletionNotificationModes(FileHandle, Flags))
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+
+ Py_RETURN_NONE;
+}
+
+/*
+ * A Python object wrapping an OVERLAPPED structure and other useful data
+ * for overlapped I/O
+ */
+
+PyDoc_STRVAR(
+ Overlapped_doc,
+ "Overlapped object");
+
+typedef struct {
+ PyObject_HEAD
+ OVERLAPPED overlapped;
+ /* For convenience, we store the file handle too */
+ HANDLE handle;
+ /* Error returned by last method call */
+ DWORD error;
+ /* Type of operation */
+ DWORD type;
+ /* Buffer used for reading (optional) */
+ PyObject *read_buffer;
+ /* Buffer used for writing (optional) */
+ Py_buffer write_buffer;
+} OverlappedObject;
+
+
+static PyObject *
+Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ OverlappedObject *self;
+ HANDLE event = INVALID_HANDLE_VALUE;
+ static char *kwlist[] = {"event", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "|" F_HANDLE, kwlist, &event))
+ return NULL;
+
+ if (event == INVALID_HANDLE_VALUE) {
+ event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (event == NULL)
+ return PyErr_SetExcFromWindowsErr(PyExc_OSError, 0);
+ }
+
+ self = PyObject_New(OverlappedObject, type);
+ if (self == NULL) {
+ if (event != NULL)
+ CloseHandle(event);
+ return NULL;
+ }
+
+ self->handle = NULL;
+ self->error = 0;
+ self->type = TYPE_NONE;
+ self->read_buffer = NULL;
+ memset(&self->overlapped, 0, sizeof(OVERLAPPED));
+ memset(&self->write_buffer, 0, sizeof(Py_buffer));
+ if (event)
+ self->overlapped.hEvent = event;
+ return (PyObject *)self;
+}
+
+static void
+Overlapped_dealloc(OverlappedObject *self)
+{
+ DWORD bytes;
+ DWORD olderr = GetLastError();
+ BOOL wait = FALSE;
+ BOOL ret;
+
+ if (!HasOverlappedIoCompleted(&self->overlapped) &&
+ self->type != TYPE_NOT_STARTED)
+ {
+ if (Py_CancelIoEx && Py_CancelIoEx(self->handle, &self->overlapped))
+ wait = TRUE;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetOverlappedResult(self->handle, &self->overlapped,
+ &bytes, wait);
+ Py_END_ALLOW_THREADS
+
+ switch (ret ? ERROR_SUCCESS : GetLastError()) {
+ case ERROR_SUCCESS:
+ case ERROR_NOT_FOUND:
+ case ERROR_OPERATION_ABORTED:
+ break;
+ default:
+ PyErr_SetString(
+ PyExc_RuntimeError,
+ "I/O operations still in flight while destroying "
+ "Overlapped object, the process may crash");
+ PyErr_WriteUnraisable(NULL);
+ }
+ }
+
+ if (self->overlapped.hEvent != NULL)
+ CloseHandle(self->overlapped.hEvent);
+
+ if (self->write_buffer.obj)
+ PyBuffer_Release(&self->write_buffer);
+
+ Py_CLEAR(self->read_buffer);
+ PyObject_Del(self);
+ SetLastError(olderr);
+}
+
+PyDoc_STRVAR(
+ Overlapped_cancel_doc,
+ "cancel() -> None\n\n"
+ "Cancel overlapped operation");
+
+static PyObject *
+Overlapped_cancel(OverlappedObject *self)
+{
+ BOOL ret = TRUE;
+
+ if (self->type == TYPE_NOT_STARTED)
+ Py_RETURN_NONE;
+
+ if (!HasOverlappedIoCompleted(&self->overlapped)) {
+ Py_BEGIN_ALLOW_THREADS
+ if (Py_CancelIoEx)
+ ret = Py_CancelIoEx(self->handle, &self->overlapped);
+ else
+ ret = CancelIo(self->handle);
+ Py_END_ALLOW_THREADS
+ }
+
+ /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */
+ if (!ret && GetLastError() != ERROR_NOT_FOUND)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(
+ Overlapped_getresult_doc,
+ "getresult(wait=False) -> result\n\n"
+ "Retrieve result of operation. If wait is true then it blocks\n"
+ "until the operation is finished. If wait is false and the\n"
+ "operation is still pending then an error is raised.");
+
+static PyObject *
+Overlapped_getresult(OverlappedObject *self, PyObject *args)
+{
+ BOOL wait = FALSE;
+ DWORD transferred = 0;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait))
+ return NULL;
+
+ if (self->type == TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation not yet attempted");
+ return NULL;
+ }
+
+ if (self->type == TYPE_NOT_STARTED) {
+ PyErr_SetString(PyExc_ValueError, "operation failed to start");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
+ wait);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ break;
+ case ERROR_BROKEN_PIPE:
+ if (self->read_buffer != NULL)
+ break;
+ /* fall through */
+ default:
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+
+ switch (self->type) {
+ case TYPE_READ:
+ assert(PyBytes_CheckExact(self->read_buffer));
+ if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
+ _PyBytes_Resize(&self->read_buffer, transferred))
+ return NULL;
+ Py_INCREF(self->read_buffer);
+ return self->read_buffer;
+ case TYPE_ACCEPT:
+ case TYPE_CONNECT:
+ case TYPE_DISCONNECT:
+ Py_RETURN_NONE;
+ default:
+ return PyLong_FromUnsignedLong((unsigned long) transferred);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_ReadFile_doc,
+ "ReadFile(handle, size) -> Overlapped[message]\n\n"
+ "Start overlapped read");
+
+static PyObject *
+Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ DWORD size;
+ DWORD nread;
+ PyObject *buf;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+ size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+ buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+ if (buf == NULL)
+ return NULL;
+
+ self->type = TYPE_READ;
+ self->handle = handle;
+ self->read_buffer = buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
+ &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ case ERROR_BROKEN_PIPE:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WSARecv_doc,
+ "RecvFile(handle, size, flags) -> Overlapped[message]\n\n"
+ "Start overlapped receive");
+
+static PyObject *
+Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ DWORD size;
+ DWORD flags;
+ DWORD nread;
+ PyObject *buf;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_DWORD,
+ &handle, &size, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+ size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+ buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+ if (buf == NULL)
+ return NULL;
+
+ self->type = TYPE_READ;
+ self->handle = handle;
+ self->read_buffer = buf;
+ wsabuf.len = size;
+ wsabuf.buf = PyBytes_AS_STRING(buf);
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
+ &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ case ERROR_BROKEN_PIPE:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WriteFile_doc,
+ "WriteFile(handle, buf) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped write");
+
+static PyObject *
+Overlapped_WriteFile(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD written;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+ PyBuffer_Release(&self->write_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer to large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_WRITE;
+ self->handle = handle;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WriteFile(handle, self->write_buffer.buf, self->write_buffer.len,
+ &written, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WSASend_doc,
+ "WSASend(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped send");
+
+static PyObject *
+Overlapped_WSASend(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD flags;
+ DWORD written;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD,
+ &handle, &bufobj, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+ PyBuffer_Release(&self->write_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer to large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_WRITE;
+ self->handle = handle;
+ wsabuf.len = (DWORD)self->write_buffer.len;
+ wsabuf.buf = self->write_buffer.buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags,
+ &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_AcceptEx_doc,
+ "AcceptEx(listen_handle, accept_handle) -> Overlapped[address_as_bytes]\n\n"
+ "Start overlapped wait for client to connect");
+
+static PyObject *
+Overlapped_AcceptEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET ListenSocket;
+ SOCKET AcceptSocket;
+ DWORD BytesReceived;
+ DWORD size;
+ PyObject *buf;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE,
+ &ListenSocket, &AcceptSocket))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ size = sizeof(struct sockaddr_in6) + 16;
+ buf = PyBytes_FromStringAndSize(NULL, size*2);
+ if (!buf)
+ return NULL;
+
+ self->type = TYPE_ACCEPT;
+ self->handle = (HANDLE)ListenSocket;
+ self->read_buffer = buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf),
+ 0, size, size, &BytesReceived, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+
+static int
+parse_address(PyObject *obj, SOCKADDR *Address, int Length)
+{
+ char *Host;
+ unsigned short Port;
+ unsigned long FlowInfo;
+ unsigned long ScopeId;
+
+ memset(Address, 0, Length);
+
+ if (PyArg_ParseTuple(obj, "sH", &Host, &Port))
+ {
+ Address->sa_family = AF_INET;
+ if (WSAStringToAddressA(Host, AF_INET, NULL, Address, &Length) < 0) {
+ PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ return -1;
+ }
+ ((SOCKADDR_IN*)Address)->sin_port = htons(Port);
+ return Length;
+ }
+ else if (PyArg_ParseTuple(obj, "sHkk", &Host, &Port, &FlowInfo, &ScopeId))
+ {
+ PyErr_Clear();
+ Address->sa_family = AF_INET6;
+ if (WSAStringToAddressA(Host, AF_INET6, NULL, Address, &Length) < 0) {
+ PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ return -1;
+ }
+ ((SOCKADDR_IN6*)Address)->sin6_port = htons(Port);
+ ((SOCKADDR_IN6*)Address)->sin6_flowinfo = FlowInfo;
+ ((SOCKADDR_IN6*)Address)->sin6_scope_id = ScopeId;
+ return Length;
+ }
+
+ return -1;
+}
+
+
+PyDoc_STRVAR(
+ Overlapped_ConnectEx_doc,
+ "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n"
+ "Start overlapped connect. client_handle should be unbound.");
+
+static PyObject *
+Overlapped_ConnectEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET ConnectSocket;
+ PyObject *AddressObj;
+ char AddressBuf[sizeof(struct sockaddr_in6)];
+ SOCKADDR *Address = (SOCKADDR*)AddressBuf;
+ int Length;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ Length = sizeof(AddressBuf);
+ Length = parse_address(AddressObj, Address, Length);
+ if (Length < 0)
+ return NULL;
+
+ self->type = TYPE_CONNECT;
+ self->handle = (HANDLE)ConnectSocket;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_ConnectEx(ConnectSocket, Address, Length,
+ NULL, 0, NULL, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_DisconnectEx_doc,
+ "DisconnectEx(handle, flags) -> Overlapped[None]\n\n"
+ "Start overlapped connect. client_handle should be unbound.");
+
+static PyObject *
+Overlapped_DisconnectEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET Socket;
+ DWORD flags;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &Socket, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ self->type = TYPE_DISCONNECT;
+ self->handle = (HANDLE)Socket;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_DisconnectEx(Socket, &self->overlapped, flags, 0);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+static PyObject*
+Overlapped_getaddress(OverlappedObject *self)
+{
+ return PyLong_FromVoidPtr(&self->overlapped);
+}
+
+static PyObject*
+Overlapped_getpending(OverlappedObject *self)
+{
+ return PyBool_FromLong(!HasOverlappedIoCompleted(&self->overlapped) &&
+ self->type != TYPE_NOT_STARTED);
+}
+
+static PyMethodDef Overlapped_methods[] = {
+ {"getresult", (PyCFunction) Overlapped_getresult,
+ METH_VARARGS, Overlapped_getresult_doc},
+ {"cancel", (PyCFunction) Overlapped_cancel,
+ METH_NOARGS, Overlapped_cancel_doc},
+ {"ReadFile", (PyCFunction) Overlapped_ReadFile,
+ METH_VARARGS, Overlapped_ReadFile_doc},
+ {"WSARecv", (PyCFunction) Overlapped_WSARecv,
+ METH_VARARGS, Overlapped_WSARecv_doc},
+ {"WriteFile", (PyCFunction) Overlapped_WriteFile,
+ METH_VARARGS, Overlapped_WriteFile_doc},
+ {"WSASend", (PyCFunction) Overlapped_WSASend,
+ METH_VARARGS, Overlapped_WSASend_doc},
+ {"AcceptEx", (PyCFunction) Overlapped_AcceptEx,
+ METH_VARARGS, Overlapped_AcceptEx_doc},
+ {"ConnectEx", (PyCFunction) Overlapped_ConnectEx,
+ METH_VARARGS, Overlapped_ConnectEx_doc},
+ {"DisconnectEx", (PyCFunction) Overlapped_DisconnectEx,
+ METH_VARARGS, Overlapped_DisconnectEx_doc},
+ {NULL}
+};
+
+static PyMemberDef Overlapped_members[] = {
+ {"error", T_ULONG,
+ offsetof(OverlappedObject, error),
+ READONLY, "Error from last operation"},
+ {"event", T_HANDLE,
+ offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent),
+ READONLY, "Overlapped event handle"},
+ {NULL}
+};
+
+static PyGetSetDef Overlapped_getsets[] = {
+ {"address", (getter)Overlapped_getaddress, NULL,
+ "Address of overlapped structure"},
+ {"pending", (getter)Overlapped_getpending, NULL,
+ "Whether the operation is pending"},
+ {NULL},
+};
+
+PyTypeObject OverlappedType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ /* tp_name */ "_overlapped.Overlapped",
+ /* tp_basicsize */ sizeof(OverlappedObject),
+ /* tp_itemsize */ 0,
+ /* tp_dealloc */ (destructor) Overlapped_dealloc,
+ /* tp_print */ 0,
+ /* tp_getattr */ 0,
+ /* tp_setattr */ 0,
+ /* tp_reserved */ 0,
+ /* tp_repr */ 0,
+ /* tp_as_number */ 0,
+ /* tp_as_sequence */ 0,
+ /* tp_as_mapping */ 0,
+ /* tp_hash */ 0,
+ /* tp_call */ 0,
+ /* tp_str */ 0,
+ /* tp_getattro */ 0,
+ /* tp_setattro */ 0,
+ /* tp_as_buffer */ 0,
+ /* tp_flags */ Py_TPFLAGS_DEFAULT,
+ /* tp_doc */ "OVERLAPPED structure wrapper",
+ /* tp_traverse */ 0,
+ /* tp_clear */ 0,
+ /* tp_richcompare */ 0,
+ /* tp_weaklistoffset */ 0,
+ /* tp_iter */ 0,
+ /* tp_iternext */ 0,
+ /* tp_methods */ Overlapped_methods,
+ /* tp_members */ Overlapped_members,
+ /* tp_getset */ Overlapped_getsets,
+ /* tp_base */ 0,
+ /* tp_dict */ 0,
+ /* tp_descr_get */ 0,
+ /* tp_descr_set */ 0,
+ /* tp_dictoffset */ 0,
+ /* tp_init */ 0,
+ /* tp_alloc */ 0,
+ /* tp_new */ Overlapped_new,
+};
+
+static PyMethodDef overlapped_functions[] = {
+ {"CreateIoCompletionPort", overlapped_CreateIoCompletionPort,
+ METH_VARARGS, CreateIoCompletionPort_doc},
+ {"GetQueuedCompletionStatus", overlapped_GetQueuedCompletionStatus,
+ METH_VARARGS, GetQueuedCompletionStatus_doc},
+ {"PostQueuedCompletionStatus", overlapped_PostQueuedCompletionStatus,
+ METH_VARARGS, PostQueuedCompletionStatus_doc},
+ {"BindLocal", overlapped_BindLocal,
+ METH_VARARGS, BindLocal_doc},
+ {"SetFileCompletionNotificationModes",
+ overlapped_SetFileCompletionNotificationModes,
+ METH_VARARGS, SetFileCompletionNotificationModes_doc},
+ {NULL}
+};
+
+static struct PyModuleDef overlapped_module = {
+ PyModuleDef_HEAD_INIT,
+ "_overlapped",
+ NULL,
+ -1,
+ overlapped_functions,
+ NULL,
+ NULL,
+ NULL,
+ NULL
+};
+
+#define WINAPI_CONSTANT(fmt, con) \
+ PyDict_SetItemString(d, #con, Py_BuildValue(fmt, con))
+
+PyMODINIT_FUNC
+PyInit__overlapped(void)
+{
+ PyObject *m, *d;
+
+ /* Ensure WSAStartup() called before initializing function pointers */
+ m = PyImport_ImportModule("_socket");
+ if (!m)
+ return NULL;
+ Py_DECREF(m);
+
+ if (initialize_function_pointers() < 0)
+ return NULL;
+
+ if (PyType_Ready(&OverlappedType) < 0)
+ return NULL;
+
+ m = PyModule_Create(&overlapped_module);
+ if (PyModule_AddObject(m, "Overlapped", (PyObject *)&OverlappedType) < 0)
+ return NULL;
+
+ d = PyModule_GetDict(m);
+
+ WINAPI_CONSTANT(F_DWORD, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
+ WINAPI_CONSTANT(F_DWORD, INFINITE);
+ WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
+ WINAPI_CONSTANT(F_HANDLE, NULL);
+ WINAPI_CONSTANT(F_DWORD, SO_UPDATE_ACCEPT_CONTEXT);
+ WINAPI_CONSTANT(F_DWORD, SO_UPDATE_CONNECT_CONTEXT);
+ WINAPI_CONSTANT(F_DWORD, TF_REUSE_SOCKET);
+
+ return m;
+}
diff --git a/polling.py b/polling.py
index 8e160b3..a9e1491 100644
--- a/polling.py
+++ b/polling.py
@@ -41,224 +41,10 @@ import logging
import os
import select
import time
+import socket
-
-class PollsterBase:
- """Base class for all polling implementations.
-
- This defines an interface to register and unregister readers and
- writers for specific file descriptors, and an interface to get a
- list of events. There's also an interface to check whether any
- readers or writers are currently registered.
- """
-
- def __init__(self):
- super().__init__()
- self.readers = {} # {fd: token, ...}.
- self.writers = {} # {fd: token, ...}.
-
- def pollable(self):
- """Return True if any readers or writers are currently registered."""
- return bool(self.readers or self.writers)
-
- # Subclasses are expected to extend the add/remove methods.
-
- def register_reader(self, fd, token):
- """Add or update a reader for a file descriptor."""
- self.readers[fd] = token
-
- def register_writer(self, fd, token):
- """Add or update a writer for a file descriptor."""
- self.writers[fd] = token
-
- def unregister_reader(self, fd):
- """Remove the reader for a file descriptor."""
- del self.readers[fd]
-
- def unregister_writer(self, fd):
- """Remove the writer for a file descriptor."""
- del self.writers[fd]
-
- def poll(self, timeout=None):
- """Poll for events. A subclass must implement this.
-
- If timeout is omitted or None, this blocks until at least one
- event is ready. Otherwise, timeout gives a maximum time to
- wait (an int of float in seconds) -- the method returns as
- soon as at least one event is ready or when the timeout is
- expired. For a non-blocking poll, pass 0.
-
- The return value is a list of events; it is empty when the
- timeout expired before any events were ready. Each event
- is a token previously passed to register_reader/writer().
- """
- raise NotImplementedError
-
-
-class SelectPollster(PollsterBase):
- """Pollster implementation using select."""
-
- def poll(self, timeout=None):
- readable, writable, _ = select.select(self.readers, self.writers,
- [], timeout)
- events = []
- events += (self.readers[fd] for fd in readable)
- events += (self.writers[fd] for fd in writable)
- return events
-
-
-class PollPollster(PollsterBase):
- """Pollster implementation using poll."""
-
- def __init__(self):
- super().__init__()
- self._poll = select.poll()
-
- def _update(self, fd):
- assert isinstance(fd, int), fd
- flags = 0
- if fd in self.readers:
- flags |= select.POLLIN
- if fd in self.writers:
- flags |= select.POLLOUT
- if flags:
- self._poll.register(fd, flags)
- else:
- self._poll.unregister(fd)
-
- def register_reader(self, fd, callback, *args):
- super().register_reader(fd, callback, *args)
- self._update(fd)
-
- def register_writer(self, fd, callback, *args):
- super().register_writer(fd, callback, *args)
- self._update(fd)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- self._update(fd)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- self._update(fd)
-
- def poll(self, timeout=None):
- # Timeout is in seconds, but poll() takes milliseconds.
- msecs = None if timeout is None else int(round(1000 * timeout))
- events = []
- for fd, flags in self._poll.poll(msecs):
- if flags & (select.POLLIN | select.POLLHUP):
- if fd in self.readers:
- events.append(self.readers[fd])
- if flags & (select.POLLOUT | select.POLLHUP):
- if fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-class EPollPollster(PollsterBase):
- """Pollster implementation using epoll."""
-
- def __init__(self):
- super().__init__()
- self._epoll = select.epoll()
-
- def _update(self, fd):
- assert isinstance(fd, int), fd
- eventmask = 0
- if fd in self.readers:
- eventmask |= select.EPOLLIN
- if fd in self.writers:
- eventmask |= select.EPOLLOUT
- if eventmask:
- try:
- self._epoll.register(fd, eventmask)
- except IOError:
- self._epoll.modify(fd, eventmask)
- else:
- self._epoll.unregister(fd)
-
- def register_reader(self, fd, callback, *args):
- super().register_reader(fd, callback, *args)
- self._update(fd)
-
- def register_writer(self, fd, callback, *args):
- super().register_writer(fd, callback, *args)
- self._update(fd)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- self._update(fd)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- self._update(fd)
-
- def poll(self, timeout=None):
- if timeout is None:
- timeout = -1 # epoll.poll() uses -1 to mean "wait forever".
- events = []
- for fd, eventmask in self._epoll.poll(timeout):
- if eventmask & select.EPOLLIN:
- if fd in self.readers:
- events.append(self.readers[fd])
- if eventmask & select.EPOLLOUT:
- if fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-class KqueuePollster(PollsterBase):
- """Pollster implementation using kqueue."""
-
- def __init__(self):
- super().__init__()
- self._kqueue = select.kqueue()
-
- def register_reader(self, fd, callback, *args):
- if fd not in self.readers:
- kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- return super().register_reader(fd, callback, *args)
-
- def register_writer(self, fd, callback, *args):
- if fd not in self.readers:
- kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- return super().register_writer(fd, callback, *args)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
- self._kqueue.control([kev], 0, 0)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
- self._kqueue.control([kev], 0, 0)
-
- def poll(self, timeout=None):
- events = []
- max_ev = len(self.readers) + len(self.writers)
- for kev in self._kqueue.control(None, max_ev, timeout):
- fd = kev.ident
- flag = kev.filter
- if flag == select.KQ_FILTER_READ and fd in self.readers:
- events.append(self.readers[fd])
- elif flag == select.KQ_FILTER_WRITE and fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-# Pick the best pollster class for the platform.
-if hasattr(select, 'kqueue'):
- best_pollster = KqueuePollster
-elif hasattr(select, 'epoll'):
- best_pollster = EPollPollster
-elif hasattr(select, 'poll'):
- best_pollster = PollPollster
-else:
- best_pollster = SelectPollster
+# local imports
+from proactor import Proactor, Future
class DelayedCall:
@@ -295,35 +81,15 @@ class EventLoop:
This class's instance variables are not part of its API.
"""
- def __init__(self, pollster=None):
+ def __init__(self, proactor=None):
super().__init__()
- if pollster is None:
- logging.info('Using pollster: %s', best_pollster.__name__)
- pollster = best_pollster()
- self.pollster = pollster
+ if proactor is None:
+ logging.info('Using proactor: %s', Proactor.__name__)
+ proactor = Proactor()
+ self.proactor = proactor
self.ready = collections.deque() # [(callback, args), ...]
self.scheduled = [] # [(when, callback, args), ...]
- def add_reader(self, fd, callback, *args):
- """Add a reader callback. Return a DelayedCall instance."""
- dcall = DelayedCall(None, callback, args)
- self.pollster.register_reader(fd, dcall)
- return dcall
-
- def remove_reader(self, fd):
- """Remove a reader callback."""
- self.pollster.unregister_reader(fd)
-
- def add_writer(self, fd, callback, *args):
- """Add a writer callback. Return a DelayedCall instance."""
- dcall = DelayedCall(None, callback, args)
- self.pollster.register_writer(fd, dcall)
- return dcall
-
- def remove_writer(self, fd):
- """Remove a writer callback."""
- self.pollster.unregister_writer(fd)
-
def add_callback(self, dcall):
"""Add a DelayedCall to ready or scheduled."""
if dcall.cancelled:
@@ -408,14 +174,15 @@ class EventLoop:
heapq.heappop(self.scheduled)
# Inspect the poll queue.
- if self.pollster.pollable():
+ if self.proactor.pollable():
if self.scheduled:
when = self.scheduled[0].when
timeout = max(0, when - time.time())
else:
timeout = None
t0 = time.time()
- events = self.pollster.poll(timeout)
+ # done callbacks added to ready futures get run by poll()
+ self.proactor.poll(timeout)
t1 = time.time()
argstr = '' if timeout is None else ' %.3f' % timeout
if t1-t0 >= 1:
@@ -423,8 +190,6 @@ class EventLoop:
else:
level = logging.DEBUG
logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
- for dcall in events:
- self.add_callback(dcall)
# Handle 'later' callbacks that are ready.
while self.scheduled:
@@ -441,12 +206,24 @@ class EventLoop:
writable file descriptors, or scheduled callbacks (of either
variety).
"""
- while self.ready or self.scheduled or self.pollster.pollable():
+ while self.ready or self.scheduled or self.proactor.pollable():
self.run_once()
+
MAX_WORKERS = 5 # Default max workers when creating an executor.
+try:
+ from socket import socketpair
+except ImportError:
+ def socketpair():
+ with socket.socket() as l:
+ l.bind(('127.0.0.1', 0))
+ l.listen(1)
+ c = socket.socket()
+ c.connect(l.getsockname())
+ a, _ = l.accept()
+ return a, c
class ThreadRunner:
"""Helper to submit work to a thread pool and wait for it.
@@ -461,19 +238,36 @@ class ThreadRunner:
def __init__(self, eventloop, executor=None):
self.eventloop = eventloop
self.executor = executor # Will be constructed lazily.
- self.pipe_read_fd, self.pipe_write_fd = os.pipe()
+ self.rsock, self.wsock = socketpair()
+ self.rsock.settimeout(0)
self.active_count = 0
+ self.read_future = None
+
+ def start_read(self):
+ while self.active_count > 0:
+ try:
+ res = self.eventloop.proactor.recv(self.rsock, 8192)
+ except Future as f:
+ self.read_future = f
+ self.read_future.add_done_callback(self.read_callback)
+ break
+ else:
+ self.active_count -= len(res)
- def read_callback(self):
- """Semi-permanent callback while at least one future is active."""
+ def read_callback(self, f):
assert self.active_count > 0, self.active_count
- data = os.read(self.pipe_read_fd, 8192) # Traditional buffer size.
- self.active_count -= len(data)
- if self.active_count == 0:
- self.eventloop.remove_reader(self.pipe_read_fd)
+ assert f is self.read_future
+ try:
+ self.active_count -= len(self.read_future.result())
+ except OSError:
+ pass
+ finally:
+ self.read_future = None
assert self.active_count >= 0, self.active_count
+ if self.active_count > 0:
+ self.start_read()
- def submit(self, func, *args, executor=None):
+ def submit(self, func, *args, executor=None, insert_callback=None):
"""Submit a function to the thread pool.
This returns a concurrent.futures.Future instance. The caller
@@ -487,11 +281,13 @@ class ThreadRunner:
executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
self.executor = executor
assert self.active_count >= 0, self.active_count
- future = executor.submit(func, *args)
- if self.active_count == 0:
- self.eventloop.add_reader(self.pipe_read_fd, self.read_callback)
self.active_count += 1
+ future = executor.submit(func, *args)
+ if self.read_future is None or self.read_future.done():
+ self.start_read()
+ if insert_callback is not None:
+ future.add_done_callback(insert_callback)
def done_callback(future):
- os.write(self.pipe_write_fd, b'x')
+ self.wsock.sendall(b'x')
future.add_done_callback(done_callback)
return future
diff --git a/proactor.py b/proactor.py
new file mode 100644
index 0000000..ffe5b17
--- /dev/null
+++ b/proactor.py
@@ -0,0 +1,432 @@
+#
+# Module implementing the Proactor pattern
+#
+# A proactor is used to initiate asynchronous I/O, and to wait for
+# completion of previously initiated operations.
+#
+
+import os
+import sys
+import errno
+import socket
+import select
+import time
+import warnings
+
+
+__all__ = ['SelectProactor']
+
+#
+# Future class
+#
+
+class Future(Exception):
+
+ def __init__(self):
+ self._callbacks = []
+
+ def result(self):
+ # does not block for operation to complete
+ assert self.done()
+ if self.success:
+ return self.value
+ else:
+ raise self.value
+
+ def set_result(self, value):
+ assert not self.done()
+ self.success = True
+ self.value = value
+ self._invoke_callbacks()
+
+ def set_exception(self, value):
+ assert not self.done()
+ self.success = False
+ self.value = value
+ self._invoke_callbacks()
+
+ def done(self):
+ return hasattr(self, 'success')
+
+ def add_done_callback(self, func):
+ if self.done():
+ func(self)
+ else:
+ self._callbacks.append(func)
+
+ def _invoke_callbacks(self):
+ for func in self._callbacks:
+ try:
+ func(self)
+ except Exception:
+ sys.excepthook(*sys.exc_info())
+ del self._callbacks
+
+#
+# Base class for all proactors
+#
+
+class BaseProactor:
+ _Future = Future
+
+ def __init__(self):
+ self._results = []
+
+ def poll(self, timeout=None):
+ if not self._results:
+ self._poll(timeout)
+ tmp, self._results = self._results, []
+ return tmp
+
+ def filteredpoll(self, penders, timeout=None):
+ if timeout is None:
+ deadline = None
+ elif timeout < 0:
+ raise ValueError('negative timeout')
+ else:
+ deadline = time.monotonic() + timeout
+ S = set(penders)
+ while True:
+ filtered = [x for x in self._results if x[0] in S]
+ if filtered:
+ self._results = [x for x in self._results if x[0] not in S]
+ return filtered
+ self._poll(timeout)
+ if deadline is not None:
+ timeout = deadline - time.monotonic()
+ if timeout <= 0:
+ break
+
+ def close(self):
+ pass
+
+#
+# Initiator methods for proactors based on select()/poll()/epoll()/kqueue()
+#
+
+READABLE = 0
+WRITABLE = 1
+
+class ReadyBaseProactor(BaseProactor):
+ def __init__(self):
+ super().__init__()
+ self._queue = [{}, {}]
+
+ def pollable(self):
+ return any(self._queue)
+
+ def recv(self, sock, nbytes, flags=0):
+ try:
+ return sock.recv(nbytes, flags)
+ except BlockingIOError:
+ raise self._register(sock.fileno(), READABLE,
+ sock.recv, nbytes, flags)
+
+ def send(self, sock, buf, flags=0):
+ try:
+ return sock.send(buf, flags)
+ except BlockingIOError:
+ raise self._register(sock.fileno(), WRITABLE,
+ sock.send, buf, flags)
+
+ def accept(self, sock):
+ def _accept():
+ conn, addr = sock.accept()
+ conn.settimeout(0)
+ return conn, addr
+ try:
+ return _accept()
+ except BlockingIOError:
+ raise self._register(sock.fileno(), READABLE, _accept)
+
+ def connect(self, sock, addr):
+ assert sock.gettimeout() == 0
+ err = sock.connect_ex(addr)
+ if err not in self._connection_errors:
+ raise OSError(err, os.strerror(err))
+ def _connect():
+ err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err != 0:
+ raise OSError(err, os.strerror(err))
+ raise self._register(sock.fileno(), WRITABLE, _connect)
+
+ # hacks to support SSL
+ def _readable(self, sock):
+ return self._register(sock.fileno(), READABLE, lambda:None)
+
+ def _writable(self, sock):
+ return self._register(sock.fileno(), WRITABLE, lambda:None)
+
+#
+# Proactor using select()
+#
+
+class SelectProactor(ReadyBaseProactor):
+ _connection_errors = {0, errno.EINPROGRESS}
+ _select = select.select
+
+ def _poll(self, timeout=None):
+ rfds, wfds, xfds = self._select(self._queue[READABLE].keys(),
+ self._queue[WRITABLE].keys(),
+ (), timeout)
+ for fd in rfds:
+ self._handle(fd, READABLE)
+ for fd in wfds:
+ self._handle(fd, WRITABLE)
+
+ def _handle(self, fd, kind):
+ Q = self._queue[kind][fd]
+ f, callback, args = Q.pop(0)
+ try:
+ f.set_result(callback(*args))
+ except OSError as e:
+ f.set_exception(e)
+ self._results.append(f)
+ if not Q:
+ del self._queue[kind][fd]
+
+ def _register(self, fd, kind, callback, *args):
+ f = self._Future()
+ queue = self._queue[kind]
+ if fd not in queue:
+ queue[fd] = []
+ queue[fd].append((f, callback, args))
+ return f
+
+ if sys.platform == 'win32':
+ # Windows insists on being awkward...
+ _connection_errors = {0, errno.WSAEWOULDBLOCK}
+
+ def _select(self, rfds, wfds, _, timeout=None):
+ if not (rfds or wfds):
+ time.sleep(timeout)
+ return [], [], []
+ else:
+ rfds, wfds, xfds = select.select(rfds, wfds, wfds, timeout)
+ return rfds, wfds + xfds, []
+
+
+#
+# Proactor using poll()
+#
+
+if hasattr(select, 'poll'):
+ __all__.append('PollProactor')
+
+ from select import POLLIN, POLLPRI, POLLOUT, POLLHUP, POLLERR, POLLNVAL
+
+ FLAG = [POLLIN, POLLOUT]
+ READ_EXTRA_FLAGS = POLLIN | POLLHUP | POLLNVAL | POLLERR
+ WRITE_EXTRA_FLAGS = POLLOUT | POLLHUP | POLLNVAL | POLLERR
+
+ class PollProactor(ReadyBaseProactor):
+ _connection_errors = {0, errno.EINPROGRESS}
+ _make_poller = select.poll
+ _uses_msecs = True
+
+ def __init__(self):
+ super().__init__()
+ self._poller = self._make_poller()
+ self._flag = {}
+
+ def _poll(self, timeout=None):
+ if timeout is None:
+ timeout = -1
+ elif timeout < 0:
+ raise ValueError('negative timeout')
+ elif self._uses_msecs:
+ timeout = int(timeout*1000 + 0.5)
+ ready = self._poller.poll(timeout)
+ for fd, flags in ready:
+ if fd in self._queue[READABLE] and flags & READ_EXTRA_FLAGS:
+ self._handle(fd, READABLE)
+ if fd in self._queue[WRITABLE] and flags & WRITE_EXTRA_FLAGS:
+ self._handle(fd, WRITABLE)
+
+ def _handle(self, fd, kind):
+ Q = self._queue[kind][fd]
+ f, callback, args = Q.pop(0)
+ try:
+ f.set_result(callback(*args))
+ except OSError as e:
+ f.set_exception(e)
+ self._results.append(f)
+ if not Q:
+ del self._queue[kind][fd]
+ flag = self._flag[fd] = self._flag[fd] & ~FLAG[kind]
+ if flag == 0:
+ del self._flag[fd]
+ self._poller.unregister(fd)
+ else:
+ self._poller.modify(fd, flag)
+
+ def _register(self, fd, kind, callback, *args):
+ f = self._Future()
+ queue = self._queue[kind]
+ if fd not in queue:
+ queue[fd] = []
+ old_flag = self._flag.get(fd, 0)
+ flag = self._flag[fd] = old_flag | FLAG[kind]
+ if old_flag == 0:
+ self._poller.register(fd, flag)
+ else:
+ self._poller.modify(fd, flag)
+ queue[fd].append((f, callback, args))
+ return f
+
+#
+# Proactor using epoll()
+#
+
+if hasattr(select, 'epoll'):
+ assert (select.EPOLLIN, select.EPOLLOUT) == (POLLIN, POLLOUT)
+
+ __all__.append('EpollProactor')
+
+ class EpollProactor(PollProactor):
+ _make_poller = select.epoll
+ _uses_msecs = False
+
+
+#
+# Proactor using overlapped IO and a completion port
+#
+
+try:
+ from _overlapped import *
+except ImportError:
+ if sys.platform == 'win32':
+ warnings.warn('IOCP support not compiled')
+else:
+ __all__.append('IocpProactor')
+
+ from _winapi import CloseHandle
+ import weakref
+
+ class IocpProactor(BaseProactor):
+ def __init__(self, concurrency=0xffffffff):
+ super().__init__()
+ self._iocp = CreateIoCompletionPort(
+ INVALID_HANDLE_VALUE, NULL, 0, concurrency)
+ self._cache = {}
+ self._registered = weakref.WeakSet()
+
+ def pollable(self):
+ return bool(self._cache)
+
+ def recv(self, conn, nbytes, flags=0):
+ self._register_obj(conn)
+ ov = Overlapped(NULL)
+ ov.WSARecv(conn.fileno(), nbytes, flags)
+ if ov.pending:
+ raise self._register(ov, conn, ov.getresult)
+ return ov.getresult()
+
+ def send(self, conn, buf, flags=0):
+ self._register_obj(conn)
+ ov = Overlapped(NULL)
+ ov.WSASend(conn.fileno(), buf, flags)
+ if ov.pending:
+ raise self._register(ov, conn, ov.getresult)
+ return ov.getresult()
+
+ def accept(self, listener):
+ self._register_obj(listener)
+ conn = self._get_accept_socket()
+ ov = Overlapped(NULL)
+ ov.AcceptEx(listener.fileno(), conn.fileno())
+ def finish_accept():
+ addr = ov.getresult()
+ conn.setsockopt(socket.SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT, listener.fileno())
+ conn.settimeout(listener.gettimeout())
+ return conn, conn.getpeername()
+ if ov.pending:
+ raise self._register(ov, listener, finish_accept)
+ return ov.getresult()
+
+ def connect(self, conn, address):
+ self._register_obj(conn)
+ BindLocal(conn.fileno(), len(address))
+ ov = Overlapped(NULL)
+ ov.ConnectEx(conn.fileno(), address)
+ def finish_connect():
+ ov.getresult()
+ conn.setsockopt(socket.SOL_SOCKET,
+ SO_UPDATE_CONNECT_CONTEXT, 0)
+ return conn
+ if ov.pending:
+ raise self._register(ov, conn, finish_connect)
+ return ov.getresult()
+
+ def _readable(self, sock):
+ raise NotImplementedError('IocpProactor._readable()')
+
+ def _writable(self, sock):
+ raise NotImplementedError('IocpProactor._writable()')
+
+ def _register_obj(self, obj):
+ if obj not in self._registered:
+ self._registered.add(obj)
+ CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
+ SetFileCompletionNotificationModes(obj.fileno(),
+ FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
+
+ def _register(self, ov, obj, callback, discard=False):
+ # we prevent ov and obj from being garbage collected
+ f = None if discard else self._Future()
+ self._cache[ov.address] = (f, ov, obj, callback)
+ return f
+
+ def _get_accept_socket(self):
+ s = socket.socket()
+ s.settimeout(0)
+ return s
+
+ def _poll(self, timeout=None):
+ if timeout is None:
+ ms = INFINITE
+ elif timeout < 0:
+ raise ValueError("negative timeout")
+ else:
+ ms = int(timeout * 1000 + 0.5)
+ if ms >= INFINITE:
+ raise ValueError("timeout too big")
+ while True:
+ status = GetQueuedCompletionStatus(self._iocp, ms)
+ if status is None:
+ return
+ f, ov, obj, callback = self._cache.pop(status[3])
+ try:
+ value = callback()
+ except OSError as e:
+ if f is None:
+ sys.excepthook(*sys.exc_info())
+ continue
+ f.set_exception(e)
+ self._results.append(f)
+ else:
+ if f is None:
+ continue
+ f.set_result(value)
+ self._results.append(f)
+ ms = 0
+
+ def close(self, *, CloseHandle=CloseHandle):
+ if self._iocp is not None:
+ CloseHandle(self._iocp)
+ self._iocp = None
+
+ __del__ = close
+
+#
+# Select default proactor (IocpReactor does not support SSL)
+#
+
+for _ in ('EpollProactor', 'IocpProactor', 'PollProactor', 'SelectProactor'):
+ if _ in globals():
+ Proactor = globals()[_]
+ break
+del _
+
+# Proactor = SelectProactor
diff --git a/scheduling.py b/scheduling.py
index ce649f5..50b7399 100644
--- a/scheduling.py
+++ b/scheduling.py
@@ -187,16 +187,6 @@ class Task:
self.unblocker = None
self.eventloop.call_soon(self.step)
- def block_io(self, fd, flag):
- assert isinstance(fd, int), repr(fd)
- assert flag in ('r', 'w'), repr(flag)
- if flag == 'r':
- self.block(self.eventloop.remove_reader, fd)
- self.eventloop.add_reader(fd, self.unblock)
- else:
- self.block(self.eventloop.remove_writer, fd)
- self.eventloop.add_writer(fd, self.unblock)
-
def wait(self):
"""COROUTINE: Wait until this task is finished."""
current_task = context.current_task
@@ -256,33 +246,46 @@ def sleep(secs):
yield
-def block_r(fd):
- """COROUTINE: Block until a file descriptor is ready for reading."""
- context.current_task.block_io(fd, 'r')
+def block_future(future):
+ """COROUTINE: Block until future is set"""
+ task = context.current_task
+ future.add_done_callback(task.unblock)
+ task.block()
yield
-def block_w(fd):
- """COROUTINE: Block until a file descriptor is ready for writing."""
- context.current_task.block_io(fd, 'w')
- yield
+def block_r(obj):
+ """COROUTINE: Block until object is readable"""
+ # XXX not implemented for IOCP.
+ f = context.eventloop.proactor._readable(obj)
+ if not f.done():
+ yield from block_future(f)
+
+
+def block_w(obj):
+ """COROUTINE: Block until object is writable"""
+ # XXX not implemented for IOCP.
+ f = context.eventloop.proactor._writable(obj)
+ if not f.done():
+ yield from block_future(f)
def call_in_thread(func, *args, executor=None):
"""COROUTINE: Run a function in a thread."""
task = context.current_task
eventloop = context.eventloop
- future = context.threadrunner.submit(func, *args, executor=executor)
+ def reschedule(_):
+ eventloop.call_soon(task.unblock_if_alive)
+ future = context.threadrunner.submit(
+ func, *args, executor=executor, insert_callback=reschedule)
task.block(future.cancel)
# If the thread managed to complete before we get here,
- # add_done_callback() will call the callback right now. Make sure
- # the unblock() call doesn't happen until later. But then, the
- # task may already have been cancelled (and it may have been too
- # late to cancel the Future) so it should be okay if this call
- # finds the task deceased. For that purpose we have
- # unblock_if_alive().
- future.add_done_callback(
- lambda _: eventloop.call_soon(task.unblock_if_alive))
+ # reschedule() have been called. reschedule() must be the *first*
+ # callback added to future. Otherwise the eventloop may exit
+ # before the current task is rescheduled. The task may already
+ # have been cancelled (and it may have been too late to cancel the
+ # Future) so it should be okay if this call finds the task
+ # deceased. For that purpose we use unblock_if_alive().
yield
assert future.done()
return future.result()
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..2754e2b
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[build_ext]
+inplace=1 \ No newline at end of file
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..ff70754
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,4 @@
+from distutils.core import setup, Extension
+
+ext = Extension('_overlapped', ['overlapped.c'], libraries=['ws2_32'])
+setup(name='_overlapped', ext_modules=[ext])
diff --git a/sockets.py b/sockets.py
index 4d64ee4..f54f19a 100644
--- a/sockets.py
+++ b/sockets.py
@@ -30,6 +30,8 @@ import ssl
# Local imports.
import scheduling
+from scheduling import context
+from proactor import Future
# Errno values indicating the connection was disconnected.
_DISCONNECTED = frozenset((errno.ECONNRESET,
@@ -61,9 +63,14 @@ class SocketTransport:
returns b''.
"""
assert n >= 0, n
+
while True:
try:
- return self.sock.recv(n)
+ try:
+ return context._eventloop.proactor.recv(self.sock, n)
+ except Future as f:
+ yield from scheduling.block_future(f)
+ return f.result()
except socket.error as err:
if err.errno in _TRYAGAIN:
pass
@@ -71,7 +78,6 @@ class SocketTransport:
return b''
else:
raise # Unexpected, propagate.
- yield from scheduling.block_r(self.sock.fileno())
def send(self, data):
"""COROUTINE; Send data to the socket, blocking until all written.
@@ -80,7 +86,11 @@ class SocketTransport:
"""
while data:
try:
- n = self.sock.send(data)
+ try:
+ n = context._eventloop.proactor.send(self.sock, data)
+ except Future as f:
+ yield from scheduling.block_future(f)
+ n = f.result()
except socket.error as err:
if err.errno in _TRYAGAIN:
pass
@@ -93,8 +103,6 @@ class SocketTransport:
if n == len(data):
break
data = data[n:]
- continue
- yield from scheduling.block_w(self.sock.fileno())
return True
@@ -122,9 +130,9 @@ class SslTransport:
try:
self.sslsock.do_handshake()
except ssl.SSLWantReadError:
- yield from scheduling.block_r(self.sslsock.fileno())
+ yield from scheduling.block_r(self.sslsock)
except ssl.SSLWantWriteError:
- yield from scheduling.block_w(self.sslsock.fileno())
+ yield from scheduling.block_w(self.sslsock)
else:
break
@@ -137,12 +145,12 @@ class SslTransport:
try:
return self.sslsock.recv(n)
except ssl.SSLWantReadError:
- yield from scheduling.block_r(self.sslsock.fileno())
+ yield from scheduling.block_r(self.sslsock)
except ssl.SSLWantWriteError:
- yield from scheduling.block_w(self.sslsock.fileno())
+ yield from scheduling.block_w(self.sslsock)
except socket.error as err:
if err.errno in _TRYAGAIN:
- yield from scheduling.block_r(self.sock.fileno())
+ yield from scheduling.block_r(self.sslsock)
elif err.errno in _DISCONNECTED:
# Can this happen?
return b''
@@ -155,12 +163,12 @@ class SslTransport:
try:
n = self.sslsock.send(data)
except ssl.SSLWantReadError:
- yield from scheduling.block_r(self.sslsock.fileno())
+ yield from scheduling.block_r(self.sslsock)
except ssl.SSLWantWriteError:
- yield from scheduling.block_w(self.sslsock.fileno())
+ yield from scheduling.block_w(self.sslsock)
except socket.error as err:
if err.errno in _TRYAGAIN:
- yield from scheduling.block_w(self.sock.fileno())
+ yield from scheduling.block_w(self.sslsock)
elif err.errno in _DISCONNECTED:
return False
else:
@@ -243,14 +251,10 @@ class BufferedReader:
def connect(sock, address):
"""COROUTINE: Connect a socket to an address."""
try:
- sock.connect(address)
- except socket.error as err:
- if err.errno != errno.EINPROGRESS:
- raise
- yield from scheduling.block_w(sock.fileno())
- err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err != 0:
- raise IOError(err, 'Connection refused')
+ return context._eventloop.proactor.connect(sock, address)
+ except Future as f:
+ yield from scheduling.block_future(f)
+ return f.result()
def getaddrinfo(host, port, af=0, socktype=0, proto=0):
@@ -310,15 +314,14 @@ class Listener:
"""COROUTINE: Accept a connection."""
while True:
try:
- conn, addr = self.sock.accept()
+ try:
+ return context._eventloop.proactor.accept(self.sock)
+ except Future as f:
+ yield from scheduling.block_future(f)
+ return f.result()
except socket.error as err:
- if err.errno in _TRYAGAIN:
- yield from scheduling.block_r(self.sock.fileno())
- else:
- raise # Unexpected, propagate.
- else:
- conn.setblocking(False)
- return conn, addr
+ if err.errno not in _TRYAGAIN:
+ raise
def create_listener(host, port, af=0, socktype=0, proto=0,