From a5332e937e25d45cbfb3af90a557776a7280cbdb Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Fri, 17 Nov 2017 00:33:26 +0100 Subject: libtracker-direct: Rewrite in C In today's chapter of gratuituous rewrites. The instigator of this rewrite is vala's bug https://bugzilla.gnome.org/show_bug.cgi?id=789249. One might argue that bugs are bugs and eventually get fixed, but there's two things that make me think it won't happen soon: - Vala behavior of possibly iterating the main loop from the async task until the task is complete is very much deliberate in order to support the Generator pattern without a main loop, as seen at: https://wiki.gnome.org/Projects/Vala/AsyncSamples#Generator_example - OTOH, glib docs specify that a GAsyncReadyCallback must happen at a later iteration of the main context, presumably to ensure the task is not finished before the async function dispatching the task returns. This is precisely what trips Vala into starting the same task again. I don't see either changing anytime soon, and in the mean time Tracker is largely affected by it, both in perceived bugs (All those nie:url constraint warnings out of the blue had a reason, this) and in real bugs (It's sometimes attempting to insert things twice, and it may even succeed if the query does not break any cardinality/unique constraints). This affects Tracker in too fundamental ways to just shrug it away, unlike the Vala code this C/glib code works just as it looks. Now about the code... It's a pretty boring rewrite, there's a thread pool to dispatch select queries, and a single exclusive thread to handle updates. The WAL hook can possibly use an additional thread to perform non-blocking writes. All very much alike the older code. Future commits will make tracker-store use this connection implementation, so there's only this piece of code handling SPARQL updates. --- src/libtracker-direct/.gitignore | 3 - src/libtracker-direct/Makefile.am | 17 +- src/libtracker-direct/tracker-direct.c | 830 +++++++++++++++++++++++++++ src/libtracker-direct/tracker-direct.h | 52 ++ src/libtracker-direct/tracker-direct.vala | 405 ------------- src/libtracker-direct/tracker-direct.vapi | 10 + src/libtracker-direct/tracker-namespace.vala | 29 - 7 files changed, 893 insertions(+), 453 deletions(-) delete mode 100644 src/libtracker-direct/.gitignore create mode 100644 src/libtracker-direct/tracker-direct.c create mode 100644 src/libtracker-direct/tracker-direct.h delete mode 100644 src/libtracker-direct/tracker-direct.vala create mode 100644 src/libtracker-direct/tracker-direct.vapi delete mode 100644 src/libtracker-direct/tracker-namespace.vala diff --git a/src/libtracker-direct/.gitignore b/src/libtracker-direct/.gitignore deleted file mode 100644 index b55dd251a..000000000 --- a/src/libtracker-direct/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -tracker-namespace.c -tracker-direct.[ch] -tracker-direct*.vapi diff --git a/src/libtracker-direct/Makefile.am b/src/libtracker-direct/Makefile.am index 5e9a42f46..7960bae6c 100644 --- a/src/libtracker-direct/Makefile.am +++ b/src/libtracker-direct/Makefile.am @@ -1,16 +1,5 @@ noinst_LTLIBRARIES = libtracker-direct.la -AM_VALAFLAGS = \ - --includedir=libtracker-direct \ - --header tracker-direct.h \ - --vapi tracker-direct.vapi \ - --pkg gio-2.0 \ - $(BUILD_VALAFLAGS) \ - $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ - $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \ - $(top_srcdir)/src/libtracker-common/libtracker-common.vapi \ - $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi - AM_CPPFLAGS = \ $(BUILD_VALACFLAGS) \ -I$(top_srcdir)/src \ @@ -19,8 +8,7 @@ AM_CPPFLAGS = \ $(LIBTRACKER_DIRECT_CFLAGS) libtracker_direct_la_SOURCES = \ - tracker-namespace.vala \ - tracker-direct.vala + tracker-direct.c libtracker_direct_la_LIBADD = \ $(top_builddir)/src/libtracker-data/libtracker-data.la \ @@ -30,7 +18,4 @@ libtracker_direct_la_LIBADD = \ noinst_HEADERS = \ tracker-direct.h -BUILT_SOURCES = \ - libtracker_direct_la_vala.stamp - EXTRA_DIST = meson.build diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c new file mode 100644 index 000000000..7c3b42568 --- /dev/null +++ b/src/libtracker-direct/tracker-direct.c @@ -0,0 +1,830 @@ +/* + * Copyright (C) 2010, Nokia + * Copyright (C) 2017, Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "config.h" + +#include "tracker-direct.h" +#include + +typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate; + +struct _TrackerDirectConnectionPrivate +{ + TrackerSparqlConnectionFlags flags; + GFile *store; + GFile *journal; + GFile *ontology; + + TrackerNamespaceManager *namespace_manager; + TrackerDataManager *data_manager; + GMutex mutex; + + GThreadPool *update_thread; /* Contains 1 exclusive thread */ + GThreadPool *select_pool; + + guint initialized : 1; +}; + +enum { + PROP_0, + PROP_FLAGS, + PROP_STORE_LOCATION, + PROP_JOURNAL_LOCATION, + PROP_ONTOLOGY_LOCATION, + N_PROPS +}; + +static GParamSpec *props[N_PROPS] = { NULL }; + +typedef enum { + TASK_TYPE_QUERY, + TASK_TYPE_UPDATE, + TASK_TYPE_UPDATE_BLANK, + TASK_TYPE_TURTLE +} TaskType; + +typedef struct { + TaskType type; + union { + gchar *query; + GFile *turtle_file; + } data; +} TaskData; + +static void tracker_direct_connection_initable_iface_init (GInitableIface *iface); +static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface); + +G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection, + TRACKER_SPARQL_TYPE_CONNECTION, + G_ADD_PRIVATE (TrackerDirectConnection) + G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, + tracker_direct_connection_initable_iface_init) + G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE, + tracker_direct_connection_async_initable_iface_init)) + +static TaskData * +task_data_query_new (TaskType type, + const gchar *sparql) +{ + TaskData *data; + + g_assert (type != TASK_TYPE_TURTLE); + data = g_new0 (TaskData, 1); + data->type = type; + data->data.query = g_strdup (sparql); + + return data; +} + +static TaskData * +task_data_turtle_new (GFile *file) +{ + TaskData *data; + + data = g_new0 (TaskData, 1); + data->type = TASK_TYPE_TURTLE; + g_set_object (&data->data.turtle_file, file); + + return data; +} + +static void +task_data_free (TaskData *task) +{ + if (task->type == TASK_TYPE_TURTLE) + g_object_unref (task->data.turtle_file); + else + g_free (task->data.query); + + g_free (task); +} + +static void +update_thread_func (gpointer data, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + TrackerData *tracker_data; + GError *error = NULL; + gpointer retval = NULL; + GDestroyNotify destroy_notify = NULL; + + conn = user_data; + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + tracker_data = tracker_data_manager_get_data (priv->data_manager); + + switch (task_data->type) { + case TASK_TYPE_QUERY: + g_warning ("Queries don't go through this thread"); + break; + case TASK_TYPE_UPDATE: + tracker_data_update_sparql (tracker_data, task_data->data.query, &error); + break; + case TASK_TYPE_UPDATE_BLANK: + retval = tracker_data_update_sparql_blank (tracker_data, task_data->data.query, &error); + destroy_notify = (GDestroyNotify) g_variant_unref; + break; + case TASK_TYPE_TURTLE: + tracker_data_load_turtle_file (tracker_data, task_data->data.turtle_file, &error); + break; + } + + if (error) + g_task_return_error (task, error); + else if (retval) + g_task_return_pointer (task, retval, destroy_notify); + else + g_task_return_boolean (task, TRUE); + + g_mutex_unlock (&priv->mutex); +} + +static void +query_thread_pool_func (gpointer data, + gpointer user_data) +{ + TrackerSparqlCursor *cursor; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + GError *error = NULL; + + g_assert (task_data->type == TASK_TYPE_QUERY); + cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), + task_data->data.query, + g_task_get_cancellable (task), + &error); + if (cursor) + g_task_return_pointer (task, cursor, g_object_unref); + else + g_task_return_error (task, error); +} + +static void +wal_checkpoint (TrackerDBInterface *iface, + gboolean blocking) +{ + GError *error = NULL; + + g_debug ("Checkpointing database..."); + tracker_db_interface_sqlite_wal_checkpoint (iface, blocking, &error); + + if (error) { + g_warning ("Error in WAL checkpoint: %s", error->message); + g_error_free (error); + } + + g_debug ("Checkpointing complete"); +} + +static gpointer +wal_checkpoint_thread (gpointer data) +{ + TrackerDBInterface *wal_iface = data; + + wal_checkpoint (wal_iface, FALSE); + g_object_unref (wal_iface); + return NULL; +} + +static void +wal_hook (TrackerDBInterface *iface, + gint n_pages) +{ + TrackerDataManager *data_manager = tracker_db_interface_get_user_data (iface); + TrackerDBInterface *wal_iface = tracker_data_manager_get_wal_db_interface (data_manager); + + if (n_pages >= 10000) { + /* Do immediate checkpointing (blocking updates) to + * prevent excessive WAL file growth. + */ + wal_checkpoint (wal_iface, TRUE); + } else { + /* Defer non-blocking checkpoint to thread */ + g_thread_try_new ("wal-checkpoint", wal_checkpoint_thread, + g_object_ref (wal_iface), NULL); + } +} + +static gint +task_compare_func (GTask *a, + GTask *b, + gpointer user_data) +{ + return g_task_get_priority (b) - g_task_get_priority (a); +} + +static gboolean +tracker_direct_connection_initable_init (GInitable *initable, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES; + TrackerDBInterface *iface; + GHashTable *namespaces; + GHashTableIter iter; + gchar *prefix, *ns; + + conn = TRACKER_DIRECT_CONNECTION (initable); + priv = tracker_direct_connection_get_instance_private (conn); + + tracker_locale_sanity_check (); + + priv->select_pool = g_thread_pool_new (query_thread_pool_func, + conn, 16, FALSE, error); + if (!priv->select_pool) + return FALSE; + + priv->update_thread = g_thread_pool_new (update_thread_func, + conn, 1, TRUE, error); + if (!priv->update_thread) + return FALSE; + + g_thread_pool_set_sort_function (priv->select_pool, + (GCompareDataFunc) task_compare_func, + conn); + g_thread_pool_set_sort_function (priv->update_thread, + (GCompareDataFunc) task_compare_func, + conn); + + /* Init data manager */ + if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) + db_flags |= TRACKER_DB_MANAGER_READONLY; + + priv->data_manager = tracker_data_manager_new (db_flags, priv->store, + priv->journal, priv->ontology, + FALSE, FALSE, 100, 100); + if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, error)) + return FALSE; + + /* Set up WAL hook on our connection */ + iface = tracker_data_manager_get_writable_db_interface (priv->data_manager); + tracker_db_interface_sqlite_wal_hook (iface, wal_hook); + + /* Initialize namespace manager */ + priv->namespace_manager = tracker_namespace_manager_new (); + namespaces = tracker_data_manager_get_namespaces (priv->data_manager); + g_hash_table_iter_init (&iter, namespaces); + + while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) { + tracker_namespace_manager_add_prefix (priv->namespace_manager, + prefix, ns); + } + + return TRUE; +} + +static void +tracker_direct_connection_initable_iface_init (GInitableIface *iface) +{ + iface->init = tracker_direct_connection_initable_init; +} + +static void +async_initable_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GError *error = NULL; + + if (!g_initable_init (G_INITABLE (source_object), cancellable, &error)) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); +} + +static void +tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + task = g_task_new (async_initable, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_run_in_thread (task, async_initable_thread_func); +} + +static gboolean +tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (res), error); +} + +static void +tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface) +{ + iface->init_async = tracker_direct_connection_async_initable_init_async; + iface->init_finish = tracker_direct_connection_async_initable_init_finish; +} + +static void +tracker_direct_connection_init (TrackerDirectConnection *conn) +{ +} + +static void +tracker_direct_connection_finalize (GObject *object) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + if (priv->data_manager) { + TrackerDBInterface *wal_iface; + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); + } + + g_clear_object (&priv->store); + g_clear_object (&priv->journal); + g_clear_object (&priv->ontology); + g_clear_object (&priv->data_manager); + + G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object); +} + +static void +tracker_direct_connection_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + priv->flags = g_value_get_enum (value); + break; + case PROP_STORE_LOCATION: + priv->store = g_value_dup_object (value); + break; + case PROP_JOURNAL_LOCATION: + priv->journal = g_value_dup_object (value); + break; + case PROP_ONTOLOGY_LOCATION: + priv->ontology = g_value_dup_object (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +tracker_direct_connection_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + g_value_set_enum (value, priv->flags); + break; + case PROP_STORE_LOCATION: + g_value_set_object (value, priv->store); + break; + case PROP_JOURNAL_LOCATION: + g_value_set_object (value, priv->journal); + break; + case PROP_ONTOLOGY_LOCATION: + g_value_set_object (value, priv->ontology); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static TrackerSparqlCursor * +tracker_direct_connection_query (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerSparqlQuery *query; + TrackerSparqlCursor *cursor; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + query = tracker_sparql_query_new (priv->data_manager, sparql); + cursor = TRACKER_SPARQL_CURSOR (tracker_sparql_query_execute_cursor (query, error)); + if (cursor) + tracker_sparql_cursor_set_connection (cursor, self); + g_mutex_unlock (&priv->mutex); + + return cursor; +} + +static void +tracker_direct_connection_query_async (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GError *error = NULL; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_QUERY, sparql), + (GDestroyNotify) task_data_free); + + if (!g_thread_pool_push (priv->select_pool, task, &error)) + g_task_return_error (task, error); +} + +static TrackerSparqlCursor * +tracker_direct_connection_query_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_update (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_update_sparql (data, sparql, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_update_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_update_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_boolean (G_TASK (res), error); +} + +static void +error_free (GError *error) +{ + if (error) + g_error_free (error); +} + +static void +update_array_async_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + gchar **updates = task_data; + gchar *concatenated; + GPtrArray *errors; + GError *error = NULL; + gint i; + + errors = g_ptr_array_new_with_free_func ((GDestroyNotify) error_free); + g_ptr_array_set_size (errors, g_strv_length (updates)); + + /* Fast path, perform everything as a single update */ + concatenated = g_strjoinv (" ", updates); + tracker_sparql_connection_update (source_object, concatenated, + g_task_get_priority (task), + cancellable, &error); + + if (!error) { + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); + return; + } + + /* Slow path, perform updates one by one */ + for (i = 0; updates[i]; i++) { + GError *err = NULL; + + err = g_ptr_array_index (errors, i); + tracker_sparql_connection_update (source_object, updates[i], + g_task_get_priority (task), + cancellable, &err); + } + + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); +} + +static void +tracker_direct_connection_update_array_async (TrackerSparqlConnection *self, + gchar **updates, + gint n_updates, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gchar **copy; + gint i = 0, idx = 0; + + copy = g_new0 (gchar*, n_updates + 1); + + for (i = 0; i < n_updates; i++) { + g_return_if_fail (updates[i] != NULL); + copy[i] = g_strdup (updates[i]); + } + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, copy, (GDestroyNotify) g_strfreev); + + g_task_run_in_thread (task, update_array_async_thread_func); +} + +static GPtrArray * +tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static GVariant * +tracker_direct_connection_update_blank (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + GVariant *blank_nodes; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + blank_nodes = tracker_data_update_sparql_blank (data, sparql, error); + g_mutex_unlock (&priv->mutex); + + return blank_nodes; +} + +static void +tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static GVariant * +tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_load (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_load_turtle_file (data, file, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_load_async (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_turtle_new (file), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_load_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_pointer (G_TASK (res), error); +} + +static TrackerNamespaceManager * +tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self)); + + return priv->namespace_manager; +} + +static void +tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) +{ + TrackerSparqlConnectionClass *sparql_connection_class; + GObjectClass *object_class; + + object_class = G_OBJECT_CLASS (klass); + sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass); + + object_class->finalize = tracker_direct_connection_finalize; + object_class->set_property = tracker_direct_connection_set_property; + object_class->get_property = tracker_direct_connection_get_property; + + sparql_connection_class->query = tracker_direct_connection_query; + sparql_connection_class->query_async = tracker_direct_connection_query_async; + sparql_connection_class->query_finish = tracker_direct_connection_query_finish; + sparql_connection_class->update = tracker_direct_connection_update; + sparql_connection_class->update_async = tracker_direct_connection_update_async; + sparql_connection_class->update_finish = tracker_direct_connection_update_finish; + sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async; + sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish; + sparql_connection_class->update_blank = tracker_direct_connection_update_blank; + sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async; + sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish; + sparql_connection_class->load = tracker_direct_connection_load; + sparql_connection_class->load_async = tracker_direct_connection_load_async; + sparql_connection_class->load_finish = tracker_direct_connection_load_finish; + sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager; + + props[PROP_FLAGS] = + g_param_spec_enum ("flags", + "Flags", + "Flags", + TRACKER_SPARQL_TYPE_CONNECTION_FLAGS, + TRACKER_SPARQL_CONNECTION_FLAGS_NONE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_STORE_LOCATION] = + g_param_spec_object ("store-location", + "Store location", + "Store location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_JOURNAL_LOCATION] = + g_param_spec_object ("journal-location", + "Journal location", + "Journal location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_ONTOLOGY_LOCATION] = + g_param_spec_object ("ontology-location", + "Ontology location", + "Ontology location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + + g_object_class_install_properties (object_class, N_PROPS, props); +} + +TrackerDirectConnection * +tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, + GFile *store, + GFile *journal, + GFile *ontology, + GError **error) +{ + g_return_val_if_fail (G_IS_FILE (store), NULL); + g_return_val_if_fail (G_IS_FILE (journal), NULL); + g_return_val_if_fail (G_IS_FILE (ontology), NULL); + g_return_val_if_fail (!error || !*error, NULL); + + return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION, + "flags", flags, + "store-location", store, + "journal-location", journal, + "ontology-location", ontology, + NULL); +} diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h new file mode 100644 index 000000000..105602b2a --- /dev/null +++ b/src/libtracker-direct/tracker-direct.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2010, Nokia + * Copyright (C) 2017, Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __TRACKER_LOCAL_CONNECTION_H__ +#define __TRACKER_LOCAL_CONNECTION_H__ + +#include + +#define TRACKER_TYPE_DIRECT_CONNECTION (tracker_direct_connection_get_type()) +#define TRACKER_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnection)) +#define TRACKER_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass)) +#define TRACKER_IS_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TRACKER_TYPE_DIRECT_CONNECTION)) +#define TRACKER_IS_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), TRACKER_TYPE_DIRECT_CONNECTION)) +#define TRACKER_DIRECT_CONNECTION_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass)) + +typedef struct _TrackerDirectConnection TrackerDirectConnection; +typedef struct _TrackerDirectConnectionClass TrackerDirectConnectionClass; + +struct _TrackerDirectConnectionClass +{ + TrackerSparqlConnectionClass parent_class; +}; + +struct _TrackerDirectConnection +{ + TrackerSparqlConnection parent_instance; +}; + +TrackerDirectConnection *tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, + GFile *store, + GFile *journal, + GFile *ontology, + GError **error); + +#endif /* __TRACKER_LOCAL_CONNECTION_H__ */ diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala deleted file mode 100644 index b149b6806..000000000 --- a/src/libtracker-direct/tracker-direct.vala +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Copyright (C) 2010, Nokia - * Copyright (C) 2017, Red Hat, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -public class Tracker.Direct.Connection : Tracker.Sparql.Connection, AsyncInitable, Initable { - File? database_loc; - File? journal_loc; - File? ontology_loc; - Sparql.ConnectionFlags flags; - - Data.Manager data_manager; - - // Mutex to hold datamanager - private Mutex mutex = Mutex (); - Thread thread; - - // Initialization stuff, both sync and async - private Mutex init_mutex = Mutex (); - private Cond init_cond = Cond (); - private bool initialized; - private Error init_error; - public SourceFunc init_callback; - - private AsyncQueue update_queue; - private NamespaceManager namespace_manager; - - [CCode (cname = "SHAREDIR")] - extern const string SHAREDIR; - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } - - abstract class Task { - public TaskType type; - public int priority; - public Cancellable? cancellable; - public SourceFunc callback; - public Error error; - } - - private class UpdateTask : Task { - public string sparql; - public Variant blank_nodes; - - private void set (TaskType type, string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable = null) { - this.type = type; - this.sparql = sparql; - this.priority = priority; - this.cancellable = cancellable; - } - - public UpdateTask (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) { - this.set (TaskType.UPDATE, sparql, priority, cancellable); - } - - public UpdateTask.blank (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) { - this.set (TaskType.UPDATE_BLANK, sparql, priority, cancellable); - } - } - - private class TurtleTask : Task { - public File file; - - public TurtleTask (File file, Cancellable? cancellable) { - this.type = TaskType.TURTLE; - this.file = file; - this.priority = Priority.DEFAULT; - this.cancellable = cancellable; - } - } - - static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static void wal_checkpoint_on_thread (DBInterface iface) { - new Thread ("wal-checkpoint", () => { - wal_checkpoint (iface, false); - return null; - }); - } - - static void wal_hook (DBInterface iface, int n_pages) { - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000) { - wal_checkpoint_on_thread (wal_iface); - } - } - - private void* thread_func () { - init_mutex.lock (); - - try { - Locale.sanity_check (); - DBManagerFlags db_flags = DBManagerFlags.ENABLE_MUTEXES; - if ((flags & Sparql.ConnectionFlags.READONLY) != 0) - db_flags |= DBManagerFlags.READONLY; - - data_manager = new Data.Manager (db_flags, - database_loc, journal_loc, ontology_loc, - false, false, 100, 100); - data_manager.init (); - - var iface = data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - } catch (Error e) { - init_error = e; - } finally { - if (init_callback != null) { - init_callback (); - } else { - initialized = true; - init_cond.signal (); - init_mutex.unlock (); - } - } - - while (true) { - var task = update_queue.pop(); - - try { - switch (task.type) { - case TaskType.UPDATE: - UpdateTask update_task = (UpdateTask) task; - update (update_task.sparql, update_task.priority, update_task.cancellable); - break; - case TaskType.UPDATE_BLANK: - UpdateTask update_task = (UpdateTask) task; - update_task.blank_nodes = update_blank (update_task.sparql, update_task.priority, update_task.cancellable); - break; - case TaskType.TURTLE: - TurtleTask turtle_task = (TurtleTask) task; - load (turtle_task.file, turtle_task.cancellable); - break; - default: - break; - } - } catch (Error e) { - task.error = e; - } - - task.callback (); - } - } - - public async bool init_async (int io_priority, Cancellable? cancellable) throws Error { - init_callback = init_async.callback; - thread = new Thread ("database", thread_func); - - return initialized; - } - - public bool init (Cancellable? cancellable) throws Error { - try { - thread = new Thread ("database", thread_func); - - init_mutex.lock (); - while (!initialized) - init_cond.wait(init_mutex); - init_mutex.unlock (); - - if (init_error != null) - throw init_error; - } catch (Error e) { - throw new Sparql.Error.INTERNAL (e.message); - } - - return true; - } - - public Connection (Sparql.ConnectionFlags connection_flags, File loc, File? journal, File? ontology) throws Sparql.Error, IOError, DBusError { - database_loc = loc; - journal_loc = journal; - ontology_loc = ontology; - flags = connection_flags; - - if (journal_loc == null) - journal_loc = database_loc; - if (ontology_loc == null) - ontology_loc = File.new_for_path (Path.build_filename (SHAREDIR, "tracker", "ontologies", "nepomuk")); - - update_queue = new AsyncQueue (); - } - - public override void dispose () { - data_manager.shutdown (); - base.dispose (); - } - - Sparql.Cursor query_unlocked (string sparql) throws Sparql.Error, DBusError { - try { - var query_object = new Sparql.Query (data_manager, sparql); - var cursor = query_object.execute_cursor (); - cursor.connection = this; - return cursor; - } catch (DBInterfaceError e) { - throw new Sparql.Error.INTERNAL (e.message); - } catch (DateError e) { - throw new Sparql.Error.PARSE (e.message); - } - } - - public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError { - // Check here for early cancellation, just in case - // the operation can be entirely avoided - if (cancellable != null && cancellable.is_cancelled ()) { - throw new IOError.CANCELLED ("Operation was cancelled"); - } - - mutex.lock (); - try { - return query_unlocked (sparql); - } finally { - mutex.unlock (); - } - } - - public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError { - // run in a separate thread - Sparql.Error sparql_error = null; - IOError io_error = null; - DBusError dbus_error = null; - GLib.Error error = null; - Sparql.Cursor result = null; - var context = MainContext.get_thread_default (); - - IOSchedulerJob.push ((job, cancellable) => { - try { - result = query (sparql, cancellable); - } catch (IOError e_io) { - io_error = e_io; - } catch (Sparql.Error e_spql) { - sparql_error = e_spql; - } catch (DBusError e_dbus) { - dbus_error = e_dbus; - } catch (GLib.Error e) { - error = e; - } - - context.invoke (() => { - query_async.callback (); - return false; - }); - - return false; - }, Priority.DEFAULT, cancellable); - - yield; - - if (cancellable != null && cancellable.is_cancelled ()) { - throw new IOError.CANCELLED ("Operation was cancelled"); - } else if (sparql_error != null) { - throw sparql_error; - } else if (io_error != null) { - throw io_error; - } else if (dbus_error != null) { - throw dbus_error; - } else { - return result; - } - } - - public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - mutex.lock (); - try { - var data = data_manager.get_data (); - data.update_sparql (sparql); - } finally { - mutex.unlock (); - } - } - - public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - var task = new UpdateTask (sparql, priority, cancellable); - task.callback = update_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw task.error; - } - - public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - GLib.Variant? blank_nodes = null; - mutex.lock (); - try { - var data = data_manager.get_data (); - blank_nodes = data.update_sparql_blank (sparql); - } finally { - mutex.unlock (); - } - - return blank_nodes; - } - - public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - var task = new UpdateTask.blank (sparql, priority, cancellable); - task.callback = update_blank_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw task.error; - - return task.blank_nodes; - } - - public async override GenericArray? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { - var combined_query = new StringBuilder (); - var n_updates = sparql.length; - int i; - - for (i = 0; i < n_updates; i++) - combined_query.append (sparql[i]); - - var task = new UpdateTask (combined_query.str, priority, cancellable); - task.callback = update_array_async.callback; - update_queue.push (task); - yield; - - var errors = new GenericArray (n_updates); - - if (task.error == null) { - for (i = 0; i < n_updates; i++) - errors.add (null); - } else { - // combined query was not successful, try queries one by one - for (i = 0; i < n_updates; i++) { - try { - yield update_async (sparql[i], priority, cancellable); - errors.add (null); - } catch (Sparql.Error e) { - errors.add (e); - } - } - } - - return errors; - } - - public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { - mutex.lock (); - try { - var data = data_manager.get_data (); - data.load_turtle_file (file); - } finally { - mutex.unlock (); - } - } - - public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { - var task = new TurtleTask (file, cancellable); - task.callback = load_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw new Sparql.Error.INTERNAL (task.error.message); - } - - public override NamespaceManager? get_namespace_manager () { - if (namespace_manager == null && data_manager != null) { - var ht = data_manager.get_namespaces (); - namespace_manager = new NamespaceManager (); - - foreach (var prefix in ht.get_keys ()) { - namespace_manager.add_prefix (prefix, ht.lookup (prefix)); - } - } - - return namespace_manager; - } -} diff --git a/src/libtracker-direct/tracker-direct.vapi b/src/libtracker-direct/tracker-direct.vapi new file mode 100644 index 000000000..d1678f353 --- /dev/null +++ b/src/libtracker-direct/tracker-direct.vapi @@ -0,0 +1,10 @@ +[CCode (cprefix = "Tracker", gir_namespace = "Tracker", gir_version = "2.0", lower_case_cprefix = "tracker_")] +namespace Tracker { + namespace Direct { + [CCode (cheader_filename = "libtracker-direct/tracker-direct.h")] + public class Connection : Tracker.Sparql.Connection, GLib.Initable, GLib.AsyncInitable { + public Connection (Tracker.Sparql.ConnectionFlags connection_flags, GLib.File loc, GLib.File? journal, GLib.File? ontology) throws Tracker.Sparql.Error, GLib.IOError, GLib.DBusError; + public Tracker.Data.Manager get_data_manager (); + } + } +} diff --git a/src/libtracker-direct/tracker-namespace.vala b/src/libtracker-direct/tracker-namespace.vala deleted file mode 100644 index 24d7b2ee8..000000000 --- a/src/libtracker-direct/tracker-namespace.vala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright © 2015 Collabora Ltd. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -/* - * This file serves as the representation for the Tracker namespace, mostly - * so that we can set its namespace and version attributes for GIR. - */ - -[CCode (cprefix = "TrackerDirect", gir_namespace = "TrackerDirect", - gir_version = "2.0", lower_case_cprefix = "tracker_direct_")] -namespace Tracker -{ -} -- cgit v1.2.1 From da5283d94215646d0a8ccdb270b4b4adcb815969 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 18 Nov 2017 15:35:00 +0100 Subject: libtracker-direct: Add internal TrackerDataManager getter This will make internal users able to access all the gory details that TrackerDataManager has to offer. Will help deduplicate code in tracker-store that is essentially the same than this. --- src/libtracker-direct/tracker-direct.c | 9 +++++++++ src/libtracker-direct/tracker-direct.h | 3 +++ src/libtracker-sparql-backend/Makefile.am | 1 + 3 files changed, 13 insertions(+) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c index 7c3b42568..c32b94f2d 100644 --- a/src/libtracker-direct/tracker-direct.c +++ b/src/libtracker-direct/tracker-direct.c @@ -828,3 +828,12 @@ tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, "ontology-location", ontology, NULL); } + +TrackerDataManager * +tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (conn); + return priv->data_manager; +} diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h index 105602b2a..13da42243 100644 --- a/src/libtracker-direct/tracker-direct.h +++ b/src/libtracker-direct/tracker-direct.h @@ -22,6 +22,7 @@ #define __TRACKER_LOCAL_CONNECTION_H__ #include +#include #define TRACKER_TYPE_DIRECT_CONNECTION (tracker_direct_connection_get_type()) #define TRACKER_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnection)) @@ -49,4 +50,6 @@ TrackerDirectConnection *tracker_direct_connection_new (TrackerSparqlConnectionF GFile *ontology, GError **error); +TrackerDataManager *tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn); + #endif /* __TRACKER_LOCAL_CONNECTION_H__ */ diff --git a/src/libtracker-sparql-backend/Makefile.am b/src/libtracker-sparql-backend/Makefile.am index 9a31d98d6..6fe17dccb 100644 --- a/src/libtracker-sparql-backend/Makefile.am +++ b/src/libtracker-sparql-backend/Makefile.am @@ -5,6 +5,7 @@ AM_VALAFLAGS = \ $(BUILD_VALAFLAGS) \ $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \ $(top_srcdir)/src/libtracker-bus/tracker-bus.vapi \ + $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \ $(top_srcdir)/src/libtracker-remote/tracker-remote.vapi \ $(top_srcdir)/src/libtracker-common/libtracker-common.vapi -- cgit v1.2.1 From 0a4c92134288217341e670db5cf0f10cfb1bf4b6 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sun, 19 Nov 2017 16:45:34 +0100 Subject: tracker-store: Do not freeze events during TTL load --- src/tracker-store/tracker-events.c | 19 ------------------- src/tracker-store/tracker-events.h | 1 - src/tracker-store/tracker-events.vapi | 1 - src/tracker-store/tracker-store.vala | 7 +------ 4 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c index 199dced29..870084709 100644 --- a/src/tracker-store/tracker-events.c +++ b/src/tracker-store/tracker-events.c @@ -27,7 +27,6 @@ #include "tracker-events.h" typedef struct { - gboolean frozen; guint total; GPtrArray *notify_classes; } EventsPrivate; @@ -64,10 +63,6 @@ tracker_events_add_insert (gint graph_id, g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); - if (private->frozen) { - return; - } - for (i = 0; i < rdf_types->len; i++) { if (tracker_class_get_notify (rdf_types->pdata[i])) { tracker_class_add_insert_event (rdf_types->pdata[i], @@ -94,10 +89,6 @@ tracker_events_add_delete (gint graph_id, g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); - if (private->frozen) { - return; - } - for (i = 0; i < rdf_types->len; i++) { if (tracker_class_get_notify (rdf_types->pdata[i])) { tracker_class_add_delete_event (rdf_types->pdata[i], @@ -122,16 +113,6 @@ tracker_events_reset_pending (void) tracker_class_reset_pending_events (class); } - - private->frozen = FALSE; -} - -void -tracker_events_freeze (void) -{ - g_return_if_fail (private != NULL); - - private->frozen = TRUE; } static void diff --git a/src/tracker-store/tracker-events.h b/src/tracker-store/tracker-events.h index df9322ff1..1e962b90c 100644 --- a/src/tracker-store/tracker-events.h +++ b/src/tracker-store/tracker-events.h @@ -48,7 +48,6 @@ void tracker_events_add_delete (gint graph_id, GPtrArray *rdf_types); guint tracker_events_get_total (gboolean and_reset); void tracker_events_reset_pending (void); -void tracker_events_freeze (void); TrackerClass** tracker_events_get_classes (guint *length); G_END_DECLS diff --git a/src/tracker-store/tracker-events.vapi b/src/tracker-store/tracker-events.vapi index c232d8af5..fdd7b8af4 100644 --- a/src/tracker-store/tracker-events.vapi +++ b/src/tracker-store/tracker-events.vapi @@ -26,7 +26,6 @@ namespace Tracker { public void add_delete (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); public uint get_total (bool and_reset); public void reset_pending (); - public void freeze (); public unowned Class[] get_classes (); } } diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index 1f58256e8..bcc776180 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -233,12 +233,7 @@ public class Tracker.Store { var file = File.new_for_path (turtle_task.path); - Tracker.Events.freeze (); - try { - data.load_turtle_file (file); - } finally { - Tracker.Events.reset_pending (); - } + data.load_turtle_file (file); } } } catch (Error e) { -- cgit v1.2.1 From ed8e9870466591b14a18e4e1e189afbd4ba91ccd Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sun, 19 Nov 2017 20:36:10 +0100 Subject: libtracker-data: Remove TRACKER_DB_MANAGER_REMOVE_CACHE flag It's unused and unhandled. As the TrackerDBManager type is internal API, just remove the flag and shuffle the numbers. --- src/libtracker-data/libtracker-data.vapi | 1 - src/libtracker-data/tracker-db-manager.h | 10 ++++------ src/tracker-store/tracker-main.vala | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi index a9e446cd7..9876d6a7a 100644 --- a/src/libtracker-data/libtracker-data.vapi +++ b/src/libtracker-data/libtracker-data.vapi @@ -78,7 +78,6 @@ namespace Tracker { [CCode (cprefix = "TRACKER_DB_MANAGER_", cheader_filename = "libtracker-data/tracker-db-manager.h")] public enum DBManagerFlags { FORCE_REINDEX, - REMOVE_CACHE, REMOVE_ALL, READONLY, DO_NOT_CHECK_ONTOLOGY, diff --git a/src/libtracker-data/tracker-db-manager.h b/src/libtracker-data/tracker-db-manager.h index 95a71cfdb..a41b2ff53 100644 --- a/src/libtracker-data/tracker-db-manager.h +++ b/src/libtracker-data/tracker-db-manager.h @@ -35,12 +35,10 @@ G_BEGIN_DECLS typedef enum { TRACKER_DB_MANAGER_FORCE_REINDEX = 1 << 1, - TRACKER_DB_MANAGER_REMOVE_CACHE = 1 << 2, - /* 1 << 3 Was low mem mode */ - TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 4, - TRACKER_DB_MANAGER_READONLY = 1 << 5, - TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 6, - TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 7, + TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 2, + TRACKER_DB_MANAGER_READONLY = 1 << 3, + TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 4, + TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 5, } TrackerDBManagerFlags; typedef struct _TrackerDBManager TrackerDBManager; diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index 8e6d8895d..187ff6908 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -247,7 +247,7 @@ License which can be viewed at: config_verbosity_changed_cb (config, null); ulong config_verbosity_id = config.notify["verbosity"].connect (config_verbosity_changed_cb); - DBManagerFlags flags = DBManagerFlags.REMOVE_CACHE; + DBManagerFlags flags = 0; if (force_reindex) { /* TODO port backup support -- cgit v1.2.1 From cb3ef04c5c7389fafbc0625c4aa89530b21f5add Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sun, 19 Nov 2017 20:59:31 +0100 Subject: libtracker-direct: Add internal function to set additional DBManager flags This will be useful for tracker-store, and the flags that make sense there don't make as much sense to add to the public TrackerSparqlConnectionFlags set. --- src/libtracker-direct/tracker-direct.c | 10 +++++++++- src/libtracker-direct/tracker-direct.h | 2 ++ src/libtracker-direct/tracker-direct.vapi | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c index c32b94f2d..8923b8da3 100644 --- a/src/libtracker-direct/tracker-direct.c +++ b/src/libtracker-direct/tracker-direct.c @@ -23,6 +23,8 @@ #include "tracker-direct.h" #include +static TrackerDBManagerFlags default_flags = 0; + typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate; struct _TrackerDirectConnectionPrivate @@ -274,7 +276,7 @@ tracker_direct_connection_initable_init (GInitable *initable, if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) db_flags |= TRACKER_DB_MANAGER_READONLY; - priv->data_manager = tracker_data_manager_new (db_flags, priv->store, + priv->data_manager = tracker_data_manager_new (db_flags | default_flags, priv->store, priv->journal, priv->ontology, FALSE, FALSE, 100, 100); if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, error)) @@ -837,3 +839,9 @@ tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn) priv = tracker_direct_connection_get_instance_private (conn); return priv->data_manager; } + +void +tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags) +{ + default_flags = flags; +} diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h index 13da42243..b8c1392b8 100644 --- a/src/libtracker-direct/tracker-direct.h +++ b/src/libtracker-direct/tracker-direct.h @@ -52,4 +52,6 @@ TrackerDirectConnection *tracker_direct_connection_new (TrackerSparqlConnectionF TrackerDataManager *tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn); +void tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags); + #endif /* __TRACKER_LOCAL_CONNECTION_H__ */ diff --git a/src/libtracker-direct/tracker-direct.vapi b/src/libtracker-direct/tracker-direct.vapi index d1678f353..79b293a5d 100644 --- a/src/libtracker-direct/tracker-direct.vapi +++ b/src/libtracker-direct/tracker-direct.vapi @@ -5,6 +5,7 @@ namespace Tracker { public class Connection : Tracker.Sparql.Connection, GLib.Initable, GLib.AsyncInitable { public Connection (Tracker.Sparql.ConnectionFlags connection_flags, GLib.File loc, GLib.File? journal, GLib.File? ontology) throws Tracker.Sparql.Error, GLib.IOError, GLib.DBusError; public Tracker.Data.Manager get_data_manager (); + public static void set_default_flags (Tracker.DBManagerFlags flags); } } } -- cgit v1.2.1 From 05383d974bfa9c557171ad1101600abb4ca2ae77 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Tue, 21 Nov 2017 00:30:50 +0100 Subject: libtracker-direct: Add sync() call This will flush all updates and trigger the WAL hook. --- src/libtracker-direct/tracker-direct.c | 61 +++++++++++++++++++++++++------ src/libtracker-direct/tracker-direct.h | 2 + src/libtracker-direct/tracker-direct.vapi | 1 + 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c index 8923b8da3..572fdb0c8 100644 --- a/src/libtracker-direct/tracker-direct.c +++ b/src/libtracker-direct/tracker-direct.c @@ -238,23 +238,13 @@ task_compare_func (GTask *a, } static gboolean -tracker_direct_connection_initable_init (GInitable *initable, - GCancellable *cancellable, - GError **error) +set_up_thread_pools (TrackerDirectConnection *conn, + GError **error) { TrackerDirectConnectionPrivate *priv; - TrackerDirectConnection *conn; - TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES; - TrackerDBInterface *iface; - GHashTable *namespaces; - GHashTableIter iter; - gchar *prefix, *ns; - conn = TRACKER_DIRECT_CONNECTION (initable); priv = tracker_direct_connection_get_instance_private (conn); - tracker_locale_sanity_check (); - priv->select_pool = g_thread_pool_new (query_thread_pool_func, conn, 16, FALSE, error); if (!priv->select_pool) @@ -271,6 +261,29 @@ tracker_direct_connection_initable_init (GInitable *initable, g_thread_pool_set_sort_function (priv->update_thread, (GCompareDataFunc) task_compare_func, conn); + return TRUE; +} + +static gboolean +tracker_direct_connection_initable_init (GInitable *initable, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES; + TrackerDBInterface *iface; + GHashTable *namespaces; + GHashTableIter iter; + gchar *prefix, *ns; + + conn = TRACKER_DIRECT_CONNECTION (initable); + priv = tracker_direct_connection_get_instance_private (conn); + + tracker_locale_sanity_check (); + + if (!set_up_thread_pools (conn, error)) + return FALSE; /* Init data manager */ if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) @@ -845,3 +858,27 @@ tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags) { default_flags = flags; } + +void +tracker_direct_connection_sync (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDBInterface *wal_iface; + + priv = tracker_direct_connection_get_instance_private (conn); + + if (!priv->data_manager) + return; + + /* Wait for pending updates. */ + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + /* Selects are less important, readonly interfaces won't be bothersome */ + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + set_up_thread_pools (conn, NULL); + + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); +} diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h index b8c1392b8..d1ebfd6ee 100644 --- a/src/libtracker-direct/tracker-direct.h +++ b/src/libtracker-direct/tracker-direct.h @@ -54,4 +54,6 @@ TrackerDataManager *tracker_direct_connection_get_data_manager (TrackerDirectCon void tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags); +void tracker_direct_connection_sync (TrackerDirectConnection *conn); + #endif /* __TRACKER_LOCAL_CONNECTION_H__ */ diff --git a/src/libtracker-direct/tracker-direct.vapi b/src/libtracker-direct/tracker-direct.vapi index 79b293a5d..df15c5890 100644 --- a/src/libtracker-direct/tracker-direct.vapi +++ b/src/libtracker-direct/tracker-direct.vapi @@ -5,6 +5,7 @@ namespace Tracker { public class Connection : Tracker.Sparql.Connection, GLib.Initable, GLib.AsyncInitable { public Connection (Tracker.Sparql.ConnectionFlags connection_flags, GLib.File loc, GLib.File? journal, GLib.File? ontology) throws Tracker.Sparql.Error, GLib.IOError, GLib.DBusError; public Tracker.Data.Manager get_data_manager (); + public void sync (); public static void set_default_flags (Tracker.DBManagerFlags flags); } } -- cgit v1.2.1 From 7b25b9a25dff1227de91808d8e6e9eafcfaf0603 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Tue, 21 Nov 2017 01:28:03 +0100 Subject: libtracker-data: Drop CommitType This basically exists to allow deferring GraphUpdated signals while there's pending batch updates. This is arguably wrong, the priorities should totally affect the order in which updates are processed, but for the sake of interactivity once the data is in the database it makes sense to let the users know ASAP. Now all commits shall set up a timer for GraphUpdated emission is none is set yet. --- src/libtracker-data/libtracker-data.vapi | 11 ++--------- src/libtracker-data/tracker-data-update.c | 7 +++---- src/libtracker-data/tracker-data-update.h | 12 ++---------- src/tracker-store/tracker-resources.vala | 32 ++++--------------------------- src/tracker-store/tracker-store.vala | 27 ++------------------------ 5 files changed, 13 insertions(+), 76 deletions(-) diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi index 9876d6a7a..2daae8061 100644 --- a/src/libtracker-data/libtracker-data.vapi +++ b/src/libtracker-data/libtracker-data.vapi @@ -180,17 +180,10 @@ namespace Tracker { } public delegate void StatementCallback (int graph_id, string? graph, int subject_id, string subject, int predicate_id, int object_id, string object, GLib.PtrArray rdf_types); - public delegate void CommitCallback (Data.Update.CommitType commit_type); + public delegate void CommitCallback (); [CCode (lower_case_cprefix="tracker_data_", cname = "TrackerData", cheader_filename = "libtracker-data/tracker-data-query.h,libtracker-data/tracker-data-update.h")] public class Data.Update : GLib.Object { - [CCode (cprefix = "TRACKER_DATA_COMMIT_")] - public enum CommitType { - REGULAR, - BATCH, - BATCH_LAST - } - public void begin_db_transaction (); public void commit_db_transaction (); public void begin_transaction () throws DBInterfaceError; @@ -199,7 +192,7 @@ namespace Tracker { public void update_sparql (string update) throws Sparql.Error; public GLib.Variant update_sparql_blank (string update) throws Sparql.Error; public void load_turtle_file (GLib.File file) throws Sparql.Error; - public void notify_transaction (CommitType commit_type); + public void notify_transaction (); public void delete_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; public void update_statement (string? graph, string subject, string predicate, string? object) throws Sparql.Error, DateError; public void insert_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c index eb58caafd..e552b10ce 100644 --- a/src/libtracker-data/tracker-data-update.c +++ b/src/libtracker-data/tracker-data-update.c @@ -3628,15 +3628,14 @@ tracker_data_commit_transaction (TrackerData *data, } void -tracker_data_notify_transaction (TrackerData *data, - TrackerDataCommitType commit_type) +tracker_data_notify_transaction (TrackerData *data) { if (data->commit_callbacks) { guint n; for (n = 0; n < data->commit_callbacks->len; n++) { TrackerCommitDelegate *delegate; delegate = g_ptr_array_index (data->commit_callbacks, n); - delegate->callback (commit_type, delegate->user_data); + delegate->callback (delegate->user_data); } } } @@ -3679,7 +3678,7 @@ tracker_data_rollback_transaction (TrackerData *data) for (n = 0; n < data->rollback_callbacks->len; n++) { TrackerCommitDelegate *delegate; delegate = g_ptr_array_index (data->rollback_callbacks, n); - delegate->callback (TRUE, delegate->user_data); + delegate->callback (delegate->user_data); } } } diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h index 75444875c..640df408a 100644 --- a/src/libtracker-data/tracker-data-update.h +++ b/src/libtracker-data/tracker-data-update.h @@ -47,12 +47,6 @@ typedef struct _TrackerDataClass TrackerDataClass; typedef struct _TrackerData TrackerData; typedef struct _TrackerDataClass TrackerDataClass; -typedef enum { - TRACKER_DATA_COMMIT_REGULAR, - TRACKER_DATA_COMMIT_BATCH, - TRACKER_DATA_COMMIT_BATCH_LAST -} TrackerDataCommitType; - typedef struct _TrackerData TrackerData; typedef struct _TrackerData TrackerDataUpdate; @@ -65,8 +59,7 @@ typedef void (*TrackerStatementCallback) (gint graph_id, const gchar *object, GPtrArray *rdf_types, gpointer user_data); -typedef void (*TrackerCommitCallback) (TrackerDataCommitType commit_type, - gpointer user_data); +typedef void (*TrackerCommitCallback) (gpointer user_data); GQuark tracker_data_error_quark (void); @@ -110,8 +103,7 @@ void tracker_data_begin_transaction_for_replay (TrackerData * GError **error); void tracker_data_commit_transaction (TrackerData *data, GError **error); -void tracker_data_notify_transaction (TrackerData *data, - TrackerDataCommitType commit_type); +void tracker_data_notify_transaction (TrackerData *data); void tracker_data_rollback_transaction (TrackerData *data); void tracker_data_update_sparql (TrackerData *data, const gchar *update, diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index ed34a5f56..fe805e196 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -51,7 +51,6 @@ public class Tracker.Resources : Object { DBusConnection connection; uint signal_timeout; - bool regular_commit_pending; Tracker.Config config; public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects); @@ -268,49 +267,26 @@ public class Tracker.Resources : Object { Tracker.Writeback.reset_ready (); - regular_commit_pending = false; signal_timeout = 0; return false; } - void on_statements_committed (Tracker.Data.Update.CommitType commit_type) { + void on_statements_committed () { /* Class signal feature */ foreach (var cl in Tracker.Events.get_classes ()) { cl.transact_events (); } - if (!regular_commit_pending) { - // never cancel timeout for non-batch commits as we want - // to ensure that the signal corresponding to a certain - // update arrives within a fixed time limit - - // cancel it in all other cases - // in the BATCH_LAST case, the timeout will be reenabled - // further down but it's important to cancel it first - // to reset the timeout to 1 s starting now - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } - } - - if (commit_type == Tracker.Data.Update.CommitType.REGULAR) { - regular_commit_pending = true; - } - - if (regular_commit_pending || commit_type == Tracker.Data.Update.CommitType.BATCH_LAST) { - // timer wanted for non-batch commits and the last in a series of batch commits - if (signal_timeout == 0) { - signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); - } + if (signal_timeout == 0) { + signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); } /* Writeback feature */ Tracker.Writeback.transact (); } - void on_statements_rolled_back (Tracker.Data.Update.CommitType commit_type) { + void on_statements_rolled_back () { Tracker.Events.reset_pending (); Tracker.Writeback.reset_pending (); } diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index bcc776180..a555c67b0 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -138,29 +138,6 @@ public class Tracker.Store { } } - static Tracker.Data.Update.CommitType commit_type (Task task) { - switch (task.type) { - case TaskType.UPDATE: - case TaskType.UPDATE_BLANK: - if (((UpdateTask) task).priority == Priority.HIGH) { - return Tracker.Data.Update.CommitType.REGULAR; - } else if (update_queues[Priority.LOW].get_length () > 0) { - return Tracker.Data.Update.CommitType.BATCH; - } else { - return Tracker.Data.Update.CommitType.BATCH_LAST; - } - case TaskType.TURTLE: - if (update_queues[Priority.TURTLE].get_length () > 0) { - return Tracker.Data.Update.CommitType.BATCH; - } else { - return Tracker.Data.Update.CommitType.BATCH_LAST; - } - default: - warn_if_reached (); - return Tracker.Data.Update.CommitType.REGULAR; - } - } - static bool task_finish_cb (Task task) { var data = task.data_manager.get_data (); @@ -180,7 +157,7 @@ public class Tracker.Store { n_queries_running--; } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { if (task.error == null) { - data.notify_transaction (commit_type (task)); + data.notify_transaction (); } task.callback (); @@ -189,7 +166,7 @@ public class Tracker.Store { update_running = false; } else if (task.type == TaskType.TURTLE) { if (task.error == null) { - data.notify_transaction (commit_type (task)); + data.notify_transaction (); } task.callback (); -- cgit v1.2.1 From dffa19e78aa185db3bc3068a43e0035f13a5c83e Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Fri, 24 Nov 2017 01:02:53 +0100 Subject: libtracker-data: Move TrackerClass event maintenance to tracker-store This is solely used by tracker-store to keep the backlog of pending GraphUpdated events. This event tracking can move to tracker-store itself, implemented atop libtracker-data's insert/delete/commit/rollback callbacks. --- src/libtracker-data/tracker-class.c | 279 ------------------------- src/libtracker-data/tracker-class.h | 29 --- src/tracker-store/tracker-backup.vala | 4 +- src/tracker-store/tracker-events.c | 345 +++++++++++++++++++++++++------ src/tracker-store/tracker-events.h | 22 +- src/tracker-store/tracker-events.vapi | 13 +- src/tracker-store/tracker-main.vala | 2 +- src/tracker-store/tracker-resources.vala | 57 +++-- 8 files changed, 339 insertions(+), 412 deletions(-) diff --git a/src/libtracker-data/tracker-class.c b/src/libtracker-data/tracker-class.c index 6c9754e70..f216c8e94 100644 --- a/src/libtracker-data/tracker-class.c +++ b/src/libtracker-data/tracker-class.c @@ -49,27 +49,6 @@ struct _TrackerClassPrivate { GArray *last_super_classes; TrackerOntologies *ontologies; - - struct { - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } pending; - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } ready; - } deletes; - struct { - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } pending; - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } ready; - } inserts; }; static void class_finalize (GObject *object); @@ -99,16 +78,6 @@ tracker_class_init (TrackerClass *service) priv->last_domain_indexes = NULL; priv->last_super_classes = NULL; - priv->deletes.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - - priv->inserts.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - /* Make GET_PRIV working */ service->priv = priv; } @@ -126,16 +95,6 @@ class_finalize (GObject *object) g_array_free (priv->super_classes, TRUE); g_array_free (priv->domain_indexes, TRUE); - g_array_free (priv->deletes.pending.sub_pred_ids, TRUE); - g_array_free (priv->deletes.pending.obj_graph_ids, TRUE); - g_array_free (priv->deletes.ready.sub_pred_ids, TRUE); - g_array_free (priv->deletes.ready.obj_graph_ids, TRUE); - - g_array_free (priv->inserts.pending.sub_pred_ids, TRUE); - g_array_free (priv->inserts.pending.obj_graph_ids, TRUE); - g_array_free (priv->inserts.ready.sub_pred_ids, TRUE); - g_array_free (priv->inserts.ready.obj_graph_ids, TRUE); - if (priv->last_domain_indexes) { g_array_free (priv->last_domain_indexes, TRUE); } @@ -511,244 +470,6 @@ tracker_class_set_db_schema_changed (TrackerClass *service, priv->db_schema_changed = value; } -gboolean -tracker_class_has_insert_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE); - - priv = GET_PRIV (class); - - return (priv->inserts.ready.sub_pred_ids->len > 0); -} - -gboolean -tracker_class_has_delete_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE); - - priv = GET_PRIV (class); - - return (priv->deletes.ready.sub_pred_ids->len > 0); -} - -void -tracker_class_foreach_insert_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data) -{ - guint i; - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - g_return_if_fail (foreach != NULL); - - priv = GET_PRIV (class); - - for (i = 0; i < priv->inserts.ready.sub_pred_ids->len; i++) { - gint graph_id, subject_id, pred_id, object_id; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = g_array_index (priv->inserts.ready.sub_pred_ids, gint64, i); - obj_graph_id = g_array_index (priv->inserts.ready.obj_graph_ids, gint64, i); - - pred_id = sub_pred_id & 0xffffffff; - subject_id = sub_pred_id >> 32; - graph_id = obj_graph_id & 0xffffffff; - object_id = obj_graph_id >> 32; - - foreach (graph_id, subject_id, pred_id, object_id, user_data); - } -} - -void -tracker_class_foreach_delete_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data) -{ - guint i; - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - g_return_if_fail (foreach != NULL); - - priv = GET_PRIV (class); - - for (i = 0; i < priv->deletes.ready.sub_pred_ids->len; i++) { - gint graph_id, subject_id, pred_id, object_id; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = g_array_index (priv->deletes.ready.sub_pred_ids, gint64, i); - obj_graph_id = g_array_index (priv->deletes.ready.obj_graph_ids, gint64, i); - - pred_id = sub_pred_id & 0xffffffff; - subject_id = sub_pred_id >> 32; - graph_id = obj_graph_id & 0xffffffff; - object_id = obj_graph_id >> 32; - - foreach (graph_id, subject_id, pred_id, object_id, user_data); - } -} - -void -tracker_class_reset_ready_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - - priv = GET_PRIV (class); - - /* Reset */ - g_array_set_size (priv->deletes.ready.sub_pred_ids, 0); - g_array_set_size (priv->deletes.ready.obj_graph_ids, 0); - - g_array_set_size (priv->inserts.ready.sub_pred_ids, 0); - g_array_set_size (priv->inserts.ready.obj_graph_ids, 0); - -} - -void -tracker_class_reset_pending_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - - priv = GET_PRIV (class); - - /* Reset */ - g_array_set_size (priv->deletes.pending.sub_pred_ids, 0); - g_array_set_size (priv->deletes.pending.obj_graph_ids, 0); - - g_array_set_size (priv->inserts.pending.sub_pred_ids, 0); - g_array_set_size (priv->inserts.pending.obj_graph_ids, 0); - -} - -void -tracker_class_transact_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - /* Move */ - g_array_insert_vals (priv->deletes.ready.obj_graph_ids, - priv->deletes.ready.obj_graph_ids->len, - priv->deletes.pending.obj_graph_ids->data, - priv->deletes.pending.obj_graph_ids->len); - - g_array_insert_vals (priv->deletes.ready.sub_pred_ids, - priv->deletes.ready.sub_pred_ids->len, - priv->deletes.pending.sub_pred_ids->data, - priv->deletes.pending.sub_pred_ids->len); - - /* Reset */ - g_array_set_size (priv->deletes.pending.sub_pred_ids, 0); - g_array_set_size (priv->deletes.pending.obj_graph_ids, 0); - - - /* Move */ - g_array_insert_vals (priv->inserts.ready.obj_graph_ids, - priv->inserts.ready.obj_graph_ids->len, - priv->inserts.pending.obj_graph_ids->data, - priv->inserts.pending.obj_graph_ids->len); - - g_array_insert_vals (priv->inserts.ready.sub_pred_ids, - priv->inserts.ready.sub_pred_ids->len, - priv->inserts.pending.sub_pred_ids->data, - priv->inserts.pending.sub_pred_ids->len); - - /* Reset */ - g_array_set_size (priv->inserts.pending.sub_pred_ids, 0); - g_array_set_size (priv->inserts.pending.obj_graph_ids, 0); - -} - -static void -insert_vals_into_arrays (GArray *sub_pred_ids, - GArray *obj_graph_ids, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - gint i, j, k; - gint64 tmp; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = (gint64) subject_id; - sub_pred_id = sub_pred_id << 32 | pred_id; - obj_graph_id = (gint64) object_id; - obj_graph_id = obj_graph_id << 32 | graph_id; - - i = 0; - j = sub_pred_ids->len - 1; - - while (j - i > 0) { - k = (i + j) / 2; - tmp = g_array_index (sub_pred_ids, gint64, k); - if (tmp == sub_pred_id) { - i = k + 1; - break; - } else if (tmp > sub_pred_id) - j = k; - else - i = k + 1; - } - - g_array_insert_val (sub_pred_ids, i, sub_pred_id); - g_array_insert_val (obj_graph_ids, i, obj_graph_id); -} - -void -tracker_class_add_insert_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - insert_vals_into_arrays (priv->inserts.pending.sub_pred_ids, - priv->inserts.pending.obj_graph_ids, - graph_id, - subject_id, - pred_id, - object_id); -} - -void -tracker_class_add_delete_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - insert_vals_into_arrays (priv->deletes.pending.sub_pred_ids, - priv->deletes.pending.obj_graph_ids, - graph_id, - subject_id, - pred_id, - object_id); -} - void tracker_class_set_ontologies (TrackerClass *class, TrackerOntologies *ontologies) diff --git a/src/libtracker-data/tracker-class.h b/src/libtracker-data/tracker-class.h index c05634102..9ac46acd8 100644 --- a/src/libtracker-data/tracker-class.h +++ b/src/libtracker-data/tracker-class.h @@ -51,12 +51,6 @@ struct _TrackerClassClass { GObjectClass parent_class; }; -typedef void (*TrackerEventsForeach) (gint graph_id, - gint subject_id, - gint pred_id, - gint object_id, - gpointer user_data); - GType tracker_class_get_type (void) G_GNUC_CONST; TrackerClass * tracker_class_new (gboolean use_gvdb); const gchar * tracker_class_get_uri (TrackerClass *service); @@ -96,29 +90,6 @@ void tracker_class_set_notify (TrackerClass *ser void tracker_class_set_ontologies (TrackerClass *class, TrackerOntologies *ontologies); -/* For signals API */ -void tracker_class_foreach_delete_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data); -void tracker_class_foreach_insert_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data); -gboolean tracker_class_has_insert_events (TrackerClass *class); -gboolean tracker_class_has_delete_events (TrackerClass *class); -void tracker_class_reset_ready_events (TrackerClass *class); -void tracker_class_reset_pending_events (TrackerClass *class); -void tracker_class_transact_events (TrackerClass *class); -void tracker_class_add_delete_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id); -void tracker_class_add_insert_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id); - G_END_DECLS #endif /* __LIBTRACKER_DATA_CLASS_H__ */ diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala index 5db075274..d589b227a 100644 --- a/src/tracker-store/tracker-backup.vala +++ b/src/tracker-store/tracker-backup.vala @@ -57,7 +57,7 @@ public class Tracker.Backup : Object { throw e; } finally { if (resources != null) { - Tracker.Events.init (Tracker.Main.get_data_manager ()); + Tracker.Events.init (); resources.enable_signals (); } @@ -95,7 +95,7 @@ public class Tracker.Backup : Object { throw e; } finally { if (resources != null) { - Tracker.Events.init (Tracker.Main.get_data_manager ()); + Tracker.Events.init (); resources.enable_signals (); } diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c index 870084709..05782b2c5 100644 --- a/src/tracker-store/tracker-events.c +++ b/src/tracker-store/tracker-events.c @@ -26,13 +26,198 @@ #include "tracker-events.h" +typedef struct _TrackerEventBatch TrackerEventBatch; + +struct _TrackerEventBatch +{ + struct { + GArray *sub_pred_ids; + GArray *obj_graph_ids; + } deletes; + struct { + GArray *sub_pred_ids; + GArray *obj_graph_ids; + } inserts; +}; + typedef struct { guint total; - GPtrArray *notify_classes; + GHashTable *pending; + GHashTable *ready; } EventsPrivate; static EventsPrivate *private; +static TrackerEventBatch * +tracker_event_batch_new (void) +{ + TrackerEventBatch *events; + + events = g_new0 (TrackerEventBatch, 1); + events->deletes.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->deletes.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->inserts.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->inserts.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + + return events; +} + +static void +tracker_event_batch_free (TrackerEventBatch *events) +{ + g_array_unref (events->deletes.sub_pred_ids); + g_array_unref (events->deletes.obj_graph_ids); + g_array_unref (events->inserts.sub_pred_ids); + g_array_unref (events->inserts.obj_graph_ids); + g_free (events); +} + +static void +insert_vals_into_arrays (GArray *sub_pred_ids, + GArray *obj_graph_ids, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + gint i, j, k; + gint64 tmp; + gint64 sub_pred_id; + gint64 obj_graph_id; + + sub_pred_id = (gint64) subject_id; + sub_pred_id = sub_pred_id << 32 | pred_id; + obj_graph_id = (gint64) object_id; + obj_graph_id = obj_graph_id << 32 | graph_id; + + i = 0; + j = sub_pred_ids->len - 1; + + while (j - i > 0) { + k = (i + j) / 2; + tmp = g_array_index (sub_pred_ids, gint64, k); + if (tmp == sub_pred_id) { + i = k + 1; + break; + } else if (tmp > sub_pred_id) + j = k; + else + i = k + 1; + } + + g_array_insert_val (sub_pred_ids, i, sub_pred_id); + g_array_insert_val (obj_graph_ids, i, obj_graph_id); +} + +static void +tracker_event_batch_add_insert_event (TrackerEventBatch *events, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + insert_vals_into_arrays (events->inserts.sub_pred_ids, + events->inserts.obj_graph_ids, + graph_id, + subject_id, + pred_id, + object_id); +} + +static void +tracker_event_batch_add_delete_event (TrackerEventBatch *events, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + insert_vals_into_arrays (events->deletes.sub_pred_ids, + events->deletes.obj_graph_ids, + graph_id, + subject_id, + pred_id, + object_id); +} + +static void +foreach_event_in_arrays (GArray *sub_pred_ids, + GArray *obj_graph_ids, + TrackerEventsForeach foreach, + gpointer user_data) +{ + guint i; + + g_assert (sub_pred_ids->len == obj_graph_ids->len); + + for (i = 0; i < sub_pred_ids->len; i++) { + gint graph_id, subject_id, pred_id, object_id; + gint64 sub_pred_id; + gint64 obj_graph_id; + + sub_pred_id = g_array_index (sub_pred_ids, gint64, i); + obj_graph_id = g_array_index (obj_graph_ids, gint64, i); + + pred_id = sub_pred_id & 0xffffffff; + subject_id = sub_pred_id >> 32; + graph_id = obj_graph_id & 0xffffffff; + object_id = obj_graph_id >> 32; + + foreach (graph_id, subject_id, pred_id, object_id, user_data); + } +} + +void +tracker_event_batch_foreach_insert_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data) +{ + g_return_if_fail (events != NULL); + g_return_if_fail (foreach != NULL); + + foreach_event_in_arrays (events->inserts.sub_pred_ids, + events->inserts.obj_graph_ids, + foreach, user_data); +} + +void +tracker_event_batch_foreach_delete_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data) +{ + g_return_if_fail (events != NULL); + g_return_if_fail (foreach != NULL); + + foreach_event_in_arrays (events->deletes.sub_pred_ids, + events->deletes.obj_graph_ids, + foreach, user_data); +} + +static GHashTable * +tracker_event_batch_hashtable_new (void) +{ + return g_hash_table_new_full (NULL, NULL, + (GDestroyNotify) g_object_unref, + (GDestroyNotify) tracker_event_batch_free); +} + +void +tracker_event_batch_merge (TrackerEventBatch *dest, + TrackerEventBatch *to_copy) +{ + g_array_append_vals (dest->deletes.sub_pred_ids, + to_copy->deletes.sub_pred_ids->data, + to_copy->deletes.sub_pred_ids->len); + g_array_append_vals (dest->deletes.obj_graph_ids, + to_copy->deletes.obj_graph_ids->data, + to_copy->deletes.obj_graph_ids->len); + g_array_append_vals (dest->inserts.sub_pred_ids, + to_copy->inserts.sub_pred_ids->data, + to_copy->inserts.sub_pred_ids->len); + g_array_append_vals (dest->inserts.obj_graph_ids, + to_copy->inserts.obj_graph_ids->data, + to_copy->inserts.obj_graph_ids->len); +} + guint tracker_events_get_total (gboolean and_reset) { @@ -49,6 +234,28 @@ tracker_events_get_total (gboolean and_reset) return total; } +static inline TrackerEventBatch * +ensure_event_batch (TrackerClass *rdf_type) +{ + TrackerEventBatch *events; + + g_assert (private != NULL); + + if (!private->pending) + private->pending = tracker_event_batch_hashtable_new (); + + events = g_hash_table_lookup (private->pending, rdf_type); + + if (!events) { + events = tracker_event_batch_new (); + g_hash_table_insert (private->pending, + g_object_ref (rdf_type), + events); + } + + return events; +} + void tracker_events_add_insert (gint graph_id, gint subject_id, @@ -58,20 +265,23 @@ tracker_events_add_insert (gint graph_id, const gchar *object, GPtrArray *rdf_types) { + TrackerEventBatch *events; guint i; g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); for (i = 0; i < rdf_types->len; i++) { - if (tracker_class_get_notify (rdf_types->pdata[i])) { - tracker_class_add_insert_event (rdf_types->pdata[i], - graph_id, - subject_id, - pred_id, - object_id); - private->total++; - } + if (!tracker_class_get_notify (rdf_types->pdata[i])) + continue; + + events = ensure_event_batch (rdf_types->pdata[i]); + tracker_event_batch_add_insert_event (events, + graph_id, + subject_id, + pred_id, + object_id); + private->total++; } } @@ -84,90 +294,82 @@ tracker_events_add_delete (gint graph_id, const gchar *object, GPtrArray *rdf_types) { + TrackerEventBatch *events; guint i; g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); for (i = 0; i < rdf_types->len; i++) { - if (tracker_class_get_notify (rdf_types->pdata[i])) { - tracker_class_add_delete_event (rdf_types->pdata[i], - graph_id, - subject_id, - pred_id, - object_id); - private->total++; - } + if (!tracker_class_get_notify (rdf_types->pdata[i])) + continue; + + events = ensure_event_batch (rdf_types->pdata[i]); + tracker_event_batch_add_delete_event (events, + graph_id, + subject_id, + pred_id, + object_id); + private->total++; } } void -tracker_events_reset_pending (void) +tracker_events_transact (void) { - guint i; + TrackerEventBatch *prev_events, *events; + TrackerClass *rdf_type; + GHashTableIter iter; g_return_if_fail (private != NULL); - for (i = 0; i < private->notify_classes->len; i++) { - TrackerClass *class = g_ptr_array_index (private->notify_classes, i); + if (!private->pending || g_hash_table_size (private->pending) == 0) + return; - tracker_class_reset_pending_events (class); + if (!private->ready) { + private->ready = tracker_event_batch_hashtable_new (); } -} -static void -free_private (EventsPrivate *private) -{ - guint i; - - for (i = 0; i < private->notify_classes->len; i++) { - TrackerClass *class = g_ptr_array_index (private->notify_classes, i); - - tracker_class_reset_pending_events (class); - - /* Perhaps hurry an emit of the ready events here? We're shutting down, - * so I guess we're not required to do that here ... ? */ - tracker_class_reset_ready_events (class); + g_hash_table_iter_init (&iter, private->pending); + + while (g_hash_table_iter_next (&iter, + (gpointer *) &rdf_type, + (gpointer *) &events)) { + prev_events = g_hash_table_lookup (private->ready, + rdf_type); + if (prev_events) { + tracker_event_batch_merge (prev_events, events); + g_hash_table_iter_remove (&iter); + } else { + g_hash_table_iter_steal (&iter); + g_hash_table_insert (private->ready, + g_object_ref (rdf_type), + events); + /* Drop the reference stolen from the pending HT */ + g_object_unref (rdf_type); + } } - - g_ptr_array_unref (private->notify_classes); - - g_free (private); } -TrackerClass ** -tracker_events_get_classes (guint *length) +void +tracker_events_reset_pending (void) { - g_return_val_if_fail (private != NULL, NULL); + g_return_if_fail (private != NULL); - *length = private->notify_classes->len; + g_clear_pointer (&private->pending, g_hash_table_unref); +} - return (TrackerClass **) (private->notify_classes->pdata); +static void +free_private (EventsPrivate *private) +{ + tracker_events_reset_pending (); + g_free (private); } void -tracker_events_init (TrackerDataManager *data_manager) +tracker_events_init (void) { - TrackerOntologies *ontologies; - TrackerClass **classes; - guint length = 0, i; - private = g_new0 (EventsPrivate, 1); - - ontologies = tracker_data_manager_get_ontologies (data_manager); - classes = tracker_ontologies_get_classes (ontologies, &length); - - private->notify_classes = g_ptr_array_sized_new (length); - g_ptr_array_set_free_func (private->notify_classes, (GDestroyNotify) g_object_unref); - - for (i = 0; i < length; i++) { - TrackerClass *class = classes[i]; - - if (tracker_class_get_notify (class)) { - g_ptr_array_add (private->notify_classes, g_object_ref (class)); - } - } - } void @@ -180,3 +382,16 @@ tracker_events_shutdown (void) g_warning ("tracker_events already shutdown"); } } + +GHashTable * +tracker_events_get_pending (void) +{ + GHashTable *pending; + + g_return_val_if_fail (private != NULL, NULL); + + pending = private->ready; + private->ready = NULL; + + return pending; +} diff --git a/src/tracker-store/tracker-events.h b/src/tracker-store/tracker-events.h index 1e962b90c..4d6ff4e1e 100644 --- a/src/tracker-store/tracker-events.h +++ b/src/tracker-store/tracker-events.h @@ -28,9 +28,15 @@ G_BEGIN_DECLS -typedef GStrv (*TrackerNotifyClassGetter) (void); +typedef struct _TrackerEventBatch TrackerEventBatch; -void tracker_events_init (TrackerDataManager *data_manager); +typedef void (*TrackerEventsForeach) (gint graph_id, + gint subject_id, + gint pred_id, + gint object_id, + gpointer user_data); + +void tracker_events_init (void); void tracker_events_shutdown (void); void tracker_events_add_insert (gint graph_id, gint subject_id, @@ -48,7 +54,17 @@ void tracker_events_add_delete (gint graph_id, GPtrArray *rdf_types); guint tracker_events_get_total (gboolean and_reset); void tracker_events_reset_pending (void); -TrackerClass** tracker_events_get_classes (guint *length); + +void tracker_events_transact (void); + +GHashTable * tracker_events_get_pending (void); + +void tracker_event_batch_foreach_insert_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data); +void tracker_event_batch_foreach_delete_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data); G_END_DECLS diff --git a/src/tracker-store/tracker-events.vapi b/src/tracker-store/tracker-events.vapi index fdd7b8af4..da47267b9 100644 --- a/src/tracker-store/tracker-events.vapi +++ b/src/tracker-store/tracker-events.vapi @@ -20,12 +20,21 @@ namespace Tracker { [CCode (cheader_filename = "tracker-store/tracker-events.h")] namespace Events { - public void init (Tracker.Data.Manager data_manager); + public void init (); public void shutdown (); public void add_insert (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); public void add_delete (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); public uint get_total (bool and_reset); public void reset_pending (); - public unowned Class[] get_classes (); + + public void transact (); + public GLib.HashTable get_pending (); + + [CCode (lower_case_cprefix="tracker_event_batch_", cname = "TrackerEventBatch")] + public class Batch { + public delegate void EventsForeach (int graph_id, int subject_id, int pred_id, int object_id); + public void foreach_delete_event (EventsForeach func); + public void foreach_insert_event (EventsForeach func); + } } } diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index 187ff6908..d14b12c8e 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -319,7 +319,7 @@ License which can be viewed at: if (!shutdown) { Tracker.DBus.register_prepare_class_signal (); - Tracker.Events.init (data_manager); + Tracker.Events.init (); Tracker.Writeback.init (data_manager, get_writeback_predicates); Tracker.Store.resume (); diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index fe805e196..9b6069b6b 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -207,32 +207,33 @@ public class Tracker.Resources : Object { /* no longer needed, just return */ } - bool emit_graph_updated (Class cl) { - if (cl.has_insert_events () || cl.has_delete_events ()) { - var builder = new VariantBuilder ((VariantType) "a(iiii)"); - cl.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => { - builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); - }); - var deletes = builder.end (); - - builder = new VariantBuilder ((VariantType) "a(iiii)"); - cl.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => { - builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); - }); - var inserts = builder.end (); - - graph_updated (cl.uri, deletes, inserts); - - cl.reset_ready_events (); - - return true; - } - return false; + void emit_graph_updated (Class cl, Events.Batch events) { + var builder = new VariantBuilder ((VariantType) "a(iiii)"); + events.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => { + builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); + }); + var deletes = builder.end (); + + builder = new VariantBuilder ((VariantType) "a(iiii)"); + events.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => { + builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); + }); + var inserts = builder.end (); + + graph_updated (cl.uri, deletes, inserts); } bool on_emit_signals () { - foreach (var cl in Tracker.Events.get_classes ()) { - emit_graph_updated (cl); + var events = Tracker.Events.get_pending (); + + if (events != null) { + var iter = HashTableIter (events); + unowned Events.Batch class_events; + unowned Class cl; + + while (iter.next (out cl, out class_events)) { + emit_graph_updated (cl, class_events); + } } /* Reset counter */ @@ -272,18 +273,12 @@ public class Tracker.Resources : Object { } void on_statements_committed () { - /* Class signal feature */ - - foreach (var cl in Tracker.Events.get_classes ()) { - cl.transact_events (); - } + Tracker.Events.transact (); + Tracker.Writeback.transact (); if (signal_timeout == 0) { signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); } - - /* Writeback feature */ - Tracker.Writeback.transact (); } void on_statements_rolled_back () { -- cgit v1.2.1 From 0d8083510296dd2996a60a62a00579481c9aa7fb Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 12:39:22 +0100 Subject: tracker-store: Do immediate GraphUpdated emission checks on commit hook Triggering those on insert/delete callbacks isn't right for two reasons: there could still be a rollback of the just notified data, and it's done from the wrong thread (the one performing updates instead of the main thread). To fix the first, only call this from the commit hook, we can only notify of data that was successfully stored. To fix the second, do the call on an idle that will ensure the main thread running the main loop and doing the DBus dispatching is the one handling the actual emission. At the moment the commit hook is actually executed on that same thread, but that won't stay as-is. --- src/tracker-store/tracker-resources.vala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 9b6069b6b..0635b4f23 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -275,6 +275,7 @@ public class Tracker.Resources : Object { void on_statements_committed () { Tracker.Events.transact (); Tracker.Writeback.transact (); + check_graph_updated_signal (); if (signal_timeout == 0) { signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); @@ -298,20 +299,21 @@ public class Tracker.Resources : Object { } // immediately emit signals for already committed transaction - on_emit_signals (); + Idle.add (() => { + on_emit_signals (); + return false; + }); } } void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - check_graph_updated_signal (); } void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - check_graph_updated_signal (); } [DBus (visible = false)] -- cgit v1.2.1 From 683035a5fd4585ae8cfb30ed9acbcefd43fd0ba1 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 13:19:44 +0100 Subject: tracker-store: Protect event batches with a mutex While the pending data and event counter are only accessed by the updates thread, the ready events will be potentially accessed by both the updates and the dbus thread. That said, chances of locking will be minimal, since the get_pending() call only happens once a second (by default) or after the pending buffer grew big enough. --- src/tracker-store/tracker-events.c | 29 +++++++++++++++++------------ src/tracker-store/tracker-events.h | 2 +- src/tracker-store/tracker-events.vapi | 2 +- src/tracker-store/tracker-resources.vala | 5 +---- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c index 05782b2c5..77de4c256 100644 --- a/src/tracker-store/tracker-events.c +++ b/src/tracker-store/tracker-events.c @@ -41,9 +41,13 @@ struct _TrackerEventBatch }; typedef struct { - guint total; - GHashTable *pending; + /* Accessed by updates/dbus threads */ + GMutex mutex; GHashTable *ready; + + /* Only accessed by updates thread */ + GHashTable *pending; + guint total; } EventsPrivate; static EventsPrivate *private; @@ -219,19 +223,11 @@ tracker_event_batch_merge (TrackerEventBatch *dest, } guint -tracker_events_get_total (gboolean and_reset) +tracker_events_get_total (void) { - guint total; - g_return_val_if_fail (private != NULL, 0); - total = private->total; - - if (and_reset) { - private->total = 0; - } - - return total; + return private->total; } static inline TrackerEventBatch * @@ -326,6 +322,8 @@ tracker_events_transact (void) if (!private->pending || g_hash_table_size (private->pending) == 0) return; + g_mutex_lock (&private->mutex); + if (!private->ready) { private->ready = tracker_event_batch_hashtable_new (); } @@ -349,6 +347,10 @@ tracker_events_transact (void) g_object_unref (rdf_type); } } + + private->total = 0; + + g_mutex_unlock (&private->mutex); } void @@ -370,6 +372,7 @@ void tracker_events_init (void) { private = g_new0 (EventsPrivate, 1); + g_mutex_init (&private->mutex); } void @@ -390,8 +393,10 @@ tracker_events_get_pending (void) g_return_val_if_fail (private != NULL, NULL); + g_mutex_lock (&private->mutex); pending = private->ready; private->ready = NULL; + g_mutex_unlock (&private->mutex); return pending; } diff --git a/src/tracker-store/tracker-events.h b/src/tracker-store/tracker-events.h index 4d6ff4e1e..1aea15ab6 100644 --- a/src/tracker-store/tracker-events.h +++ b/src/tracker-store/tracker-events.h @@ -52,7 +52,7 @@ void tracker_events_add_delete (gint graph_id, gint object_id, const gchar *object, GPtrArray *rdf_types); -guint tracker_events_get_total (gboolean and_reset); +guint tracker_events_get_total (void); void tracker_events_reset_pending (void); void tracker_events_transact (void); diff --git a/src/tracker-store/tracker-events.vapi b/src/tracker-store/tracker-events.vapi index da47267b9..16df92e84 100644 --- a/src/tracker-store/tracker-events.vapi +++ b/src/tracker-store/tracker-events.vapi @@ -24,7 +24,7 @@ namespace Tracker { public void shutdown (); public void add_insert (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); public void add_delete (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); - public uint get_total (bool and_reset); + public uint get_total (); public void reset_pending (); public void transact (); diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 0635b4f23..9d9b83c18 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -236,9 +236,6 @@ public class Tracker.Resources : Object { } } - /* Reset counter */ - Tracker.Events.get_total (true); - /* Writeback feature */ var writebacks = Tracker.Writeback.get_ready (); @@ -289,7 +286,7 @@ public class Tracker.Resources : Object { void check_graph_updated_signal () { /* Check for whether we need an immediate emit */ - if (Tracker.Events.get_total (false) > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { + if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { // possibly active timeout no longer necessary as signals // for committed transactions will be emitted by the following on_emit_signals call // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0 -- cgit v1.2.1 From 10915fd2ab785c75f9ef28fa35e24689db7faf37 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 13:53:22 +0100 Subject: tracker-store: Give ownership of writeback events on get_ready() Instead of doing get_ready() and then reset_ready(), just give ownership of the events hashtable on get_ready() while resetting the internal one. --- src/tracker-store/tracker-resources.vala | 2 -- src/tracker-store/tracker-writeback.c | 10 ++++++++-- src/tracker-store/tracker-writeback.vapi | 3 +-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 9d9b83c18..2dbd1c00b 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -263,8 +263,6 @@ public class Tracker.Resources : Object { writeback (builder.end ()); } - Tracker.Writeback.reset_ready (); - signal_timeout = 0; return false; } diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c index a183b00a3..5bba19020 100644 --- a/src/tracker-store/tracker-writeback.c +++ b/src/tracker-store/tracker-writeback.c @@ -115,9 +115,14 @@ tracker_writeback_reset_ready () GHashTable * tracker_writeback_get_ready (void) { + GHashTable *events; + g_return_val_if_fail (private != NULL, NULL); - return private->ready_events; + events = private->ready_events; + private->ready_events = NULL; + + return events; } static void @@ -214,7 +219,8 @@ tracker_writeback_shutdown (void) /* Perhaps hurry an emit of the ready events here? We're shutting down, * so I guess we're not required to do that here ... ? */ - tracker_writeback_reset_ready (); + g_clear_pointer (&private->ready_events, + (GDestroyNotify) g_hash_table_unref); free_private (private); private = NULL; diff --git a/src/tracker-store/tracker-writeback.vapi b/src/tracker-store/tracker-writeback.vapi index 368de0303..7c48512b8 100644 --- a/src/tracker-store/tracker-writeback.vapi +++ b/src/tracker-store/tracker-writeback.vapi @@ -26,9 +26,8 @@ namespace Tracker { public void init (Tracker.Data.Manager data_manager, WritebackGetPredicatesFunc callback); public void shutdown (); public void check (int graph_id, string graph, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); - public unowned GLib.HashTable> get_ready (); + public GLib.HashTable> get_ready (); public void reset_pending (); - public void reset_ready (); public void transact (); } } -- cgit v1.2.1 From 38e362da82eef861a07b464cfc2d91f9539a74f8 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 13:57:06 +0100 Subject: tracker-store: Refactor writeback signal emission into separate function Purely cosmetic. --- src/tracker-store/tracker-resources.vala | 45 +++++++++++++++++--------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 2dbd1c00b..c6a76e749 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -223,6 +223,29 @@ public class Tracker.Resources : Object { graph_updated (cl.uri, deletes, inserts); } + void emit_writeback (HashTable> events) { + var builder = new VariantBuilder ((VariantType) "a{iai}"); + var wb_iter = HashTableIter> (events); + + int subject_id; + unowned Array types; + while (wb_iter.next (out subject_id, out types)) { + builder.open ((VariantType) "{iai}"); + + builder.add ("i", subject_id); + + builder.open ((VariantType) "ai"); + for (int i = 0; i < types.length; i++) { + builder.add ("i", types.index (i)); + } + builder.close (); + + builder.close (); + } + + writeback (builder.end ()); + } + bool on_emit_signals () { var events = Tracker.Events.get_pending (); @@ -240,27 +263,7 @@ public class Tracker.Resources : Object { var writebacks = Tracker.Writeback.get_ready (); if (writebacks != null) { - var builder = new VariantBuilder ((VariantType) "a{iai}"); - - var wb_iter = HashTableIter> (writebacks); - - int subject_id; - unowned Array types; - while (wb_iter.next (out subject_id, out types)) { - builder.open ((VariantType) "{iai}"); - - builder.add ("i", subject_id); - - builder.open ((VariantType) "ai"); - for (int i = 0; i < types.length; i++) { - builder.add ("i", types.index (i)); - } - builder.close (); - - builder.close (); - } - - writeback (builder.end ()); + emit_writeback (writebacks); } signal_timeout = 0; -- cgit v1.2.1 From adfa177cb1c38bf3452d16ea66195cf0fade4068 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 14:14:32 +0100 Subject: tracker-store: Protect ready writeback events with mutex Just like with ready GraphUpdated events, this will be potentially accessed by both the thread performing updates, and the thread doing the DBus dispatching and signaling. Just like there, the chances of contention are rather low, since emission is checked just once per second by default. --- src/tracker-store/tracker-writeback.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c index 5bba19020..1edc3e2dd 100644 --- a/src/tracker-store/tracker-writeback.c +++ b/src/tracker-store/tracker-writeback.c @@ -27,8 +27,12 @@ #include "tracker-writeback.h" typedef struct { + /* Accessed by updates thread */ GHashTable *allowances; GHashTable *pending_events; + + /* Accessed by both updates and dbus threads */ + GMutex mutex; GHashTable *ready_events; } WritebackPrivate; @@ -119,8 +123,10 @@ tracker_writeback_get_ready (void) g_return_val_if_fail (private != NULL, NULL); + g_mutex_lock (&private->mutex); events = private->ready_events; private->ready_events = NULL; + g_mutex_unlock (&private->mutex); return events; } @@ -150,6 +156,7 @@ tracker_writeback_init (TrackerDataManager *data_manager, g_return_if_fail (private == NULL); private = g_new0 (WritebackPrivate, 1); + g_mutex_init (&private->mutex); private->allowances = g_hash_table_new_full (g_direct_hash, g_direct_equal, @@ -196,6 +203,8 @@ tracker_writeback_transact (void) if (!private->pending_events) return; + g_mutex_lock (&private->mutex); + if (!private->ready_events) { private->ready_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, (GDestroyNotify) NULL, @@ -208,6 +217,8 @@ tracker_writeback_transact (void) g_hash_table_insert (private->ready_events, key, value); g_hash_table_iter_remove (&iter); } + + g_mutex_unlock (&private->mutex); } void -- cgit v1.2.1 From 1d59f03662b91a1ad6475333fbe778a38a38aa2b Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 14:39:48 +0100 Subject: tracker-store: Push TrackerData hooks down to Tracker.Store Move this out of the Resources object, which is basically a view of the internal Store object. All event accounting and signaling is now performed by the Store object, to which the Resources DBus object connects to in order to implement GraphUpdated and Writeback signals. Only one handler of these events is possible at the moment, would be nice to consider doing something marginally better on the Steroids interface at some point, at least wrt the amount of data sent through the bus. Instead of trying to schedule the timeout across threads (the TrackerData hooks run in the thread performing the updates, and we want signaling done from the main/dbus thread), the code now just sets up a timeout on the main thread that keeps running as long as there are pending updates. When the task for the last batched update returns, it will be safe for the timeout to do signaling one last time and turn itself down, all of this happening in the main thread. --- src/tracker-store/tracker-backup.vala | 8 +-- src/tracker-store/tracker-dbus.vala | 8 +-- src/tracker-store/tracker-main.vala | 4 +- src/tracker-store/tracker-resources.vala | 89 ++------------------------ src/tracker-store/tracker-store.vala | 104 ++++++++++++++++++++++++++++++- 5 files changed, 116 insertions(+), 97 deletions(-) diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala index d589b227a..a9ede1b9a 100644 --- a/src/tracker-store/tracker-backup.vala +++ b/src/tracker-store/tracker-backup.vala @@ -25,7 +25,7 @@ public class Tracker.Backup : Object { public async void save (BusName sender, string destination_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -58,7 +58,7 @@ public class Tracker.Backup : Object { } finally { if (resources != null) { Tracker.Events.init (); - resources.enable_signals (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); @@ -68,7 +68,7 @@ public class Tracker.Backup : Object { public async void restore (BusName sender, string journal_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -96,7 +96,7 @@ public class Tracker.Backup : Object { } finally { if (resources != null) { Tracker.Events.init (); - resources.enable_signals (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala index 35c1542e5..48197f4b9 100644 --- a/src/tracker-store/tracker-dbus.vala +++ b/src/tracker-store/tracker-dbus.vala @@ -34,7 +34,6 @@ public class Tracker.DBus { static uint notifier_id; static Tracker.Backup backup; static uint backup_id; - static Tracker.Config config; static uint domain_watch_id; static MainLoop watch_main_loop; @@ -108,9 +107,8 @@ public class Tracker.DBus { } } - public static bool init (Tracker.Config config_p) { + public static bool init () { /* Don't reinitialize */ - config = config_p; if (connection != null) { return true; } @@ -216,7 +214,7 @@ public class Tracker.DBus { statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH); /* Add org.freedesktop.Tracker1.Resources */ - resources = new Tracker.Resources (connection, config); + resources = new Tracker.Resources (connection); if (resources == null) { critical ("Could not create TrackerResources object to register"); return false; @@ -261,7 +259,7 @@ public class Tracker.DBus { return false; } - resources.enable_signals (); + Tracker.Store.enable_signals (); return true; } diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index d14b12c8e..63df072ff 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -239,7 +239,7 @@ License which can be viewed at: sanity_check_option_values (config); - if (!Tracker.DBus.init (config)) { + if (!Tracker.DBus.init ()) { return 1; } @@ -258,7 +258,7 @@ License which can be viewed at: var notifier = Tracker.DBus.register_notifier (); - Tracker.Store.init (); + Tracker.Store.init (config); /* Make Tracker available for introspection */ if (!Tracker.DBus.register_objects ()) { diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index c6a76e749..7322636d1 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -22,8 +22,6 @@ public class Tracker.Resources : Object { public const string PATH = "/org/freedesktop/Tracker1/Resources"; - const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - /* I *know* that this is some arbitrary number that doesn't seem to * resemble anything. In fact it's what I experimentally measured to * be a good value on a default Debian testing which has @@ -50,15 +48,13 @@ public class Tracker.Resources : Object { const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000; DBusConnection connection; - uint signal_timeout; - Tracker.Config config; public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects); public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts); - public Resources (DBusConnection connection, Tracker.Config config_p) { + public Resources (DBusConnection connection) { this.connection = connection; - this.config = config_p; + Tracker.Store.set_signal_callback (on_emit_signals); } public async void load (BusName sender, string uri) throws Error { @@ -246,9 +242,7 @@ public class Tracker.Resources : Object { writeback (builder.end ()); } - bool on_emit_signals () { - var events = Tracker.Events.get_pending (); - + void on_emit_signals (HashTable? events, HashTable>? writebacks) { if (events != null) { var iter = HashTableIter (events); unowned Events.Batch class_events; @@ -259,88 +253,13 @@ public class Tracker.Resources : Object { } } - /* Writeback feature */ - var writebacks = Tracker.Writeback.get_ready (); - if (writebacks != null) { emit_writeback (writebacks); } - - signal_timeout = 0; - return false; - } - - void on_statements_committed () { - Tracker.Events.transact (); - Tracker.Writeback.transact (); - check_graph_updated_signal (); - - if (signal_timeout == 0) { - signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); - } - } - - void on_statements_rolled_back () { - Tracker.Events.reset_pending (); - Tracker.Writeback.reset_pending (); - } - - void check_graph_updated_signal () { - /* Check for whether we need an immediate emit */ - if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { - // possibly active timeout no longer necessary as signals - // for committed transactions will be emitted by the following on_emit_signals call - // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0 - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } - - // immediately emit signals for already committed transaction - Idle.add (() => { - on_emit_signals (); - return false; - }); - } - } - - void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - } - - void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - } - - [DBus (visible = false)] - public void enable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.add_insert_statement_callback (on_statement_inserted); - data.add_delete_statement_callback (on_statement_deleted); - data.add_commit_statement_callback (on_statements_committed); - data.add_rollback_statement_callback (on_statements_rolled_back); - } - - [DBus (visible = false)] - public void disable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.remove_insert_statement_callback (on_statement_inserted); - data.remove_delete_statement_callback (on_statement_deleted); - data.remove_commit_statement_callback (on_statements_committed); - data.remove_rollback_statement_callback (on_statements_rolled_back); - - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } } ~Resources () { - this.disable_signals (); + Tracker.Store.set_signal_callback (null); } [DBus (visible = false)] diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a555c67b0..d4b4d9646 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -23,6 +23,7 @@ public class Tracker.Store { const int MAX_CONCURRENT_QUERIES = 2; const int MAX_TASK_TIME = 30; + const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; static Queue query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; static Queue update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; @@ -36,6 +37,10 @@ public class Tracker.Store { static bool active; static SourceFunc active_callback; + static Tracker.Config config; + static uint signal_timeout; + static int n_updates; + public enum Priority { HIGH, LOW, @@ -50,6 +55,9 @@ public class Tracker.Store { TURTLE, } + public delegate void SignalEmissionFunc (HashTable? graph_updated, HashTable>? writeback); + static unowned SignalEmissionFunc signal_callback; + public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; abstract class Task { @@ -265,7 +273,7 @@ public class Tracker.Store { AtomicInt.set (ref checkpointing, 0); } - public static void init () { + public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { max_task_time = int.parse (max_task_time_env); @@ -293,6 +301,8 @@ public class Tracker.Store { are rather random */ ThreadPool.set_max_idle_time (15 * 1000); ThreadPool.set_max_unused_threads (2); + + config = config_p; } public static void shutdown () { @@ -304,6 +314,11 @@ public class Tracker.Store { query_queues[i] = null; update_queues[i] = null; } + + if (signal_timeout != 0) { + Source.remove (signal_timeout); + signal_timeout = 0; + } } public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { @@ -327,7 +342,28 @@ public class Tracker.Store { } } + private static void do_emit_signals () { + signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ()); + } + + private static void ensure_signal_timeout () { + if (signal_timeout == 0) { + signal_timeout = Timeout.add (config.graphupdated_delay, () => { + do_emit_signals (); + if (n_updates == 0) { + signal_timeout = 0; + return false; + } else { + return true; + } + }); + } + } + public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE; task.query = sparql; @@ -342,12 +378,17 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } } public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE_BLANK; task.query = sparql; @@ -362,6 +403,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -370,6 +413,9 @@ public class Tracker.Store { } public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new TurtleTask (); task.type = TaskType.TURTLE; task.path = file.get_path (); @@ -383,6 +429,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -473,4 +521,58 @@ public class Tracker.Store { sched (); } + + private static void on_statements_committed () { + Tracker.Events.transact (); + Tracker.Writeback.transact (); + check_graph_updated_signal (); + } + + private static void on_statements_rolled_back () { + Tracker.Events.reset_pending (); + Tracker.Writeback.reset_pending (); + } + + private static void check_graph_updated_signal () { + /* Check for whether we need an immediate emit */ + if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { + // immediately emit signals for already committed transaction + Idle.add (() => { + do_emit_signals (); + return false; + }); + } + } + + private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + public static void enable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.add_insert_statement_callback (on_statement_inserted); + data.add_delete_statement_callback (on_statement_deleted); + data.add_commit_statement_callback (on_statements_committed); + data.add_rollback_statement_callback (on_statements_rolled_back); + } + + public static void disable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.remove_insert_statement_callback (on_statement_inserted); + data.remove_delete_statement_callback (on_statement_deleted); + data.remove_commit_statement_callback (on_statements_committed); + data.remove_rollback_statement_callback (on_statements_rolled_back); + } + + public static void set_signal_callback (SignalEmissionFunc? func) { + signal_callback = func; + } } -- cgit v1.2.1 From 09fae200db0e0cba4fc91c7d50e443e1da08e328 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Tue, 21 Nov 2017 12:34:38 +0100 Subject: libtracker-data: Move notify_transaction() down to libtracker-data Now that we don't need upper layers' information to find out the right CommitType, push this call down together with the handling of the insert/delete/rollback callbacks. One particularity is that the commit callbacks will now be called from within the update thread, just like all the other callbacks. And this is fine, with all the preparation work from the previous commits. --- src/libtracker-data/libtracker-data.vapi | 1 - src/libtracker-data/tracker-data-update.c | 10 +++------- src/libtracker-data/tracker-data-update.h | 1 - src/tracker-store/tracker-store.vala | 10 ---------- 4 files changed, 3 insertions(+), 19 deletions(-) diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi index 2daae8061..f0300ae73 100644 --- a/src/libtracker-data/libtracker-data.vapi +++ b/src/libtracker-data/libtracker-data.vapi @@ -192,7 +192,6 @@ namespace Tracker { public void update_sparql (string update) throws Sparql.Error; public GLib.Variant update_sparql_blank (string update) throws Sparql.Error; public void load_turtle_file (GLib.File file) throws Sparql.Error; - public void notify_transaction (); public void delete_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; public void update_statement (string? graph, string subject, string predicate, string? object) throws Sparql.Error, DateError; public void insert_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c index e552b10ce..444e5efee 100644 --- a/src/libtracker-data/tracker-data-update.c +++ b/src/libtracker-data/tracker-data-update.c @@ -3624,13 +3624,7 @@ tracker_data_commit_transaction (TrackerData *data, g_hash_table_remove_all (data->update_buffer.resources_by_id); g_hash_table_remove_all (data->update_buffer.resource_cache); - data->in_journal_replay = FALSE; -} - -void -tracker_data_notify_transaction (TrackerData *data) -{ - if (data->commit_callbacks) { + if (!data->in_journal_replay && data->commit_callbacks) { guint n; for (n = 0; n < data->commit_callbacks->len; n++) { TrackerCommitDelegate *delegate; @@ -3638,6 +3632,8 @@ tracker_data_notify_transaction (TrackerData *data) delegate->callback (delegate->user_data); } } + + data->in_journal_replay = FALSE; } void diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h index 640df408a..97fdcd6a9 100644 --- a/src/libtracker-data/tracker-data-update.h +++ b/src/libtracker-data/tracker-data-update.h @@ -103,7 +103,6 @@ void tracker_data_begin_transaction_for_replay (TrackerData * GError **error); void tracker_data_commit_transaction (TrackerData *data, GError **error); -void tracker_data_notify_transaction (TrackerData *data); void tracker_data_rollback_transaction (TrackerData *data); void tracker_data_update_sparql (TrackerData *data, const gchar *update, diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index d4b4d9646..a373e6155 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -147,8 +147,6 @@ public class Tracker.Store { } static bool task_finish_cb (Task task) { - var data = task.data_manager.get_data (); - if (task.type == TaskType.QUERY) { var query_task = (QueryTask) task; @@ -164,19 +162,11 @@ public class Tracker.Store { running_tasks.remove (task); n_queries_running--; } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { - if (task.error == null) { - data.notify_transaction (); - } - task.callback (); task.error = null; update_running = false; } else if (task.type == TaskType.TURTLE) { - if (task.error == null) { - data.notify_transaction (); - } - task.callback (); task.error = null; -- cgit v1.2.1 From 693f89de16b44efd3b8663c9c493762f51de3a3b Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 25 Nov 2017 15:21:42 +0100 Subject: tracker-store: Use TrackerDirectConnection underneath Instead of the lower level TrackerDataManager object directly. The only additional thing that tracker-store does is signal emission for writeback and GraphUpdated, the internal TrackerDataManager object is still accessed to implement those features. This makes libtracker-direct the only place where queries/updates are queued, performed and dispatched. There's other indirect benefit from this, update queue handling no longer needs to hit the main thread in order to schedule the next update. Besides the very unlikely thread contention situations described in previous commits, this should maximize throughput of the updates queue. --- src/tracker-store/Makefile.am | 2 + src/tracker-store/tracker-main.vala | 39 +-- src/tracker-store/tracker-resources.vala | 26 +- src/tracker-store/tracker-steroids.vala | 28 +- src/tracker-store/tracker-store.vala | 446 ++++++------------------------- 5 files changed, 117 insertions(+), 424 deletions(-) diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am index dc5df50c3..2463cb4ab 100644 --- a/src/tracker-store/Makefile.am +++ b/src/tracker-store/Makefile.am @@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \ $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \ $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \ $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ + $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \ $(top_srcdir)/src/tracker-store/tracker-config.vapi \ $(top_srcdir)/src/tracker-store/tracker-events.vapi \ $(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \ @@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \ tracker_store_LDADD = \ $(top_builddir)/src/libtracker-data/libtracker-data.la \ + $(top_builddir)/src/libtracker-direct/libtracker-direct.la \ $(top_builddir)/src/libtracker-common/libtracker-common.la \ $(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \ $(BUILD_LIBS) \ diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index 63df072ff..861aed2eb 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -39,6 +39,7 @@ License which can be viewed at: static bool shutdown; + static Tracker.Direct.Connection connection; static Tracker.Data.Manager data_manager; /* Private command line parameters */ @@ -175,6 +176,10 @@ License which can be viewed at: return data_manager; } + public static unowned Tracker.Direct.Connection get_sparql_connection () { + return connection; + } + static int main (string[] args) { Intl.setlocale (LocaleCategory.ALL, ""); @@ -256,6 +261,8 @@ License which can be viewed at: flags |= DBManagerFlags.FORCE_REINDEX; } + Tracker.Direct.Connection.set_default_flags (flags); + var notifier = Tracker.DBus.register_notifier (); Tracker.Store.init (config); @@ -281,38 +288,18 @@ License which can be viewed at: Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to); - int select_cache_size, update_cache_size; - string cache_size_s; - - cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - select_cache_size = int.parse (cache_size_s); - } else { - select_cache_size = SELECT_CACHE_SIZE; - } - - cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - update_cache_size = int.parse (cache_size_s); - } else { - update_cache_size = UPDATE_CACHE_SIZE; - } - try { - data_manager = new Tracker.Data.Manager (flags, - cache_location, - data_location, - ontology_location, - true, - false, - select_cache_size, - update_cache_size); - data_manager.init (null); + connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE, + cache_location, + data_location, + ontology_location); + connection.init (null); } catch (GLib.Error e) { critical ("Cannot initialize database: %s", e.message); return 1; } + data_manager = connection.get_data_manager (); db_config = null; notifier = null; diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 7322636d1..5cfab8bb2 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -61,9 +61,9 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri); try { var file = File.new_for_uri (uri); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.queue_turtle_import (data_manager, file, sender); + yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -84,9 +84,9 @@ public class Tracker.Resources : Object { request.debug ("query: %s", query); try { var builder = new VariantBuilder ((VariantType) "aas"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { while (cursor.next ()) { builder.open ((VariantType) "as"); @@ -126,8 +126,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -147,8 +147,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender); request.end (); @@ -169,10 +169,10 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.Sync"); var data_manager = Tracker.Main.get_data_manager (); var data = data_manager.get_data (); - var iface = data_manager.get_writable_db_interface (); - // wal checkpoint implies sync - Tracker.Store.wal_checkpoint (iface, true); + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); + // sync journal if available data.sync (); @@ -183,8 +183,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala index 40679bf14..1eb7bef05 100644 --- a/src/tracker-store/tracker-steroids.vala +++ b/src/tracker-store/tracker-steroids.vala @@ -29,9 +29,9 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query); try { string[] variable_names = null; - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE)); data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); @@ -90,10 +90,10 @@ public class Tracker.Steroids : Object { } } - async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error { + async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error { var request = DBusRequest.begin (sender, "Steroids.%sUpdate%s", - priority != Tracker.Store.Priority.HIGH ? "Batch" : "", + priority != Priority.HIGH ? "Batch" : "", blank ? "Blank" : ""); try { size_t bytes_read; @@ -112,16 +112,16 @@ public class Tracker.Steroids : Object { data_input_stream = null; request.debug ("query: %s", (string) query); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); if (!blank) { - yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender); + yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender); request.end (); return null; } else { - var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender); request.end (); @@ -140,21 +140,21 @@ public class Tracker.Steroids : Object { } public async void update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream); + yield update_internal (sender, Priority.HIGH, false, input_stream); } public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream); + yield update_internal (sender, Priority.LOW, false, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream); + return yield update_internal (sender, Priority.HIGH, true, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream); + return yield update_internal (sender, Priority.LOW, true, input_stream); } [DBus (signature = "as")] @@ -188,11 +188,11 @@ public class Tracker.Steroids : Object { data_input_stream = null; var builder = new VariantBuilder ((VariantType) "as"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); // first try combined query for best possible performance try { - yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender); // combined query was successful for (i = 0; i < query_count; i++) { @@ -213,7 +213,7 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query_array[i]); try { - yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender); builder.add ("s", ""); builder.add ("s", ""); } catch (Error e1) { diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a373e6155..03cc8078a 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -25,244 +25,46 @@ public class Tracker.Store { const int MAX_TASK_TIME = 30; const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - static Queue query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static Queue update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static int n_queries_running; - static bool update_running; - static ThreadPool update_pool; - static ThreadPool query_pool; - static ThreadPool checkpoint_pool; - static GenericArray running_tasks; static int max_task_time; static bool active; - static SourceFunc active_callback; static Tracker.Config config; static uint signal_timeout; static int n_updates; - public enum Priority { - HIGH, - LOW, - TURTLE, - N_PRIORITIES - } - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } + static HashTable client_cancellables; public delegate void SignalEmissionFunc (HashTable? graph_updated, HashTable>? writeback); static unowned SignalEmissionFunc signal_callback; - public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; + public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error; - abstract class Task { - public TaskType type; - public string client_id; + class CursorTask { + public Sparql.Cursor cursor; + public unowned SourceFunc callback; + public unowned SparqlQueryInThread thread_func; public Error error; - public SourceFunc callback; - public Tracker.Data.Manager data_manager; - } - - class QueryTask : Task { - public string query; - public Cancellable cancellable; - public uint watchdog_id; - public unowned SparqlQueryInThread in_thread; - - ~QueryTask () { - if (watchdog_id > 0) { - Source.remove (watchdog_id); - } - } - } - - class UpdateTask : Task { - public string query; - public Variant blank_nodes; - public Priority priority; - } - - class TurtleTask : Task { - public string path; - } - - static void sched () { - Task task = null; - - if (!active) { - return; - } - while (n_queries_running < MAX_CONCURRENT_QUERIES) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = query_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task == null) { - /* no pending query */ - break; - } - running_tasks.add (task); - - if (max_task_time != 0) { - var query_task = (QueryTask) task; - query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => { - query_task.cancellable.cancel (); - query_task.watchdog_id = 0; - return false; - }); - } - - n_queries_running++; - try { - query_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } - - if (!update_running) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = update_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task != null) { - update_running = true; - try { - update_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } + public CursorTask (Sparql.Cursor cursor) { + this.cursor = cursor; } } - static bool task_finish_cb (Task task) { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - if (task.error == null && - query_task.cancellable != null && - query_task.cancellable.is_cancelled ()) { - task.error = new IOError.CANCELLED ("Operation was cancelled"); - } - - task.callback (); - task.error = null; - - running_tasks.remove (task); - n_queries_running--; - } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { - task.callback (); - task.error = null; - - update_running = false; - } else if (task.type == TaskType.TURTLE) { - task.callback (); - task.error = null; - - update_running = false; - } - - if (n_queries_running == 0 && !update_running && active_callback != null) { - active_callback (); - } - - sched (); - - return false; - } + static ThreadPool cursor_pool; - static void pool_dispatch_cb (owned Task task) { + private static void cursor_dispatch_cb (owned CursorTask task) { try { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query); - - query_task.in_thread (cursor); - } else { - var data = task.data_manager.get_data (); - var iface = task.data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - - if (task.type == TaskType.UPDATE) { - var update_task = (UpdateTask) task; - - data.update_sparql (update_task.query); - } else if (task.type == TaskType.UPDATE_BLANK) { - var update_task = (UpdateTask) task; - - update_task.blank_nodes = data.update_sparql_blank (update_task.query); - } else if (task.type == TaskType.TURTLE) { - var turtle_task = (TurtleTask) task; - - var file = File.new_for_path (turtle_task.path); - - data.load_turtle_file (file); - } - } + task.thread_func (task.cursor); } catch (Error e) { task.error = e; } Idle.add (() => { - task_finish_cb (task); + task.callback (); return false; }); } - public static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static int checkpointing; - - static void wal_hook (DBInterface iface, int n_pages) { - // run in update thread - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - debug ("WAL: %d pages", n_pages); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000 && checkpoint_pool != null) { - if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) { - // initiate asynchronous checkpointing (not blocking updates) - try { - checkpoint_pool.push (wal_iface); - } catch (Error e) { - warning (e.message); - AtomicInt.set (ref checkpointing, 0); - } - } - } - } - - static void checkpoint_dispatch_cb (DBInterface iface) { - // run in checkpoint thread - wal_checkpoint (iface, false); - AtomicInt.set (ref checkpointing, 0); - } - public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { @@ -271,19 +73,12 @@ public class Tracker.Store { max_task_time = MAX_TASK_TIME; } - running_tasks = new GenericArray (); - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = new Queue (); - update_queues[i] = new Queue (); - } + client_cancellables = new HashTable (str_hash, str_equal); try { - update_pool = new ThreadPool.with_owned_data (pool_dispatch_cb, 1, true); - query_pool = new ThreadPool.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true); - checkpoint_pool = new ThreadPool (checkpoint_dispatch_cb, 1, true); + cursor_pool = new ThreadPool.with_owned_data (cursor_dispatch_cb, 16, false); } catch (Error e) { - warning (e.message); + // Ignore harmless error } /* as the following settings are global for unknown reasons, @@ -296,40 +91,26 @@ public class Tracker.Store { } public static void shutdown () { - query_pool = null; - update_pool = null; - checkpoint_pool = null; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = null; - update_queues[i] = null; - } - if (signal_timeout != 0) { Source.remove (signal_timeout); signal_timeout = 0; } } - public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { - var task = new QueryTask (); - task.type = TaskType.QUERY; - task.query = sparql; - task.cancellable = new Cancellable (); - task.in_thread = in_thread; - task.callback = sparql_query.callback; - task.client_id = client_id; - task.data_manager = manager; - - query_queues[priority].push_tail (task); + private static Cancellable create_cancellable (string client_id) { + var client_cancellable = client_cancellables.lookup (client_id); - sched (); + if (client_cancellable == null) { + client_cancellable = new Cancellable (); + client_cancellables.insert (client_id, client_cancellable); + } - yield; + var task_cancellable = new Cancellable (); + client_cancellable.connect (() => { + task_cancellable.cancel (); + }); - if (task.error != null) { - throw task.error; - } + return task_cancellable; } private static void do_emit_signals () { @@ -350,166 +131,89 @@ public class Tracker.Store { } } - public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { - n_updates++; - ensure_signal_timeout (); + public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error { + var cancellable = create_cancellable (client_id); + uint timeout_id = 0; - var task = new UpdateTask (); - task.type = TaskType.UPDATE; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update.callback; - task.client_id = client_id; - task.data_manager = manager; + if (max_task_time != 0) { + timeout_id = Timeout.add_seconds (max_task_time, () => { + cancellable.cancel (); + timeout_id = 0; + return false; + }); + } - update_queues[priority].push_tail (task); + var cursor = yield conn.query_async (sparql, cancellable); - sched (); + if (timeout_id != 0) + GLib.Source.remove (timeout_id); - yield; + var task = new CursorTask (cursor); + task.thread_func = in_thread; + task.callback = sparql_query.callback; - n_updates--; + try { + cursor_pool.add (task); + } catch (Error e) { + // Ignore harmless error + } + + yield; - if (task.error != null) { + if (task.error != null) throw task.error; - } } - public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new UpdateTask (); - task.type = TaskType.UPDATE_BLANK; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update_blank.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[priority].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + yield conn.update_async (sparql, priority, cancellable); n_updates--; - - if (task.error != null) { - throw task.error; - } - - return task.blank_nodes; } - public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new TurtleTask (); - task.type = TaskType.TURTLE; - task.path = file.get_path (); - task.callback = queue_turtle_import.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[Priority.TURTLE].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + var nodes = yield conn.update_blank_async (sparql, priority, cancellable); n_updates--; - if (task.error != null) { - throw task.error; - } + return nodes; } - public uint get_queue_size () { - uint result = 0; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - result += query_queues[i].get_length (); - result += update_queues[i].get_length (); - } - return result; + public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + yield conn.load_async (file, cancellable); + n_updates--; } public static void unreg_batches (string client_id) { - unowned List list, cur; - unowned Queue queue; - - for (int i = 0; i < running_tasks.length; i++) { - unowned QueryTask task = running_tasks[i] as QueryTask; - if (task != null && task.client_id == client_id && task.cancellable != null) { - task.cancellable.cancel (); - } - } + Cancellable cancellable = client_cancellables.lookup (client_id); - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - queue = query_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } - - queue = update_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } + if (cancellable != null) { + cancellable.cancel (); + client_cancellables.remove (client_id); } - - sched (); } public static async void pause () { Tracker.Store.active = false; - if (n_queries_running > 0 || update_running) { - active_callback = pause.callback; - yield; - active_callback = null; - } - - if (AtomicInt.get (ref checkpointing) != 0) { - // this will wait for checkpointing to finish - checkpoint_pool = null; - try { - checkpoint_pool = new ThreadPool (checkpoint_dispatch_cb, 1, true); - } catch (Error e) { - warning (e.message); - } - } - - if (active) { - sched (); - } + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); } public static void resume () { Tracker.Store.active = true; - - sched (); } private static void on_statements_committed () { -- cgit v1.2.1 From dbce390ac0b645f45b0dd442ccaea6c7b80f480e Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Mon, 16 Jul 2018 21:35:39 +0200 Subject: libtracker-direct: Remove unused variables --- src/libtracker-direct/tracker-direct.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c index 572fdb0c8..c3ff4d004 100644 --- a/src/libtracker-direct/tracker-direct.c +++ b/src/libtracker-direct/tracker-direct.c @@ -522,7 +522,6 @@ tracker_direct_connection_update (TrackerSparqlConnection *self, TrackerDirectConnectionPrivate *priv; TrackerDirectConnection *conn; TrackerData *data; - GTask *task; conn = TRACKER_DIRECT_CONNECTION (self); priv = tracker_direct_connection_get_instance_private (conn); @@ -624,7 +623,7 @@ tracker_direct_connection_update_array_async (TrackerSparqlConnection *self, { GTask *task; gchar **copy; - gint i = 0, idx = 0; + gint i = 0; copy = g_new0 (gchar*, n_updates + 1); -- cgit v1.2.1 From add37bb9e8b79da1d01442037ea155421857465e Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Mon, 16 Jul 2018 21:36:20 +0200 Subject: Update meson build system for libtracker-direct changes --- src/libtracker-direct/meson.build | 14 +------------- src/libtracker-sparql-backend/meson.build | 2 ++ src/tracker-store/meson.build | 3 ++- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/libtracker-direct/meson.build b/src/libtracker-direct/meson.build index 8cf018c69..0c515eaa7 100644 --- a/src/libtracker-direct/meson.build +++ b/src/libtracker-direct/meson.build @@ -1,18 +1,6 @@ libtracker_direct = static_library('tracker-direct', - 'tracker-direct.vala', - 'tracker-namespace.vala', - '../libtracker-common/libtracker-common.vapi', - '../libtracker-data/libtracker-data.vapi', + 'tracker-direct.c', c_args: tracker_c_args, - vala_args: [ - '--debug', - '--pkg', 'posix', - # FIXME: Meson has code to add --target-glib automatically, but it - # doesn't seem to work here. - '--target-glib', glib_required, - ], - # This doesn't depend on tracker_common_dep because of - # https://github.com/mesonbuild/meson/issues/671 dependencies: [ glib, gio, tracker_data_dep ], include_directories: [commoninc, configinc, srcinc], ) diff --git a/src/libtracker-sparql-backend/meson.build b/src/libtracker-sparql-backend/meson.build index b04cdaf0d..d76143306 100644 --- a/src/libtracker-sparql-backend/meson.build +++ b/src/libtracker-sparql-backend/meson.build @@ -1,5 +1,7 @@ libtracker_sparql = library('tracker-sparql-' + tracker_api_version, '../libtracker-common/libtracker-common.vapi', + '../libtracker-data/libtracker-data.vapi', + '../libtracker-direct/tracker-direct.vapi', 'tracker-backend.vala', soversion: soversion, diff --git a/src/tracker-store/meson.build b/src/tracker-store/meson.build index 91d7a335b..1c42da3a3 100644 --- a/src/tracker-store/meson.build +++ b/src/tracker-store/meson.build @@ -16,6 +16,7 @@ tracker_store_sources = [ 'tracker-writeback.vapi', '../libtracker-common/libtracker-common.vapi', '../libtracker-data/libtracker-data.vapi', + '../libtracker-direct/tracker-direct.vapi', ] tracker_store = executable('tracker-store', @@ -25,7 +26,7 @@ tracker_store = executable('tracker-store', ], vala_args: [ '--pkg', 'posix' ], dependencies: [ - tracker_common_dep, tracker_data_dep, + tracker_common_dep, tracker_data_dep, tracker_sparql_direct_dep, gio_unix ], install: true, -- cgit v1.2.1 From 85e58a5219b66f77cbb534c0de39563ff50ec9a9 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Tue, 17 Jul 2018 17:48:07 +0200 Subject: libtracker-data: Fetch shared connection on failure to create one On stress situations (tests/functional-tests/ipc/test-bus-query-cancellation is known to trigger this) there may be too many opened FDs all around (dbus related, fds passed for resultsets, DB connections, ...) to keep it all together. In those cases, we may attempt to create an extra interface to cater for the incoming request, but it will just fail underneath us. In those cases it is preferrable to fetch a connection from the pool and have it shared across threads than tripping into critical warnings and undefined behavior. --- src/libtracker-data/tracker-db-manager.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c index 33e178f4f..5e5b7299c 100644 --- a/src/libtracker-data/tracker-db-manager.c +++ b/src/libtracker-data/tracker-db-manager.c @@ -1055,14 +1055,20 @@ tracker_db_manager_get_db_interface (TrackerDBManager *db_manager) interface = tracker_db_manager_create_db_interface (db_manager, TRUE, &internal_error); - if (internal_error) { - g_critical ("Error opening database: %s", internal_error->message); - g_error_free (internal_error); - g_async_queue_unlock (db_manager->interfaces); - return NULL; + if (interface) { + tracker_data_manager_init_fts (interface, FALSE); + } else { + if (g_async_queue_length_unlocked (db_manager->interfaces) == 0) { + g_critical ("Error opening database: %s", internal_error->message); + g_error_free (internal_error); + g_async_queue_unlock (db_manager->interfaces); + return NULL; + } else { + g_error_free (internal_error); + /* Fetch the first interface back. Oh well */ + interface = g_async_queue_try_pop_unlocked (db_manager->interfaces); + } } - - tracker_data_manager_init_fts (interface, FALSE); } g_async_queue_push_unlocked (db_manager->interfaces, interface); @@ -1102,7 +1108,8 @@ tracker_db_manager_get_writable_db_interface (TrackerDBManager *db_manager) TrackerDBInterface * tracker_db_manager_get_wal_db_interface (TrackerDBManager *db_manager) { - if (db_manager->db.wal_iface == NULL) { + if (db_manager->db.wal_iface == NULL && + (db_manager->flags & TRACKER_DB_MANAGER_READONLY) == 0) { db_manager->db.wal_iface = init_writable_db_interface (db_manager); } -- cgit v1.2.1 From d5c79e87bc1284f925e2c155f03312da7fd0a4ab Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Tue, 17 Jul 2018 17:56:25 +0200 Subject: libtracker-direct: Avoid WAL checkpoint on readonly connections There is no point in doing WAL checkpoints from a readonly connection, so avoid it altogether. --- src/libtracker-direct/tracker-direct.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c index c3ff4d004..4b8d74a73 100644 --- a/src/libtracker-direct/tracker-direct.c +++ b/src/libtracker-direct/tracker-direct.c @@ -217,6 +217,9 @@ wal_hook (TrackerDBInterface *iface, TrackerDataManager *data_manager = tracker_db_interface_get_user_data (iface); TrackerDBInterface *wal_iface = tracker_data_manager_get_wal_db_interface (data_manager); + if (!wal_iface) + return; + if (n_pages >= 10000) { /* Do immediate checkpointing (blocking updates) to * prevent excessive WAL file growth. @@ -383,7 +386,8 @@ tracker_direct_connection_finalize (GObject *object) if (priv->data_manager) { TrackerDBInterface *wal_iface; wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); - tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); } g_clear_object (&priv->store); @@ -879,5 +883,6 @@ tracker_direct_connection_sync (TrackerDirectConnection *conn) set_up_thread_pools (conn, NULL); wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); - tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); } -- cgit v1.2.1 From 9329e25472514ebc57dc34ec1351b34966eb6503 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Fri, 20 Jul 2018 17:39:25 +0200 Subject: libtracker-direct: Use specific include This is an internal header, so we don't need pointing to the general tracker-data.h header. Fixes build from scratch on meson as some files pulled from there are unexpectedly in the build directory. --- src/libtracker-direct/tracker-direct.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h index d1ebfd6ee..0b66fdb1f 100644 --- a/src/libtracker-direct/tracker-direct.h +++ b/src/libtracker-direct/tracker-direct.h @@ -22,7 +22,7 @@ #define __TRACKER_LOCAL_CONNECTION_H__ #include -#include +#include #define TRACKER_TYPE_DIRECT_CONNECTION (tracker_direct_connection_get_type()) #define TRACKER_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnection)) -- cgit v1.2.1 From 58cc6255576b1c436545293afbfaf0cb79578348 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 21 Jul 2018 17:22:07 +0200 Subject: libtracker-miner: Coalesce 2 CREATED events It is not even clear this is possible in real life cases, however the standalone tracker-file-notifier tests fall into this (due to IRI not being ever set, still this is an async op). In the case of 2 consecutive CREATED events on the same file, it would be dealt with in TrackerMinerFS as CREATED+ UPDATED. This was already harmless, but we can do better and swallow one of such events. --- src/libtracker-miner/tracker-miner-fs.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c index e8c9de85e..0fbf7c60d 100644 --- a/src/libtracker-miner/tracker-miner-fs.c +++ b/src/libtracker-miner/tracker-miner-fs.c @@ -775,7 +775,8 @@ queue_event_coalesce (const QueueEvent *first, *replacement = NULL; if (first->type == TRACKER_MINER_FS_EVENT_CREATED) { - if (second->type == TRACKER_MINER_FS_EVENT_UPDATED && + if ((second->type == TRACKER_MINER_FS_EVENT_UPDATED || + second->type == TRACKER_MINER_FS_EVENT_CREATED) && first->file == second->file) { return QUEUE_ACTION_DELETE_SECOND; } else if (second->type == TRACKER_MINER_FS_EVENT_MOVED && -- cgit v1.2.1 From e4e7766e2f0baa9d69de61993200680afa914616 Mon Sep 17 00:00:00 2001 From: Carlos Garnacho Date: Sat, 21 Jul 2018 17:26:24 +0200 Subject: tests: Adapt TrackerFileNotifier tests to internal behavioral change Before commit 68381c1dd, ensure_parents() would stop before the indexing root in the assumption that it was already notified upon, that commit made it so those folders are ensured to be notified too. This internal behavioral change is normally evened out by TrackerMinerFS, but shows at the tests for the internal TrackerFileNotifier object as there is nothing there to set the IRI for those files. --- tests/libtracker-miner/tracker-file-notifier-test.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/libtracker-miner/tracker-file-notifier-test.c b/tests/libtracker-miner/tracker-file-notifier-test.c index fd8264467..1bdc16085 100644 --- a/tests/libtracker-miner/tracker-file-notifier-test.c +++ b/tests/libtracker-miner/tracker-file-notifier-test.c @@ -668,6 +668,7 @@ test_file_notifier_monitor_updates_non_recursive (TestCommonContext *fixture, { OPERATION_CREATE, "non-recursive/bbb", NULL } }; FilesystemOperation expected_results2[] = { + { OPERATION_CREATE, "non-recursive", NULL }, { OPERATION_UPDATE, "non-recursive/bbb", NULL }, { OPERATION_CREATE, "non-recursive/ccc", NULL }, { OPERATION_UPDATE, "non-recursive/ccc", NULL } @@ -716,6 +717,7 @@ test_file_notifier_monitor_updates_recursive (TestCommonContext *fixture, { OPERATION_CREATE, "recursive/bbb", NULL } }; FilesystemOperation expected_results2[] = { + { OPERATION_CREATE, "recursive", NULL }, { OPERATION_CREATE, "recursive/folder", NULL }, { OPERATION_CREATE, "recursive/folder/aaa", NULL }, { OPERATION_UPDATE, "recursive/bbb", NULL }, -- cgit v1.2.1