diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2019-05-28 12:52:15 +0300 |
---|---|---|
committer | Miss Islington (bot) <31488909+miss-islington@users.noreply.github.com> | 2019-05-28 02:52:15 -0700 |
commit | bafd4b5ac83b6cc0b7455290a04c4bfad34bdc90 (patch) | |
tree | bfb330fd3530eec1781d35b4b0c8339f93018951 /Modules | |
parent | 9ee2c264c37a71bd1c60f6032c50630b87e3c611 (diff) | |
download | cpython-git-bafd4b5ac83b6cc0b7455290a04c4bfad34bdc90.tar.gz |
bpo-29883: Asyncio proactor udp (GH-13440)
Follow-up for #1067
https://bugs.python.org/issue29883
Diffstat (limited to 'Modules')
-rw-r--r-- | Modules/overlapped.c | 372 |
1 files changed, 356 insertions, 16 deletions
diff --git a/Modules/overlapped.c b/Modules/overlapped.c index e5a209bf75..aad531e478 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -39,7 +39,8 @@ enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE, TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, - TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE}; + TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM, + TYPE_WRITE_TO}; typedef struct { PyObject_HEAD @@ -53,8 +54,19 @@ typedef struct { union { /* Buffer allocated by us: TYPE_READ and TYPE_ACCEPT */ PyObject *allocated_buffer; - /* Buffer passed by the user: TYPE_WRITE and TYPE_READINTO */ + /* Buffer passed by the user: TYPE_WRITE, TYPE_WRITE_TO, and TYPE_READINTO */ Py_buffer user_buffer; + + /* Data used for reading from a connectionless socket: + TYPE_READ_FROM */ + struct { + // A (buffer, (host, port)) tuple + PyObject *result; + // The actual read buffer + PyObject *allocated_buffer; + struct sockaddr_in6 address; + int address_length; + } read_from; }; } OverlappedObject; @@ -570,16 +582,32 @@ static int Overlapped_clear(OverlappedObject *self) { switch (self->type) { - case TYPE_READ: - case TYPE_ACCEPT: - Py_CLEAR(self->allocated_buffer); - break; - case TYPE_WRITE: - case TYPE_READINTO: - if (self->user_buffer.obj) { - PyBuffer_Release(&self->user_buffer); + case TYPE_READ: + case TYPE_ACCEPT: { + Py_CLEAR(self->allocated_buffer); + break; + } + case TYPE_READ_FROM: { + // An initial call to WSARecvFrom will only allocate the buffer. + // The result tuple of (message, address) is only + // allocated _after_ a message has been received. + if(self->read_from.result) { + // We've received a message, free the result tuple. + Py_CLEAR(self->read_from.result); + } + if(self->read_from.allocated_buffer) { + Py_CLEAR(self->read_from.allocated_buffer); + } + break; + } + case TYPE_WRITE: + case TYPE_WRITE_TO: + case TYPE_READINTO: { + if (self->user_buffer.obj) { + PyBuffer_Release(&self->user_buffer); + } + break; } - break; } self->type = TYPE_NOT_STARTED; return 0; @@ -627,6 +655,73 @@ Overlapped_dealloc(OverlappedObject *self) SetLastError(olderr); } + +/* Convert IPv4 sockaddr to a Python str. */ + +static PyObject * +make_ipv4_addr(const struct sockaddr_in *addr) +{ + char buf[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); +} + +#ifdef ENABLE_IPV6 +/* Convert IPv6 sockaddr to a Python str. */ + +static PyObject * +make_ipv6_addr(const struct sockaddr_in6 *addr) +{ + char buf[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET6, &addr->sin6_addr, buf, sizeof(buf)) == NULL) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + return PyUnicode_FromString(buf); +} +#endif + +static PyObject* +unparse_address(LPSOCKADDR Address, DWORD Length) +{ + /* The function is adopted from mocketmodule.c makesockaddr()*/ + + switch(Address->sa_family) { + case AF_INET: { + const struct sockaddr_in *a = (const struct sockaddr_in *)Address; + PyObject *addrobj = make_ipv4_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port)); + Py_DECREF(addrobj); + } + return ret; + } +#ifdef ENABLE_IPV6 + case AF_INET6: { + const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address; + PyObject *addrobj = make_ipv6_addr(a); + PyObject *ret = NULL; + if (addrobj) { + ret = Py_BuildValue("OiII", + addrobj, + ntohs(a->sin6_port), + ntohl(a->sin6_flowinfo), + a->sin6_scope_id); + Py_DECREF(addrobj); + } + return ret; + } +#endif /* ENABLE_IPV6 */ + default: { + return SetFromWindowsErr(ERROR_INVALID_PARAMETER); + } + } +} + PyDoc_STRVAR( Overlapped_cancel_doc, "cancel() -> None\n\n" @@ -670,6 +765,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) DWORD transferred = 0; BOOL ret; DWORD err; + PyObject *addr; if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait)) return NULL; @@ -695,8 +791,15 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) case ERROR_MORE_DATA: break; case ERROR_BROKEN_PIPE: - if (self->type == TYPE_READ || self->type == TYPE_READINTO) + if (self->type == TYPE_READ || self->type == TYPE_READINTO) { break; + } + else if (self->type == TYPE_READ_FROM && + (self->read_from.result != NULL || + self->read_from.allocated_buffer != NULL)) + { + break; + } /* fall through */ default: return SetFromWindowsErr(err); @@ -708,8 +811,43 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args) if (transferred != PyBytes_GET_SIZE(self->allocated_buffer) && _PyBytes_Resize(&self->allocated_buffer, transferred)) return NULL; + Py_INCREF(self->allocated_buffer); return self->allocated_buffer; + case TYPE_READ_FROM: + assert(PyBytes_CheckExact(self->read_from.allocated_buffer)); + + if (transferred != PyBytes_GET_SIZE( + self->read_from.allocated_buffer) && + _PyBytes_Resize(&self->read_from.allocated_buffer, transferred)) + { + return NULL; + } + + // unparse the address + addr = unparse_address((SOCKADDR*)&self->read_from.address, + self->read_from.address_length); + + if (addr == NULL) { + return NULL; + } + + // The result is a two item tuple: (message, address) + self->read_from.result = PyTuple_New(2); + if (self->read_from.result == NULL) { + Py_CLEAR(addr); + return NULL; + } + + // first item: message + Py_INCREF(self->read_from.allocated_buffer); + PyTuple_SET_ITEM(self->read_from.result, 0, + self->read_from.allocated_buffer); + // second item: address + PyTuple_SET_ITEM(self->read_from.result, 1, addr); + + Py_INCREF(self->read_from.result); + return self->read_from.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); } @@ -1121,7 +1259,6 @@ parse_address(PyObject *obj, SOCKADDR *Address, int Length) return -1; } - PyDoc_STRVAR( Overlapped_ConnectEx_doc, "ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n" @@ -1314,7 +1451,7 @@ PyDoc_STRVAR( "Connect to the pipe for asynchronous I/O (overlapped)."); static PyObject * -ConnectPipe(OverlappedObject *self, PyObject *args) +overlapped_ConnectPipe(PyObject *self, PyObject *args) { PyObject *AddressObj; wchar_t *Address; @@ -1362,15 +1499,213 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) Py_VISIT(self->allocated_buffer); break; case TYPE_WRITE: + case TYPE_WRITE_TO: case TYPE_READINTO: if (self->user_buffer.obj) { Py_VISIT(&self->user_buffer.obj); } break; + case TYPE_READ_FROM: + if(self->read_from.result) { + Py_VISIT(self->read_from.result); + } + if(self->read_from.allocated_buffer) { + Py_VISIT(self->read_from.allocated_buffer); + } } return 0; } +// UDP functions + +PyDoc_STRVAR( + WSAConnect_doc, + "WSAConnect(client_handle, address_as_bytes) -> Overlapped[None]\n\n" + "Bind a remote address to a connectionless (UDP) socket"); + +/* + * Note: WSAConnect does not support Overlapped I/O so this function should + * _only_ be used for connectionless sockets (UDP). + */ +static PyObject * +overlapped_WSAConnect(PyObject *self, PyObject *args) +{ + SOCKET ConnectSocket; + PyObject *AddressObj; + char AddressBuf[sizeof(struct sockaddr_in6)]; + SOCKADDR *Address = (SOCKADDR*)AddressBuf; + int Length; + int err; + + if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) { + return NULL; + } + + Length = sizeof(AddressBuf); + Length = parse_address(AddressObj, Address, Length); + if (Length < 0) { + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + // WSAConnect does not support overlapped I/O so this call will + // successfully complete immediately. + err = WSAConnect(ConnectSocket, Address, Length, + NULL, NULL, NULL, NULL); + Py_END_ALLOW_THREADS + + if (err == 0) { + Py_RETURN_NONE; + } + else { + return SetFromWindowsErr(WSAGetLastError()); + } +} + +PyDoc_STRVAR( + Overlapped_WSASendTo_doc, + "WSASendTo(handle, buf, flags, address_as_bytes) -> " + "Overlapped[bytes_transferred]\n\n" + "Start overlapped sendto over a connectionless (UDP) socket"); + +static PyObject * +Overlapped_WSASendTo(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + PyObject *bufobj; + DWORD flags; + PyObject *AddressObj; + char AddressBuf[sizeof(struct sockaddr_in6)]; + SOCKADDR *Address = (SOCKADDR*)AddressBuf; + int AddressLength; + DWORD written; + WSABUF wsabuf; + int ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD "O", + &handle, &bufobj, &flags, &AddressObj)) + { + return NULL; + } + + // Parse the "to" address + AddressLength = sizeof(AddressBuf); + AddressLength = parse_address(AddressObj, Address, AddressLength); + if (AddressLength < 0) { + return NULL; + } + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) { + return NULL; + } + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->user_buffer); + PyErr_SetString(PyExc_ValueError, "buffer too large"); + return NULL; + } +#endif + + self->type = TYPE_WRITE_TO; + self->handle = handle; + wsabuf.len = (DWORD)self->user_buffer.len; + wsabuf.buf = self->user_buffer.buf; + + Py_BEGIN_ALLOW_THREADS + ret = WSASendTo((SOCKET)handle, &wsabuf, 1, &written, flags, + Address, AddressLength, &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret == SOCKET_ERROR ? WSAGetLastError() : + ERROR_SUCCESS); + + switch(err) { + case ERROR_SUCCESS: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +} + + + +PyDoc_STRVAR( + Overlapped_WSARecvFrom_doc, + "RecvFile(handle, size, flags) -> Overlapped[(message, (host, port))]\n\n" + "Start overlapped receive"); + +static PyObject * +Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args) +{ + HANDLE handle; + DWORD size; + DWORD flags = 0; + DWORD nread; + PyObject *buf; + WSABUF wsabuf; + int ret; + DWORD err; + + if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD, + &handle, &size, &flags)) + { + return NULL; + } + + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + +#if SIZEOF_SIZE_T <= SIZEOF_LONG + size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX); +#endif + buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1)); + if (buf == NULL) { + return NULL; + } + + wsabuf.len = size; + wsabuf.buf = PyBytes_AS_STRING(buf); + + self->type = TYPE_READ_FROM; + self->handle = handle; + self->read_from.allocated_buffer = buf; + memset(&self->read_from.address, 0, sizeof(self->read_from.address)); + self->read_from.address_length = sizeof(self->read_from.address); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + + switch(err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +} + static PyMethodDef Overlapped_methods[] = { {"getresult", (PyCFunction) Overlapped_getresult, @@ -1399,6 +1734,10 @@ static PyMethodDef Overlapped_methods[] = { METH_VARARGS, Overlapped_TransmitFile_doc}, {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe, METH_VARARGS, Overlapped_ConnectNamedPipe_doc}, + {"WSARecvFrom", (PyCFunction) Overlapped_WSARecvFrom, + METH_VARARGS, Overlapped_WSARecvFrom_doc }, + {"WSASendTo", (PyCFunction) Overlapped_WSASendTo, + METH_VARARGS, Overlapped_WSASendTo_doc }, {NULL} }; @@ -1484,9 +1823,10 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, SetEvent_doc}, {"ResetEvent", overlapped_ResetEvent, METH_VARARGS, ResetEvent_doc}, - {"ConnectPipe", - (PyCFunction) ConnectPipe, + {"ConnectPipe", overlapped_ConnectPipe, METH_VARARGS, ConnectPipe_doc}, + {"WSAConnect", overlapped_WSAConnect, + METH_VARARGS, WSAConnect_doc}, {NULL} }; |