summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-01-25 17:21:15 +0000
committerRichard Oudkerk <shibturn@gmail.com>2013-01-25 17:21:15 +0000
commit54554e2f6d6108a0eead0b268e86207017d7fcee (patch)
tree93595165a3261d4d7839d6226d3e79b45ea96287
parent347f4e51b7aa79a33a648da07f434d3636e7434a (diff)
parenteaa412a3430f9cb351f56887286a3f0d32d5bd5b (diff)
downloadtrollius-54554e2f6d6108a0eead0b268e86207017d7fcee.tar.gz
Dummy merge.
-rw-r--r--overlapped.c998
-rw-r--r--setup.cfg2
-rw-r--r--setup.py4
-rw-r--r--tulip/events_test.py38
-rw-r--r--tulip/iocp_events.py318
-rw-r--r--tulip/selectors.py4
-rw-r--r--tulip/unix_events.py480
7 files changed, 1602 insertions, 242 deletions
diff --git a/overlapped.c b/overlapped.c
new file mode 100644
index 0000000..a428cf2
--- /dev/null
+++ b/overlapped.c
@@ -0,0 +1,998 @@
+/*
+ * Support for overlapped IO
+ *
+ * Some code borrowed from Modules/_winapi.c of CPython
+ */
+
+/* XXX check overflow and DWORD <-> Py_ssize_t conversions
+ Check itemsize */
+
+#include "Python.h"
+#include "structmember.h"
+
+#define WINDOWS_LEAN_AND_MEAN
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <mswsock.h>
+
+#if defined(MS_WIN32) && !defined(MS_WIN64)
+# define F_POINTER "k"
+# define T_POINTER T_ULONG
+#else
+# define F_POINTER "K"
+# define T_POINTER T_ULONGLONG
+#endif
+
+#define F_HANDLE F_POINTER
+#define F_ULONG_PTR F_POINTER
+#define F_DWORD "k"
+#define F_BOOL "i"
+#define F_UINT "I"
+
+#define T_HANDLE T_POINTER
+
+enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT,
+ TYPE_CONNECT, TYPE_DISCONNECT};
+
+/*
+ * Some functions should be loaded at runtime
+ */
+
+static LPFN_ACCEPTEX Py_AcceptEx = NULL;
+static LPFN_CONNECTEX Py_ConnectEx = NULL;
+static LPFN_DISCONNECTEX Py_DisconnectEx = NULL;
+static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED) = NULL;
+
+#define GET_WSA_POINTER(s, x) \
+ (SOCKET_ERROR != WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, \
+ &Guid##x, sizeof(Guid##x), &Py_##x, \
+ sizeof(Py_##x), &dwBytes, NULL, NULL))
+
+static int
+initialize_function_pointers(void)
+{
+ GUID GuidAcceptEx = WSAID_ACCEPTEX;
+ GUID GuidConnectEx = WSAID_CONNECTEX;
+ GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
+ HINSTANCE hKernel32;
+ SOCKET s;
+ DWORD dwBytes;
+
+ s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == INVALID_SOCKET) {
+ PyErr_SetFromWindowsErr(WSAGetLastError());
+ return -1;
+ }
+
+ if (!GET_WSA_POINTER(s, AcceptEx) ||
+ !GET_WSA_POINTER(s, ConnectEx) ||
+ !GET_WSA_POINTER(s, DisconnectEx))
+ {
+ closesocket(s);
+ PyErr_SetFromWindowsErr(WSAGetLastError());
+ return -1;
+ }
+
+ closesocket(s);
+
+ /* On WinXP we will have Py_CancelIoEx == NULL */
+ hKernel32 = GetModuleHandle("KERNEL32");
+ *(FARPROC *)&Py_CancelIoEx = GetProcAddress(hKernel32, "CancelIoEx");
+ return 0;
+}
+
+/*
+ * Completion port stuff
+ */
+
+PyDoc_STRVAR(
+ CreateIoCompletionPort_doc,
+ "CreateIoCompletionPort(handle, port, key, concurrency) -> port\n\n"
+ "Create a completion port or register a handle with a port.");
+
+static PyObject *
+overlapped_CreateIoCompletionPort(PyObject *self, PyObject *args)
+{
+ HANDLE FileHandle;
+ HANDLE ExistingCompletionPort;
+ ULONG_PTR CompletionKey;
+ DWORD NumberOfConcurrentThreads;
+ HANDLE ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE F_ULONG_PTR F_DWORD,
+ &FileHandle, &ExistingCompletionPort, &CompletionKey,
+ &NumberOfConcurrentThreads))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = CreateIoCompletionPort(FileHandle, ExistingCompletionPort,
+ CompletionKey, NumberOfConcurrentThreads);
+ Py_END_ALLOW_THREADS
+
+ if (ret == NULL)
+ return PyErr_SetFromWindowsErr(0);
+ return Py_BuildValue(F_HANDLE, ret);
+}
+
+PyDoc_STRVAR(
+ GetQueuedCompletionStatus_doc,
+ "GetQueuedCompletionStatus(port, msecs) -> (err, bytes, key, address)\n\n"
+ "Get a message from completion port. Wait for up to msecs milliseconds.");
+
+static PyObject *
+overlapped_GetQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+ HANDLE CompletionPort;
+ DWORD Milliseconds;
+ DWORD NumberOfBytes;
+ ULONG_PTR CompletionKey;
+ OVERLAPPED *Overlapped = NULL;
+ DWORD err;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD,
+ &CompletionPort, &Milliseconds))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetQueuedCompletionStatus(CompletionPort, &NumberOfBytes,
+ &CompletionKey, &Overlapped, Milliseconds);
+ Py_END_ALLOW_THREADS
+
+ err = ret ? ERROR_SUCCESS : GetLastError();
+ if (Overlapped == NULL) {
+ if (err == WAIT_TIMEOUT)
+ Py_RETURN_NONE;
+ else
+ return PyErr_SetFromWindowsErr(err);
+ }
+ return Py_BuildValue(F_DWORD F_DWORD F_ULONG_PTR F_POINTER,
+ err, NumberOfBytes, CompletionKey, Overlapped);
+}
+
+PyDoc_STRVAR(
+ PostQueuedCompletionStatus_doc,
+ "PostQueuedCompletionStatus(port, bytes, key, address) -> None\n\n"
+ "Post a message to completion port.");
+
+static PyObject *
+overlapped_PostQueuedCompletionStatus(PyObject *self, PyObject *args)
+{
+ HANDLE CompletionPort;
+ DWORD NumberOfBytes;
+ ULONG_PTR CompletionKey;
+ OVERLAPPED *Overlapped;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_ULONG_PTR F_POINTER,
+ &CompletionPort, &NumberOfBytes, &CompletionKey,
+ &Overlapped))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = PostQueuedCompletionStatus(CompletionPort, NumberOfBytes,
+ CompletionKey, Overlapped);
+ Py_END_ALLOW_THREADS
+
+ if (!ret)
+ return PyErr_SetFromWindowsErr(0);
+ Py_RETURN_NONE;
+}
+
+/*
+ * Bind socket handle to local port without doing slow getaddrinfo()
+ */
+
+PyDoc_STRVAR(
+ BindLocal_doc,
+ "BindLocal(handle, length_of_address_tuple) -> None\n\n"
+ "Bind a socket handle to an arbitrary local port.\n"
+ "If length_of_address_tuple is 2 then an AF_INET address is used.\n"
+ "If length_of_address_tuple is 4 then an AF_INET6 address is used.");
+
+static PyObject *
+overlapped_BindLocal(PyObject *self, PyObject *args)
+{
+ SOCKET Socket;
+ int TupleLength;
+ BOOL ret;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "i", &Socket, &TupleLength))
+ return NULL;
+
+ if (TupleLength == 2) {
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_port = 0;
+ addr.sin_addr.S_un.S_addr = INADDR_ANY;
+ ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR;
+ } else if (TupleLength == 4) {
+ struct sockaddr_in6 addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin6_family = AF_INET6;
+ addr.sin6_port = 0;
+ addr.sin6_addr = in6addr_any;
+ ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR;
+ } else {
+ PyErr_SetString(PyExc_ValueError, "expected tuple of length 2 or 4");
+ return NULL;
+ }
+
+ if (!ret)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ Py_RETURN_NONE;
+}
+
+/*
+ * Set notification mode for the handle
+ */
+
+PyDoc_STRVAR(
+ SetFileCompletionNotificationModes_doc,
+ "SetFileCompletionNotificationModes(FileHandle, Flags) -> None\n\n"
+ "Set whether notification happens if operation succeeds without blocking");
+
+static PyObject *
+overlapped_SetFileCompletionNotificationModes(PyObject *self, PyObject *args)
+{
+ HANDLE FileHandle;
+ UCHAR Flags;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_BOOL, &FileHandle, &Flags))
+ return NULL;
+
+ if (!SetFileCompletionNotificationModes(FileHandle, Flags))
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+
+ Py_RETURN_NONE;
+}
+
+/*
+ * A Python object wrapping an OVERLAPPED structure and other useful data
+ * for overlapped I/O
+ */
+
+PyDoc_STRVAR(
+ Overlapped_doc,
+ "Overlapped object");
+
+typedef struct {
+ PyObject_HEAD
+ OVERLAPPED overlapped;
+ /* For convenience, we store the file handle too */
+ HANDLE handle;
+ /* Error returned by last method call */
+ DWORD error;
+ /* Type of operation */
+ DWORD type;
+ /* Buffer used for reading (optional) */
+ PyObject *read_buffer;
+ /* Buffer used for writing (optional) */
+ Py_buffer write_buffer;
+} OverlappedObject;
+
+
+static PyObject *
+Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ OverlappedObject *self;
+ HANDLE event = INVALID_HANDLE_VALUE;
+ static char *kwlist[] = {"event", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "|" F_HANDLE, kwlist, &event))
+ return NULL;
+
+ if (event == INVALID_HANDLE_VALUE) {
+ event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (event == NULL)
+ return PyErr_SetExcFromWindowsErr(PyExc_OSError, 0);
+ }
+
+ self = PyObject_New(OverlappedObject, type);
+ if (self == NULL) {
+ if (event != NULL)
+ CloseHandle(event);
+ return NULL;
+ }
+
+ self->handle = NULL;
+ self->error = 0;
+ self->type = TYPE_NONE;
+ self->read_buffer = NULL;
+ memset(&self->overlapped, 0, sizeof(OVERLAPPED));
+ memset(&self->write_buffer, 0, sizeof(Py_buffer));
+ if (event)
+ self->overlapped.hEvent = event;
+ return (PyObject *)self;
+}
+
+static void
+Overlapped_dealloc(OverlappedObject *self)
+{
+ DWORD bytes;
+ DWORD olderr = GetLastError();
+ BOOL wait = FALSE;
+ BOOL ret;
+
+ if (!HasOverlappedIoCompleted(&self->overlapped) &&
+ self->type != TYPE_NOT_STARTED)
+ {
+ if (Py_CancelIoEx && Py_CancelIoEx(self->handle, &self->overlapped))
+ wait = TRUE;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetOverlappedResult(self->handle, &self->overlapped,
+ &bytes, wait);
+ Py_END_ALLOW_THREADS
+
+ switch (ret ? ERROR_SUCCESS : GetLastError()) {
+ case ERROR_SUCCESS:
+ case ERROR_NOT_FOUND:
+ case ERROR_OPERATION_ABORTED:
+ break;
+ default:
+ PyErr_SetString(
+ PyExc_RuntimeError,
+ "I/O operations still in flight while destroying "
+ "Overlapped object, the process may crash");
+ PyErr_WriteUnraisable(NULL);
+ }
+ }
+
+ if (self->overlapped.hEvent != NULL)
+ CloseHandle(self->overlapped.hEvent);
+
+ if (self->write_buffer.obj)
+ PyBuffer_Release(&self->write_buffer);
+
+ Py_CLEAR(self->read_buffer);
+ PyObject_Del(self);
+ SetLastError(olderr);
+}
+
+PyDoc_STRVAR(
+ Overlapped_cancel_doc,
+ "cancel() -> None\n\n"
+ "Cancel overlapped operation");
+
+static PyObject *
+Overlapped_cancel(OverlappedObject *self)
+{
+ BOOL ret = TRUE;
+
+ if (self->type == TYPE_NOT_STARTED)
+ Py_RETURN_NONE;
+
+ if (!HasOverlappedIoCompleted(&self->overlapped)) {
+ Py_BEGIN_ALLOW_THREADS
+ if (Py_CancelIoEx)
+ ret = Py_CancelIoEx(self->handle, &self->overlapped);
+ else
+ ret = CancelIo(self->handle);
+ Py_END_ALLOW_THREADS
+ }
+
+ /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */
+ if (!ret && GetLastError() != ERROR_NOT_FOUND)
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+ Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(
+ Overlapped_getresult_doc,
+ "getresult(wait=False) -> result\n\n"
+ "Retrieve result of operation. If wait is true then it blocks\n"
+ "until the operation is finished. If wait is false and the\n"
+ "operation is still pending then an error is raised.");
+
+static PyObject *
+Overlapped_getresult(OverlappedObject *self, PyObject *args)
+{
+ BOOL wait = FALSE;
+ DWORD transferred = 0;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait))
+ return NULL;
+
+ if (self->type == TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation not yet attempted");
+ return NULL;
+ }
+
+ if (self->type == TYPE_NOT_STARTED) {
+ PyErr_SetString(PyExc_ValueError, "operation failed to start");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
+ wait);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ break;
+ case ERROR_BROKEN_PIPE:
+ if (self->read_buffer != NULL)
+ break;
+ /* fall through */
+ default:
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+
+ switch (self->type) {
+ case TYPE_READ:
+ assert(PyBytes_CheckExact(self->read_buffer));
+ if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
+ _PyBytes_Resize(&self->read_buffer, transferred))
+ return NULL;
+ Py_INCREF(self->read_buffer);
+ return self->read_buffer;
+ case TYPE_ACCEPT:
+ case TYPE_CONNECT:
+ case TYPE_DISCONNECT:
+ Py_RETURN_NONE;
+ default:
+ return PyLong_FromUnsignedLong((unsigned long) transferred);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_ReadFile_doc,
+ "ReadFile(handle, size) -> Overlapped[message]\n\n"
+ "Start overlapped read");
+
+static PyObject *
+Overlapped_ReadFile(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ DWORD size;
+ DWORD nread;
+ PyObject *buf;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+ size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+ buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+ if (buf == NULL)
+ return NULL;
+
+ self->type = TYPE_READ;
+ self->handle = handle;
+ self->read_buffer = buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
+ &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ case ERROR_BROKEN_PIPE:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WSARecv_doc,
+ "RecvFile(handle, size, flags) -> Overlapped[message]\n\n"
+ "Start overlapped receive");
+
+static PyObject *
+Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ DWORD size;
+ DWORD flags = 0;
+ DWORD nread;
+ PyObject *buf;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD,
+ &handle, &size, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+ size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+ buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+ if (buf == NULL)
+ return NULL;
+
+ self->type = TYPE_READ;
+ self->handle = handle;
+ self->read_buffer = buf;
+ wsabuf.len = size;
+ wsabuf.buf = PyBytes_AS_STRING(buf);
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
+ &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ case ERROR_BROKEN_PIPE:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WriteFile_doc,
+ "WriteFile(handle, buf) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped write");
+
+static PyObject *
+Overlapped_WriteFile(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD written;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+ PyBuffer_Release(&self->write_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer to large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_WRITE;
+ self->handle = handle;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WriteFile(handle, self->write_buffer.buf, self->write_buffer.len,
+ &written, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WSASend_doc,
+ "WSASend(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n"
+ "Start overlapped send");
+
+static PyObject *
+Overlapped_WSASend(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD flags;
+ DWORD written;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD,
+ &handle, &bufobj, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->write_buffer))
+ return NULL;
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) {
+ PyBuffer_Release(&self->write_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer to large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_WRITE;
+ self->handle = handle;
+ wsabuf.len = (DWORD)self->write_buffer.len;
+ wsabuf.buf = self->write_buffer.buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags,
+ &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_AcceptEx_doc,
+ "AcceptEx(listen_handle, accept_handle) -> Overlapped[address_as_bytes]\n\n"
+ "Start overlapped wait for client to connect");
+
+static PyObject *
+Overlapped_AcceptEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET ListenSocket;
+ SOCKET AcceptSocket;
+ DWORD BytesReceived;
+ DWORD size;
+ PyObject *buf;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE,
+ &ListenSocket, &AcceptSocket))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ size = sizeof(struct sockaddr_in6) + 16;
+ buf = PyBytes_FromStringAndSize(NULL, size*2);
+ if (!buf)
+ return NULL;
+
+ self->type = TYPE_ACCEPT;
+ self->handle = (HANDLE)ListenSocket;
+ self->read_buffer = buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf),
+ 0, size, size, &BytesReceived, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+
+static int
+parse_address(PyObject *obj, SOCKADDR *Address, int Length)
+{
+ char *Host;
+ unsigned short Port;
+ unsigned long FlowInfo;
+ unsigned long ScopeId;
+
+ memset(Address, 0, Length);
+
+ if (PyArg_ParseTuple(obj, "sH", &Host, &Port))
+ {
+ Address->sa_family = AF_INET;
+ if (WSAStringToAddressA(Host, AF_INET, NULL, Address, &Length) < 0) {
+ PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ return -1;
+ }
+ ((SOCKADDR_IN*)Address)->sin_port = htons(Port);
+ return Length;
+ }
+ else if (PyArg_ParseTuple(obj, "sHkk", &Host, &Port, &FlowInfo, &ScopeId))
+ {
+ PyErr_Clear();
+ Address->sa_family = AF_INET6;
+ if (WSAStringToAddressA(Host, AF_INET6, NULL, Address, &Length) < 0) {
+ PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+ return -1;
+ }
+ ((SOCKADDR_IN6*)Address)->sin6_port = htons(Port);
+ ((SOCKADDR_IN6*)Address)->sin6_flowinfo = FlowInfo;
+ ((SOCKADDR_IN6*)Address)->sin6_scope_id = ScopeId;
+ return Length;
+ }
+
+ return -1;
+}
+
+
+PyDoc_STRVAR(
+ Overlapped_ConnectEx_doc,
+ "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n"
+ "Start overlapped connect. client_handle should be unbound.");
+
+static PyObject *
+Overlapped_ConnectEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET ConnectSocket;
+ PyObject *AddressObj;
+ char AddressBuf[sizeof(struct sockaddr_in6)];
+ SOCKADDR *Address = (SOCKADDR*)AddressBuf;
+ int Length;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ Length = sizeof(AddressBuf);
+ Length = parse_address(AddressObj, Address, Length);
+ if (Length < 0)
+ return NULL;
+
+ self->type = TYPE_CONNECT;
+ self->handle = (HANDLE)ConnectSocket;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_ConnectEx(ConnectSocket, Address, Length,
+ NULL, 0, NULL, &self->overlapped);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_DisconnectEx_doc,
+ "DisconnectEx(handle, flags) -> Overlapped[None]\n\n"
+ "Start overlapped connect. client_handle should be unbound.");
+
+static PyObject *
+Overlapped_DisconnectEx(OverlappedObject *self, PyObject *args)
+{
+ SOCKET Socket;
+ DWORD flags;
+ BOOL ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &Socket, &flags))
+ return NULL;
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ self->type = TYPE_DISCONNECT;
+ self->handle = (HANDLE)Socket;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = Py_DisconnectEx(Socket, &self->overlapped, flags, 0);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+ }
+}
+
+static PyObject*
+Overlapped_getaddress(OverlappedObject *self)
+{
+ return PyLong_FromVoidPtr(&self->overlapped);
+}
+
+static PyObject*
+Overlapped_getpending(OverlappedObject *self)
+{
+ return PyBool_FromLong(!HasOverlappedIoCompleted(&self->overlapped) &&
+ self->type != TYPE_NOT_STARTED);
+}
+
+static PyMethodDef Overlapped_methods[] = {
+ {"getresult", (PyCFunction) Overlapped_getresult,
+ METH_VARARGS, Overlapped_getresult_doc},
+ {"cancel", (PyCFunction) Overlapped_cancel,
+ METH_NOARGS, Overlapped_cancel_doc},
+ {"ReadFile", (PyCFunction) Overlapped_ReadFile,
+ METH_VARARGS, Overlapped_ReadFile_doc},
+ {"WSARecv", (PyCFunction) Overlapped_WSARecv,
+ METH_VARARGS, Overlapped_WSARecv_doc},
+ {"WriteFile", (PyCFunction) Overlapped_WriteFile,
+ METH_VARARGS, Overlapped_WriteFile_doc},
+ {"WSASend", (PyCFunction) Overlapped_WSASend,
+ METH_VARARGS, Overlapped_WSASend_doc},
+ {"AcceptEx", (PyCFunction) Overlapped_AcceptEx,
+ METH_VARARGS, Overlapped_AcceptEx_doc},
+ {"ConnectEx", (PyCFunction) Overlapped_ConnectEx,
+ METH_VARARGS, Overlapped_ConnectEx_doc},
+ {"DisconnectEx", (PyCFunction) Overlapped_DisconnectEx,
+ METH_VARARGS, Overlapped_DisconnectEx_doc},
+ {NULL}
+};
+
+static PyMemberDef Overlapped_members[] = {
+ {"error", T_ULONG,
+ offsetof(OverlappedObject, error),
+ READONLY, "Error from last operation"},
+ {"event", T_HANDLE,
+ offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent),
+ READONLY, "Overlapped event handle"},
+ {NULL}
+};
+
+static PyGetSetDef Overlapped_getsets[] = {
+ {"address", (getter)Overlapped_getaddress, NULL,
+ "Address of overlapped structure"},
+ {"pending", (getter)Overlapped_getpending, NULL,
+ "Whether the operation is pending"},
+ {NULL},
+};
+
+PyTypeObject OverlappedType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ /* tp_name */ "_overlapped.Overlapped",
+ /* tp_basicsize */ sizeof(OverlappedObject),
+ /* tp_itemsize */ 0,
+ /* tp_dealloc */ (destructor) Overlapped_dealloc,
+ /* tp_print */ 0,
+ /* tp_getattr */ 0,
+ /* tp_setattr */ 0,
+ /* tp_reserved */ 0,
+ /* tp_repr */ 0,
+ /* tp_as_number */ 0,
+ /* tp_as_sequence */ 0,
+ /* tp_as_mapping */ 0,
+ /* tp_hash */ 0,
+ /* tp_call */ 0,
+ /* tp_str */ 0,
+ /* tp_getattro */ 0,
+ /* tp_setattro */ 0,
+ /* tp_as_buffer */ 0,
+ /* tp_flags */ Py_TPFLAGS_DEFAULT,
+ /* tp_doc */ "OVERLAPPED structure wrapper",
+ /* tp_traverse */ 0,
+ /* tp_clear */ 0,
+ /* tp_richcompare */ 0,
+ /* tp_weaklistoffset */ 0,
+ /* tp_iter */ 0,
+ /* tp_iternext */ 0,
+ /* tp_methods */ Overlapped_methods,
+ /* tp_members */ Overlapped_members,
+ /* tp_getset */ Overlapped_getsets,
+ /* tp_base */ 0,
+ /* tp_dict */ 0,
+ /* tp_descr_get */ 0,
+ /* tp_descr_set */ 0,
+ /* tp_dictoffset */ 0,
+ /* tp_init */ 0,
+ /* tp_alloc */ 0,
+ /* tp_new */ Overlapped_new,
+};
+
+static PyMethodDef overlapped_functions[] = {
+ {"CreateIoCompletionPort", overlapped_CreateIoCompletionPort,
+ METH_VARARGS, CreateIoCompletionPort_doc},
+ {"GetQueuedCompletionStatus", overlapped_GetQueuedCompletionStatus,
+ METH_VARARGS, GetQueuedCompletionStatus_doc},
+ {"PostQueuedCompletionStatus", overlapped_PostQueuedCompletionStatus,
+ METH_VARARGS, PostQueuedCompletionStatus_doc},
+ {"BindLocal", overlapped_BindLocal,
+ METH_VARARGS, BindLocal_doc},
+ {"SetFileCompletionNotificationModes",
+ overlapped_SetFileCompletionNotificationModes,
+ METH_VARARGS, SetFileCompletionNotificationModes_doc},
+ {NULL}
+};
+
+static struct PyModuleDef overlapped_module = {
+ PyModuleDef_HEAD_INIT,
+ "_overlapped",
+ NULL,
+ -1,
+ overlapped_functions,
+ NULL,
+ NULL,
+ NULL,
+ NULL
+};
+
+#define WINAPI_CONSTANT(fmt, con) \
+ PyDict_SetItemString(d, #con, Py_BuildValue(fmt, con))
+
+PyMODINIT_FUNC
+PyInit__overlapped(void)
+{
+ PyObject *m, *d;
+
+ /* Ensure WSAStartup() called before initializing function pointers */
+ m = PyImport_ImportModule("_socket");
+ if (!m)
+ return NULL;
+ Py_DECREF(m);
+
+ if (initialize_function_pointers() < 0)
+ return NULL;
+
+ if (PyType_Ready(&OverlappedType) < 0)
+ return NULL;
+
+ m = PyModule_Create(&overlapped_module);
+ if (PyModule_AddObject(m, "Overlapped", (PyObject *)&OverlappedType) < 0)
+ return NULL;
+
+ d = PyModule_GetDict(m);
+
+ WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING);
+ WINAPI_CONSTANT(F_DWORD, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
+ WINAPI_CONSTANT(F_DWORD, INFINITE);
+ WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
+ WINAPI_CONSTANT(F_HANDLE, NULL);
+ WINAPI_CONSTANT(F_DWORD, SO_UPDATE_ACCEPT_CONTEXT);
+ WINAPI_CONSTANT(F_DWORD, SO_UPDATE_CONNECT_CONTEXT);
+ WINAPI_CONSTANT(F_DWORD, TF_REUSE_SOCKET);
+
+ return m;
+}
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..0260f9d
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[build_ext]
+build_lib=tulip
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..67b037c
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,4 @@
+from distutils.core import setup, Extension
+
+ext = Extension('_overlapped', ['overlapped.c'], libraries=['ws2_32'])
+setup(name='_overlapped', ext_modules=[ext])
diff --git a/tulip/events_test.py b/tulip/events_test.py
index 7680257..0a08480 100644
--- a/tulip/events_test.py
+++ b/tulip/events_test.py
@@ -46,8 +46,7 @@ class MyProto(protocols.Protocol):
class EventLoopTestsMixin:
def setUp(self):
- self.selector = self.SELECTOR_CLASS()
- self.event_loop = unix_events.UnixEventLoop(self.selector)
+ self.event_loop = self.create_event_loop()
events.set_event_loop(self.event_loop)
def tearDown(self):
@@ -216,7 +215,10 @@ class EventLoopTestsMixin:
r, w = unix_events.socketpair()
bytes_read = []
def reader():
- data = r.recv(1024)
+ try:
+ data = r.recv(1024)
+ except BlockingIOError:
+ return
if data:
bytes_read.append(data)
if sum(len(b) for b in bytes_read) >= 6:
@@ -278,7 +280,8 @@ class EventLoopTestsMixin:
sock = socket.socket()
sock.setblocking(False)
# TODO: This depends on python.org behavior!
- el.run_until_complete(el.sock_connect(sock, ('python.org', 80)))
+ address = socket.getaddrinfo('python.org', 80, socket.AF_INET)[0][4]
+ el.run_until_complete(el.sock_connect(sock, address))
el.run_until_complete(el.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
data = el.run_until_complete(el.sock_recv(sock, 1024))
sock.close()
@@ -289,18 +292,19 @@ class EventLoopTestsMixin:
sock = socket.socket()
sock.setblocking(False)
# TODO: This depends on python.org behavior!
+ address = socket.getaddrinfo('python.org', 12345, socket.AF_INET)[0][4]
with self.assertRaises(ConnectionRefusedError):
- el.run_until_complete(el.sock_connect(sock, ('python.org', 12345)))
+ el.run_until_complete(el.sock_connect(sock, address))
sock.close()
def testSockAccept(self):
+ el = events.get_event_loop()
listener = socket.socket()
listener.setblocking(False)
listener.bind(('127.0.0.1', 0))
listener.listen(1)
client = socket.socket()
client.connect(listener.getsockname())
- el = events.get_event_loop()
f = el.sock_accept(listener)
conn, addr = el.run_until_complete(f)
self.assertEqual(conn.gettimeout(), 0)
@@ -414,22 +418,32 @@ class EventLoopTestsMixin:
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- SELECTOR_CLASS = selectors.KqueueSelector
-
+ def create_event_loop(self):
+ return unix_events.UnixEventLoop(selectors.KqueueSelector())
if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- SELECTOR_CLASS = selectors.EpollSelector
-
+ def create_event_loop(self):
+ return unix_events.UnixEventLoop(selectors.EpollSelector())
if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- SELECTOR_CLASS = selectors.PollSelector
+ def create_event_loop(self):
+ return unix_events.UnixEventLoop(selectors.PollSelector())
+
+if sys.platform == 'win32':
+ from . import iocp_events
+ class IocpEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
+ def create_event_loop(self):
+ return iocp_events.IocpEventLoop()
+ def testCreateSslTransport(self):
+ raise unittest.SkipTest("IocpEventLoop imcompatible with SSL")
# Should always exist.
class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- SELECTOR_CLASS = selectors.SelectSelector
+ def create_event_loop(self):
+ return unix_events.UnixEventLoop(selectors.SelectSelector())
class HandlerTests(unittest.TestCase):
diff --git a/tulip/iocp_events.py b/tulip/iocp_events.py
new file mode 100644
index 0000000..cfa09a2
--- /dev/null
+++ b/tulip/iocp_events.py
@@ -0,0 +1,318 @@
+#
+# Module implementing the Proactor pattern
+#
+# A proactor is used to initiate asynchronous I/O, and to wait for
+# completion of previously initiated operations.
+#
+
+import errno
+import logging
+import os
+import heapq
+import sys
+import socket
+import time
+import weakref
+
+from _winapi import CloseHandle
+
+from . import transports
+
+from .futures import Future
+from .unix_events import BaseEventLoop, _StopError
+from .winsocketpair import socketpair
+from ._overlapped import *
+
+
+_TRYAGAIN = frozenset() # XXX
+
+
+class IocpProactor:
+
+ def __init__(self, concurrency=0xffffffff):
+ self._results = []
+ self._iocp = CreateIoCompletionPort(
+ INVALID_HANDLE_VALUE, NULL, 0, concurrency)
+ self._cache = {}
+ self._registered = weakref.WeakSet()
+
+ def registered_count(self):
+ return len(self._cache)
+
+ def select(self, timeout=None):
+ if not self._results:
+ self._poll(timeout)
+ tmp, self._results = self._results, []
+ return tmp
+
+ def recv(self, conn, nbytes, flags=0):
+ self._register_with_iocp(conn)
+ ov = Overlapped(NULL)
+ ov.WSARecv(conn.fileno(), nbytes, flags)
+ return self._register(ov, conn, ov.getresult)
+
+ def send(self, conn, buf, flags=0):
+ self._register_with_iocp(conn)
+ ov = Overlapped(NULL)
+ ov.WSASend(conn.fileno(), buf, flags)
+ return self._register(ov, conn, ov.getresult)
+
+ def accept(self, listener):
+ self._register_with_iocp(listener)
+ conn = self._get_accept_socket()
+ ov = Overlapped(NULL)
+ ov.AcceptEx(listener.fileno(), conn.fileno())
+ def finish_accept():
+ addr = ov.getresult()
+ conn.setsockopt(socket.SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT, listener.fileno())
+ conn.settimeout(listener.gettimeout())
+ return conn, conn.getpeername()
+ return self._register(ov, listener, finish_accept)
+
+ def connect(self, conn, address):
+ self._register_with_iocp(conn)
+ BindLocal(conn.fileno(), len(address))
+ ov = Overlapped(NULL)
+ ov.ConnectEx(conn.fileno(), address)
+ def finish_connect():
+ try:
+ ov.getresult()
+ except OSError as e:
+ if e.winerror == 1225:
+ raise ConnectionRefusedError(errno.ECONNREFUSED,
+ 'connection refused')
+ raise
+ conn.setsockopt(socket.SOL_SOCKET,
+ SO_UPDATE_CONNECT_CONTEXT, 0)
+ return conn
+ return self._register(ov, conn, finish_connect)
+
+ def _register_with_iocp(self, obj):
+ if obj not in self._registered:
+ self._registered.add(obj)
+ CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
+ SetFileCompletionNotificationModes(obj.fileno(),
+ FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)
+
+ def _register(self, ov, obj, callback):
+ f = Future()
+ if ov.error == ERROR_IO_PENDING:
+ # we must prevent ov and obj from being garbage collected
+ self._cache[ov.address] = (f, ov, obj, callback)
+ else:
+ try:
+ f.set_result(callback())
+ except Exception as e:
+ f.set_exception(e)
+ return f
+
+ def _get_accept_socket(self):
+ s = socket.socket()
+ s.settimeout(0)
+ return s
+
+ def _poll(self, timeout=None):
+ if timeout is None:
+ ms = INFINITE
+ elif timeout < 0:
+ raise ValueError("negative timeout")
+ else:
+ ms = int(timeout * 1000 + 0.5)
+ if ms >= INFINITE:
+ raise ValueError("timeout too big")
+ while True:
+ status = GetQueuedCompletionStatus(self._iocp, ms)
+ if status is None:
+ return
+ f, ov, obj, callback = self._cache.pop(status[3])
+ try:
+ value = callback()
+ except OSError as e:
+ if f is None:
+ sys.excepthook(*sys.exc_info())
+ continue
+ f.set_exception(e)
+ self._results.append(f)
+ else:
+ if f is None:
+ continue
+ f.set_result(value)
+ self._results.append(f)
+ ms = 0
+
+ def close(self, *, CloseHandle=CloseHandle):
+ for address, (f, ov, obj, callback) in list(self._cache.items()):
+ try:
+ ov.cancel()
+ except OSError:
+ pass
+
+ while self._cache:
+ status = GetQueuedCompletionStatus(self._iocp, 1000)
+ if status is None:
+ logging.debug('taking long time to close proactor')
+ continue
+ self._cache.pop(status[3])
+
+ if self._iocp is not None:
+ CloseHandle(self._iocp)
+ self._iocp = None
+
+
+class IocpEventLoop(BaseEventLoop):
+
+ @staticmethod
+ def SocketTransport(*args, **kwds):
+ return _IocpSocketTransport(*args, **kwds)
+
+ @staticmethod
+ def SslTransport(*args, **kwds):
+ raise NotImplementedError
+
+ def __init__(self, proactor=None):
+ super().__init__()
+ if proactor is None:
+ proactor = IocpProactor()
+ logging.debug('Using proactor: %s', proactor.__class__.__name__)
+ self._proactor = proactor
+ self._selector = proactor # convenient alias
+ self._readers = {}
+ self._make_self_pipe()
+
+ def close(self):
+ if self._proactor is not None:
+ self._proactor.close()
+ self._proactor = None
+ self._ssock.close()
+ self._csock.close()
+
+ def _make_self_pipe(self):
+ # A self-socket, really. :-)
+ self._ssock, self._csock = socketpair()
+ self._ssock.setblocking(False)
+ self._csock.setblocking(False)
+ def loop(f=None):
+ if f and f.exception():
+ self.close()
+ raise f.exception()
+ f = self._proactor.recv(self._ssock, 4096)
+ self.call_soon(f.add_done_callback, loop)
+ self.call_soon(loop)
+
+ def _write_to_self(self):
+ self._proactor.send(self._csock, b'x')
+
+ def _start_serving(self, protocol_factory, sock):
+ def loop(f=None):
+ try:
+ if f:
+ conn, addr = f.result()
+ protocol = protocol_factory()
+ transport = self.SocketTransport(self, conn, protocol)
+ f = self._proactor.accept(sock)
+ self.call_soon(f.add_done_callback, loop)
+ except OSError as exc:
+ if exc.errno in _TRYAGAIN:
+ self.call_soon(loop)
+ else:
+ sock.close()
+ logging.exception('Accept failed')
+ self.call_soon(loop)
+
+ def sock_recv(self, sock, n):
+ return self._proactor.recv(sock, n)
+
+ def sock_sendall(self, sock, data):
+ return self._proactor.send(sock, data)
+
+ def sock_connect(self, sock, address):
+ return self._proactor.connect(sock, address)
+
+ def sock_accept(self, sock):
+ return self._proactor.accept(sock)
+
+ def _process_events(self, event_list):
+ pass # XXX hard work currently done in poll
+
+
+class _IocpSocketTransport(transports.Transport):
+
+ def __init__(self, event_loop, sock, protocol, waiter=None):
+ self._event_loop = event_loop
+ self._sock = sock
+ self._protocol = protocol
+ self._buffer = []
+ self._write_fut = None
+ self._closing = False # Set when close() called.
+ self._event_loop.call_soon(self._protocol.connection_made, self)
+ self._event_loop.call_soon(self._loop_reading)
+ if waiter is not None:
+ self._event_loop.call_soon(waiter.set_result, None)
+
+ def _loop_reading(self, f=None):
+ try:
+ if f:
+ data = f.result()
+ if not data:
+ self._event_loop.call_soon(self._protocol.eof_received)
+ return
+ self._event_loop.call_soon(self._protocol.data_received, data)
+ f = self._event_loop._proactor.recv(self._sock, 4096)
+ self._event_loop.call_soon(
+ f.add_done_callback, self._loop_reading)
+ except OSError as exc:
+ self._fatal_error(exc)
+
+ def write(self, data):
+ assert isinstance(data, bytes)
+ assert not self._closing
+ if not data:
+ return
+
+ if self._write_fut is not None:
+ self._buffer.append(data)
+ return
+
+ def callback(f):
+ if f.exception():
+ self._fatal_error(f.exception())
+ # XXX should check for partial write
+ data = b''.join(self._buffer)
+ if data:
+ self._buffer = []
+ self._write_fut = self._event_loop._proactor.send(
+ self._sock, data)
+ assert f is self._write_fut
+ self._write_fut = None
+
+ self._write_fut = self._event_loop._proactor.send(self._sock, data)
+ self._write_fut.add_done_callback(callback)
+
+ # TODO: write_eof(), can_write_eof().
+
+ def abort(self):
+ self._fatal_error(None)
+
+ def close(self):
+ self._closing = True
+ if self._write_fut:
+ self._write_fut.cancel()
+ if not self._buffer:
+ self._event_loop.call_soon(self._call_connection_lost, None)
+
+ def _fatal_error(self, exc):
+ logging.exception('Fatal error for %s', self)
+ if self._write_fut:
+ self._write_fut.cancel()
+ # if self._read_fut: # XXX
+ # self._read_fut.cancel()
+ self._write_fut = self._read_fut = None
+ self._buffer = []
+ self._event_loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._sock.close()
diff --git a/tulip/selectors.py b/tulip/selectors.py
index 0543463..2ea92b3 100644
--- a/tulip/selectors.py
+++ b/tulip/selectors.py
@@ -210,6 +210,10 @@ class _BaseSelector:
logging.warn('No key found for fd %r', fd)
return None
+ def wrap_socket(self, sock):
+ """Return sock or a wrapper for sock compatible with selector"""
+ return sock
+
class SelectSelector(_BaseSelector):
"""Select-based selector."""
diff --git a/tulip/unix_events.py b/tulip/unix_events.py
index fc2cb4a..1db6666 100644
--- a/tulip/unix_events.py
+++ b/tulip/unix_events.py
@@ -72,54 +72,13 @@ def _raise_stop_error():
raise _StopError
-class UnixEventLoop(events.EventLoop):
- """Unix event loop.
-
- See events.EventLoop for API specification.
- """
+class BaseEventLoop(events.EventLoop):
- def __init__(self, selector=None):
- super().__init__()
- if selector is None:
- # pick the best selector class for the platform
- selector = selectors.Selector()
- logging.debug('Using selector: %s', selector.__class__.__name__)
- self._selector = selector
+ def __init__(self):
self._ready = collections.deque()
self._scheduled = []
self._default_executor = None
self._signal_handlers = {}
- self._make_self_pipe()
-
- def close(self):
- if self._selector is not None:
- self._selector.close()
- self._selector = None
- self._ssock.close()
- self._csock.close()
-
- def _make_self_pipe(self):
- # A self-socket, really. :-)
- self._ssock, self._csock = socketpair()
- self._ssock.setblocking(False)
- self._csock.setblocking(False)
- self.add_reader(self._ssock.fileno(), self._read_from_self)
-
- def _read_from_self(self):
- try:
- self._ssock.recv(1)
- except socket.error as exc:
- if exc.errno in _TRYAGAIN:
- return
- raise # Halp!
-
- def _write_to_self(self):
- try:
- self._csock.send(b'x')
- except socket.error as exc:
- if exc.errno in _TRYAGAIN:
- return
- raise # Halp!
def run(self):
"""Run the event loop until nothing left to do or stop() called.
@@ -246,16 +205,6 @@ class UnixEventLoop(events.EventLoop):
self._write_to_self()
return handler
- def wrap_future(self, future):
- """XXX"""
- if isinstance(future, futures.Future):
- return future # Don't wrap our own type of Future.
- new_future = futures.Future()
- future.add_done_callback(
- lambda future:
- self.call_soon_threadsafe(new_future._copy_state, future))
- return new_future
-
def run_in_executor(self, executor, callback, *args):
if isinstance(callback, events.Handler):
assert not args
@@ -322,10 +271,10 @@ class UnixEventLoop(events.EventLoop):
waiter = futures.Future()
if ssl:
sslcontext = None if isinstance(ssl, bool) else ssl
- transport = _UnixSslTransport(self, sock, protocol, sslcontext,
+ transport = self.SslTransport(self, sock, protocol, sslcontext,
waiter)
else:
- transport = _UnixSocketTransport(self, sock, protocol, waiter)
+ transport = self.SocketTransport(self, sock, protocol, waiter)
yield from waiter
return transport, protocol
@@ -358,9 +307,238 @@ class UnixEventLoop(events.EventLoop):
raise exceptions[0]
sock.listen(backlog)
sock.setblocking(False)
+ self._start_serving(protocol_factory, sock)
+ return sock
+
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for a signal. UNIX only.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ self._check_signal(sig)
+ try:
+ # set_wakeup_fd() raises ValueError if this is not the
+ # main thread. By calling it early we ensure that an
+ # event loop running in another thread cannot add a signal
+ # handler.
+ signal.set_wakeup_fd(self._csock.fileno())
+ except ValueError as exc:
+ raise RuntimeError(str(exc))
+ handler = events.make_handler(None, callback, args)
+ self._signal_handlers[sig] = handler
+ try:
+ signal.signal(sig, self._handle_signal)
+ except OSError as exc:
+ del self._signal_handlers[sig]
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except ValueError as nexc:
+ logging.info('set_wakeup_fd(-1) failed: %s', nexc)
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {} cannot be caught'.format(sig))
+ else:
+ raise
+ return handler
+
+ def _handle_signal(self, sig, arg):
+ """Internal helper that is the actual signal handler."""
+ handler = self._signal_handlers.get(sig)
+ if handler is None:
+ return # Assume it's some race condition.
+ if handler.cancelled:
+ self.remove_signal_handler(sig) # Remove it properly.
+ else:
+ self.call_soon_threadsafe(handler.callback, *handler.args)
+
+ def remove_signal_handler(self, sig):
+ """Remove a handler for a signal. UNIX only.
+
+ Return True if a signal handler was removed, False if not."""
+ self._check_signal(sig)
+ try:
+ del self._signal_handlers[sig]
+ except KeyError:
+ return False
+ if sig == signal.SIGINT:
+ handler = signal.default_int_handler
+ else:
+ handler = signal.SIG_DFL
+ try:
+ signal.signal(sig, handler)
+ except OSError as exc:
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {} cannot be caught'.format(sig))
+ else:
+ raise
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except ValueError as exc:
+ logging.info('set_wakeup_fd(-1) failed: %s', exc)
+ return True
+
+ def _check_signal(self, sig):
+ """Internal helper to validate a signal.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ if not isinstance(sig, int):
+ raise TypeError('sig must be an int, not {!r}'.format(sig))
+ if signal is None:
+ raise RuntimeError('Signals are not supported')
+ if not (1 <= sig < signal.NSIG):
+ raise ValueError('sig {} out of range(1, {})'.format(sig,
+ signal.NSIG))
+ if sys.platform == 'win32':
+ raise RuntimeError('Signals are not really supported on Windows')
+
+ def _add_callback(self, handler):
+ """Add a Handler to ready or scheduled."""
+ if handler.cancelled:
+ return
+ if handler.when is None:
+ self._ready.append(handler)
+ else:
+ heapq.heappush(self._scheduled, handler)
+
+ def wrap_future(self, future):
+ """XXX"""
+ if isinstance(future, futures.Future):
+ return future # Don't wrap our own type of Future.
+ new_future = futures.Future()
+ future.add_done_callback(
+ lambda future:
+ self.call_soon_threadsafe(new_future._copy_state, future))
+ return new_future
+
+ def _run_once(self, timeout=None):
+ """Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks, polls for I/O,
+ schedules the resulting callbacks, and finally schedules
+ 'call_later' callbacks.
+ """
+ # TODO: Break each of these into smaller pieces.
+ # TODO: Refactor to separate the callbacks from the readers/writers.
+ # TODO: An alternative API would be to do the *minimal* amount
+ # of work, e.g. one callback or one I/O poll.
+
+ # Remove delayed calls that were cancelled from head of queue.
+ while self._scheduled and self._scheduled[0].cancelled:
+ heapq.heappop(self._scheduled)
+
+ # Inspect the poll queue. If there's exactly one selectable
+ # file descriptor, it's the self-pipe, and if there's nothing
+ # scheduled, we should ignore it.
+ if self._selector.registered_count() > 1 or self._scheduled:
+ if self._ready:
+ timeout = 0
+ elif self._scheduled:
+ # Compute the desired timeout.
+ when = self._scheduled[0].when
+ deadline = max(0, when - time.monotonic())
+ if timeout is None:
+ timeout = deadline
+ else:
+ timeout = min(timeout, deadline)
+
+ t0 = time.monotonic()
+ event_list = self._selector.select(timeout)
+ t1 = time.monotonic()
+ argstr = '' if timeout is None else ' %.3f' % timeout
+ if t1-t0 >= 1:
+ level = logging.INFO
+ else:
+ level = logging.DEBUG
+ logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
+ self._process_events(event_list)
+
+ # Handle 'later' callbacks that are ready.
+ now = time.monotonic()
+ while self._scheduled:
+ handler = self._scheduled[0]
+ if handler.when > now:
+ break
+ handler = heapq.heappop(self._scheduled)
+ self._ready.append(handler)
+
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # Note: We run all currently scheduled callbacks, but not any
+ # callbacks scheduled by callbacks run this time around --
+ # they will be run the next time (after another I/O poll).
+ # Use an idiom that is threadsafe without using locks.
+ ntodo = len(self._ready)
+ for i in range(ntodo):
+ handler = self._ready.popleft()
+ if not handler.cancelled:
+ try:
+ handler.callback(*handler.args)
+ except Exception:
+ logging.exception('Exception in callback %s %r',
+ handler.callback, handler.args)
+
+
+class UnixEventLoop(BaseEventLoop):
+ """Unix event loop.
+
+ See events.EventLoop for API specification.
+ """
+
+ @staticmethod
+ def SocketTransport(event_loop, sock, protocol, waiter=None):
+ return _UnixSocketTransport(event_loop, sock, protocol, waiter)
+
+ @staticmethod
+ def SslTransport(event_loop, rawsock, protocol, sslcontext, waiter):
+ return _UnixSslTransport(event_loop, rawsock, protocol,
+ sslcontext, waiter)
+
+ def __init__(self, selector=None):
+ super().__init__()
+ if selector is None:
+ # pick the best selector class for the platform
+ selector = selectors.Selector()
+ logging.debug('Using selector: %s', selector.__class__.__name__)
+ self._selector = selector
+ self._make_self_pipe()
+
+ def close(self):
+ if self._selector is not None:
+ self._selector.close()
+ self._selector = None
+ self._ssock.close()
+ self._csock.close()
+
+ def _make_self_pipe(self):
+ # A self-socket, really. :-)
+ self._ssock, self._csock = socketpair()
+ self._ssock.setblocking(False)
+ self._csock.setblocking(False)
+ self.add_reader(self._ssock.fileno(), self._read_from_self)
+
+ def _read_from_self(self):
+ try:
+ self._ssock.recv(1)
+ except socket.error as exc:
+ if exc.errno in _TRYAGAIN:
+ return
+ raise # Halp!
+
+ def _write_to_self(self):
+ try:
+ self._csock.send(b'x')
+ except socket.error as exc:
+ if exc.errno in _TRYAGAIN:
+ return
+ raise # Halp!
+
+ def _start_serving(self, protocol_factory, sock):
self.add_reader(sock.fileno(), self._accept_connection,
protocol_factory, sock)
- return sock
def _accept_connection(self, protocol_factory, sock):
try:
@@ -593,181 +771,23 @@ class UnixEventLoop(events.EventLoop):
else:
self.add_reader(fd, self._sock_accept, fut, True, sock)
- def add_signal_handler(self, sig, callback, *args):
- """Add a handler for a signal. UNIX only.
-
- Raise ValueError if the signal number is invalid or uncatchable.
- Raise RuntimeError if there is a problem setting up the handler.
- """
- self._check_signal(sig)
- try:
- # set_wakeup_fd() raises ValueError if this is not the
- # main thread. By calling it early we ensure that an
- # event loop running in another thread cannot add a signal
- # handler.
- signal.set_wakeup_fd(self._csock.fileno())
- except ValueError as exc:
- raise RuntimeError(str(exc))
- handler = events.make_handler(None, callback, args)
- self._signal_handlers[sig] = handler
- try:
- signal.signal(sig, self._handle_signal)
- except OSError as exc:
- del self._signal_handlers[sig]
- if not self._signal_handlers:
- try:
- signal.set_wakeup_fd(-1)
- except ValueError as nexc:
- logging.info('set_wakeup_fd(-1) failed: %s', nexc)
- if exc.errno == errno.EINVAL:
- raise RuntimeError('sig {} cannot be caught'.format(sig))
- else:
- raise
- return handler
-
- def _handle_signal(self, sig, arg):
- """Internal helper that is the actual signal handler."""
- handler = self._signal_handlers.get(sig)
- if handler is None:
- return # Assume it's some race condition.
- if handler.cancelled:
- self.remove_signal_handler(sig) # Remove it properly.
- else:
- self.call_soon_threadsafe(handler.callback, *handler.args)
-
- def remove_signal_handler(self, sig):
- """Remove a handler for a signal. UNIX only.
-
- Return True if a signal handler was removed, False if not."""
- self._check_signal(sig)
- try:
- del self._signal_handlers[sig]
- except KeyError:
- return False
- if sig == signal.SIGINT:
- handler = signal.default_int_handler
- else:
- handler = signal.SIG_DFL
- try:
- signal.signal(sig, handler)
- except OSError as exc:
- if exc.errno == errno.EINVAL:
- raise RuntimeError('sig {} cannot be caught'.format(sig))
- else:
- raise
- if not self._signal_handlers:
- try:
- signal.set_wakeup_fd(-1)
- except ValueError as exc:
- logging.info('set_wakeup_fd(-1) failed: %s', exc)
- return True
-
- def _check_signal(self, sig):
- """Internal helper to validate a signal.
-
- Raise ValueError if the signal number is invalid or uncatchable.
- Raise RuntimeError if there is a problem setting up the handler.
- """
- if not isinstance(sig, int):
- raise TypeError('sig must be an int, not {!r}'.format(sig))
- if signal is None:
- raise RuntimeError('Signals are not supported')
- if not (1 <= sig < signal.NSIG):
- raise ValueError('sig {} out of range(1, {})'.format(sig,
- signal.NSIG))
- if sys.platform == 'win32':
- raise RuntimeError('Signals are not really supported on Windows')
-
- def _add_callback(self, handler):
- """Add a Handler to ready or scheduled."""
- if handler.cancelled:
- return
- if handler.when is None:
- self._ready.append(handler)
- else:
- heapq.heappush(self._scheduled, handler)
-
- def _run_once(self, timeout=None):
- """Run one full iteration of the event loop.
-
- This calls all currently ready callbacks, polls for I/O,
- schedules the resulting callbacks, and finally schedules
- 'call_later' callbacks.
- """
- # TODO: Break each of these into smaller pieces.
- # TODO: Refactor to separate the callbacks from the readers/writers.
- # TODO: An alternative API would be to do the *minimal* amount
- # of work, e.g. one callback or one I/O poll.
-
- # Remove delayed calls that were cancelled from head of queue.
- while self._scheduled and self._scheduled[0].cancelled:
- heapq.heappop(self._scheduled)
-
- # Inspect the poll queue. If there's exactly one selectable
- # file descriptor, it's the self-pipe, and if there's nothing
- # scheduled, we should ignore it.
- if self._selector.registered_count() > 1 or self._scheduled:
- if self._ready:
- timeout = 0
- elif self._scheduled:
- # Compute the desired timeout.
- when = self._scheduled[0].when
- deadline = max(0, when - time.monotonic())
- if timeout is None:
- timeout = deadline
+ def _process_events(self, event_list):
+ for fileobj, mask, (reader, writer, connector) in event_list:
+ if mask & selectors.EVENT_READ and reader is not None:
+ if reader.cancelled:
+ self.remove_reader(fileobj)
else:
- timeout = min(timeout, deadline)
-
- t0 = time.monotonic()
- event_list = self._selector.select(timeout)
- t1 = time.monotonic()
- argstr = '' if timeout is None else ' %.3f' % timeout
- if t1-t0 >= 1:
- level = logging.INFO
- else:
- level = logging.DEBUG
- logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
- for fileobj, mask, (reader, writer, connector) in event_list:
- if mask & selectors.EVENT_READ and reader is not None:
- if reader.cancelled:
- self.remove_reader(fileobj)
- else:
- self._add_callback(reader)
- if mask & selectors.EVENT_WRITE and writer is not None:
- if writer.cancelled:
- self.remove_writer(fileobj)
- else:
- self._add_callback(writer)
- elif mask & selectors.EVENT_CONNECT and connector is not None:
- if connector.cancelled:
- self.remove_connector(fileobj)
- else:
- self._add_callback(connector)
-
- # Handle 'later' callbacks that are ready.
- now = time.monotonic()
- while self._scheduled:
- handler = self._scheduled[0]
- if handler.when > now:
- break
- handler = heapq.heappop(self._scheduled)
- self._ready.append(handler)
-
- # This is the only place where callbacks are actually *called*.
- # All other places just add them to ready.
- # Note: We run all currently scheduled callbacks, but not any
- # callbacks scheduled by callbacks run this time around --
- # they will be run the next time (after another I/O poll).
- # Use an idiom that is threadsafe without using locks.
- ntodo = len(self._ready)
- for i in range(ntodo):
- handler = self._ready.popleft()
- if not handler.cancelled:
- try:
- handler.callback(*handler.args)
- except Exception:
- logging.exception('Exception in callback %s %r',
- handler.callback, handler.args)
+ self._add_callback(reader)
+ if mask & selectors.EVENT_WRITE and writer is not None:
+ if writer.cancelled:
+ self.remove_writer(fileobj)
+ else:
+ self._add_callback(writer)
+ elif mask & selectors.EVENT_CONNECT and connector is not None:
+ if connector.cancelled:
+ self.remove_connector(fileobj)
+ else:
+ self._add_callback(connector)
class _UnixSocketTransport(transports.Transport):