diff options
author | Richard Oudkerk <shibturn@gmail.com> | 2013-01-25 17:21:15 +0000 |
---|---|---|
committer | Richard Oudkerk <shibturn@gmail.com> | 2013-01-25 17:21:15 +0000 |
commit | 54554e2f6d6108a0eead0b268e86207017d7fcee (patch) | |
tree | 93595165a3261d4d7839d6226d3e79b45ea96287 | |
parent | 347f4e51b7aa79a33a648da07f434d3636e7434a (diff) | |
parent | eaa412a3430f9cb351f56887286a3f0d32d5bd5b (diff) | |
download | trollius-54554e2f6d6108a0eead0b268e86207017d7fcee.tar.gz |
Dummy merge.
-rw-r--r-- | overlapped.c | 998 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | setup.py | 4 | ||||
-rw-r--r-- | tulip/events_test.py | 38 | ||||
-rw-r--r-- | tulip/iocp_events.py | 318 | ||||
-rw-r--r-- | tulip/selectors.py | 4 | ||||
-rw-r--r-- | tulip/unix_events.py | 480 |
7 files changed, 1602 insertions, 242 deletions
diff --git a/overlapped.c b/overlapped.c new file mode 100644 index 0000000..a428cf2 --- /dev/null +++ b/overlapped.c @@ -0,0 +1,998 @@ +/* + * Support for overlapped IO + * + * Some code borrowed from Modules/_winapi.c of CPython + */ + +/* XXX check overflow and DWORD <-> Py_ssize_t conversions + Check itemsize */ + +#include "Python.h" +#include "structmember.h" + +#define WINDOWS_LEAN_AND_MEAN +#include <winsock2.h> +#include <ws2tcpip.h> +#include <mswsock.h> + +#if defined(MS_WIN32) && !defined(MS_WIN64) +# define F_POINTER "k" +# define T_POINTER T_ULONG +#else +# define F_POINTER "K" +# define T_POINTER T_ULONGLONG +#endif + +#define F_HANDLE F_POINTER +#define F_ULONG_PTR F_POINTER +#define F_DWORD "k" +#define F_BOOL "i" +#define F_UINT "I" + +#define T_HANDLE T_POINTER + +enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_WRITE, TYPE_ACCEPT, + TYPE_CONNECT, TYPE_DISCONNECT}; + +/* + * Some functions should be loaded at runtime + */ + +static LPFN_ACCEPTEX Py_AcceptEx = NULL; +static LPFN_CONNECTEX Py_ConnectEx = NULL; +static LPFN_DISCONNECTEX Py_DisconnectEx = NULL; +static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED) = NULL; + +#define GET_WSA_POINTER(s, x) \ + (SOCKET_ERROR != WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, \ + &Guid##x, sizeof(Guid##x), &Py_##x, \ + sizeof(Py_##x), &dwBytes, NULL, NULL)) + +static int +initialize_function_pointers(void) +{ + GUID GuidAcceptEx = WSAID_ACCEPTEX; + GUID GuidConnectEx = WSAID_CONNECTEX; + GUID GuidDisconnectEx = WSAID_DISCONNECTEX; + HINSTANCE hKernel32; + SOCKET s; + DWORD dwBytes; + + s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s == INVALID_SOCKET) { + PyErr_SetFromWindowsErr(WSAGetLastError()); + return -1; + } + + if (!GET_WSA_POINTER(s, AcceptEx) || + !GET_WSA_POINTER(s, ConnectEx) || + !GET_WSA_POINTER(s, DisconnectEx)) + { + closesocket(s); + PyErr_SetFromWindowsErr(WSAGetLastError()); + return -1; + } + + closesocket(s); + + /* On WinXP we will have Py_CancelIoEx == NULL */ + hKernel32 = GetModuleHandle("KERNEL32"); + *(FARPROC *)&Py_CancelIoEx = GetProcAddress(hKernel32, "CancelIoEx"); + return 0; +} + +/* + * Completion port stuff + */ + +PyDoc_STRVAR( + CreateIoCompletionPort_doc, + "CreateIoCompletionPort(handle, port, key, concurrency) -> port\n\n" + "Create a completion port or register a handle with a port."); + +static PyObject * +overlapped_CreateIoCompletionPort(PyObject *self, PyObject *args) +{ + HANDLE FileHandle; + HANDLE ExistingCompletionPort; + ULONG_PTR CompletionKey; + DWORD NumberOfConcurrentThreads; + HANDLE ret; + + if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE F_ULONG_PTR F_DWORD, + &FileHandle, &ExistingCompletionPort, &CompletionKey, + &NumberOfConcurrentThreads)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = CreateIoCompletionPort(FileHandle, ExistingCompletionPort, + CompletionKey, NumberOfConcurrentThreads); + Py_END_ALLOW_THREADS + + if (ret == NULL) + return PyErr_SetFromWindowsErr(0); + return Py_BuildValue(F_HANDLE, ret); +} + +PyDoc_STRVAR( + GetQueuedCompletionStatus_doc, + "GetQueuedCompletionStatus(port, msecs) -> (err, bytes, key, address)\n\n" + "Get a message from completion port. Wait for up to msecs milliseconds."); + +static PyObject * +overlapped_GetQueuedCompletionStatus(PyObject *self, PyObject *args) +{ + HANDLE CompletionPort; + DWORD Milliseconds; + DWORD NumberOfBytes; + ULONG_PTR CompletionKey; + OVERLAPPED *Overlapped = NULL; + DWORD err; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, + &CompletionPort, &Milliseconds)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = GetQueuedCompletionStatus(CompletionPort, &NumberOfBytes, + &CompletionKey, &Overlapped, Milliseconds); + Py_END_ALLOW_THREADS + + err = ret ? ERROR_SUCCESS : GetLastError(); + if (Overlapped == NULL) { + if (err == WAIT_TIMEOUT) + Py_RETURN_NONE; + else + return PyErr_SetFromWindowsErr(err); + } + return Py_BuildValue(F_DWORD F_DWORD F_ULONG_PTR F_POINTER, + err, NumberOfBytes, CompletionKey, Overlapped); +} + +PyDoc_STRVAR( + PostQueuedCompletionStatus_doc, + "PostQueuedCompletionStatus(port, bytes, key, address) -> None\n\n" + "Post a message to completion port."); + +static PyObject * +overlapped_PostQueuedCompletionStatus(PyObject *self, PyObject *args) +{ + HANDLE CompletionPort; + DWORD NumberOfBytes; + ULONG_PTR CompletionKey; + OVERLAPPED *Overlapped; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD F_ULONG_PTR F_POINTER, + &CompletionPort, &NumberOfBytes, &CompletionKey, + &Overlapped)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = PostQueuedCompletionStatus(CompletionPort, NumberOfBytes, + CompletionKey, Overlapped); + Py_END_ALLOW_THREADS + + if (!ret) + return PyErr_SetFromWindowsErr(0); + Py_RETURN_NONE; +} + +/* + * Bind socket handle to local port without doing slow getaddrinfo() + */ + +PyDoc_STRVAR( + BindLocal_doc, + "BindLocal(handle, length_of_address_tuple) -> None\n\n" + "Bind a socket handle to an arbitrary local port.\n" + "If length_of_address_tuple is 2 then an AF_INET address is used.\n" + "If length_of_address_tuple is 4 then an AF_INET6 address is used."); + +static PyObject * +overlapped_BindLocal(PyObject *self, PyObject *args) +{ + SOCKET Socket; + int TupleLength; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE "i", &Socket, &TupleLength)) + return NULL; + + if (TupleLength == 2) { + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = 0; + addr.sin_addr.S_un.S_addr = INADDR_ANY; + ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR; + } else if (TupleLength == 4) { + struct sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_port = 0; + addr.sin6_addr = in6addr_any; + ret = bind(Socket, (SOCKADDR*)&addr, sizeof(addr)) != SOCKET_ERROR; + } else { + PyErr_SetString(PyExc_ValueError, "expected tuple of length 2 or 4"); + return NULL; + } + + if (!ret) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + Py_RETURN_NONE; +} + +/* + * Set notification mode for the handle + */ + +PyDoc_STRVAR( + SetFileCompletionNotificationModes_doc, + "SetFileCompletionNotificationModes(FileHandle, Flags) -> None\n\n" + "Set whether notification happens if operation succeeds without blocking"); + +static PyObject * +overlapped_SetFileCompletionNotificationModes(PyObject *self, PyObject *args) +{ + HANDLE FileHandle; + UCHAR Flags; + + if (!PyArg_ParseTuple(args, F_HANDLE F_BOOL, &FileHandle, &Flags)) + return NULL; + + if (!SetFileCompletionNotificationModes(FileHandle, Flags)) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + + Py_RETURN_NONE; +} + +/* + * A Python object wrapping an OVERLAPPED structure and other useful data + * for overlapped I/O + */ + +PyDoc_STRVAR( + Overlapped_doc, + "Overlapped object"); + +typedef struct { + PyObject_HEAD + OVERLAPPED overlapped; + /* For convenience, we store the file handle too */ + HANDLE handle; + /* Error returned by last method call */ + DWORD error; + /* Type of operation */ + DWORD type; + /* Buffer used for reading (optional) */ + PyObject *read_buffer; + /* Buffer used for writing (optional) */ + Py_buffer write_buffer; +} OverlappedObject; + + +static PyObject * +Overlapped_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + OverlappedObject *self; + HANDLE event = INVALID_HANDLE_VALUE; + static char *kwlist[] = {"event", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|" F_HANDLE, kwlist, &event)) + return NULL; + + if (event == INVALID_HANDLE_VALUE) { + event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (event == NULL) + return PyErr_SetExcFromWindowsErr(PyExc_OSError, 0); + } + + self = PyObject_New(OverlappedObject, type); + if (self == NULL) { + if (event != NULL) + CloseHandle(event); + return NULL; + } + + self->handle = NULL; + self->error = 0; + self->type = TYPE_NONE; + self->read_buffer = NULL; + memset(&self->overlapped, 0, sizeof(OVERLAPPED)); + memset(&self->write_buffer, 0, sizeof(Py_buffer)); + if (event) + self->overlapped.hEvent = event; + return (PyObject *)self; +} + +static void +Overlapped_dealloc(OverlappedObject *self) +{ + DWORD bytes; + DWORD olderr = GetLastError(); + BOOL wait = FALSE; + BOOL ret; + + if (!HasOverlappedIoCompleted(&self->overlapped) && + self->type != TYPE_NOT_STARTED) + { + if (Py_CancelIoEx && Py_CancelIoEx(self->handle, &self->overlapped)) + wait = TRUE; + + Py_BEGIN_ALLOW_THREADS + ret = GetOverlappedResult(self->handle, &self->overlapped, + &bytes, wait); + Py_END_ALLOW_THREADS + + switch (ret ? ERROR_SUCCESS : GetLastError()) { + case ERROR_SUCCESS: + case ERROR_NOT_FOUND: + case ERROR_OPERATION_ABORTED: + break; + default: + PyErr_SetString( + PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); + } + } + + if (self->overlapped.hEvent != NULL) + CloseHandle(self->overlapped.hEvent); + + if (self->write_buffer.obj) + PyBuffer_Release(&self->write_buffer); + + Py_CLEAR(self->read_buffer); + PyObject_Del(self); + SetLastError(olderr); +} + +PyDoc_STRVAR( + Overlapped_cancel_doc, + "cancel() -> None\n\n" + "Cancel overlapped operation"); + +static PyObject * +Overlapped_cancel(OverlappedObject *self) +{ + BOOL ret = TRUE; + + if (self->type == TYPE_NOT_STARTED) + Py_RETURN_NONE; + + if (!HasOverlappedIoCompleted(&self->overlapped)) { + Py_BEGIN_ALLOW_THREADS + if (Py_CancelIoEx) + ret = Py_CancelIoEx(self->handle, &self->overlapped); + else + ret = CancelIo(self->handle); + Py_END_ALLOW_THREADS + } + + /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */ + if (!ret && GetLastError() != ERROR_NOT_FOUND) + return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); + Py_RETURN_NONE; +} + +PyDoc_STRVAR( + Overlapped_getresult_doc, + "getresult(wait=False) -> result\n\n" + "Retrieve result of operation. If wait is true then it blocks\n" + "until the operation is finished. If wait is false and the\n" + "operation is still pending then an error is raised."); + +static PyObject * +Overlapped_getresult(OverlappedObject *self, PyObject *args) +{ + BOOL wait = FALSE; + DWORD transferred = 0; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait)) + return NULL; + + if (self->type == TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation not yet attempted"); + return NULL; + } + + if (self->type == TYPE_NOT_STARTED) { + PyErr_SetString(PyExc_ValueError, "operation failed to start"); + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + ret = GetOverlappedResult(self->handle, &self->overlapped, &transferred, + wait); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + break; + case ERROR_BROKEN_PIPE: + if (self->read_buffer != NULL) + break; + /* fall through */ + default: + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } + + switch (self->type) { + case TYPE_READ: + assert(PyBytes_CheckExact(self->read_buffer)); + if (transferred != PyBytes_GET_SIZE(self->read_buffer) && + _PyBytes_Resize(&self->read_buffer, transferred)) + return NULL; + Py_INCREF(self->read_buffer); + return self->read_buffer; + case TYPE_ACCEPT: + case TYPE_CONNECT: + case TYPE_DISCONNECT: + Py_RETURN_NONE; + default: + return PyLong_FromUnsignedLong((unsigned long) transferred); + } +} + +PyDoc_STRVAR( + Overlapped_ReadFile_doc, + "ReadFile(handle, size) -> Overlapped[message]\n\n" + "Start overlapped read"); + +static PyObject * +Overlapped_ReadFile(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + DWORD size; + DWORD nread; + PyObject *buf; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &handle, &size)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + +#if SIZEOF_SIZE_T <= SIZEOF_LONG + size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX); +#endif + buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1)); + if (buf == NULL) + return NULL; + + self->type = TYPE_READ; + self->handle = handle; + self->read_buffer = buf; + + Py_BEGIN_ALLOW_THREADS + ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, + &self->overlapped); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + case ERROR_BROKEN_PIPE: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +PyDoc_STRVAR( + Overlapped_WSARecv_doc, + "RecvFile(handle, size, flags) -> Overlapped[message]\n\n" + "Start overlapped receive"); + +static PyObject * +Overlapped_WSARecv(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + DWORD size; + DWORD flags = 0; + DWORD nread; + PyObject *buf; + WSABUF wsabuf; + int ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, + &handle, &size, &flags)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + +#if SIZEOF_SIZE_T <= SIZEOF_LONG + size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX); +#endif + buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1)); + if (buf == NULL) + return NULL; + + self->type = TYPE_READ; + self->handle = handle; + self->read_buffer = buf; + wsabuf.len = size; + wsabuf.buf = PyBytes_AS_STRING(buf); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + case ERROR_BROKEN_PIPE: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +PyDoc_STRVAR( + Overlapped_WriteFile_doc, + "WriteFile(handle, buf) -> Overlapped[bytes_transferred]\n\n" + "Start overlapped write"); + +static PyObject * +Overlapped_WriteFile(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + PyObject *bufobj; + DWORD written; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE "O", &handle, &bufobj)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if (!PyArg_Parse(bufobj, "y*", &self->write_buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) { + PyBuffer_Release(&self->write_buffer); + PyErr_SetString(PyExc_ValueError, "buffer to large"); + return NULL; + } +#endif + + self->type = TYPE_WRITE; + self->handle = handle; + + Py_BEGIN_ALLOW_THREADS + ret = WriteFile(handle, self->write_buffer.buf, self->write_buffer.len, + &written, &self->overlapped); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +PyDoc_STRVAR( + Overlapped_WSASend_doc, + "WSASend(handle, buf, flags) -> Overlapped[bytes_transferred]\n\n" + "Start overlapped send"); + +static PyObject * +Overlapped_WSASend(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + PyObject *bufobj; + DWORD flags; + DWORD written; + WSABUF wsabuf; + int ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD, + &handle, &bufobj, &flags)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if (!PyArg_Parse(bufobj, "y*", &self->write_buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (self->write_buffer.len > (Py_ssize_t)PY_ULONG_MAX) { + PyBuffer_Release(&self->write_buffer); + PyErr_SetString(PyExc_ValueError, "buffer to large"); + return NULL; + } +#endif + + self->type = TYPE_WRITE; + self->handle = handle; + wsabuf.len = (DWORD)self->write_buffer.len; + wsabuf.buf = self->write_buffer.buf; + + Py_BEGIN_ALLOW_THREADS + ret = WSASend((SOCKET)handle, &wsabuf, 1, &written, flags, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +PyDoc_STRVAR( + Overlapped_AcceptEx_doc, + "AcceptEx(listen_handle, accept_handle) -> Overlapped[address_as_bytes]\n\n" + "Start overlapped wait for client to connect"); + +static PyObject * +Overlapped_AcceptEx(OverlappedObject *self, PyObject *args) +{ + SOCKET ListenSocket; + SOCKET AcceptSocket; + DWORD BytesReceived; + DWORD size; + PyObject *buf; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, + &ListenSocket, &AcceptSocket)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + size = sizeof(struct sockaddr_in6) + 16; + buf = PyBytes_FromStringAndSize(NULL, size*2); + if (!buf) + return NULL; + + self->type = TYPE_ACCEPT; + self->handle = (HANDLE)ListenSocket; + self->read_buffer = buf; + + Py_BEGIN_ALLOW_THREADS + ret = Py_AcceptEx(ListenSocket, AcceptSocket, PyBytes_AS_STRING(buf), + 0, size, size, &BytesReceived, &self->overlapped); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + + +static int +parse_address(PyObject *obj, SOCKADDR *Address, int Length) +{ + char *Host; + unsigned short Port; + unsigned long FlowInfo; + unsigned long ScopeId; + + memset(Address, 0, Length); + + if (PyArg_ParseTuple(obj, "sH", &Host, &Port)) + { + Address->sa_family = AF_INET; + if (WSAStringToAddressA(Host, AF_INET, NULL, Address, &Length) < 0) { + PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + return -1; + } + ((SOCKADDR_IN*)Address)->sin_port = htons(Port); + return Length; + } + else if (PyArg_ParseTuple(obj, "sHkk", &Host, &Port, &FlowInfo, &ScopeId)) + { + PyErr_Clear(); + Address->sa_family = AF_INET6; + if (WSAStringToAddressA(Host, AF_INET6, NULL, Address, &Length) < 0) { + PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError()); + return -1; + } + ((SOCKADDR_IN6*)Address)->sin6_port = htons(Port); + ((SOCKADDR_IN6*)Address)->sin6_flowinfo = FlowInfo; + ((SOCKADDR_IN6*)Address)->sin6_scope_id = ScopeId; + return Length; + } + + return -1; +} + + +PyDoc_STRVAR( + Overlapped_ConnectEx_doc, + "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n" + "Start overlapped connect. client_handle should be unbound."); + +static PyObject * +Overlapped_ConnectEx(OverlappedObject *self, PyObject *args) +{ + SOCKET ConnectSocket; + PyObject *AddressObj; + char AddressBuf[sizeof(struct sockaddr_in6)]; + SOCKADDR *Address = (SOCKADDR*)AddressBuf; + int Length; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + Length = sizeof(AddressBuf); + Length = parse_address(AddressObj, Address, Length); + if (Length < 0) + return NULL; + + self->type = TYPE_CONNECT; + self->handle = (HANDLE)ConnectSocket; + + Py_BEGIN_ALLOW_THREADS + ret = Py_ConnectEx(ConnectSocket, Address, Length, + NULL, 0, NULL, &self->overlapped); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +PyDoc_STRVAR( + Overlapped_DisconnectEx_doc, + "DisconnectEx(handle, flags) -> Overlapped[None]\n\n" + "Start overlapped connect. client_handle should be unbound."); + +static PyObject * +Overlapped_DisconnectEx(OverlappedObject *self, PyObject *args) +{ + SOCKET Socket; + DWORD flags; + BOOL ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD, &Socket, &flags)) + return NULL; + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + self->type = TYPE_DISCONNECT; + self->handle = (HANDLE)Socket; + + Py_BEGIN_ALLOW_THREADS + ret = Py_DisconnectEx(Socket, &self->overlapped, flags, 0); + Py_END_ALLOW_THREADS + + self->error = err = ret ? ERROR_SUCCESS : WSAGetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); + } +} + +static PyObject* +Overlapped_getaddress(OverlappedObject *self) +{ + return PyLong_FromVoidPtr(&self->overlapped); +} + +static PyObject* +Overlapped_getpending(OverlappedObject *self) +{ + return PyBool_FromLong(!HasOverlappedIoCompleted(&self->overlapped) && + self->type != TYPE_NOT_STARTED); +} + +static PyMethodDef Overlapped_methods[] = { + {"getresult", (PyCFunction) Overlapped_getresult, + METH_VARARGS, Overlapped_getresult_doc}, + {"cancel", (PyCFunction) Overlapped_cancel, + METH_NOARGS, Overlapped_cancel_doc}, + {"ReadFile", (PyCFunction) Overlapped_ReadFile, + METH_VARARGS, Overlapped_ReadFile_doc}, + {"WSARecv", (PyCFunction) Overlapped_WSARecv, + METH_VARARGS, Overlapped_WSARecv_doc}, + {"WriteFile", (PyCFunction) Overlapped_WriteFile, + METH_VARARGS, Overlapped_WriteFile_doc}, + {"WSASend", (PyCFunction) Overlapped_WSASend, + METH_VARARGS, Overlapped_WSASend_doc}, + {"AcceptEx", (PyCFunction) Overlapped_AcceptEx, + METH_VARARGS, Overlapped_AcceptEx_doc}, + {"ConnectEx", (PyCFunction) Overlapped_ConnectEx, + METH_VARARGS, Overlapped_ConnectEx_doc}, + {"DisconnectEx", (PyCFunction) Overlapped_DisconnectEx, + METH_VARARGS, Overlapped_DisconnectEx_doc}, + {NULL} +}; + +static PyMemberDef Overlapped_members[] = { + {"error", T_ULONG, + offsetof(OverlappedObject, error), + READONLY, "Error from last operation"}, + {"event", T_HANDLE, + offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent), + READONLY, "Overlapped event handle"}, + {NULL} +}; + +static PyGetSetDef Overlapped_getsets[] = { + {"address", (getter)Overlapped_getaddress, NULL, + "Address of overlapped structure"}, + {"pending", (getter)Overlapped_getpending, NULL, + "Whether the operation is pending"}, + {NULL}, +}; + +PyTypeObject OverlappedType = { + PyVarObject_HEAD_INIT(NULL, 0) + /* tp_name */ "_overlapped.Overlapped", + /* tp_basicsize */ sizeof(OverlappedObject), + /* tp_itemsize */ 0, + /* tp_dealloc */ (destructor) Overlapped_dealloc, + /* tp_print */ 0, + /* tp_getattr */ 0, + /* tp_setattr */ 0, + /* tp_reserved */ 0, + /* tp_repr */ 0, + /* tp_as_number */ 0, + /* tp_as_sequence */ 0, + /* tp_as_mapping */ 0, + /* tp_hash */ 0, + /* tp_call */ 0, + /* tp_str */ 0, + /* tp_getattro */ 0, + /* tp_setattro */ 0, + /* tp_as_buffer */ 0, + /* tp_flags */ Py_TPFLAGS_DEFAULT, + /* tp_doc */ "OVERLAPPED structure wrapper", + /* tp_traverse */ 0, + /* tp_clear */ 0, + /* tp_richcompare */ 0, + /* tp_weaklistoffset */ 0, + /* tp_iter */ 0, + /* tp_iternext */ 0, + /* tp_methods */ Overlapped_methods, + /* tp_members */ Overlapped_members, + /* tp_getset */ Overlapped_getsets, + /* tp_base */ 0, + /* tp_dict */ 0, + /* tp_descr_get */ 0, + /* tp_descr_set */ 0, + /* tp_dictoffset */ 0, + /* tp_init */ 0, + /* tp_alloc */ 0, + /* tp_new */ Overlapped_new, +}; + +static PyMethodDef overlapped_functions[] = { + {"CreateIoCompletionPort", overlapped_CreateIoCompletionPort, + METH_VARARGS, CreateIoCompletionPort_doc}, + {"GetQueuedCompletionStatus", overlapped_GetQueuedCompletionStatus, + METH_VARARGS, GetQueuedCompletionStatus_doc}, + {"PostQueuedCompletionStatus", overlapped_PostQueuedCompletionStatus, + METH_VARARGS, PostQueuedCompletionStatus_doc}, + {"BindLocal", overlapped_BindLocal, + METH_VARARGS, BindLocal_doc}, + {"SetFileCompletionNotificationModes", + overlapped_SetFileCompletionNotificationModes, + METH_VARARGS, SetFileCompletionNotificationModes_doc}, + {NULL} +}; + +static struct PyModuleDef overlapped_module = { + PyModuleDef_HEAD_INIT, + "_overlapped", + NULL, + -1, + overlapped_functions, + NULL, + NULL, + NULL, + NULL +}; + +#define WINAPI_CONSTANT(fmt, con) \ + PyDict_SetItemString(d, #con, Py_BuildValue(fmt, con)) + +PyMODINIT_FUNC +PyInit__overlapped(void) +{ + PyObject *m, *d; + + /* Ensure WSAStartup() called before initializing function pointers */ + m = PyImport_ImportModule("_socket"); + if (!m) + return NULL; + Py_DECREF(m); + + if (initialize_function_pointers() < 0) + return NULL; + + if (PyType_Ready(&OverlappedType) < 0) + return NULL; + + m = PyModule_Create(&overlapped_module); + if (PyModule_AddObject(m, "Overlapped", (PyObject *)&OverlappedType) < 0) + return NULL; + + d = PyModule_GetDict(m); + + WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); + WINAPI_CONSTANT(F_DWORD, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS); + WINAPI_CONSTANT(F_DWORD, INFINITE); + WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); + WINAPI_CONSTANT(F_HANDLE, NULL); + WINAPI_CONSTANT(F_DWORD, SO_UPDATE_ACCEPT_CONTEXT); + WINAPI_CONSTANT(F_DWORD, SO_UPDATE_CONNECT_CONTEXT); + WINAPI_CONSTANT(F_DWORD, TF_REUSE_SOCKET); + + return m; +} diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..0260f9d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[build_ext] +build_lib=tulip diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..67b037c --- /dev/null +++ b/setup.py @@ -0,0 +1,4 @@ +from distutils.core import setup, Extension + +ext = Extension('_overlapped', ['overlapped.c'], libraries=['ws2_32']) +setup(name='_overlapped', ext_modules=[ext]) diff --git a/tulip/events_test.py b/tulip/events_test.py index 7680257..0a08480 100644 --- a/tulip/events_test.py +++ b/tulip/events_test.py @@ -46,8 +46,7 @@ class MyProto(protocols.Protocol): class EventLoopTestsMixin: def setUp(self): - self.selector = self.SELECTOR_CLASS() - self.event_loop = unix_events.UnixEventLoop(self.selector) + self.event_loop = self.create_event_loop() events.set_event_loop(self.event_loop) def tearDown(self): @@ -216,7 +215,10 @@ class EventLoopTestsMixin: r, w = unix_events.socketpair() bytes_read = [] def reader(): - data = r.recv(1024) + try: + data = r.recv(1024) + except BlockingIOError: + return if data: bytes_read.append(data) if sum(len(b) for b in bytes_read) >= 6: @@ -278,7 +280,8 @@ class EventLoopTestsMixin: sock = socket.socket() sock.setblocking(False) # TODO: This depends on python.org behavior! - el.run_until_complete(el.sock_connect(sock, ('python.org', 80))) + address = socket.getaddrinfo('python.org', 80, socket.AF_INET)[0][4] + el.run_until_complete(el.sock_connect(sock, address)) el.run_until_complete(el.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) data = el.run_until_complete(el.sock_recv(sock, 1024)) sock.close() @@ -289,18 +292,19 @@ class EventLoopTestsMixin: sock = socket.socket() sock.setblocking(False) # TODO: This depends on python.org behavior! + address = socket.getaddrinfo('python.org', 12345, socket.AF_INET)[0][4] with self.assertRaises(ConnectionRefusedError): - el.run_until_complete(el.sock_connect(sock, ('python.org', 12345))) + el.run_until_complete(el.sock_connect(sock, address)) sock.close() def testSockAccept(self): + el = events.get_event_loop() listener = socket.socket() listener.setblocking(False) listener.bind(('127.0.0.1', 0)) listener.listen(1) client = socket.socket() client.connect(listener.getsockname()) - el = events.get_event_loop() f = el.sock_accept(listener) conn, addr = el.run_until_complete(f) self.assertEqual(conn.gettimeout(), 0) @@ -414,22 +418,32 @@ class EventLoopTestsMixin: if hasattr(selectors, 'KqueueSelector'): class KqueueEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - SELECTOR_CLASS = selectors.KqueueSelector - + def create_event_loop(self): + return unix_events.UnixEventLoop(selectors.KqueueSelector()) if hasattr(selectors, 'EpollSelector'): class EPollEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - SELECTOR_CLASS = selectors.EpollSelector - + def create_event_loop(self): + return unix_events.UnixEventLoop(selectors.EpollSelector()) if hasattr(selectors, 'PollSelector'): class PollEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - SELECTOR_CLASS = selectors.PollSelector + def create_event_loop(self): + return unix_events.UnixEventLoop(selectors.PollSelector()) + +if sys.platform == 'win32': + from . import iocp_events + class IocpEventLoopTests(EventLoopTestsMixin, unittest.TestCase): + def create_event_loop(self): + return iocp_events.IocpEventLoop() + def testCreateSslTransport(self): + raise unittest.SkipTest("IocpEventLoop imcompatible with SSL") # Should always exist. class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase): - SELECTOR_CLASS = selectors.SelectSelector + def create_event_loop(self): + return unix_events.UnixEventLoop(selectors.SelectSelector()) class HandlerTests(unittest.TestCase): diff --git a/tulip/iocp_events.py b/tulip/iocp_events.py new file mode 100644 index 0000000..cfa09a2 --- /dev/null +++ b/tulip/iocp_events.py @@ -0,0 +1,318 @@ +# +# Module implementing the Proactor pattern +# +# A proactor is used to initiate asynchronous I/O, and to wait for +# completion of previously initiated operations. +# + +import errno +import logging +import os +import heapq +import sys +import socket +import time +import weakref + +from _winapi import CloseHandle + +from . import transports + +from .futures import Future +from .unix_events import BaseEventLoop, _StopError +from .winsocketpair import socketpair +from ._overlapped import * + + +_TRYAGAIN = frozenset() # XXX + + +class IocpProactor: + + def __init__(self, concurrency=0xffffffff): + self._results = [] + self._iocp = CreateIoCompletionPort( + INVALID_HANDLE_VALUE, NULL, 0, concurrency) + self._cache = {} + self._registered = weakref.WeakSet() + + def registered_count(self): + return len(self._cache) + + def select(self, timeout=None): + if not self._results: + self._poll(timeout) + tmp, self._results = self._results, [] + return tmp + + def recv(self, conn, nbytes, flags=0): + self._register_with_iocp(conn) + ov = Overlapped(NULL) + ov.WSARecv(conn.fileno(), nbytes, flags) + return self._register(ov, conn, ov.getresult) + + def send(self, conn, buf, flags=0): + self._register_with_iocp(conn) + ov = Overlapped(NULL) + ov.WSASend(conn.fileno(), buf, flags) + return self._register(ov, conn, ov.getresult) + + def accept(self, listener): + self._register_with_iocp(listener) + conn = self._get_accept_socket() + ov = Overlapped(NULL) + ov.AcceptEx(listener.fileno(), conn.fileno()) + def finish_accept(): + addr = ov.getresult() + conn.setsockopt(socket.SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, listener.fileno()) + conn.settimeout(listener.gettimeout()) + return conn, conn.getpeername() + return self._register(ov, listener, finish_accept) + + def connect(self, conn, address): + self._register_with_iocp(conn) + BindLocal(conn.fileno(), len(address)) + ov = Overlapped(NULL) + ov.ConnectEx(conn.fileno(), address) + def finish_connect(): + try: + ov.getresult() + except OSError as e: + if e.winerror == 1225: + raise ConnectionRefusedError(errno.ECONNREFUSED, + 'connection refused') + raise + conn.setsockopt(socket.SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, 0) + return conn + return self._register(ov, conn, finish_connect) + + def _register_with_iocp(self, obj): + if obj not in self._registered: + self._registered.add(obj) + CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) + SetFileCompletionNotificationModes(obj.fileno(), + FILE_SKIP_COMPLETION_PORT_ON_SUCCESS) + + def _register(self, ov, obj, callback): + f = Future() + if ov.error == ERROR_IO_PENDING: + # we must prevent ov and obj from being garbage collected + self._cache[ov.address] = (f, ov, obj, callback) + else: + try: + f.set_result(callback()) + except Exception as e: + f.set_exception(e) + return f + + def _get_accept_socket(self): + s = socket.socket() + s.settimeout(0) + return s + + def _poll(self, timeout=None): + if timeout is None: + ms = INFINITE + elif timeout < 0: + raise ValueError("negative timeout") + else: + ms = int(timeout * 1000 + 0.5) + if ms >= INFINITE: + raise ValueError("timeout too big") + while True: + status = GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + return + f, ov, obj, callback = self._cache.pop(status[3]) + try: + value = callback() + except OSError as e: + if f is None: + sys.excepthook(*sys.exc_info()) + continue + f.set_exception(e) + self._results.append(f) + else: + if f is None: + continue + f.set_result(value) + self._results.append(f) + ms = 0 + + def close(self, *, CloseHandle=CloseHandle): + for address, (f, ov, obj, callback) in list(self._cache.items()): + try: + ov.cancel() + except OSError: + pass + + while self._cache: + status = GetQueuedCompletionStatus(self._iocp, 1000) + if status is None: + logging.debug('taking long time to close proactor') + continue + self._cache.pop(status[3]) + + if self._iocp is not None: + CloseHandle(self._iocp) + self._iocp = None + + +class IocpEventLoop(BaseEventLoop): + + @staticmethod + def SocketTransport(*args, **kwds): + return _IocpSocketTransport(*args, **kwds) + + @staticmethod + def SslTransport(*args, **kwds): + raise NotImplementedError + + def __init__(self, proactor=None): + super().__init__() + if proactor is None: + proactor = IocpProactor() + logging.debug('Using proactor: %s', proactor.__class__.__name__) + self._proactor = proactor + self._selector = proactor # convenient alias + self._readers = {} + self._make_self_pipe() + + def close(self): + if self._proactor is not None: + self._proactor.close() + self._proactor = None + self._ssock.close() + self._csock.close() + + def _make_self_pipe(self): + # A self-socket, really. :-) + self._ssock, self._csock = socketpair() + self._ssock.setblocking(False) + self._csock.setblocking(False) + def loop(f=None): + if f and f.exception(): + self.close() + raise f.exception() + f = self._proactor.recv(self._ssock, 4096) + self.call_soon(f.add_done_callback, loop) + self.call_soon(loop) + + def _write_to_self(self): + self._proactor.send(self._csock, b'x') + + def _start_serving(self, protocol_factory, sock): + def loop(f=None): + try: + if f: + conn, addr = f.result() + protocol = protocol_factory() + transport = self.SocketTransport(self, conn, protocol) + f = self._proactor.accept(sock) + self.call_soon(f.add_done_callback, loop) + except OSError as exc: + if exc.errno in _TRYAGAIN: + self.call_soon(loop) + else: + sock.close() + logging.exception('Accept failed') + self.call_soon(loop) + + def sock_recv(self, sock, n): + return self._proactor.recv(sock, n) + + def sock_sendall(self, sock, data): + return self._proactor.send(sock, data) + + def sock_connect(self, sock, address): + return self._proactor.connect(sock, address) + + def sock_accept(self, sock): + return self._proactor.accept(sock) + + def _process_events(self, event_list): + pass # XXX hard work currently done in poll + + +class _IocpSocketTransport(transports.Transport): + + def __init__(self, event_loop, sock, protocol, waiter=None): + self._event_loop = event_loop + self._sock = sock + self._protocol = protocol + self._buffer = [] + self._write_fut = None + self._closing = False # Set when close() called. + self._event_loop.call_soon(self._protocol.connection_made, self) + self._event_loop.call_soon(self._loop_reading) + if waiter is not None: + self._event_loop.call_soon(waiter.set_result, None) + + def _loop_reading(self, f=None): + try: + if f: + data = f.result() + if not data: + self._event_loop.call_soon(self._protocol.eof_received) + return + self._event_loop.call_soon(self._protocol.data_received, data) + f = self._event_loop._proactor.recv(self._sock, 4096) + self._event_loop.call_soon( + f.add_done_callback, self._loop_reading) + except OSError as exc: + self._fatal_error(exc) + + def write(self, data): + assert isinstance(data, bytes) + assert not self._closing + if not data: + return + + if self._write_fut is not None: + self._buffer.append(data) + return + + def callback(f): + if f.exception(): + self._fatal_error(f.exception()) + # XXX should check for partial write + data = b''.join(self._buffer) + if data: + self._buffer = [] + self._write_fut = self._event_loop._proactor.send( + self._sock, data) + assert f is self._write_fut + self._write_fut = None + + self._write_fut = self._event_loop._proactor.send(self._sock, data) + self._write_fut.add_done_callback(callback) + + # TODO: write_eof(), can_write_eof(). + + def abort(self): + self._fatal_error(None) + + def close(self): + self._closing = True + if self._write_fut: + self._write_fut.cancel() + if not self._buffer: + self._event_loop.call_soon(self._call_connection_lost, None) + + def _fatal_error(self, exc): + logging.exception('Fatal error for %s', self) + if self._write_fut: + self._write_fut.cancel() + # if self._read_fut: # XXX + # self._read_fut.cancel() + self._write_fut = self._read_fut = None + self._buffer = [] + self._event_loop.call_soon(self._call_connection_lost, exc) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._sock.close() diff --git a/tulip/selectors.py b/tulip/selectors.py index 0543463..2ea92b3 100644 --- a/tulip/selectors.py +++ b/tulip/selectors.py @@ -210,6 +210,10 @@ class _BaseSelector: logging.warn('No key found for fd %r', fd) return None + def wrap_socket(self, sock): + """Return sock or a wrapper for sock compatible with selector""" + return sock + class SelectSelector(_BaseSelector): """Select-based selector.""" diff --git a/tulip/unix_events.py b/tulip/unix_events.py index fc2cb4a..1db6666 100644 --- a/tulip/unix_events.py +++ b/tulip/unix_events.py @@ -72,54 +72,13 @@ def _raise_stop_error(): raise _StopError -class UnixEventLoop(events.EventLoop): - """Unix event loop. - - See events.EventLoop for API specification. - """ +class BaseEventLoop(events.EventLoop): - def __init__(self, selector=None): - super().__init__() - if selector is None: - # pick the best selector class for the platform - selector = selectors.Selector() - logging.debug('Using selector: %s', selector.__class__.__name__) - self._selector = selector + def __init__(self): self._ready = collections.deque() self._scheduled = [] self._default_executor = None self._signal_handlers = {} - self._make_self_pipe() - - def close(self): - if self._selector is not None: - self._selector.close() - self._selector = None - self._ssock.close() - self._csock.close() - - def _make_self_pipe(self): - # A self-socket, really. :-) - self._ssock, self._csock = socketpair() - self._ssock.setblocking(False) - self._csock.setblocking(False) - self.add_reader(self._ssock.fileno(), self._read_from_self) - - def _read_from_self(self): - try: - self._ssock.recv(1) - except socket.error as exc: - if exc.errno in _TRYAGAIN: - return - raise # Halp! - - def _write_to_self(self): - try: - self._csock.send(b'x') - except socket.error as exc: - if exc.errno in _TRYAGAIN: - return - raise # Halp! def run(self): """Run the event loop until nothing left to do or stop() called. @@ -246,16 +205,6 @@ class UnixEventLoop(events.EventLoop): self._write_to_self() return handler - def wrap_future(self, future): - """XXX""" - if isinstance(future, futures.Future): - return future # Don't wrap our own type of Future. - new_future = futures.Future() - future.add_done_callback( - lambda future: - self.call_soon_threadsafe(new_future._copy_state, future)) - return new_future - def run_in_executor(self, executor, callback, *args): if isinstance(callback, events.Handler): assert not args @@ -322,10 +271,10 @@ class UnixEventLoop(events.EventLoop): waiter = futures.Future() if ssl: sslcontext = None if isinstance(ssl, bool) else ssl - transport = _UnixSslTransport(self, sock, protocol, sslcontext, + transport = self.SslTransport(self, sock, protocol, sslcontext, waiter) else: - transport = _UnixSocketTransport(self, sock, protocol, waiter) + transport = self.SocketTransport(self, sock, protocol, waiter) yield from waiter return transport, protocol @@ -358,9 +307,238 @@ class UnixEventLoop(events.EventLoop): raise exceptions[0] sock.listen(backlog) sock.setblocking(False) + self._start_serving(protocol_factory, sock) + return sock + + def add_signal_handler(self, sig, callback, *args): + """Add a handler for a signal. UNIX only. + + Raise ValueError if the signal number is invalid or uncatchable. + Raise RuntimeError if there is a problem setting up the handler. + """ + self._check_signal(sig) + try: + # set_wakeup_fd() raises ValueError if this is not the + # main thread. By calling it early we ensure that an + # event loop running in another thread cannot add a signal + # handler. + signal.set_wakeup_fd(self._csock.fileno()) + except ValueError as exc: + raise RuntimeError(str(exc)) + handler = events.make_handler(None, callback, args) + self._signal_handlers[sig] = handler + try: + signal.signal(sig, self._handle_signal) + except OSError as exc: + del self._signal_handlers[sig] + if not self._signal_handlers: + try: + signal.set_wakeup_fd(-1) + except ValueError as nexc: + logging.info('set_wakeup_fd(-1) failed: %s', nexc) + if exc.errno == errno.EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + return handler + + def _handle_signal(self, sig, arg): + """Internal helper that is the actual signal handler.""" + handler = self._signal_handlers.get(sig) + if handler is None: + return # Assume it's some race condition. + if handler.cancelled: + self.remove_signal_handler(sig) # Remove it properly. + else: + self.call_soon_threadsafe(handler.callback, *handler.args) + + def remove_signal_handler(self, sig): + """Remove a handler for a signal. UNIX only. + + Return True if a signal handler was removed, False if not.""" + self._check_signal(sig) + try: + del self._signal_handlers[sig] + except KeyError: + return False + if sig == signal.SIGINT: + handler = signal.default_int_handler + else: + handler = signal.SIG_DFL + try: + signal.signal(sig, handler) + except OSError as exc: + if exc.errno == errno.EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + if not self._signal_handlers: + try: + signal.set_wakeup_fd(-1) + except ValueError as exc: + logging.info('set_wakeup_fd(-1) failed: %s', exc) + return True + + def _check_signal(self, sig): + """Internal helper to validate a signal. + + Raise ValueError if the signal number is invalid or uncatchable. + Raise RuntimeError if there is a problem setting up the handler. + """ + if not isinstance(sig, int): + raise TypeError('sig must be an int, not {!r}'.format(sig)) + if signal is None: + raise RuntimeError('Signals are not supported') + if not (1 <= sig < signal.NSIG): + raise ValueError('sig {} out of range(1, {})'.format(sig, + signal.NSIG)) + if sys.platform == 'win32': + raise RuntimeError('Signals are not really supported on Windows') + + def _add_callback(self, handler): + """Add a Handler to ready or scheduled.""" + if handler.cancelled: + return + if handler.when is None: + self._ready.append(handler) + else: + heapq.heappush(self._scheduled, handler) + + def wrap_future(self, future): + """XXX""" + if isinstance(future, futures.Future): + return future # Don't wrap our own type of Future. + new_future = futures.Future() + future.add_done_callback( + lambda future: + self.call_soon_threadsafe(new_future._copy_state, future)) + return new_future + + def _run_once(self, timeout=None): + """Run one full iteration of the event loop. + + This calls all currently ready callbacks, polls for I/O, + schedules the resulting callbacks, and finally schedules + 'call_later' callbacks. + """ + # TODO: Break each of these into smaller pieces. + # TODO: Refactor to separate the callbacks from the readers/writers. + # TODO: An alternative API would be to do the *minimal* amount + # of work, e.g. one callback or one I/O poll. + + # Remove delayed calls that were cancelled from head of queue. + while self._scheduled and self._scheduled[0].cancelled: + heapq.heappop(self._scheduled) + + # Inspect the poll queue. If there's exactly one selectable + # file descriptor, it's the self-pipe, and if there's nothing + # scheduled, we should ignore it. + if self._selector.registered_count() > 1 or self._scheduled: + if self._ready: + timeout = 0 + elif self._scheduled: + # Compute the desired timeout. + when = self._scheduled[0].when + deadline = max(0, when - time.monotonic()) + if timeout is None: + timeout = deadline + else: + timeout = min(timeout, deadline) + + t0 = time.monotonic() + event_list = self._selector.select(timeout) + t1 = time.monotonic() + argstr = '' if timeout is None else ' %.3f' % timeout + if t1-t0 >= 1: + level = logging.INFO + else: + level = logging.DEBUG + logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0) + self._process_events(event_list) + + # Handle 'later' callbacks that are ready. + now = time.monotonic() + while self._scheduled: + handler = self._scheduled[0] + if handler.when > now: + break + handler = heapq.heappop(self._scheduled) + self._ready.append(handler) + + # This is the only place where callbacks are actually *called*. + # All other places just add them to ready. + # Note: We run all currently scheduled callbacks, but not any + # callbacks scheduled by callbacks run this time around -- + # they will be run the next time (after another I/O poll). + # Use an idiom that is threadsafe without using locks. + ntodo = len(self._ready) + for i in range(ntodo): + handler = self._ready.popleft() + if not handler.cancelled: + try: + handler.callback(*handler.args) + except Exception: + logging.exception('Exception in callback %s %r', + handler.callback, handler.args) + + +class UnixEventLoop(BaseEventLoop): + """Unix event loop. + + See events.EventLoop for API specification. + """ + + @staticmethod + def SocketTransport(event_loop, sock, protocol, waiter=None): + return _UnixSocketTransport(event_loop, sock, protocol, waiter) + + @staticmethod + def SslTransport(event_loop, rawsock, protocol, sslcontext, waiter): + return _UnixSslTransport(event_loop, rawsock, protocol, + sslcontext, waiter) + + def __init__(self, selector=None): + super().__init__() + if selector is None: + # pick the best selector class for the platform + selector = selectors.Selector() + logging.debug('Using selector: %s', selector.__class__.__name__) + self._selector = selector + self._make_self_pipe() + + def close(self): + if self._selector is not None: + self._selector.close() + self._selector = None + self._ssock.close() + self._csock.close() + + def _make_self_pipe(self): + # A self-socket, really. :-) + self._ssock, self._csock = socketpair() + self._ssock.setblocking(False) + self._csock.setblocking(False) + self.add_reader(self._ssock.fileno(), self._read_from_self) + + def _read_from_self(self): + try: + self._ssock.recv(1) + except socket.error as exc: + if exc.errno in _TRYAGAIN: + return + raise # Halp! + + def _write_to_self(self): + try: + self._csock.send(b'x') + except socket.error as exc: + if exc.errno in _TRYAGAIN: + return + raise # Halp! + + def _start_serving(self, protocol_factory, sock): self.add_reader(sock.fileno(), self._accept_connection, protocol_factory, sock) - return sock def _accept_connection(self, protocol_factory, sock): try: @@ -593,181 +771,23 @@ class UnixEventLoop(events.EventLoop): else: self.add_reader(fd, self._sock_accept, fut, True, sock) - def add_signal_handler(self, sig, callback, *args): - """Add a handler for a signal. UNIX only. - - Raise ValueError if the signal number is invalid or uncatchable. - Raise RuntimeError if there is a problem setting up the handler. - """ - self._check_signal(sig) - try: - # set_wakeup_fd() raises ValueError if this is not the - # main thread. By calling it early we ensure that an - # event loop running in another thread cannot add a signal - # handler. - signal.set_wakeup_fd(self._csock.fileno()) - except ValueError as exc: - raise RuntimeError(str(exc)) - handler = events.make_handler(None, callback, args) - self._signal_handlers[sig] = handler - try: - signal.signal(sig, self._handle_signal) - except OSError as exc: - del self._signal_handlers[sig] - if not self._signal_handlers: - try: - signal.set_wakeup_fd(-1) - except ValueError as nexc: - logging.info('set_wakeup_fd(-1) failed: %s', nexc) - if exc.errno == errno.EINVAL: - raise RuntimeError('sig {} cannot be caught'.format(sig)) - else: - raise - return handler - - def _handle_signal(self, sig, arg): - """Internal helper that is the actual signal handler.""" - handler = self._signal_handlers.get(sig) - if handler is None: - return # Assume it's some race condition. - if handler.cancelled: - self.remove_signal_handler(sig) # Remove it properly. - else: - self.call_soon_threadsafe(handler.callback, *handler.args) - - def remove_signal_handler(self, sig): - """Remove a handler for a signal. UNIX only. - - Return True if a signal handler was removed, False if not.""" - self._check_signal(sig) - try: - del self._signal_handlers[sig] - except KeyError: - return False - if sig == signal.SIGINT: - handler = signal.default_int_handler - else: - handler = signal.SIG_DFL - try: - signal.signal(sig, handler) - except OSError as exc: - if exc.errno == errno.EINVAL: - raise RuntimeError('sig {} cannot be caught'.format(sig)) - else: - raise - if not self._signal_handlers: - try: - signal.set_wakeup_fd(-1) - except ValueError as exc: - logging.info('set_wakeup_fd(-1) failed: %s', exc) - return True - - def _check_signal(self, sig): - """Internal helper to validate a signal. - - Raise ValueError if the signal number is invalid or uncatchable. - Raise RuntimeError if there is a problem setting up the handler. - """ - if not isinstance(sig, int): - raise TypeError('sig must be an int, not {!r}'.format(sig)) - if signal is None: - raise RuntimeError('Signals are not supported') - if not (1 <= sig < signal.NSIG): - raise ValueError('sig {} out of range(1, {})'.format(sig, - signal.NSIG)) - if sys.platform == 'win32': - raise RuntimeError('Signals are not really supported on Windows') - - def _add_callback(self, handler): - """Add a Handler to ready or scheduled.""" - if handler.cancelled: - return - if handler.when is None: - self._ready.append(handler) - else: - heapq.heappush(self._scheduled, handler) - - def _run_once(self, timeout=None): - """Run one full iteration of the event loop. - - This calls all currently ready callbacks, polls for I/O, - schedules the resulting callbacks, and finally schedules - 'call_later' callbacks. - """ - # TODO: Break each of these into smaller pieces. - # TODO: Refactor to separate the callbacks from the readers/writers. - # TODO: An alternative API would be to do the *minimal* amount - # of work, e.g. one callback or one I/O poll. - - # Remove delayed calls that were cancelled from head of queue. - while self._scheduled and self._scheduled[0].cancelled: - heapq.heappop(self._scheduled) - - # Inspect the poll queue. If there's exactly one selectable - # file descriptor, it's the self-pipe, and if there's nothing - # scheduled, we should ignore it. - if self._selector.registered_count() > 1 or self._scheduled: - if self._ready: - timeout = 0 - elif self._scheduled: - # Compute the desired timeout. - when = self._scheduled[0].when - deadline = max(0, when - time.monotonic()) - if timeout is None: - timeout = deadline + def _process_events(self, event_list): + for fileobj, mask, (reader, writer, connector) in event_list: + if mask & selectors.EVENT_READ and reader is not None: + if reader.cancelled: + self.remove_reader(fileobj) else: - timeout = min(timeout, deadline) - - t0 = time.monotonic() - event_list = self._selector.select(timeout) - t1 = time.monotonic() - argstr = '' if timeout is None else ' %.3f' % timeout - if t1-t0 >= 1: - level = logging.INFO - else: - level = logging.DEBUG - logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0) - for fileobj, mask, (reader, writer, connector) in event_list: - if mask & selectors.EVENT_READ and reader is not None: - if reader.cancelled: - self.remove_reader(fileobj) - else: - self._add_callback(reader) - if mask & selectors.EVENT_WRITE and writer is not None: - if writer.cancelled: - self.remove_writer(fileobj) - else: - self._add_callback(writer) - elif mask & selectors.EVENT_CONNECT and connector is not None: - if connector.cancelled: - self.remove_connector(fileobj) - else: - self._add_callback(connector) - - # Handle 'later' callbacks that are ready. - now = time.monotonic() - while self._scheduled: - handler = self._scheduled[0] - if handler.when > now: - break - handler = heapq.heappop(self._scheduled) - self._ready.append(handler) - - # This is the only place where callbacks are actually *called*. - # All other places just add them to ready. - # Note: We run all currently scheduled callbacks, but not any - # callbacks scheduled by callbacks run this time around -- - # they will be run the next time (after another I/O poll). - # Use an idiom that is threadsafe without using locks. - ntodo = len(self._ready) - for i in range(ntodo): - handler = self._ready.popleft() - if not handler.cancelled: - try: - handler.callback(*handler.args) - except Exception: - logging.exception('Exception in callback %s %r', - handler.callback, handler.args) + self._add_callback(reader) + if mask & selectors.EVENT_WRITE and writer is not None: + if writer.cancelled: + self.remove_writer(fileobj) + else: + self._add_callback(writer) + elif mask & selectors.EVENT_CONNECT and connector is not None: + if connector.cancelled: + self.remove_connector(fileobj) + else: + self._add_callback(connector) class _UnixSocketTransport(transports.Transport): |