From d7302bd2c4ae43a962a293b53998bd03423adba4 Mon Sep 17 00:00:00 2001 From: Debarshi Ray Date: Fri, 31 Aug 2012 00:01:54 +0200 Subject: log-walker: Rework get_events ... so that only the caches are filled asynchronously and the rest of the work does not involve the use of a separate thread. This is the first step towards ensuring that we do not run the TplLogEventFilter from a separate thread. NB: This does not solve the actual problem. The TplLogEventFilter is still invoked from a separate thread. However this refactoring lets us move in that direction. Fixes: https://bugs.freedesktop.org/54270 --- telepathy-logger/log-walker.c | 245 ++++++++++++++++++++++++++++++------------ 1 file changed, 174 insertions(+), 71 deletions(-) (limited to 'telepathy-logger') diff --git a/telepathy-logger/log-walker.c b/telepathy-logger/log-walker.c index f5e94d3..a873384 100644 --- a/telepathy-logger/log-walker.c +++ b/telepathy-logger/log-walker.c @@ -195,6 +195,12 @@ static const gsize CACHE_SIZE = 5; typedef struct { GAsyncReadyCallback cb; + GList *fill_cache; + GList *fill_iter; + GList *latest_cache; + GList *latest_event; + GList *latest_iter; + gint64 latest_timestamp; gpointer user_data; guint num_events; } TplLogWalkerAsyncData; @@ -256,92 +262,195 @@ tpl_log_walker_caches_free_func (gpointer data) } -static GList * -tpl_log_walker_get_events (TplLogWalker *walker, - guint num_events, +static void +tpl_log_walker_fill_cache_async_thread (GSimpleAsyncResult *simple, + GObject *object, + GCancellable *cancellable) +{ + GError *error = NULL; + TplLogWalkerAsyncData *async_data; + + async_data = (TplLogWalkerAsyncData *) g_async_result_get_user_data ( + G_ASYNC_RESULT (simple)); + + async_data->fill_cache->data = tpl_log_iter_get_events ( + TPL_LOG_ITER (async_data->fill_iter->data), CACHE_SIZE, &error); + + if (error != NULL) + { + g_simple_async_result_set_from_error (simple, error); + g_error_free (error); + } +} + + +static void +tpl_log_walker_fill_cache_async (TplLogWalker *walker, + GList *cache, + GList *iter, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + TplLogWalkerAsyncData *async_data; + + g_return_if_fail (TPL_IS_LOG_WALKER (walker)); + + async_data = tpl_log_walker_async_data_new (); + async_data->cb = callback; + async_data->user_data = user_data; + async_data->fill_cache = cache; + async_data->fill_iter = iter; + + simple = g_simple_async_result_new (G_OBJECT (walker), + tpl_log_walker_async_operation_cb, async_data, + tpl_log_walker_fill_cache_async); + + g_simple_async_result_run_in_thread (simple, + tpl_log_walker_fill_cache_async_thread, G_PRIORITY_DEFAULT, + NULL); + + g_object_unref (simple); +} + + +static gboolean +tpl_log_walker_fill_cache_finish (TplLogWalker *walker, + GAsyncResult *result, GError **error) { + GSimpleAsyncResult *simple; + + g_return_val_if_fail (TPL_IS_LOG_WALKER (walker), FALSE); + g_return_val_if_fail (g_simple_async_result_is_valid (result, + G_OBJECT (walker), tpl_log_walker_fill_cache_async), FALSE); + + simple = G_SIMPLE_ASYNC_RESULT (result); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; + + return TRUE; +} + + +static void +tpl_log_walker_get_events (GObject *source_object, + GAsyncResult *result, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + TplLogWalker *walker; TplLogWalkerPriv *priv; GList *events; + TplLogWalkerAsyncData *async_data; guint i; - g_return_val_if_fail (TPL_IS_LOG_WALKER (walker), NULL); - + walker = TPL_LOG_WALKER (source_object); priv = walker->priv; - events = NULL; - i = 0; - g_mutex_lock (&priv->mutex); + simple = G_SIMPLE_ASYNC_RESULT (user_data); + events = g_simple_async_result_get_op_res_gpointer (simple); + async_data = (TplLogWalkerAsyncData *) g_async_result_get_user_data ( + G_ASYNC_RESULT (simple)); + + /* If we are returning from a prior call to + * tpl_log_walker_fill_cache_async then finish it. + */ + if (result != NULL) + tpl_log_walker_fill_cache_finish (walker, result, NULL); if (priv->is_end == TRUE) goto out; - while (i < num_events && priv->is_end == FALSE) + i = g_list_length (events); + + while (i < async_data->num_events && priv->is_end == FALSE) { - GList *k; - GList *l; - GList **latest_cache; - GList *latest_event; - TplLogIter *latest_iter; - gint64 latest_timestamp; - - latest_cache = NULL; - latest_event = NULL; - latest_iter = NULL; - latest_timestamp = 0; - - for (k = priv->caches, l = priv->iters; - k != NULL && l != NULL; - k = g_list_next (k), l = g_list_next (l)) + GList *cache; + GList *iter; + + /* Continue the loop from where we left, or start from the + * beginning as the case maybe. + */ + + cache = (async_data->fill_cache != NULL) ? + async_data->fill_cache : priv->caches; + + iter = (async_data->fill_iter != NULL) ? + async_data->fill_iter : priv->iters; + + for (; cache != NULL && iter != NULL; + cache = g_list_next (cache), iter = g_list_next (iter)) { - GList **cache; GList *event; - TplLogIter *iter; gint64 timestamp; - cache = (GList **) &k->data; - iter = TPL_LOG_ITER (l->data); - - /* If the cache is empty, try to fill it up. */ - if (*cache == NULL) - *cache = tpl_log_iter_get_events (iter, CACHE_SIZE, error); - - /* If it could not be filled, then the store must be empty. */ - if (*cache == NULL) - continue; + if (cache->data == NULL) + { + /* If the cache could not be filled, then the store + * must be empty. + */ + if (cache == async_data->fill_cache) + continue; + + /* Otherwise, try to fill it up. */ + async_data->fill_cache = cache; + async_data->fill_iter = iter; + g_simple_async_result_set_op_res_gpointer (simple, events, NULL); + tpl_log_walker_fill_cache_async (walker, cache, iter, + tpl_log_walker_get_events, simple); + return; + } - event = g_list_last (*cache); + event = g_list_last (cache->data); timestamp = tpl_event_get_timestamp (TPL_EVENT (event->data)); - if (timestamp > latest_timestamp) + if (timestamp > async_data->latest_timestamp) { - latest_cache = cache; - latest_event = event; - latest_iter = iter; - latest_timestamp = timestamp; + async_data->latest_cache = cache; + async_data->latest_event = event; + async_data->latest_iter = iter; + async_data->latest_timestamp = timestamp; } } - if (latest_event != NULL) + /* These are used to maintain the continuity of the for loop + * which can get interrupted by the calls to + * tpl_log_walker_fill_cache_async(). Now that we are out of the + * loop we should reset them. + */ + async_data->fill_cache = NULL; + async_data->fill_iter = NULL; + async_data->latest_timestamp = 0; + + if (async_data->latest_event != NULL) { GList *h; TplLogWalkerHistoryData *data; - events = g_list_prepend (events, latest_event->data); - *latest_cache = g_list_delete_link (*latest_cache, latest_event); + events = g_list_prepend (events, async_data->latest_event->data); + async_data->latest_cache->data = g_list_delete_link ( + async_data->latest_cache->data, async_data->latest_event); i++; h = priv->history; if (h == NULL || - ((TplLogWalkerHistoryData *) h->data)->iter != latest_iter) + ((TplLogWalkerHistoryData *) h->data)->iter != + async_data->latest_iter->data) { data = tpl_log_walker_history_data_new (); - data->iter = g_object_ref (latest_iter); + data->iter = g_object_ref (async_data->latest_iter->data); priv->history = g_list_prepend (priv->history, data); } else data = (TplLogWalkerHistoryData *) h->data; data->count++; + + /* Now that the event has been inserted into the list we can + * forget about it. + */ + async_data->latest_event = NULL; } else priv->is_end = TRUE; @@ -354,32 +463,23 @@ tpl_log_walker_get_events (TplLogWalker *walker, out: g_mutex_unlock (&priv->mutex); - return events; + g_simple_async_result_set_op_res_gpointer (simple, events, NULL); + g_simple_async_result_complete (simple); + g_object_unref (simple); } -static void -tpl_log_walker_get_events_async_thread (GSimpleAsyncResult *simple, - GObject *object, - GCancellable *cancellable) +static gboolean +tpl_log_walker_get_events_async_idle (gpointer user_data) { - GError *error = NULL; - GList *events; - TplLogWalkerAsyncData *async_data; - - async_data = (TplLogWalkerAsyncData *) g_async_result_get_user_data ( - G_ASYNC_RESULT (simple)); - - events = tpl_log_walker_get_events (TPL_LOG_WALKER (object), - async_data->num_events, &error); + GSimpleAsyncResult *simple; + GObject *source_object; - if (error != NULL) - { - g_simple_async_result_set_from_error (simple, error); - g_error_free (error); - } + simple = G_SIMPLE_ASYNC_RESULT (user_data); + source_object = g_async_result_get_source_object (G_ASYNC_RESULT (simple)); + tpl_log_walker_get_events (source_object, NULL, simple); - g_simple_async_result_set_op_res_gpointer (simple, events, NULL); + return G_SOURCE_REMOVE; } @@ -584,11 +684,14 @@ tpl_log_walker_get_events_async (TplLogWalker *walker, tpl_log_walker_async_operation_cb, async_data, tpl_log_walker_get_events_async); - g_simple_async_result_run_in_thread (simple, - tpl_log_walker_get_events_async_thread, G_PRIORITY_DEFAULT, - NULL); + g_mutex_lock (&walker->priv->mutex); - g_object_unref (simple); + /* We should return to the main loop before doing anything because + * otherwise in certain cases (eg., num_events == 0), we might + * complete the whole operation without ever returning to the main + * loop. + */ + g_idle_add (tpl_log_walker_get_events_async_idle, simple); } -- cgit v1.2.1