summaryrefslogtreecommitdiff
path: root/Modules
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2019-05-28 12:52:15 +0300
committerMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>2019-05-28 02:52:15 -0700
commitbafd4b5ac83b6cc0b7455290a04c4bfad34bdc90 (patch)
treebfb330fd3530eec1781d35b4b0c8339f93018951 /Modules
parent9ee2c264c37a71bd1c60f6032c50630b87e3c611 (diff)
downloadcpython-git-bafd4b5ac83b6cc0b7455290a04c4bfad34bdc90.tar.gz
bpo-29883: Asyncio proactor udp (GH-13440)
Follow-up for #1067 https://bugs.python.org/issue29883
Diffstat (limited to 'Modules')
-rw-r--r--Modules/overlapped.c372
1 files changed, 356 insertions, 16 deletions
diff --git a/Modules/overlapped.c b/Modules/overlapped.c
index e5a209bf75..aad531e478 100644
--- a/Modules/overlapped.c
+++ b/Modules/overlapped.c
@@ -39,7 +39,8 @@
enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE,
TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE,
- TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE};
+ TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM,
+ TYPE_WRITE_TO};
typedef struct {
PyObject_HEAD
@@ -53,8 +54,19 @@ typedef struct {
union {
/* Buffer allocated by us: TYPE_READ and TYPE_ACCEPT */
PyObject *allocated_buffer;
- /* Buffer passed by the user: TYPE_WRITE and TYPE_READINTO */
+ /* Buffer passed by the user: TYPE_WRITE, TYPE_WRITE_TO, and TYPE_READINTO */
Py_buffer user_buffer;
+
+ /* Data used for reading from a connectionless socket:
+ TYPE_READ_FROM */
+ struct {
+ // A (buffer, (host, port)) tuple
+ PyObject *result;
+ // The actual read buffer
+ PyObject *allocated_buffer;
+ struct sockaddr_in6 address;
+ int address_length;
+ } read_from;
};
} OverlappedObject;
@@ -570,16 +582,32 @@ static int
Overlapped_clear(OverlappedObject *self)
{
switch (self->type) {
- case TYPE_READ:
- case TYPE_ACCEPT:
- Py_CLEAR(self->allocated_buffer);
- break;
- case TYPE_WRITE:
- case TYPE_READINTO:
- if (self->user_buffer.obj) {
- PyBuffer_Release(&self->user_buffer);
+ case TYPE_READ:
+ case TYPE_ACCEPT: {
+ Py_CLEAR(self->allocated_buffer);
+ break;
+ }
+ case TYPE_READ_FROM: {
+ // An initial call to WSARecvFrom will only allocate the buffer.
+ // The result tuple of (message, address) is only
+ // allocated _after_ a message has been received.
+ if(self->read_from.result) {
+ // We've received a message, free the result tuple.
+ Py_CLEAR(self->read_from.result);
+ }
+ if(self->read_from.allocated_buffer) {
+ Py_CLEAR(self->read_from.allocated_buffer);
+ }
+ break;
+ }
+ case TYPE_WRITE:
+ case TYPE_WRITE_TO:
+ case TYPE_READINTO: {
+ if (self->user_buffer.obj) {
+ PyBuffer_Release(&self->user_buffer);
+ }
+ break;
}
- break;
}
self->type = TYPE_NOT_STARTED;
return 0;
@@ -627,6 +655,73 @@ Overlapped_dealloc(OverlappedObject *self)
SetLastError(olderr);
}
+
+/* Convert IPv4 sockaddr to a Python str. */
+
+static PyObject *
+make_ipv4_addr(const struct sockaddr_in *addr)
+{
+ char buf[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, &addr->sin_addr, buf, sizeof(buf)) == NULL) {
+ PyErr_SetFromErrno(PyExc_OSError);
+ return NULL;
+ }
+ return PyUnicode_FromString(buf);
+}
+
+#ifdef ENABLE_IPV6
+/* Convert IPv6 sockaddr to a Python str. */
+
+static PyObject *
+make_ipv6_addr(const struct sockaddr_in6 *addr)
+{
+ char buf[INET6_ADDRSTRLEN];
+ if (inet_ntop(AF_INET6, &addr->sin6_addr, buf, sizeof(buf)) == NULL) {
+ PyErr_SetFromErrno(PyExc_OSError);
+ return NULL;
+ }
+ return PyUnicode_FromString(buf);
+}
+#endif
+
+static PyObject*
+unparse_address(LPSOCKADDR Address, DWORD Length)
+{
+ /* The function is adopted from mocketmodule.c makesockaddr()*/
+
+ switch(Address->sa_family) {
+ case AF_INET: {
+ const struct sockaddr_in *a = (const struct sockaddr_in *)Address;
+ PyObject *addrobj = make_ipv4_addr(a);
+ PyObject *ret = NULL;
+ if (addrobj) {
+ ret = Py_BuildValue("Oi", addrobj, ntohs(a->sin_port));
+ Py_DECREF(addrobj);
+ }
+ return ret;
+ }
+#ifdef ENABLE_IPV6
+ case AF_INET6: {
+ const struct sockaddr_in6 *a = (const struct sockaddr_in6 *)Address;
+ PyObject *addrobj = make_ipv6_addr(a);
+ PyObject *ret = NULL;
+ if (addrobj) {
+ ret = Py_BuildValue("OiII",
+ addrobj,
+ ntohs(a->sin6_port),
+ ntohl(a->sin6_flowinfo),
+ a->sin6_scope_id);
+ Py_DECREF(addrobj);
+ }
+ return ret;
+ }
+#endif /* ENABLE_IPV6 */
+ default: {
+ return SetFromWindowsErr(ERROR_INVALID_PARAMETER);
+ }
+ }
+}
+
PyDoc_STRVAR(
Overlapped_cancel_doc,
"cancel() -> None\n\n"
@@ -670,6 +765,7 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args)
DWORD transferred = 0;
BOOL ret;
DWORD err;
+ PyObject *addr;
if (!PyArg_ParseTuple(args, "|" F_BOOL, &wait))
return NULL;
@@ -695,8 +791,15 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args)
case ERROR_MORE_DATA:
break;
case ERROR_BROKEN_PIPE:
- if (self->type == TYPE_READ || self->type == TYPE_READINTO)
+ if (self->type == TYPE_READ || self->type == TYPE_READINTO) {
break;
+ }
+ else if (self->type == TYPE_READ_FROM &&
+ (self->read_from.result != NULL ||
+ self->read_from.allocated_buffer != NULL))
+ {
+ break;
+ }
/* fall through */
default:
return SetFromWindowsErr(err);
@@ -708,8 +811,43 @@ Overlapped_getresult(OverlappedObject *self, PyObject *args)
if (transferred != PyBytes_GET_SIZE(self->allocated_buffer) &&
_PyBytes_Resize(&self->allocated_buffer, transferred))
return NULL;
+
Py_INCREF(self->allocated_buffer);
return self->allocated_buffer;
+ case TYPE_READ_FROM:
+ assert(PyBytes_CheckExact(self->read_from.allocated_buffer));
+
+ if (transferred != PyBytes_GET_SIZE(
+ self->read_from.allocated_buffer) &&
+ _PyBytes_Resize(&self->read_from.allocated_buffer, transferred))
+ {
+ return NULL;
+ }
+
+ // unparse the address
+ addr = unparse_address((SOCKADDR*)&self->read_from.address,
+ self->read_from.address_length);
+
+ if (addr == NULL) {
+ return NULL;
+ }
+
+ // The result is a two item tuple: (message, address)
+ self->read_from.result = PyTuple_New(2);
+ if (self->read_from.result == NULL) {
+ Py_CLEAR(addr);
+ return NULL;
+ }
+
+ // first item: message
+ Py_INCREF(self->read_from.allocated_buffer);
+ PyTuple_SET_ITEM(self->read_from.result, 0,
+ self->read_from.allocated_buffer);
+ // second item: address
+ PyTuple_SET_ITEM(self->read_from.result, 1, addr);
+
+ Py_INCREF(self->read_from.result);
+ return self->read_from.result;
default:
return PyLong_FromUnsignedLong((unsigned long) transferred);
}
@@ -1121,7 +1259,6 @@ parse_address(PyObject *obj, SOCKADDR *Address, int Length)
return -1;
}
-
PyDoc_STRVAR(
Overlapped_ConnectEx_doc,
"ConnectEx(client_handle, address_as_bytes) -> Overlapped[None]\n\n"
@@ -1314,7 +1451,7 @@ PyDoc_STRVAR(
"Connect to the pipe for asynchronous I/O (overlapped).");
static PyObject *
-ConnectPipe(OverlappedObject *self, PyObject *args)
+overlapped_ConnectPipe(PyObject *self, PyObject *args)
{
PyObject *AddressObj;
wchar_t *Address;
@@ -1362,15 +1499,213 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg)
Py_VISIT(self->allocated_buffer);
break;
case TYPE_WRITE:
+ case TYPE_WRITE_TO:
case TYPE_READINTO:
if (self->user_buffer.obj) {
Py_VISIT(&self->user_buffer.obj);
}
break;
+ case TYPE_READ_FROM:
+ if(self->read_from.result) {
+ Py_VISIT(self->read_from.result);
+ }
+ if(self->read_from.allocated_buffer) {
+ Py_VISIT(self->read_from.allocated_buffer);
+ }
}
return 0;
}
+// UDP functions
+
+PyDoc_STRVAR(
+ WSAConnect_doc,
+ "WSAConnect(client_handle, address_as_bytes) -> Overlapped[None]\n\n"
+ "Bind a remote address to a connectionless (UDP) socket");
+
+/*
+ * Note: WSAConnect does not support Overlapped I/O so this function should
+ * _only_ be used for connectionless sockets (UDP).
+ */
+static PyObject *
+overlapped_WSAConnect(PyObject *self, PyObject *args)
+{
+ SOCKET ConnectSocket;
+ PyObject *AddressObj;
+ char AddressBuf[sizeof(struct sockaddr_in6)];
+ SOCKADDR *Address = (SOCKADDR*)AddressBuf;
+ int Length;
+ int err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O", &ConnectSocket, &AddressObj)) {
+ return NULL;
+ }
+
+ Length = sizeof(AddressBuf);
+ Length = parse_address(AddressObj, Address, Length);
+ if (Length < 0) {
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ // WSAConnect does not support overlapped I/O so this call will
+ // successfully complete immediately.
+ err = WSAConnect(ConnectSocket, Address, Length,
+ NULL, NULL, NULL, NULL);
+ Py_END_ALLOW_THREADS
+
+ if (err == 0) {
+ Py_RETURN_NONE;
+ }
+ else {
+ return SetFromWindowsErr(WSAGetLastError());
+ }
+}
+
+PyDoc_STRVAR(
+ Overlapped_WSASendTo_doc,
+ "WSASendTo(handle, buf, flags, address_as_bytes) -> "
+ "Overlapped[bytes_transferred]\n\n"
+ "Start overlapped sendto over a connectionless (UDP) socket");
+
+static PyObject *
+Overlapped_WSASendTo(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ PyObject *bufobj;
+ DWORD flags;
+ PyObject *AddressObj;
+ char AddressBuf[sizeof(struct sockaddr_in6)];
+ SOCKADDR *Address = (SOCKADDR*)AddressBuf;
+ int AddressLength;
+ DWORD written;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "O" F_DWORD "O",
+ &handle, &bufobj, &flags, &AddressObj))
+ {
+ return NULL;
+ }
+
+ // Parse the "to" address
+ AddressLength = sizeof(AddressBuf);
+ AddressLength = parse_address(AddressObj, Address, AddressLength);
+ if (AddressLength < 0) {
+ return NULL;
+ }
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+ if (!PyArg_Parse(bufobj, "y*", &self->user_buffer)) {
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T > SIZEOF_LONG
+ if (self->user_buffer.len > (Py_ssize_t)ULONG_MAX) {
+ PyBuffer_Release(&self->user_buffer);
+ PyErr_SetString(PyExc_ValueError, "buffer too large");
+ return NULL;
+ }
+#endif
+
+ self->type = TYPE_WRITE_TO;
+ self->handle = handle;
+ wsabuf.len = (DWORD)self->user_buffer.len;
+ wsabuf.buf = self->user_buffer.buf;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSASendTo((SOCKET)handle, &wsabuf, 1, &written, flags,
+ Address, AddressLength, &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret == SOCKET_ERROR ? WSAGetLastError() :
+ ERROR_SUCCESS);
+
+ switch(err) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return SetFromWindowsErr(err);
+ }
+}
+
+
+
+PyDoc_STRVAR(
+ Overlapped_WSARecvFrom_doc,
+ "RecvFile(handle, size, flags) -> Overlapped[(message, (host, port))]\n\n"
+ "Start overlapped receive");
+
+static PyObject *
+Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *args)
+{
+ HANDLE handle;
+ DWORD size;
+ DWORD flags = 0;
+ DWORD nread;
+ PyObject *buf;
+ WSABUF wsabuf;
+ int ret;
+ DWORD err;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_DWORD "|" F_DWORD,
+ &handle, &size, &flags))
+ {
+ return NULL;
+ }
+
+ if (self->type != TYPE_NONE) {
+ PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ return NULL;
+ }
+
+#if SIZEOF_SIZE_T <= SIZEOF_LONG
+ size = Py_MIN(size, (DWORD)PY_SSIZE_T_MAX);
+#endif
+ buf = PyBytes_FromStringAndSize(NULL, Py_MAX(size, 1));
+ if (buf == NULL) {
+ return NULL;
+ }
+
+ wsabuf.len = size;
+ wsabuf.buf = PyBytes_AS_STRING(buf);
+
+ self->type = TYPE_READ_FROM;
+ self->handle = handle;
+ self->read_from.allocated_buffer = buf;
+ memset(&self->read_from.address, 0, sizeof(self->read_from.address));
+ self->read_from.address_length = sizeof(self->read_from.address);
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags,
+ (SOCKADDR*)&self->read_from.address,
+ &self->read_from.address_length,
+ &self->overlapped, NULL);
+ Py_END_ALLOW_THREADS
+
+ self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS);
+
+ switch(err) {
+ case ERROR_BROKEN_PIPE:
+ mark_as_completed(&self->overlapped);
+ return SetFromWindowsErr(err);
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_IO_PENDING:
+ Py_RETURN_NONE;
+ default:
+ self->type = TYPE_NOT_STARTED;
+ return SetFromWindowsErr(err);
+ }
+}
+
static PyMethodDef Overlapped_methods[] = {
{"getresult", (PyCFunction) Overlapped_getresult,
@@ -1399,6 +1734,10 @@ static PyMethodDef Overlapped_methods[] = {
METH_VARARGS, Overlapped_TransmitFile_doc},
{"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
+ {"WSARecvFrom", (PyCFunction) Overlapped_WSARecvFrom,
+ METH_VARARGS, Overlapped_WSARecvFrom_doc },
+ {"WSASendTo", (PyCFunction) Overlapped_WSASendTo,
+ METH_VARARGS, Overlapped_WSASendTo_doc },
{NULL}
};
@@ -1484,9 +1823,10 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, SetEvent_doc},
{"ResetEvent", overlapped_ResetEvent,
METH_VARARGS, ResetEvent_doc},
- {"ConnectPipe",
- (PyCFunction) ConnectPipe,
+ {"ConnectPipe", overlapped_ConnectPipe,
METH_VARARGS, ConnectPipe_doc},
+ {"WSAConnect", overlapped_WSAConnect,
+ METH_VARARGS, WSAConnect_doc},
{NULL}
};