summaryrefslogtreecommitdiff
path: root/Modules/_asynciomodule.c
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2017-12-17 20:19:47 -0500
committerGitHub <noreply@github.com>2017-12-17 20:19:47 -0500
commit1b7c11ff0ee3efafbf5b38c3c6f37de5d63efb81 (patch)
tree5e6752a06d700ac910fcba551cf25d975c615f1d /Modules/_asynciomodule.c
parent4c72bc4a38eced10a55ba7071e084b26a2b5ed4b (diff)
downloadcpython-git-1b7c11ff0ee3efafbf5b38c3c6f37de5d63efb81.tar.gz
bpo-32348: Optimize asyncio.Future schedule/add/remove callback. (#4907)
Diffstat (limited to 'Modules/_asynciomodule.c')
-rw-r--r--Modules/_asynciomodule.c397
1 files changed, 322 insertions, 75 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 378bd08b0c..5030a40b87 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -59,6 +59,7 @@ typedef enum {
#define FutureObj_HEAD(prefix) \
PyObject_HEAD \
PyObject *prefix##_loop; \
+ PyObject *prefix##_callback0; \
PyObject *prefix##_callbacks; \
PyObject *prefix##_exception; \
PyObject *prefix##_result; \
@@ -93,6 +94,16 @@ typedef struct {
} TaskWakeupMethWrapper;
+static PyTypeObject FutureType;
+static PyTypeObject TaskType;
+
+
+#define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType)
+#define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType)
+
+#define Future_Check(obj) PyObject_TypeCheck(obj, &FutureType)
+#define Task_Check(obj) PyObject_TypeCheck(obj, &TaskType)
+
#include "clinic/_asynciomodule.c.h"
@@ -101,6 +112,7 @@ class _asyncio.Future "FutureObj *" "&Future_Type"
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=00d3e4abca711e0f]*/
+
/* Get FutureIter from Future */
static PyObject* future_new_iter(PyObject *);
static inline int future_call_schedule_callbacks(FutureObj *);
@@ -234,46 +246,94 @@ get_event_loop(void)
static int
+call_soon(PyObject *loop, PyObject *func, PyObject *arg)
+{
+ PyObject *handle;
+ handle = _PyObject_CallMethodIdObjArgs(
+ loop, &PyId_call_soon, func, arg, NULL);
+ if (handle == NULL) {
+ return -1;
+ }
+ Py_DECREF(handle);
+ return 0;
+}
+
+
+static inline int
+future_is_alive(FutureObj *fut)
+{
+ return fut->fut_loop != NULL;
+}
+
+
+static inline int
+future_ensure_alive(FutureObj *fut)
+{
+ if (!future_is_alive(fut)) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "Future object is not initialized.");
+ return -1;
+ }
+ return 0;
+}
+
+
+#define ENSURE_FUTURE_ALIVE(fut) \
+ do { \
+ assert(Future_Check(fut) || Task_Check(fut)); \
+ if (future_ensure_alive((FutureObj*)fut)) { \
+ return NULL; \
+ } \
+ } while(0);
+
+
+static int
future_schedule_callbacks(FutureObj *fut)
{
Py_ssize_t len;
- PyObject *callbacks;
- int i;
+ Py_ssize_t i;
+
+ if (fut->fut_callback0 != NULL) {
+ /* There's a 1st callback */
+
+ int ret = call_soon(
+ fut->fut_loop, fut->fut_callback0, (PyObject *)fut);
+ Py_CLEAR(fut->fut_callback0);
+ if (ret) {
+ /* If an error occurs in pure-Python implementation,
+ all callbacks are cleared. */
+ Py_CLEAR(fut->fut_callbacks);
+ return ret;
+ }
+
+ /* we called the first callback, now try calling
+ callbacks from the 'fut_callbacks' list. */
+ }
if (fut->fut_callbacks == NULL) {
- PyErr_SetString(PyExc_RuntimeError, "uninitialized Future object");
- return -1;
+ /* No more callbacks, return. */
+ return 0;
}
len = PyList_GET_SIZE(fut->fut_callbacks);
if (len == 0) {
+ /* The list of callbacks was empty; clear it and return. */
+ Py_CLEAR(fut->fut_callbacks);
return 0;
}
- callbacks = PyList_GetSlice(fut->fut_callbacks, 0, len);
- if (callbacks == NULL) {
- return -1;
- }
- if (PyList_SetSlice(fut->fut_callbacks, 0, len, NULL) < 0) {
- Py_DECREF(callbacks);
- return -1;
- }
-
for (i = 0; i < len; i++) {
- PyObject *handle;
- PyObject *cb = PyList_GET_ITEM(callbacks, i);
+ PyObject *cb = PyList_GET_ITEM(fut->fut_callbacks, i);
- handle = _PyObject_CallMethodIdObjArgs(fut->fut_loop, &PyId_call_soon,
- cb, fut, NULL);
-
- if (handle == NULL) {
- Py_DECREF(callbacks);
+ if (call_soon(fut->fut_loop, cb, (PyObject *)fut)) {
+ /* If an error occurs in pure-Python implementation,
+ all callbacks are cleared. */
+ Py_CLEAR(fut->fut_callbacks);
return -1;
}
- Py_DECREF(handle);
}
- Py_DECREF(callbacks);
+ Py_CLEAR(fut->fut_callbacks);
return 0;
}
@@ -311,10 +371,8 @@ future_init(FutureObj *fut, PyObject *loop)
}
}
- Py_XSETREF(fut->fut_callbacks, PyList_New(0));
- if (fut->fut_callbacks == NULL) {
- return -1;
- }
+ fut->fut_callback0 = NULL;
+ fut->fut_callbacks = NULL;
return 0;
}
@@ -322,6 +380,10 @@ future_init(FutureObj *fut, PyObject *loop)
static PyObject *
future_set_result(FutureObj *fut, PyObject *res)
{
+ if (future_ensure_alive(fut)) {
+ return NULL;
+ }
+
if (fut->fut_state != STATE_PENDING) {
PyErr_SetString(asyncio_InvalidStateError, "invalid state");
return NULL;
@@ -416,25 +478,61 @@ future_get_result(FutureObj *fut, PyObject **result)
static PyObject *
future_add_done_callback(FutureObj *fut, PyObject *arg)
{
+ if (!future_is_alive(fut)) {
+ PyErr_SetString(PyExc_RuntimeError, "uninitialized Future object");
+ return NULL;
+ }
+
if (fut->fut_state != STATE_PENDING) {
- PyObject *handle = _PyObject_CallMethodIdObjArgs(fut->fut_loop,
- &PyId_call_soon,
- arg, fut, NULL);
- if (handle == NULL) {
+ /* The future is done/cancelled, so schedule the callback
+ right away. */
+ if (call_soon(fut->fut_loop, arg, (PyObject*) fut)) {
return NULL;
}
- Py_DECREF(handle);
}
else {
- if (fut->fut_callbacks == NULL) {
- PyErr_SetString(PyExc_RuntimeError, "uninitialized Future object");
- return NULL;
+ /* The future is pending, add a callback.
+
+ Callbacks in the future object are stored as follows:
+
+ callback0 -- a pointer to the first callback
+ callbacks -- a list of 2nd, 3rd, ... callbacks
+
+ Invariants:
+
+ * callbacks != NULL:
+ There are some callbacks in in the list. Just
+ add the new callback to it.
+
+ * callbacks == NULL and callback0 == NULL:
+ This is the first callback. Set it to callback0.
+
+ * callbacks == NULL and callback0 != NULL:
+ This is a second callback. Initialize callbacks
+ with a new list and add the new callback to it.
+ */
+
+ if (fut->fut_callbacks != NULL) {
+ int err = PyList_Append(fut->fut_callbacks, arg);
+ if (err != 0) {
+ return NULL;
+ }
}
- int err = PyList_Append(fut->fut_callbacks, arg);
- if (err != 0) {
- return NULL;
+ else if (fut->fut_callback0 == NULL) {
+ Py_INCREF(arg);
+ fut->fut_callback0 = arg;
+ }
+ else {
+ fut->fut_callbacks = PyList_New(1);
+ if (fut->fut_callbacks == NULL) {
+ return NULL;
+ }
+
+ Py_INCREF(arg);
+ PyList_SET_ITEM(fut->fut_callbacks, 0, arg);
}
}
+
Py_RETURN_NONE;
}
@@ -487,6 +585,7 @@ static int
FutureObj_clear(FutureObj *fut)
{
Py_CLEAR(fut->fut_loop);
+ Py_CLEAR(fut->fut_callback0);
Py_CLEAR(fut->fut_callbacks);
Py_CLEAR(fut->fut_result);
Py_CLEAR(fut->fut_exception);
@@ -499,6 +598,7 @@ static int
FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg)
{
Py_VISIT(fut->fut_loop);
+ Py_VISIT(fut->fut_callback0);
Py_VISIT(fut->fut_callbacks);
Py_VISIT(fut->fut_result);
Py_VISIT(fut->fut_exception);
@@ -522,6 +622,13 @@ _asyncio_Future_result_impl(FutureObj *self)
/*[clinic end generated code: output=f35f940936a4b1e5 input=49ecf9cf5ec50dc5]*/
{
PyObject *result;
+
+ if (!future_is_alive(self)) {
+ PyErr_SetString(asyncio_InvalidStateError,
+ "Future object is not initialized.");
+ return NULL;
+ }
+
int res = future_get_result(self, &result);
if (res == -1) {
@@ -554,6 +661,12 @@ static PyObject *
_asyncio_Future_exception_impl(FutureObj *self)
/*[clinic end generated code: output=88b20d4f855e0710 input=733547a70c841c68]*/
{
+ if (!future_is_alive(self)) {
+ PyErr_SetString(asyncio_InvalidStateError,
+ "Future object is not initialized.");
+ return NULL;
+ }
+
if (self->fut_state == STATE_CANCELLED) {
PyErr_SetNone(asyncio_CancelledError);
return NULL;
@@ -589,6 +702,7 @@ static PyObject *
_asyncio_Future_set_result(FutureObj *self, PyObject *res)
/*[clinic end generated code: output=a620abfc2796bfb6 input=5b9dc180f1baa56d]*/
{
+ ENSURE_FUTURE_ALIVE(self)
return future_set_result(self, res);
}
@@ -608,6 +722,7 @@ static PyObject *
_asyncio_Future_set_exception(FutureObj *self, PyObject *exception)
/*[clinic end generated code: output=f1c1b0cd321be360 input=e45b7d7aa71cc66d]*/
{
+ ENSURE_FUTURE_ALIVE(self)
return future_set_exception(self, exception);
}
@@ -648,15 +763,45 @@ _asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn)
{
PyObject *newlist;
Py_ssize_t len, i, j=0;
+ Py_ssize_t cleared_callback0 = 0;
+
+ ENSURE_FUTURE_ALIVE(self)
+
+ if (self->fut_callback0 != NULL) {
+ int cmp = PyObject_RichCompareBool(fn, self->fut_callback0, Py_EQ);
+ if (cmp == -1) {
+ return NULL;
+ }
+ if (cmp == 1) {
+ /* callback0 == fn */
+ Py_CLEAR(self->fut_callback0);
+ cleared_callback0 = 1;
+ }
+ }
if (self->fut_callbacks == NULL) {
- PyErr_SetString(PyExc_RuntimeError, "uninitialized Future object");
- return NULL;
+ return PyLong_FromSsize_t(cleared_callback0);
}
len = PyList_GET_SIZE(self->fut_callbacks);
if (len == 0) {
- return PyLong_FromSsize_t(0);
+ Py_CLEAR(self->fut_callbacks);
+ return PyLong_FromSsize_t(cleared_callback0);
+ }
+
+ if (len == 1) {
+ int cmp = PyObject_RichCompareBool(
+ fn, PyList_GET_ITEM(self->fut_callbacks, 0), Py_EQ);
+ if (cmp == -1) {
+ return NULL;
+ }
+ if (cmp == 1) {
+ /* callbacks[0] == fn */
+ Py_CLEAR(self->fut_callbacks);
+ return PyLong_FromSsize_t(1 + cleared_callback0);
+ }
+ /* callbacks[0] != fn and len(callbacks) == 1 */
+ return PyLong_FromSsize_t(cleared_callback0);
}
newlist = PyList_New(len);
@@ -683,6 +828,12 @@ _asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn)
}
}
+ if (j == 0) {
+ Py_CLEAR(self->fut_callbacks);
+ Py_DECREF(newlist);
+ return PyLong_FromSsize_t(len + cleared_callback0);
+ }
+
if (j < len) {
Py_SIZE(newlist) = j;
}
@@ -694,7 +845,7 @@ _asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn)
}
}
Py_DECREF(newlist);
- return PyLong_FromSsize_t(len - j);
+ return PyLong_FromSsize_t(len - j + cleared_callback0);
fail:
Py_DECREF(newlist);
@@ -715,6 +866,7 @@ static PyObject *
_asyncio_Future_cancel_impl(FutureObj *self)
/*[clinic end generated code: output=e45b932ba8bd68a1 input=515709a127995109]*/
{
+ ENSURE_FUTURE_ALIVE(self)
return future_cancel(self);
}
@@ -728,7 +880,7 @@ static PyObject *
_asyncio_Future_cancelled_impl(FutureObj *self)
/*[clinic end generated code: output=145197ced586357d input=943ab8b7b7b17e45]*/
{
- if (self->fut_state == STATE_CANCELLED) {
+ if (future_is_alive(self) && self->fut_state == STATE_CANCELLED) {
Py_RETURN_TRUE;
}
else {
@@ -749,7 +901,7 @@ static PyObject *
_asyncio_Future_done_impl(FutureObj *self)
/*[clinic end generated code: output=244c5ac351145096 input=28d7b23fdb65d2ac]*/
{
- if (self->fut_state == STATE_PENDING) {
+ if (!future_is_alive(self) || self->fut_state == STATE_PENDING) {
Py_RETURN_FALSE;
}
else {
@@ -760,7 +912,7 @@ _asyncio_Future_done_impl(FutureObj *self)
static PyObject *
FutureObj_get_blocking(FutureObj *fut)
{
- if (fut->fut_blocking) {
+ if (future_is_alive(fut) && fut->fut_blocking) {
Py_RETURN_TRUE;
}
else {
@@ -771,6 +923,10 @@ FutureObj_get_blocking(FutureObj *fut)
static int
FutureObj_set_blocking(FutureObj *fut, PyObject *val)
{
+ if (future_ensure_alive(fut)) {
+ return -1;
+ }
+
int is_true = PyObject_IsTrue(val);
if (is_true < 0) {
return -1;
@@ -782,6 +938,7 @@ FutureObj_set_blocking(FutureObj *fut, PyObject *val)
static PyObject *
FutureObj_get_log_traceback(FutureObj *fut)
{
+ ENSURE_FUTURE_ALIVE(fut)
if (fut->fut_log_tb) {
Py_RETURN_TRUE;
}
@@ -804,7 +961,7 @@ FutureObj_set_log_traceback(FutureObj *fut, PyObject *val)
static PyObject *
FutureObj_get_loop(FutureObj *fut)
{
- if (fut->fut_loop == NULL) {
+ if (!future_is_alive(fut)) {
Py_RETURN_NONE;
}
Py_INCREF(fut->fut_loop);
@@ -814,16 +971,57 @@ FutureObj_get_loop(FutureObj *fut)
static PyObject *
FutureObj_get_callbacks(FutureObj *fut)
{
+ Py_ssize_t i;
+ Py_ssize_t len;
+ PyObject *new_list;
+
+ ENSURE_FUTURE_ALIVE(fut)
+
if (fut->fut_callbacks == NULL) {
- Py_RETURN_NONE;
+ if (fut->fut_callback0 == NULL) {
+ Py_RETURN_NONE;
+ }
+ else {
+ new_list = PyList_New(1);
+ if (new_list == NULL) {
+ return NULL;
+ }
+ Py_INCREF(fut->fut_callback0);
+ PyList_SET_ITEM(new_list, 0, fut->fut_callback0);
+ return new_list;
+ }
+ }
+
+ assert(fut->fut_callbacks != NULL);
+
+ if (fut->fut_callback0 == NULL) {
+ Py_INCREF(fut->fut_callbacks);
+ return fut->fut_callbacks;
+ }
+
+ assert(fut->fut_callback0 != NULL);
+
+ len = PyList_GET_SIZE(fut->fut_callbacks);
+ new_list = PyList_New(len + 1);
+ if (new_list == NULL) {
+ return NULL;
+ }
+
+ Py_INCREF(fut->fut_callback0);
+ PyList_SET_ITEM(new_list, 0, fut->fut_callback0);
+ for (i = 0; i < len; i++) {
+ PyObject *cb = PyList_GET_ITEM(fut->fut_callbacks, i);
+ Py_INCREF(cb);
+ PyList_SET_ITEM(new_list, i + 1, cb);
}
- Py_INCREF(fut->fut_callbacks);
- return fut->fut_callbacks;
+
+ return new_list;
}
static PyObject *
FutureObj_get_result(FutureObj *fut)
{
+ ENSURE_FUTURE_ALIVE(fut)
if (fut->fut_result == NULL) {
Py_RETURN_NONE;
}
@@ -834,6 +1032,7 @@ FutureObj_get_result(FutureObj *fut)
static PyObject *
FutureObj_get_exception(FutureObj *fut)
{
+ ENSURE_FUTURE_ALIVE(fut)
if (fut->fut_exception == NULL) {
Py_RETURN_NONE;
}
@@ -844,7 +1043,7 @@ FutureObj_get_exception(FutureObj *fut)
static PyObject *
FutureObj_get_source_traceback(FutureObj *fut)
{
- if (fut->fut_source_tb == NULL) {
+ if (!future_is_alive(fut) || fut->fut_source_tb == NULL) {
Py_RETURN_NONE;
}
Py_INCREF(fut->fut_source_tb);
@@ -859,6 +1058,8 @@ FutureObj_get_state(FutureObj *fut)
_Py_IDENTIFIER(FINISHED);
PyObject *ret = NULL;
+ ENSURE_FUTURE_ALIVE(fut)
+
switch (fut->fut_state) {
case STATE_PENDING:
ret = _PyUnicode_FromId(&PyId_PENDING);
@@ -896,6 +1097,8 @@ static PyObject *
_asyncio_Future__schedule_callbacks_impl(FutureObj *self)
/*[clinic end generated code: output=5e8958d89ea1c5dc input=4f5f295f263f4a88]*/
{
+ ENSURE_FUTURE_ALIVE(self)
+
int ret = future_schedule_callbacks(self);
if (ret == -1) {
return NULL;
@@ -908,6 +1111,8 @@ FutureObj_repr(FutureObj *fut)
{
_Py_IDENTIFIER(_repr_info);
+ ENSURE_FUTURE_ALIVE(fut)
+
PyObject *rinfo = _PyObject_CallMethodIdObjArgs((PyObject*)fut,
&PyId__repr_info,
NULL);
@@ -1068,12 +1273,10 @@ static PyTypeObject FutureType = {
.tp_finalize = (destructor)FutureObj_finalize,
};
-#define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType)
-
static inline int
future_call_schedule_callbacks(FutureObj *fut)
{
- if (Future_CheckExact(fut)) {
+ if (Future_CheckExact(fut) || Task_CheckExact(fut)) {
return future_schedule_callbacks(fut);
}
else {
@@ -1122,12 +1325,26 @@ typedef struct {
FutureObj *future;
} futureiterobject;
+
+#define FI_FREELIST_MAXLEN 255
+static futureiterobject *fi_freelist = NULL;
+static Py_ssize_t fi_freelist_len = 0;
+
+
static void
FutureIter_dealloc(futureiterobject *it)
{
PyObject_GC_UnTrack(it);
- Py_XDECREF(it->future);
- PyObject_GC_Del(it);
+ Py_CLEAR(it->future);
+
+ if (fi_freelist_len < FI_FREELIST_MAXLEN) {
+ fi_freelist_len++;
+ it->future = (FutureObj*) fi_freelist;
+ fi_freelist = it;
+ }
+ else {
+ PyObject_GC_Del(it);
+ }
}
static PyObject *
@@ -1272,10 +1489,23 @@ future_new_iter(PyObject *fut)
PyErr_BadInternalCall();
return NULL;
}
- it = PyObject_GC_New(futureiterobject, &FutureIterType);
- if (it == NULL) {
- return NULL;
+
+ ENSURE_FUTURE_ALIVE(fut)
+
+ if (fi_freelist_len) {
+ fi_freelist_len--;
+ it = fi_freelist;
+ fi_freelist = (futureiterobject*) it->future;
+ it->future = NULL;
+ _Py_NewReference((PyObject*) it);
+ }
+ else {
+ it = PyObject_GC_New(futureiterobject, &FutureIterType);
+ if (it == NULL) {
+ return NULL;
+ }
}
+
Py_INCREF(fut);
it->future = (FutureObj*)fut;
PyObject_GC_Track(it);
@@ -1549,20 +1779,25 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
/*[clinic end generated code: output=9f24774c2287fc2f input=8d132974b049593e]*/
{
PyObject *res;
- int tmp;
+
if (future_init((FutureObj*)self, loop)) {
return -1;
}
if (!PyCoro_CheckExact(coro)) {
- // fastpath failed, perfom slow check
- // raise after Future.__init__(), attrs are required for __del__
- res = PyObject_CallFunctionObjArgs(asyncio_iscoroutine_func,
- coro, NULL);
+ /* 'coro' is not a native coroutine, call asyncio.iscoroutine()
+ to check if it's another coroutine flavour.
+
+ Do this check after 'future_init()'; in case we need to raise
+ an error, __del__ needs a properly initialized object.
+ */
+ res = PyObject_CallFunctionObjArgs(
+ asyncio_iscoroutine_func, coro, NULL);
if (res == NULL) {
return -1;
}
- tmp = PyObject_Not(res);
+
+ int tmp = PyObject_Not(res);
Py_DECREF(res);
if (tmp < 0) {
return -1;
@@ -2023,8 +2258,6 @@ static PyTypeObject TaskType = {
.tp_finalize = (destructor)TaskObj_finalize,
};
-#define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType)
-
static void
TaskObj_dealloc(PyObject *self)
{
@@ -2079,22 +2312,14 @@ task_call_step(TaskObj *task, PyObject *arg)
static int
task_call_step_soon(TaskObj *task, PyObject *arg)
{
- PyObject *handle;
-
PyObject *cb = TaskStepMethWrapper_new(task, arg);
if (cb == NULL) {
return -1;
}
- handle = _PyObject_CallMethodIdObjArgs(task->task_loop, &PyId_call_soon,
- cb, NULL);
+ int ret = call_soon(task->task_loop, cb, NULL);
Py_DECREF(cb);
- if (handle == NULL) {
- return -1;
- }
-
- Py_DECREF(handle);
- return 0;
+ return ret;
}
static PyObject *
@@ -2747,6 +2972,26 @@ _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
static void
+module_free_freelists()
+{
+ PyObject *next;
+ PyObject *current;
+
+ next = (PyObject*) fi_freelist;
+ while (next != NULL) {
+ assert(fi_freelist_len > 0);
+ fi_freelist_len--;
+
+ current = next;
+ next = (PyObject*) ((futureiterobject*) current)->future;
+ PyObject_GC_Del(current);
+ }
+ assert(fi_freelist_len == 0);
+ fi_freelist = NULL;
+}
+
+
+static void
module_free(void *m)
{
Py_CLEAR(asyncio_mod);
@@ -2764,6 +3009,8 @@ module_free(void *m)
Py_CLEAR(current_tasks);
Py_CLEAR(all_tasks);
+
+ module_free_freelists();
}
static int