diff options
author | Carlos Garnacho <carlosg@gnome.org> | 2018-07-21 17:52:36 +0200 |
---|---|---|
committer | Carlos Garnacho <carlosg@gnome.org> | 2018-07-21 17:52:36 +0200 |
commit | da971357f43f1f1168950cbe980e1d95561ca891 (patch) | |
tree | b9d1e6c51b435ecfd7fc9076999ef156eec44b92 | |
parent | 2f7ade534a452153ab9f0faa10e403b30e845c75 (diff) | |
parent | e4e7766e2f0baa9d69de61993200680afa914616 (diff) | |
download | tracker-da971357f43f1f1168950cbe980e1d95561ca891.tar.gz |
Merge branch 'wip/carlosg/direct-rewrite'
32 files changed, 1586 insertions, 1525 deletions
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi index a9e446cd7..f0300ae73 100644 --- a/src/libtracker-data/libtracker-data.vapi +++ b/src/libtracker-data/libtracker-data.vapi @@ -78,7 +78,6 @@ namespace Tracker { [CCode (cprefix = "TRACKER_DB_MANAGER_", cheader_filename = "libtracker-data/tracker-db-manager.h")] public enum DBManagerFlags { FORCE_REINDEX, - REMOVE_CACHE, REMOVE_ALL, READONLY, DO_NOT_CHECK_ONTOLOGY, @@ -181,17 +180,10 @@ namespace Tracker { } public delegate void StatementCallback (int graph_id, string? graph, int subject_id, string subject, int predicate_id, int object_id, string object, GLib.PtrArray rdf_types); - public delegate void CommitCallback (Data.Update.CommitType commit_type); + public delegate void CommitCallback (); [CCode (lower_case_cprefix="tracker_data_", cname = "TrackerData", cheader_filename = "libtracker-data/tracker-data-query.h,libtracker-data/tracker-data-update.h")] public class Data.Update : GLib.Object { - [CCode (cprefix = "TRACKER_DATA_COMMIT_")] - public enum CommitType { - REGULAR, - BATCH, - BATCH_LAST - } - public void begin_db_transaction (); public void commit_db_transaction (); public void begin_transaction () throws DBInterfaceError; @@ -200,7 +192,6 @@ namespace Tracker { public void update_sparql (string update) throws Sparql.Error; public GLib.Variant update_sparql_blank (string update) throws Sparql.Error; public void load_turtle_file (GLib.File file) throws Sparql.Error; - public void notify_transaction (CommitType commit_type); public void delete_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; public void update_statement (string? graph, string subject, string predicate, string? object) throws Sparql.Error, DateError; public void insert_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError; diff --git a/src/libtracker-data/tracker-class.c b/src/libtracker-data/tracker-class.c index 6c9754e70..f216c8e94 100644 --- a/src/libtracker-data/tracker-class.c +++ b/src/libtracker-data/tracker-class.c @@ -49,27 +49,6 @@ struct _TrackerClassPrivate { GArray *last_super_classes; TrackerOntologies *ontologies; - - struct { - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } pending; - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } ready; - } deletes; - struct { - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } pending; - struct { - GArray *sub_pred_ids; - GArray *obj_graph_ids; - } ready; - } inserts; }; static void class_finalize (GObject *object); @@ -99,16 +78,6 @@ tracker_class_init (TrackerClass *service) priv->last_domain_indexes = NULL; priv->last_super_classes = NULL; - priv->deletes.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->deletes.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - - priv->inserts.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - priv->inserts.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); - /* Make GET_PRIV working */ service->priv = priv; } @@ -126,16 +95,6 @@ class_finalize (GObject *object) g_array_free (priv->super_classes, TRUE); g_array_free (priv->domain_indexes, TRUE); - g_array_free (priv->deletes.pending.sub_pred_ids, TRUE); - g_array_free (priv->deletes.pending.obj_graph_ids, TRUE); - g_array_free (priv->deletes.ready.sub_pred_ids, TRUE); - g_array_free (priv->deletes.ready.obj_graph_ids, TRUE); - - g_array_free (priv->inserts.pending.sub_pred_ids, TRUE); - g_array_free (priv->inserts.pending.obj_graph_ids, TRUE); - g_array_free (priv->inserts.ready.sub_pred_ids, TRUE); - g_array_free (priv->inserts.ready.obj_graph_ids, TRUE); - if (priv->last_domain_indexes) { g_array_free (priv->last_domain_indexes, TRUE); } @@ -511,244 +470,6 @@ tracker_class_set_db_schema_changed (TrackerClass *service, priv->db_schema_changed = value; } -gboolean -tracker_class_has_insert_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE); - - priv = GET_PRIV (class); - - return (priv->inserts.ready.sub_pred_ids->len > 0); -} - -gboolean -tracker_class_has_delete_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE); - - priv = GET_PRIV (class); - - return (priv->deletes.ready.sub_pred_ids->len > 0); -} - -void -tracker_class_foreach_insert_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data) -{ - guint i; - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - g_return_if_fail (foreach != NULL); - - priv = GET_PRIV (class); - - for (i = 0; i < priv->inserts.ready.sub_pred_ids->len; i++) { - gint graph_id, subject_id, pred_id, object_id; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = g_array_index (priv->inserts.ready.sub_pred_ids, gint64, i); - obj_graph_id = g_array_index (priv->inserts.ready.obj_graph_ids, gint64, i); - - pred_id = sub_pred_id & 0xffffffff; - subject_id = sub_pred_id >> 32; - graph_id = obj_graph_id & 0xffffffff; - object_id = obj_graph_id >> 32; - - foreach (graph_id, subject_id, pred_id, object_id, user_data); - } -} - -void -tracker_class_foreach_delete_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data) -{ - guint i; - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - g_return_if_fail (foreach != NULL); - - priv = GET_PRIV (class); - - for (i = 0; i < priv->deletes.ready.sub_pred_ids->len; i++) { - gint graph_id, subject_id, pred_id, object_id; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = g_array_index (priv->deletes.ready.sub_pred_ids, gint64, i); - obj_graph_id = g_array_index (priv->deletes.ready.obj_graph_ids, gint64, i); - - pred_id = sub_pred_id & 0xffffffff; - subject_id = sub_pred_id >> 32; - graph_id = obj_graph_id & 0xffffffff; - object_id = obj_graph_id >> 32; - - foreach (graph_id, subject_id, pred_id, object_id, user_data); - } -} - -void -tracker_class_reset_ready_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - - priv = GET_PRIV (class); - - /* Reset */ - g_array_set_size (priv->deletes.ready.sub_pred_ids, 0); - g_array_set_size (priv->deletes.ready.obj_graph_ids, 0); - - g_array_set_size (priv->inserts.ready.sub_pred_ids, 0); - g_array_set_size (priv->inserts.ready.obj_graph_ids, 0); - -} - -void -tracker_class_reset_pending_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - - priv = GET_PRIV (class); - - /* Reset */ - g_array_set_size (priv->deletes.pending.sub_pred_ids, 0); - g_array_set_size (priv->deletes.pending.obj_graph_ids, 0); - - g_array_set_size (priv->inserts.pending.sub_pred_ids, 0); - g_array_set_size (priv->inserts.pending.obj_graph_ids, 0); - -} - -void -tracker_class_transact_events (TrackerClass *class) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - /* Move */ - g_array_insert_vals (priv->deletes.ready.obj_graph_ids, - priv->deletes.ready.obj_graph_ids->len, - priv->deletes.pending.obj_graph_ids->data, - priv->deletes.pending.obj_graph_ids->len); - - g_array_insert_vals (priv->deletes.ready.sub_pred_ids, - priv->deletes.ready.sub_pred_ids->len, - priv->deletes.pending.sub_pred_ids->data, - priv->deletes.pending.sub_pred_ids->len); - - /* Reset */ - g_array_set_size (priv->deletes.pending.sub_pred_ids, 0); - g_array_set_size (priv->deletes.pending.obj_graph_ids, 0); - - - /* Move */ - g_array_insert_vals (priv->inserts.ready.obj_graph_ids, - priv->inserts.ready.obj_graph_ids->len, - priv->inserts.pending.obj_graph_ids->data, - priv->inserts.pending.obj_graph_ids->len); - - g_array_insert_vals (priv->inserts.ready.sub_pred_ids, - priv->inserts.ready.sub_pred_ids->len, - priv->inserts.pending.sub_pred_ids->data, - priv->inserts.pending.sub_pred_ids->len); - - /* Reset */ - g_array_set_size (priv->inserts.pending.sub_pred_ids, 0); - g_array_set_size (priv->inserts.pending.obj_graph_ids, 0); - -} - -static void -insert_vals_into_arrays (GArray *sub_pred_ids, - GArray *obj_graph_ids, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - gint i, j, k; - gint64 tmp; - gint64 sub_pred_id; - gint64 obj_graph_id; - - sub_pred_id = (gint64) subject_id; - sub_pred_id = sub_pred_id << 32 | pred_id; - obj_graph_id = (gint64) object_id; - obj_graph_id = obj_graph_id << 32 | graph_id; - - i = 0; - j = sub_pred_ids->len - 1; - - while (j - i > 0) { - k = (i + j) / 2; - tmp = g_array_index (sub_pred_ids, gint64, k); - if (tmp == sub_pred_id) { - i = k + 1; - break; - } else if (tmp > sub_pred_id) - j = k; - else - i = k + 1; - } - - g_array_insert_val (sub_pred_ids, i, sub_pred_id); - g_array_insert_val (obj_graph_ids, i, obj_graph_id); -} - -void -tracker_class_add_insert_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - insert_vals_into_arrays (priv->inserts.pending.sub_pred_ids, - priv->inserts.pending.obj_graph_ids, - graph_id, - subject_id, - pred_id, - object_id); -} - -void -tracker_class_add_delete_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id) -{ - TrackerClassPrivate *priv; - - g_return_if_fail (TRACKER_IS_CLASS (class)); - priv = GET_PRIV (class); - - insert_vals_into_arrays (priv->deletes.pending.sub_pred_ids, - priv->deletes.pending.obj_graph_ids, - graph_id, - subject_id, - pred_id, - object_id); -} - void tracker_class_set_ontologies (TrackerClass *class, TrackerOntologies *ontologies) diff --git a/src/libtracker-data/tracker-class.h b/src/libtracker-data/tracker-class.h index c05634102..9ac46acd8 100644 --- a/src/libtracker-data/tracker-class.h +++ b/src/libtracker-data/tracker-class.h @@ -51,12 +51,6 @@ struct _TrackerClassClass { GObjectClass parent_class; }; -typedef void (*TrackerEventsForeach) (gint graph_id, - gint subject_id, - gint pred_id, - gint object_id, - gpointer user_data); - GType tracker_class_get_type (void) G_GNUC_CONST; TrackerClass * tracker_class_new (gboolean use_gvdb); const gchar * tracker_class_get_uri (TrackerClass *service); @@ -96,29 +90,6 @@ void tracker_class_set_notify (TrackerClass *ser void tracker_class_set_ontologies (TrackerClass *class, TrackerOntologies *ontologies); -/* For signals API */ -void tracker_class_foreach_delete_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data); -void tracker_class_foreach_insert_event (TrackerClass *class, - TrackerEventsForeach foreach, - gpointer user_data); -gboolean tracker_class_has_insert_events (TrackerClass *class); -gboolean tracker_class_has_delete_events (TrackerClass *class); -void tracker_class_reset_ready_events (TrackerClass *class); -void tracker_class_reset_pending_events (TrackerClass *class); -void tracker_class_transact_events (TrackerClass *class); -void tracker_class_add_delete_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id); -void tracker_class_add_insert_event (TrackerClass *class, - gint graph_id, - gint subject_id, - gint pred_id, - gint object_id); - G_END_DECLS #endif /* __LIBTRACKER_DATA_CLASS_H__ */ diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c index eb58caafd..444e5efee 100644 --- a/src/libtracker-data/tracker-data-update.c +++ b/src/libtracker-data/tracker-data-update.c @@ -3624,21 +3624,16 @@ tracker_data_commit_transaction (TrackerData *data, g_hash_table_remove_all (data->update_buffer.resources_by_id); g_hash_table_remove_all (data->update_buffer.resource_cache); - data->in_journal_replay = FALSE; -} - -void -tracker_data_notify_transaction (TrackerData *data, - TrackerDataCommitType commit_type) -{ - if (data->commit_callbacks) { + if (!data->in_journal_replay && data->commit_callbacks) { guint n; for (n = 0; n < data->commit_callbacks->len; n++) { TrackerCommitDelegate *delegate; delegate = g_ptr_array_index (data->commit_callbacks, n); - delegate->callback (commit_type, delegate->user_data); + delegate->callback (delegate->user_data); } } + + data->in_journal_replay = FALSE; } void @@ -3679,7 +3674,7 @@ tracker_data_rollback_transaction (TrackerData *data) for (n = 0; n < data->rollback_callbacks->len; n++) { TrackerCommitDelegate *delegate; delegate = g_ptr_array_index (data->rollback_callbacks, n); - delegate->callback (TRUE, delegate->user_data); + delegate->callback (delegate->user_data); } } } diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h index 75444875c..97fdcd6a9 100644 --- a/src/libtracker-data/tracker-data-update.h +++ b/src/libtracker-data/tracker-data-update.h @@ -47,12 +47,6 @@ typedef struct _TrackerDataClass TrackerDataClass; typedef struct _TrackerData TrackerData; typedef struct _TrackerDataClass TrackerDataClass; -typedef enum { - TRACKER_DATA_COMMIT_REGULAR, - TRACKER_DATA_COMMIT_BATCH, - TRACKER_DATA_COMMIT_BATCH_LAST -} TrackerDataCommitType; - typedef struct _TrackerData TrackerData; typedef struct _TrackerData TrackerDataUpdate; @@ -65,8 +59,7 @@ typedef void (*TrackerStatementCallback) (gint graph_id, const gchar *object, GPtrArray *rdf_types, gpointer user_data); -typedef void (*TrackerCommitCallback) (TrackerDataCommitType commit_type, - gpointer user_data); +typedef void (*TrackerCommitCallback) (gpointer user_data); GQuark tracker_data_error_quark (void); @@ -110,8 +103,6 @@ void tracker_data_begin_transaction_for_replay (TrackerData * GError **error); void tracker_data_commit_transaction (TrackerData *data, GError **error); -void tracker_data_notify_transaction (TrackerData *data, - TrackerDataCommitType commit_type); void tracker_data_rollback_transaction (TrackerData *data); void tracker_data_update_sparql (TrackerData *data, const gchar *update, diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c index 33e178f4f..5e5b7299c 100644 --- a/src/libtracker-data/tracker-db-manager.c +++ b/src/libtracker-data/tracker-db-manager.c @@ -1055,14 +1055,20 @@ tracker_db_manager_get_db_interface (TrackerDBManager *db_manager) interface = tracker_db_manager_create_db_interface (db_manager, TRUE, &internal_error); - if (internal_error) { - g_critical ("Error opening database: %s", internal_error->message); - g_error_free (internal_error); - g_async_queue_unlock (db_manager->interfaces); - return NULL; + if (interface) { + tracker_data_manager_init_fts (interface, FALSE); + } else { + if (g_async_queue_length_unlocked (db_manager->interfaces) == 0) { + g_critical ("Error opening database: %s", internal_error->message); + g_error_free (internal_error); + g_async_queue_unlock (db_manager->interfaces); + return NULL; + } else { + g_error_free (internal_error); + /* Fetch the first interface back. Oh well */ + interface = g_async_queue_try_pop_unlocked (db_manager->interfaces); + } } - - tracker_data_manager_init_fts (interface, FALSE); } g_async_queue_push_unlocked (db_manager->interfaces, interface); @@ -1102,7 +1108,8 @@ tracker_db_manager_get_writable_db_interface (TrackerDBManager *db_manager) TrackerDBInterface * tracker_db_manager_get_wal_db_interface (TrackerDBManager *db_manager) { - if (db_manager->db.wal_iface == NULL) { + if (db_manager->db.wal_iface == NULL && + (db_manager->flags & TRACKER_DB_MANAGER_READONLY) == 0) { db_manager->db.wal_iface = init_writable_db_interface (db_manager); } diff --git a/src/libtracker-data/tracker-db-manager.h b/src/libtracker-data/tracker-db-manager.h index 95a71cfdb..a41b2ff53 100644 --- a/src/libtracker-data/tracker-db-manager.h +++ b/src/libtracker-data/tracker-db-manager.h @@ -35,12 +35,10 @@ G_BEGIN_DECLS typedef enum { TRACKER_DB_MANAGER_FORCE_REINDEX = 1 << 1, - TRACKER_DB_MANAGER_REMOVE_CACHE = 1 << 2, - /* 1 << 3 Was low mem mode */ - TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 4, - TRACKER_DB_MANAGER_READONLY = 1 << 5, - TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 6, - TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 7, + TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 2, + TRACKER_DB_MANAGER_READONLY = 1 << 3, + TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 4, + TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 5, } TrackerDBManagerFlags; typedef struct _TrackerDBManager TrackerDBManager; diff --git a/src/libtracker-direct/.gitignore b/src/libtracker-direct/.gitignore deleted file mode 100644 index b55dd251a..000000000 --- a/src/libtracker-direct/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -tracker-namespace.c -tracker-direct.[ch] -tracker-direct*.vapi diff --git a/src/libtracker-direct/Makefile.am b/src/libtracker-direct/Makefile.am index 5e9a42f46..7960bae6c 100644 --- a/src/libtracker-direct/Makefile.am +++ b/src/libtracker-direct/Makefile.am @@ -1,16 +1,5 @@ noinst_LTLIBRARIES = libtracker-direct.la -AM_VALAFLAGS = \ - --includedir=libtracker-direct \ - --header tracker-direct.h \ - --vapi tracker-direct.vapi \ - --pkg gio-2.0 \ - $(BUILD_VALAFLAGS) \ - $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ - $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \ - $(top_srcdir)/src/libtracker-common/libtracker-common.vapi \ - $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi - AM_CPPFLAGS = \ $(BUILD_VALACFLAGS) \ -I$(top_srcdir)/src \ @@ -19,8 +8,7 @@ AM_CPPFLAGS = \ $(LIBTRACKER_DIRECT_CFLAGS) libtracker_direct_la_SOURCES = \ - tracker-namespace.vala \ - tracker-direct.vala + tracker-direct.c libtracker_direct_la_LIBADD = \ $(top_builddir)/src/libtracker-data/libtracker-data.la \ @@ -30,7 +18,4 @@ libtracker_direct_la_LIBADD = \ noinst_HEADERS = \ tracker-direct.h -BUILT_SOURCES = \ - libtracker_direct_la_vala.stamp - EXTRA_DIST = meson.build diff --git a/src/libtracker-direct/meson.build b/src/libtracker-direct/meson.build index 8cf018c69..0c515eaa7 100644 --- a/src/libtracker-direct/meson.build +++ b/src/libtracker-direct/meson.build @@ -1,18 +1,6 @@ libtracker_direct = static_library('tracker-direct', - 'tracker-direct.vala', - 'tracker-namespace.vala', - '../libtracker-common/libtracker-common.vapi', - '../libtracker-data/libtracker-data.vapi', + 'tracker-direct.c', c_args: tracker_c_args, - vala_args: [ - '--debug', - '--pkg', 'posix', - # FIXME: Meson has code to add --target-glib automatically, but it - # doesn't seem to work here. - '--target-glib', glib_required, - ], - # This doesn't depend on tracker_common_dep because of - # https://github.com/mesonbuild/meson/issues/671 dependencies: [ glib, gio, tracker_data_dep ], include_directories: [commoninc, configinc, srcinc], ) diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c new file mode 100644 index 000000000..4b8d74a73 --- /dev/null +++ b/src/libtracker-direct/tracker-direct.c @@ -0,0 +1,888 @@ +/* + * Copyright (C) 2010, Nokia <ivan.frade@nokia.com> + * Copyright (C) 2017, Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "config.h" + +#include "tracker-direct.h" +#include <libtracker-data/tracker-data.h> + +static TrackerDBManagerFlags default_flags = 0; + +typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate; + +struct _TrackerDirectConnectionPrivate +{ + TrackerSparqlConnectionFlags flags; + GFile *store; + GFile *journal; + GFile *ontology; + + TrackerNamespaceManager *namespace_manager; + TrackerDataManager *data_manager; + GMutex mutex; + + GThreadPool *update_thread; /* Contains 1 exclusive thread */ + GThreadPool *select_pool; + + guint initialized : 1; +}; + +enum { + PROP_0, + PROP_FLAGS, + PROP_STORE_LOCATION, + PROP_JOURNAL_LOCATION, + PROP_ONTOLOGY_LOCATION, + N_PROPS +}; + +static GParamSpec *props[N_PROPS] = { NULL }; + +typedef enum { + TASK_TYPE_QUERY, + TASK_TYPE_UPDATE, + TASK_TYPE_UPDATE_BLANK, + TASK_TYPE_TURTLE +} TaskType; + +typedef struct { + TaskType type; + union { + gchar *query; + GFile *turtle_file; + } data; +} TaskData; + +static void tracker_direct_connection_initable_iface_init (GInitableIface *iface); +static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface); + +G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection, + TRACKER_SPARQL_TYPE_CONNECTION, + G_ADD_PRIVATE (TrackerDirectConnection) + G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, + tracker_direct_connection_initable_iface_init) + G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE, + tracker_direct_connection_async_initable_iface_init)) + +static TaskData * +task_data_query_new (TaskType type, + const gchar *sparql) +{ + TaskData *data; + + g_assert (type != TASK_TYPE_TURTLE); + data = g_new0 (TaskData, 1); + data->type = type; + data->data.query = g_strdup (sparql); + + return data; +} + +static TaskData * +task_data_turtle_new (GFile *file) +{ + TaskData *data; + + data = g_new0 (TaskData, 1); + data->type = TASK_TYPE_TURTLE; + g_set_object (&data->data.turtle_file, file); + + return data; +} + +static void +task_data_free (TaskData *task) +{ + if (task->type == TASK_TYPE_TURTLE) + g_object_unref (task->data.turtle_file); + else + g_free (task->data.query); + + g_free (task); +} + +static void +update_thread_func (gpointer data, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + TrackerData *tracker_data; + GError *error = NULL; + gpointer retval = NULL; + GDestroyNotify destroy_notify = NULL; + + conn = user_data; + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + tracker_data = tracker_data_manager_get_data (priv->data_manager); + + switch (task_data->type) { + case TASK_TYPE_QUERY: + g_warning ("Queries don't go through this thread"); + break; + case TASK_TYPE_UPDATE: + tracker_data_update_sparql (tracker_data, task_data->data.query, &error); + break; + case TASK_TYPE_UPDATE_BLANK: + retval = tracker_data_update_sparql_blank (tracker_data, task_data->data.query, &error); + destroy_notify = (GDestroyNotify) g_variant_unref; + break; + case TASK_TYPE_TURTLE: + tracker_data_load_turtle_file (tracker_data, task_data->data.turtle_file, &error); + break; + } + + if (error) + g_task_return_error (task, error); + else if (retval) + g_task_return_pointer (task, retval, destroy_notify); + else + g_task_return_boolean (task, TRUE); + + g_mutex_unlock (&priv->mutex); +} + +static void +query_thread_pool_func (gpointer data, + gpointer user_data) +{ + TrackerSparqlCursor *cursor; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + GError *error = NULL; + + g_assert (task_data->type == TASK_TYPE_QUERY); + cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), + task_data->data.query, + g_task_get_cancellable (task), + &error); + if (cursor) + g_task_return_pointer (task, cursor, g_object_unref); + else + g_task_return_error (task, error); +} + +static void +wal_checkpoint (TrackerDBInterface *iface, + gboolean blocking) +{ + GError *error = NULL; + + g_debug ("Checkpointing database..."); + tracker_db_interface_sqlite_wal_checkpoint (iface, blocking, &error); + + if (error) { + g_warning ("Error in WAL checkpoint: %s", error->message); + g_error_free (error); + } + + g_debug ("Checkpointing complete"); +} + +static gpointer +wal_checkpoint_thread (gpointer data) +{ + TrackerDBInterface *wal_iface = data; + + wal_checkpoint (wal_iface, FALSE); + g_object_unref (wal_iface); + return NULL; +} + +static void +wal_hook (TrackerDBInterface *iface, + gint n_pages) +{ + TrackerDataManager *data_manager = tracker_db_interface_get_user_data (iface); + TrackerDBInterface *wal_iface = tracker_data_manager_get_wal_db_interface (data_manager); + + if (!wal_iface) + return; + + if (n_pages >= 10000) { + /* Do immediate checkpointing (blocking updates) to + * prevent excessive WAL file growth. + */ + wal_checkpoint (wal_iface, TRUE); + } else { + /* Defer non-blocking checkpoint to thread */ + g_thread_try_new ("wal-checkpoint", wal_checkpoint_thread, + g_object_ref (wal_iface), NULL); + } +} + +static gint +task_compare_func (GTask *a, + GTask *b, + gpointer user_data) +{ + return g_task_get_priority (b) - g_task_get_priority (a); +} + +static gboolean +set_up_thread_pools (TrackerDirectConnection *conn, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (conn); + + priv->select_pool = g_thread_pool_new (query_thread_pool_func, + conn, 16, FALSE, error); + if (!priv->select_pool) + return FALSE; + + priv->update_thread = g_thread_pool_new (update_thread_func, + conn, 1, TRUE, error); + if (!priv->update_thread) + return FALSE; + + g_thread_pool_set_sort_function (priv->select_pool, + (GCompareDataFunc) task_compare_func, + conn); + g_thread_pool_set_sort_function (priv->update_thread, + (GCompareDataFunc) task_compare_func, + conn); + return TRUE; +} + +static gboolean +tracker_direct_connection_initable_init (GInitable *initable, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES; + TrackerDBInterface *iface; + GHashTable *namespaces; + GHashTableIter iter; + gchar *prefix, *ns; + + conn = TRACKER_DIRECT_CONNECTION (initable); + priv = tracker_direct_connection_get_instance_private (conn); + + tracker_locale_sanity_check (); + + if (!set_up_thread_pools (conn, error)) + return FALSE; + + /* Init data manager */ + if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) + db_flags |= TRACKER_DB_MANAGER_READONLY; + + priv->data_manager = tracker_data_manager_new (db_flags | default_flags, priv->store, + priv->journal, priv->ontology, + FALSE, FALSE, 100, 100); + if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, error)) + return FALSE; + + /* Set up WAL hook on our connection */ + iface = tracker_data_manager_get_writable_db_interface (priv->data_manager); + tracker_db_interface_sqlite_wal_hook (iface, wal_hook); + + /* Initialize namespace manager */ + priv->namespace_manager = tracker_namespace_manager_new (); + namespaces = tracker_data_manager_get_namespaces (priv->data_manager); + g_hash_table_iter_init (&iter, namespaces); + + while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) { + tracker_namespace_manager_add_prefix (priv->namespace_manager, + prefix, ns); + } + + return TRUE; +} + +static void +tracker_direct_connection_initable_iface_init (GInitableIface *iface) +{ + iface->init = tracker_direct_connection_initable_init; +} + +static void +async_initable_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GError *error = NULL; + + if (!g_initable_init (G_INITABLE (source_object), cancellable, &error)) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); +} + +static void +tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + task = g_task_new (async_initable, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_run_in_thread (task, async_initable_thread_func); +} + +static gboolean +tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (res), error); +} + +static void +tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface) +{ + iface->init_async = tracker_direct_connection_async_initable_init_async; + iface->init_finish = tracker_direct_connection_async_initable_init_finish; +} + +static void +tracker_direct_connection_init (TrackerDirectConnection *conn) +{ +} + +static void +tracker_direct_connection_finalize (GObject *object) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + if (priv->data_manager) { + TrackerDBInterface *wal_iface; + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); + } + + g_clear_object (&priv->store); + g_clear_object (&priv->journal); + g_clear_object (&priv->ontology); + g_clear_object (&priv->data_manager); + + G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object); +} + +static void +tracker_direct_connection_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + priv->flags = g_value_get_enum (value); + break; + case PROP_STORE_LOCATION: + priv->store = g_value_dup_object (value); + break; + case PROP_JOURNAL_LOCATION: + priv->journal = g_value_dup_object (value); + break; + case PROP_ONTOLOGY_LOCATION: + priv->ontology = g_value_dup_object (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +tracker_direct_connection_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + g_value_set_enum (value, priv->flags); + break; + case PROP_STORE_LOCATION: + g_value_set_object (value, priv->store); + break; + case PROP_JOURNAL_LOCATION: + g_value_set_object (value, priv->journal); + break; + case PROP_ONTOLOGY_LOCATION: + g_value_set_object (value, priv->ontology); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static TrackerSparqlCursor * +tracker_direct_connection_query (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerSparqlQuery *query; + TrackerSparqlCursor *cursor; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + query = tracker_sparql_query_new (priv->data_manager, sparql); + cursor = TRACKER_SPARQL_CURSOR (tracker_sparql_query_execute_cursor (query, error)); + if (cursor) + tracker_sparql_cursor_set_connection (cursor, self); + g_mutex_unlock (&priv->mutex); + + return cursor; +} + +static void +tracker_direct_connection_query_async (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GError *error = NULL; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_QUERY, sparql), + (GDestroyNotify) task_data_free); + + if (!g_thread_pool_push (priv->select_pool, task, &error)) + g_task_return_error (task, error); +} + +static TrackerSparqlCursor * +tracker_direct_connection_query_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_update (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_update_sparql (data, sparql, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_update_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_update_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_boolean (G_TASK (res), error); +} + +static void +error_free (GError *error) +{ + if (error) + g_error_free (error); +} + +static void +update_array_async_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + gchar **updates = task_data; + gchar *concatenated; + GPtrArray *errors; + GError *error = NULL; + gint i; + + errors = g_ptr_array_new_with_free_func ((GDestroyNotify) error_free); + g_ptr_array_set_size (errors, g_strv_length (updates)); + + /* Fast path, perform everything as a single update */ + concatenated = g_strjoinv (" ", updates); + tracker_sparql_connection_update (source_object, concatenated, + g_task_get_priority (task), + cancellable, &error); + + if (!error) { + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); + return; + } + + /* Slow path, perform updates one by one */ + for (i = 0; updates[i]; i++) { + GError *err = NULL; + + err = g_ptr_array_index (errors, i); + tracker_sparql_connection_update (source_object, updates[i], + g_task_get_priority (task), + cancellable, &err); + } + + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); +} + +static void +tracker_direct_connection_update_array_async (TrackerSparqlConnection *self, + gchar **updates, + gint n_updates, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gchar **copy; + gint i = 0; + + copy = g_new0 (gchar*, n_updates + 1); + + for (i = 0; i < n_updates; i++) { + g_return_if_fail (updates[i] != NULL); + copy[i] = g_strdup (updates[i]); + } + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, copy, (GDestroyNotify) g_strfreev); + + g_task_run_in_thread (task, update_array_async_thread_func); +} + +static GPtrArray * +tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static GVariant * +tracker_direct_connection_update_blank (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + GVariant *blank_nodes; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + blank_nodes = tracker_data_update_sparql_blank (data, sparql, error); + g_mutex_unlock (&priv->mutex); + + return blank_nodes; +} + +static void +tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static GVariant * +tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_load (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_load_turtle_file (data, file, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_load_async (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_turtle_new (file), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_load_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_pointer (G_TASK (res), error); +} + +static TrackerNamespaceManager * +tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self)); + + return priv->namespace_manager; +} + +static void +tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) +{ + TrackerSparqlConnectionClass *sparql_connection_class; + GObjectClass *object_class; + + object_class = G_OBJECT_CLASS (klass); + sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass); + + object_class->finalize = tracker_direct_connection_finalize; + object_class->set_property = tracker_direct_connection_set_property; + object_class->get_property = tracker_direct_connection_get_property; + + sparql_connection_class->query = tracker_direct_connection_query; + sparql_connection_class->query_async = tracker_direct_connection_query_async; + sparql_connection_class->query_finish = tracker_direct_connection_query_finish; + sparql_connection_class->update = tracker_direct_connection_update; + sparql_connection_class->update_async = tracker_direct_connection_update_async; + sparql_connection_class->update_finish = tracker_direct_connection_update_finish; + sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async; + sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish; + sparql_connection_class->update_blank = tracker_direct_connection_update_blank; + sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async; + sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish; + sparql_connection_class->load = tracker_direct_connection_load; + sparql_connection_class->load_async = tracker_direct_connection_load_async; + sparql_connection_class->load_finish = tracker_direct_connection_load_finish; + sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager; + + props[PROP_FLAGS] = + g_param_spec_enum ("flags", + "Flags", + "Flags", + TRACKER_SPARQL_TYPE_CONNECTION_FLAGS, + TRACKER_SPARQL_CONNECTION_FLAGS_NONE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_STORE_LOCATION] = + g_param_spec_object ("store-location", + "Store location", + "Store location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_JOURNAL_LOCATION] = + g_param_spec_object ("journal-location", + "Journal location", + "Journal location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_ONTOLOGY_LOCATION] = + g_param_spec_object ("ontology-location", + "Ontology location", + "Ontology location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + + g_object_class_install_properties (object_class, N_PROPS, props); +} + +TrackerDirectConnection * +tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, + GFile *store, + GFile *journal, + GFile *ontology, + GError **error) +{ + g_return_val_if_fail (G_IS_FILE (store), NULL); + g_return_val_if_fail (G_IS_FILE (journal), NULL); + g_return_val_if_fail (G_IS_FILE (ontology), NULL); + g_return_val_if_fail (!error || !*error, NULL); + + return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION, + "flags", flags, + "store-location", store, + "journal-location", journal, + "ontology-location", ontology, + NULL); +} + +TrackerDataManager * +tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (conn); + return priv->data_manager; +} + +void +tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags) +{ + default_flags = flags; +} + +void +tracker_direct_connection_sync (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDBInterface *wal_iface; + + priv = tracker_direct_connection_get_instance_private (conn); + + if (!priv->data_manager) + return; + + /* Wait for pending updates. */ + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + /* Selects are less important, readonly interfaces won't be bothersome */ + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + set_up_thread_pools (conn, NULL); + + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); +} diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h new file mode 100644 index 000000000..0b66fdb1f --- /dev/null +++ b/src/libtracker-direct/tracker-direct.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2010, Nokia <ivan.frade@nokia.com> + * Copyright (C) 2017, Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __TRACKER_LOCAL_CONNECTION_H__ +#define __TRACKER_LOCAL_CONNECTION_H__ + +#include <libtracker-sparql/tracker-sparql.h> +#include <libtracker-data/tracker-data-manager.h> + +#define TRACKER_TYPE_DIRECT_CONNECTION (tracker_direct_connection_get_type()) +#define TRACKER_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnection)) +#define TRACKER_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass)) +#define TRACKER_IS_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TRACKER_TYPE_DIRECT_CONNECTION)) +#define TRACKER_IS_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), TRACKER_TYPE_DIRECT_CONNECTION)) +#define TRACKER_DIRECT_CONNECTION_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass)) + +typedef struct _TrackerDirectConnection TrackerDirectConnection; +typedef struct _TrackerDirectConnectionClass TrackerDirectConnectionClass; + +struct _TrackerDirectConnectionClass +{ + TrackerSparqlConnectionClass parent_class; +}; + +struct _TrackerDirectConnection +{ + TrackerSparqlConnection parent_instance; +}; + +TrackerDirectConnection *tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, + GFile *store, + GFile *journal, + GFile *ontology, + GError **error); + +TrackerDataManager *tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn); + +void tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags); + +void tracker_direct_connection_sync (TrackerDirectConnection *conn); + +#endif /* __TRACKER_LOCAL_CONNECTION_H__ */ diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala deleted file mode 100644 index b149b6806..000000000 --- a/src/libtracker-direct/tracker-direct.vala +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Copyright (C) 2010, Nokia <ivan.frade@nokia.com> - * Copyright (C) 2017, Red Hat, Inc. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -public class Tracker.Direct.Connection : Tracker.Sparql.Connection, AsyncInitable, Initable { - File? database_loc; - File? journal_loc; - File? ontology_loc; - Sparql.ConnectionFlags flags; - - Data.Manager data_manager; - - // Mutex to hold datamanager - private Mutex mutex = Mutex (); - Thread<void*> thread; - - // Initialization stuff, both sync and async - private Mutex init_mutex = Mutex (); - private Cond init_cond = Cond (); - private bool initialized; - private Error init_error; - public SourceFunc init_callback; - - private AsyncQueue<Task> update_queue; - private NamespaceManager namespace_manager; - - [CCode (cname = "SHAREDIR")] - extern const string SHAREDIR; - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } - - abstract class Task { - public TaskType type; - public int priority; - public Cancellable? cancellable; - public SourceFunc callback; - public Error error; - } - - private class UpdateTask : Task { - public string sparql; - public Variant blank_nodes; - - private void set (TaskType type, string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable = null) { - this.type = type; - this.sparql = sparql; - this.priority = priority; - this.cancellable = cancellable; - } - - public UpdateTask (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) { - this.set (TaskType.UPDATE, sparql, priority, cancellable); - } - - public UpdateTask.blank (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) { - this.set (TaskType.UPDATE_BLANK, sparql, priority, cancellable); - } - } - - private class TurtleTask : Task { - public File file; - - public TurtleTask (File file, Cancellable? cancellable) { - this.type = TaskType.TURTLE; - this.file = file; - this.priority = Priority.DEFAULT; - this.cancellable = cancellable; - } - } - - static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static void wal_checkpoint_on_thread (DBInterface iface) { - new Thread<void*> ("wal-checkpoint", () => { - wal_checkpoint (iface, false); - return null; - }); - } - - static void wal_hook (DBInterface iface, int n_pages) { - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000) { - wal_checkpoint_on_thread (wal_iface); - } - } - - private void* thread_func () { - init_mutex.lock (); - - try { - Locale.sanity_check (); - DBManagerFlags db_flags = DBManagerFlags.ENABLE_MUTEXES; - if ((flags & Sparql.ConnectionFlags.READONLY) != 0) - db_flags |= DBManagerFlags.READONLY; - - data_manager = new Data.Manager (db_flags, - database_loc, journal_loc, ontology_loc, - false, false, 100, 100); - data_manager.init (); - - var iface = data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - } catch (Error e) { - init_error = e; - } finally { - if (init_callback != null) { - init_callback (); - } else { - initialized = true; - init_cond.signal (); - init_mutex.unlock (); - } - } - - while (true) { - var task = update_queue.pop(); - - try { - switch (task.type) { - case TaskType.UPDATE: - UpdateTask update_task = (UpdateTask) task; - update (update_task.sparql, update_task.priority, update_task.cancellable); - break; - case TaskType.UPDATE_BLANK: - UpdateTask update_task = (UpdateTask) task; - update_task.blank_nodes = update_blank (update_task.sparql, update_task.priority, update_task.cancellable); - break; - case TaskType.TURTLE: - TurtleTask turtle_task = (TurtleTask) task; - load (turtle_task.file, turtle_task.cancellable); - break; - default: - break; - } - } catch (Error e) { - task.error = e; - } - - task.callback (); - } - } - - public async bool init_async (int io_priority, Cancellable? cancellable) throws Error { - init_callback = init_async.callback; - thread = new Thread<void*> ("database", thread_func); - - return initialized; - } - - public bool init (Cancellable? cancellable) throws Error { - try { - thread = new Thread<void*> ("database", thread_func); - - init_mutex.lock (); - while (!initialized) - init_cond.wait(init_mutex); - init_mutex.unlock (); - - if (init_error != null) - throw init_error; - } catch (Error e) { - throw new Sparql.Error.INTERNAL (e.message); - } - - return true; - } - - public Connection (Sparql.ConnectionFlags connection_flags, File loc, File? journal, File? ontology) throws Sparql.Error, IOError, DBusError { - database_loc = loc; - journal_loc = journal; - ontology_loc = ontology; - flags = connection_flags; - - if (journal_loc == null) - journal_loc = database_loc; - if (ontology_loc == null) - ontology_loc = File.new_for_path (Path.build_filename (SHAREDIR, "tracker", "ontologies", "nepomuk")); - - update_queue = new AsyncQueue<Task> (); - } - - public override void dispose () { - data_manager.shutdown (); - base.dispose (); - } - - Sparql.Cursor query_unlocked (string sparql) throws Sparql.Error, DBusError { - try { - var query_object = new Sparql.Query (data_manager, sparql); - var cursor = query_object.execute_cursor (); - cursor.connection = this; - return cursor; - } catch (DBInterfaceError e) { - throw new Sparql.Error.INTERNAL (e.message); - } catch (DateError e) { - throw new Sparql.Error.PARSE (e.message); - } - } - - public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError { - // Check here for early cancellation, just in case - // the operation can be entirely avoided - if (cancellable != null && cancellable.is_cancelled ()) { - throw new IOError.CANCELLED ("Operation was cancelled"); - } - - mutex.lock (); - try { - return query_unlocked (sparql); - } finally { - mutex.unlock (); - } - } - - public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError { - // run in a separate thread - Sparql.Error sparql_error = null; - IOError io_error = null; - DBusError dbus_error = null; - GLib.Error error = null; - Sparql.Cursor result = null; - var context = MainContext.get_thread_default (); - - IOSchedulerJob.push ((job, cancellable) => { - try { - result = query (sparql, cancellable); - } catch (IOError e_io) { - io_error = e_io; - } catch (Sparql.Error e_spql) { - sparql_error = e_spql; - } catch (DBusError e_dbus) { - dbus_error = e_dbus; - } catch (GLib.Error e) { - error = e; - } - - context.invoke (() => { - query_async.callback (); - return false; - }); - - return false; - }, Priority.DEFAULT, cancellable); - - yield; - - if (cancellable != null && cancellable.is_cancelled ()) { - throw new IOError.CANCELLED ("Operation was cancelled"); - } else if (sparql_error != null) { - throw sparql_error; - } else if (io_error != null) { - throw io_error; - } else if (dbus_error != null) { - throw dbus_error; - } else { - return result; - } - } - - public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - mutex.lock (); - try { - var data = data_manager.get_data (); - data.update_sparql (sparql); - } finally { - mutex.unlock (); - } - } - - public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - var task = new UpdateTask (sparql, priority, cancellable); - task.callback = update_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw task.error; - } - - public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - GLib.Variant? blank_nodes = null; - mutex.lock (); - try { - var data = data_manager.get_data (); - blank_nodes = data.update_sparql_blank (sparql); - } finally { - mutex.unlock (); - } - - return blank_nodes; - } - - public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error { - var task = new UpdateTask.blank (sparql, priority, cancellable); - task.callback = update_blank_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw task.error; - - return task.blank_nodes; - } - - public async override GenericArray<Sparql.Error?>? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError { - var combined_query = new StringBuilder (); - var n_updates = sparql.length; - int i; - - for (i = 0; i < n_updates; i++) - combined_query.append (sparql[i]); - - var task = new UpdateTask (combined_query.str, priority, cancellable); - task.callback = update_array_async.callback; - update_queue.push (task); - yield; - - var errors = new GenericArray<Sparql.Error?> (n_updates); - - if (task.error == null) { - for (i = 0; i < n_updates; i++) - errors.add (null); - } else { - // combined query was not successful, try queries one by one - for (i = 0; i < n_updates; i++) { - try { - yield update_async (sparql[i], priority, cancellable); - errors.add (null); - } catch (Sparql.Error e) { - errors.add (e); - } - } - } - - return errors; - } - - public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { - mutex.lock (); - try { - var data = data_manager.get_data (); - data.load_turtle_file (file); - } finally { - mutex.unlock (); - } - } - - public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError { - var task = new TurtleTask (file, cancellable); - task.callback = load_async.callback; - update_queue.push (task); - yield; - - if (task.error != null) - throw new Sparql.Error.INTERNAL (task.error.message); - } - - public override NamespaceManager? get_namespace_manager () { - if (namespace_manager == null && data_manager != null) { - var ht = data_manager.get_namespaces (); - namespace_manager = new NamespaceManager (); - - foreach (var prefix in ht.get_keys ()) { - namespace_manager.add_prefix (prefix, ht.lookup (prefix)); - } - } - - return namespace_manager; - } -} diff --git a/src/libtracker-direct/tracker-direct.vapi b/src/libtracker-direct/tracker-direct.vapi new file mode 100644 index 000000000..df15c5890 --- /dev/null +++ b/src/libtracker-direct/tracker-direct.vapi @@ -0,0 +1,12 @@ +[CCode (cprefix = "Tracker", gir_namespace = "Tracker", gir_version = "2.0", lower_case_cprefix = "tracker_")] +namespace Tracker { + namespace Direct { + [CCode (cheader_filename = "libtracker-direct/tracker-direct.h")] + public class Connection : Tracker.Sparql.Connection, GLib.Initable, GLib.AsyncInitable { + public Connection (Tracker.Sparql.ConnectionFlags connection_flags, GLib.File loc, GLib.File? journal, GLib.File? ontology) throws Tracker.Sparql.Error, GLib.IOError, GLib.DBusError; + public Tracker.Data.Manager get_data_manager (); + public void sync (); + public static void set_default_flags (Tracker.DBManagerFlags flags); + } + } +} diff --git a/src/libtracker-direct/tracker-namespace.vala b/src/libtracker-direct/tracker-namespace.vala deleted file mode 100644 index 24d7b2ee8..000000000 --- a/src/libtracker-direct/tracker-namespace.vala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright © 2015 Collabora Ltd. - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -/* - * This file serves as the representation for the Tracker namespace, mostly - * so that we can set its namespace and version attributes for GIR. - */ - -[CCode (cprefix = "TrackerDirect", gir_namespace = "TrackerDirect", - gir_version = "2.0", lower_case_cprefix = "tracker_direct_")] -namespace Tracker -{ -} diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c index e8c9de85e..0fbf7c60d 100644 --- a/src/libtracker-miner/tracker-miner-fs.c +++ b/src/libtracker-miner/tracker-miner-fs.c @@ -775,7 +775,8 @@ queue_event_coalesce (const QueueEvent *first, *replacement = NULL; if (first->type == TRACKER_MINER_FS_EVENT_CREATED) { - if (second->type == TRACKER_MINER_FS_EVENT_UPDATED && + if ((second->type == TRACKER_MINER_FS_EVENT_UPDATED || + second->type == TRACKER_MINER_FS_EVENT_CREATED) && first->file == second->file) { return QUEUE_ACTION_DELETE_SECOND; } else if (second->type == TRACKER_MINER_FS_EVENT_MOVED && diff --git a/src/libtracker-sparql-backend/Makefile.am b/src/libtracker-sparql-backend/Makefile.am index 9a31d98d6..6fe17dccb 100644 --- a/src/libtracker-sparql-backend/Makefile.am +++ b/src/libtracker-sparql-backend/Makefile.am @@ -5,6 +5,7 @@ AM_VALAFLAGS = \ $(BUILD_VALAFLAGS) \ $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \ $(top_srcdir)/src/libtracker-bus/tracker-bus.vapi \ + $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \ $(top_srcdir)/src/libtracker-remote/tracker-remote.vapi \ $(top_srcdir)/src/libtracker-common/libtracker-common.vapi diff --git a/src/libtracker-sparql-backend/meson.build b/src/libtracker-sparql-backend/meson.build index b04cdaf0d..d76143306 100644 --- a/src/libtracker-sparql-backend/meson.build +++ b/src/libtracker-sparql-backend/meson.build @@ -1,5 +1,7 @@ libtracker_sparql = library('tracker-sparql-' + tracker_api_version, '../libtracker-common/libtracker-common.vapi', + '../libtracker-data/libtracker-data.vapi', + '../libtracker-direct/tracker-direct.vapi', 'tracker-backend.vala', soversion: soversion, diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am index dc5df50c3..2463cb4ab 100644 --- a/src/tracker-store/Makefile.am +++ b/src/tracker-store/Makefile.am @@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \ $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \ $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \ $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ + $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \ $(top_srcdir)/src/tracker-store/tracker-config.vapi \ $(top_srcdir)/src/tracker-store/tracker-events.vapi \ $(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \ @@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \ tracker_store_LDADD = \ $(top_builddir)/src/libtracker-data/libtracker-data.la \ + $(top_builddir)/src/libtracker-direct/libtracker-direct.la \ $(top_builddir)/src/libtracker-common/libtracker-common.la \ $(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \ $(BUILD_LIBS) \ diff --git a/src/tracker-store/meson.build b/src/tracker-store/meson.build index 91d7a335b..1c42da3a3 100644 --- a/src/tracker-store/meson.build +++ b/src/tracker-store/meson.build @@ -16,6 +16,7 @@ tracker_store_sources = [ 'tracker-writeback.vapi', '../libtracker-common/libtracker-common.vapi', '../libtracker-data/libtracker-data.vapi', + '../libtracker-direct/tracker-direct.vapi', ] tracker_store = executable('tracker-store', @@ -25,7 +26,7 @@ tracker_store = executable('tracker-store', ], vala_args: [ '--pkg', 'posix' ], dependencies: [ - tracker_common_dep, tracker_data_dep, + tracker_common_dep, tracker_data_dep, tracker_sparql_direct_dep, gio_unix ], install: true, diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala index 5db075274..a9ede1b9a 100644 --- a/src/tracker-store/tracker-backup.vala +++ b/src/tracker-store/tracker-backup.vala @@ -25,7 +25,7 @@ public class Tracker.Backup : Object { public async void save (BusName sender, string destination_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -57,8 +57,8 @@ public class Tracker.Backup : Object { throw e; } finally { if (resources != null) { - Tracker.Events.init (Tracker.Main.get_data_manager ()); - resources.enable_signals (); + Tracker.Events.init (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); @@ -68,7 +68,7 @@ public class Tracker.Backup : Object { public async void restore (BusName sender, string journal_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -95,8 +95,8 @@ public class Tracker.Backup : Object { throw e; } finally { if (resources != null) { - Tracker.Events.init (Tracker.Main.get_data_manager ()); - resources.enable_signals (); + Tracker.Events.init (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala index 35c1542e5..48197f4b9 100644 --- a/src/tracker-store/tracker-dbus.vala +++ b/src/tracker-store/tracker-dbus.vala @@ -34,7 +34,6 @@ public class Tracker.DBus { static uint notifier_id; static Tracker.Backup backup; static uint backup_id; - static Tracker.Config config; static uint domain_watch_id; static MainLoop watch_main_loop; @@ -108,9 +107,8 @@ public class Tracker.DBus { } } - public static bool init (Tracker.Config config_p) { + public static bool init () { /* Don't reinitialize */ - config = config_p; if (connection != null) { return true; } @@ -216,7 +214,7 @@ public class Tracker.DBus { statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH); /* Add org.freedesktop.Tracker1.Resources */ - resources = new Tracker.Resources (connection, config); + resources = new Tracker.Resources (connection); if (resources == null) { critical ("Could not create TrackerResources object to register"); return false; @@ -261,7 +259,7 @@ public class Tracker.DBus { return false; } - resources.enable_signals (); + Tracker.Store.enable_signals (); return true; } diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c index 199dced29..77de4c256 100644 --- a/src/tracker-store/tracker-events.c +++ b/src/tracker-store/tracker-events.c @@ -26,28 +26,230 @@ #include "tracker-events.h" +typedef struct _TrackerEventBatch TrackerEventBatch; + +struct _TrackerEventBatch +{ + struct { + GArray *sub_pred_ids; + GArray *obj_graph_ids; + } deletes; + struct { + GArray *sub_pred_ids; + GArray *obj_graph_ids; + } inserts; +}; + typedef struct { - gboolean frozen; + /* Accessed by updates/dbus threads */ + GMutex mutex; + GHashTable *ready; + + /* Only accessed by updates thread */ + GHashTable *pending; guint total; - GPtrArray *notify_classes; } EventsPrivate; static EventsPrivate *private; -guint -tracker_events_get_total (gboolean and_reset) +static TrackerEventBatch * +tracker_event_batch_new (void) { - guint total; + TrackerEventBatch *events; + + events = g_new0 (TrackerEventBatch, 1); + events->deletes.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->deletes.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->inserts.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + events->inserts.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64)); + + return events; +} + +static void +tracker_event_batch_free (TrackerEventBatch *events) +{ + g_array_unref (events->deletes.sub_pred_ids); + g_array_unref (events->deletes.obj_graph_ids); + g_array_unref (events->inserts.sub_pred_ids); + g_array_unref (events->inserts.obj_graph_ids); + g_free (events); +} + +static void +insert_vals_into_arrays (GArray *sub_pred_ids, + GArray *obj_graph_ids, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + gint i, j, k; + gint64 tmp; + gint64 sub_pred_id; + gint64 obj_graph_id; + + sub_pred_id = (gint64) subject_id; + sub_pred_id = sub_pred_id << 32 | pred_id; + obj_graph_id = (gint64) object_id; + obj_graph_id = obj_graph_id << 32 | graph_id; + + i = 0; + j = sub_pred_ids->len - 1; + + while (j - i > 0) { + k = (i + j) / 2; + tmp = g_array_index (sub_pred_ids, gint64, k); + if (tmp == sub_pred_id) { + i = k + 1; + break; + } else if (tmp > sub_pred_id) + j = k; + else + i = k + 1; + } + + g_array_insert_val (sub_pred_ids, i, sub_pred_id); + g_array_insert_val (obj_graph_ids, i, obj_graph_id); +} + +static void +tracker_event_batch_add_insert_event (TrackerEventBatch *events, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + insert_vals_into_arrays (events->inserts.sub_pred_ids, + events->inserts.obj_graph_ids, + graph_id, + subject_id, + pred_id, + object_id); +} + +static void +tracker_event_batch_add_delete_event (TrackerEventBatch *events, + gint graph_id, + gint subject_id, + gint pred_id, + gint object_id) +{ + insert_vals_into_arrays (events->deletes.sub_pred_ids, + events->deletes.obj_graph_ids, + graph_id, + subject_id, + pred_id, + object_id); +} + +static void +foreach_event_in_arrays (GArray *sub_pred_ids, + GArray *obj_graph_ids, + TrackerEventsForeach foreach, + gpointer user_data) +{ + guint i; + + g_assert (sub_pred_ids->len == obj_graph_ids->len); + + for (i = 0; i < sub_pred_ids->len; i++) { + gint graph_id, subject_id, pred_id, object_id; + gint64 sub_pred_id; + gint64 obj_graph_id; + + sub_pred_id = g_array_index (sub_pred_ids, gint64, i); + obj_graph_id = g_array_index (obj_graph_ids, gint64, i); + + pred_id = sub_pred_id & 0xffffffff; + subject_id = sub_pred_id >> 32; + graph_id = obj_graph_id & 0xffffffff; + object_id = obj_graph_id >> 32; + + foreach (graph_id, subject_id, pred_id, object_id, user_data); + } +} + +void +tracker_event_batch_foreach_insert_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data) +{ + g_return_if_fail (events != NULL); + g_return_if_fail (foreach != NULL); + + foreach_event_in_arrays (events->inserts.sub_pred_ids, + events->inserts.obj_graph_ids, + foreach, user_data); +} + +void +tracker_event_batch_foreach_delete_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data) +{ + g_return_if_fail (events != NULL); + g_return_if_fail (foreach != NULL); + + foreach_event_in_arrays (events->deletes.sub_pred_ids, + events->deletes.obj_graph_ids, + foreach, user_data); +} + +static GHashTable * +tracker_event_batch_hashtable_new (void) +{ + return g_hash_table_new_full (NULL, NULL, + (GDestroyNotify) g_object_unref, + (GDestroyNotify) tracker_event_batch_free); +} + +void +tracker_event_batch_merge (TrackerEventBatch *dest, + TrackerEventBatch *to_copy) +{ + g_array_append_vals (dest->deletes.sub_pred_ids, + to_copy->deletes.sub_pred_ids->data, + to_copy->deletes.sub_pred_ids->len); + g_array_append_vals (dest->deletes.obj_graph_ids, + to_copy->deletes.obj_graph_ids->data, + to_copy->deletes.obj_graph_ids->len); + g_array_append_vals (dest->inserts.sub_pred_ids, + to_copy->inserts.sub_pred_ids->data, + to_copy->inserts.sub_pred_ids->len); + g_array_append_vals (dest->inserts.obj_graph_ids, + to_copy->inserts.obj_graph_ids->data, + to_copy->inserts.obj_graph_ids->len); +} +guint +tracker_events_get_total (void) +{ g_return_val_if_fail (private != NULL, 0); - total = private->total; + return private->total; +} + +static inline TrackerEventBatch * +ensure_event_batch (TrackerClass *rdf_type) +{ + TrackerEventBatch *events; + + g_assert (private != NULL); - if (and_reset) { - private->total = 0; + if (!private->pending) + private->pending = tracker_event_batch_hashtable_new (); + + events = g_hash_table_lookup (private->pending, rdf_type); + + if (!events) { + events = tracker_event_batch_new (); + g_hash_table_insert (private->pending, + g_object_ref (rdf_type), + events); } - return total; + return events; } void @@ -59,24 +261,23 @@ tracker_events_add_insert (gint graph_id, const gchar *object, GPtrArray *rdf_types) { + TrackerEventBatch *events; guint i; g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); - if (private->frozen) { - return; - } - for (i = 0; i < rdf_types->len; i++) { - if (tracker_class_get_notify (rdf_types->pdata[i])) { - tracker_class_add_insert_event (rdf_types->pdata[i], - graph_id, - subject_id, - pred_id, - object_id); - private->total++; - } + if (!tracker_class_get_notify (rdf_types->pdata[i])) + continue; + + events = ensure_event_batch (rdf_types->pdata[i]); + tracker_event_batch_add_insert_event (events, + graph_id, + subject_id, + pred_id, + object_id); + private->total++; } } @@ -89,104 +290,89 @@ tracker_events_add_delete (gint graph_id, const gchar *object, GPtrArray *rdf_types) { + TrackerEventBatch *events; guint i; g_return_if_fail (rdf_types != NULL); g_return_if_fail (private != NULL); - if (private->frozen) { - return; - } - for (i = 0; i < rdf_types->len; i++) { - if (tracker_class_get_notify (rdf_types->pdata[i])) { - tracker_class_add_delete_event (rdf_types->pdata[i], - graph_id, - subject_id, - pred_id, - object_id); - private->total++; - } + if (!tracker_class_get_notify (rdf_types->pdata[i])) + continue; + + events = ensure_event_batch (rdf_types->pdata[i]); + tracker_event_batch_add_delete_event (events, + graph_id, + subject_id, + pred_id, + object_id); + private->total++; } } void -tracker_events_reset_pending (void) +tracker_events_transact (void) { - guint i; + TrackerEventBatch *prev_events, *events; + TrackerClass *rdf_type; + GHashTableIter iter; g_return_if_fail (private != NULL); - for (i = 0; i < private->notify_classes->len; i++) { - TrackerClass *class = g_ptr_array_index (private->notify_classes, i); + if (!private->pending || g_hash_table_size (private->pending) == 0) + return; + + g_mutex_lock (&private->mutex); - tracker_class_reset_pending_events (class); + if (!private->ready) { + private->ready = tracker_event_batch_hashtable_new (); } - private->frozen = FALSE; + g_hash_table_iter_init (&iter, private->pending); + + while (g_hash_table_iter_next (&iter, + (gpointer *) &rdf_type, + (gpointer *) &events)) { + prev_events = g_hash_table_lookup (private->ready, + rdf_type); + if (prev_events) { + tracker_event_batch_merge (prev_events, events); + g_hash_table_iter_remove (&iter); + } else { + g_hash_table_iter_steal (&iter); + g_hash_table_insert (private->ready, + g_object_ref (rdf_type), + events); + /* Drop the reference stolen from the pending HT */ + g_object_unref (rdf_type); + } + } + + private->total = 0; + + g_mutex_unlock (&private->mutex); } void -tracker_events_freeze (void) +tracker_events_reset_pending (void) { g_return_if_fail (private != NULL); - private->frozen = TRUE; + g_clear_pointer (&private->pending, g_hash_table_unref); } static void free_private (EventsPrivate *private) { - guint i; - - for (i = 0; i < private->notify_classes->len; i++) { - TrackerClass *class = g_ptr_array_index (private->notify_classes, i); - - tracker_class_reset_pending_events (class); - - /* Perhaps hurry an emit of the ready events here? We're shutting down, - * so I guess we're not required to do that here ... ? */ - tracker_class_reset_ready_events (class); - } - - g_ptr_array_unref (private->notify_classes); - + tracker_events_reset_pending (); g_free (private); } -TrackerClass ** -tracker_events_get_classes (guint *length) -{ - g_return_val_if_fail (private != NULL, NULL); - - *length = private->notify_classes->len; - - return (TrackerClass **) (private->notify_classes->pdata); -} - void -tracker_events_init (TrackerDataManager *data_manager) +tracker_events_init (void) { - TrackerOntologies *ontologies; - TrackerClass **classes; - guint length = 0, i; - private = g_new0 (EventsPrivate, 1); - - ontologies = tracker_data_manager_get_ontologies (data_manager); - classes = tracker_ontologies_get_classes (ontologies, &length); - - private->notify_classes = g_ptr_array_sized_new (length); - g_ptr_array_set_free_func (private->notify_classes, (GDestroyNotify) g_object_unref); - - for (i = 0; i < length; i++) { - TrackerClass *class = classes[i]; - - if (tracker_class_get_notify (class)) { - g_ptr_array_add (private->notify_classes, g_object_ref (class)); - } - } - + g_mutex_init (&private->mutex); } void @@ -199,3 +385,18 @@ tracker_events_shutdown (void) g_warning ("tracker_events already shutdown"); } } + +GHashTable * +tracker_events_get_pending (void) +{ + GHashTable *pending; + + g_return_val_if_fail (private != NULL, NULL); + + g_mutex_lock (&private->mutex); + pending = private->ready; + private->ready = NULL; + g_mutex_unlock (&private->mutex); + + return pending; +} diff --git a/src/tracker-store/tracker-events.h b/src/tracker-store/tracker-events.h index df9322ff1..1aea15ab6 100644 --- a/src/tracker-store/tracker-events.h +++ b/src/tracker-store/tracker-events.h @@ -28,9 +28,15 @@ G_BEGIN_DECLS -typedef GStrv (*TrackerNotifyClassGetter) (void); +typedef struct _TrackerEventBatch TrackerEventBatch; -void tracker_events_init (TrackerDataManager *data_manager); +typedef void (*TrackerEventsForeach) (gint graph_id, + gint subject_id, + gint pred_id, + gint object_id, + gpointer user_data); + +void tracker_events_init (void); void tracker_events_shutdown (void); void tracker_events_add_insert (gint graph_id, gint subject_id, @@ -46,10 +52,19 @@ void tracker_events_add_delete (gint graph_id, gint object_id, const gchar *object, GPtrArray *rdf_types); -guint tracker_events_get_total (gboolean and_reset); +guint tracker_events_get_total (void); void tracker_events_reset_pending (void); -void tracker_events_freeze (void); -TrackerClass** tracker_events_get_classes (guint *length); + +void tracker_events_transact (void); + +GHashTable * tracker_events_get_pending (void); + +void tracker_event_batch_foreach_insert_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data); +void tracker_event_batch_foreach_delete_event (TrackerEventBatch *events, + TrackerEventsForeach foreach, + gpointer user_data); G_END_DECLS diff --git a/src/tracker-store/tracker-events.vapi b/src/tracker-store/tracker-events.vapi index c232d8af5..16df92e84 100644 --- a/src/tracker-store/tracker-events.vapi +++ b/src/tracker-store/tracker-events.vapi @@ -20,13 +20,21 @@ namespace Tracker { [CCode (cheader_filename = "tracker-store/tracker-events.h")] namespace Events { - public void init (Tracker.Data.Manager data_manager); + public void init (); public void shutdown (); public void add_insert (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); public void add_delete (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); - public uint get_total (bool and_reset); + public uint get_total (); public void reset_pending (); - public void freeze (); - public unowned Class[] get_classes (); + + public void transact (); + public GLib.HashTable<Tracker.Class, Batch> get_pending (); + + [CCode (lower_case_cprefix="tracker_event_batch_", cname = "TrackerEventBatch")] + public class Batch { + public delegate void EventsForeach (int graph_id, int subject_id, int pred_id, int object_id); + public void foreach_delete_event (EventsForeach func); + public void foreach_insert_event (EventsForeach func); + } } } diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index 8e6d8895d..861aed2eb 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -39,6 +39,7 @@ License which can be viewed at: static bool shutdown; + static Tracker.Direct.Connection connection; static Tracker.Data.Manager data_manager; /* Private command line parameters */ @@ -175,6 +176,10 @@ License which can be viewed at: return data_manager; } + public static unowned Tracker.Direct.Connection get_sparql_connection () { + return connection; + } + static int main (string[] args) { Intl.setlocale (LocaleCategory.ALL, ""); @@ -239,7 +244,7 @@ License which can be viewed at: sanity_check_option_values (config); - if (!Tracker.DBus.init (config)) { + if (!Tracker.DBus.init ()) { return 1; } @@ -247,7 +252,7 @@ License which can be viewed at: config_verbosity_changed_cb (config, null); ulong config_verbosity_id = config.notify["verbosity"].connect (config_verbosity_changed_cb); - DBManagerFlags flags = DBManagerFlags.REMOVE_CACHE; + DBManagerFlags flags = 0; if (force_reindex) { /* TODO port backup support @@ -256,9 +261,11 @@ License which can be viewed at: flags |= DBManagerFlags.FORCE_REINDEX; } + Tracker.Direct.Connection.set_default_flags (flags); + var notifier = Tracker.DBus.register_notifier (); - Tracker.Store.init (); + Tracker.Store.init (config); /* Make Tracker available for introspection */ if (!Tracker.DBus.register_objects ()) { @@ -281,45 +288,25 @@ License which can be viewed at: Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to); - int select_cache_size, update_cache_size; - string cache_size_s; - - cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - select_cache_size = int.parse (cache_size_s); - } else { - select_cache_size = SELECT_CACHE_SIZE; - } - - cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - update_cache_size = int.parse (cache_size_s); - } else { - update_cache_size = UPDATE_CACHE_SIZE; - } - try { - data_manager = new Tracker.Data.Manager (flags, - cache_location, - data_location, - ontology_location, - true, - false, - select_cache_size, - update_cache_size); - data_manager.init (null); + connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE, + cache_location, + data_location, + ontology_location); + connection.init (null); } catch (GLib.Error e) { critical ("Cannot initialize database: %s", e.message); return 1; } + data_manager = connection.get_data_manager (); db_config = null; notifier = null; if (!shutdown) { Tracker.DBus.register_prepare_class_signal (); - Tracker.Events.init (data_manager); + Tracker.Events.init (); Tracker.Writeback.init (data_manager, get_writeback_predicates); Tracker.Store.resume (); diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index ed34a5f56..5cfab8bb2 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -22,8 +22,6 @@ public class Tracker.Resources : Object { public const string PATH = "/org/freedesktop/Tracker1/Resources"; - const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - /* I *know* that this is some arbitrary number that doesn't seem to * resemble anything. In fact it's what I experimentally measured to * be a good value on a default Debian testing which has @@ -50,25 +48,22 @@ public class Tracker.Resources : Object { const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000; DBusConnection connection; - uint signal_timeout; - bool regular_commit_pending; - Tracker.Config config; public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects); public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts); - public Resources (DBusConnection connection, Tracker.Config config_p) { + public Resources (DBusConnection connection) { this.connection = connection; - this.config = config_p; + Tracker.Store.set_signal_callback (on_emit_signals); } public async void load (BusName sender, string uri) throws Error { var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri); try { var file = File.new_for_uri (uri); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.queue_turtle_import (data_manager, file, sender); + yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -89,9 +84,9 @@ public class Tracker.Resources : Object { request.debug ("query: %s", query); try { var builder = new VariantBuilder ((VariantType) "aas"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { while (cursor.next ()) { builder.open ((VariantType) "as"); @@ -131,8 +126,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -152,8 +147,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender); request.end (); @@ -174,10 +169,10 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.Sync"); var data_manager = Tracker.Main.get_data_manager (); var data = data_manager.get_data (); - var iface = data_manager.get_writable_db_interface (); - // wal checkpoint implies sync - Tracker.Store.wal_checkpoint (iface, true); + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); + // sync journal if available data.sync (); @@ -188,8 +183,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -208,168 +203,63 @@ public class Tracker.Resources : Object { /* no longer needed, just return */ } - bool emit_graph_updated (Class cl) { - if (cl.has_insert_events () || cl.has_delete_events ()) { - var builder = new VariantBuilder ((VariantType) "a(iiii)"); - cl.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => { - builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); - }); - var deletes = builder.end (); - - builder = new VariantBuilder ((VariantType) "a(iiii)"); - cl.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => { - builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); - }); - var inserts = builder.end (); + void emit_graph_updated (Class cl, Events.Batch events) { + var builder = new VariantBuilder ((VariantType) "a(iiii)"); + events.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => { + builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); + }); + var deletes = builder.end (); - graph_updated (cl.uri, deletes, inserts); + builder = new VariantBuilder ((VariantType) "a(iiii)"); + events.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => { + builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id); + }); + var inserts = builder.end (); - cl.reset_ready_events (); - - return true; - } - return false; + graph_updated (cl.uri, deletes, inserts); } - bool on_emit_signals () { - foreach (var cl in Tracker.Events.get_classes ()) { - emit_graph_updated (cl); - } - - /* Reset counter */ - Tracker.Events.get_total (true); - - /* Writeback feature */ - var writebacks = Tracker.Writeback.get_ready (); - - if (writebacks != null) { - var builder = new VariantBuilder ((VariantType) "a{iai}"); - - var wb_iter = HashTableIter<int, GLib.Array<int>> (writebacks); + void emit_writeback (HashTable<int, Array<int>> events) { + var builder = new VariantBuilder ((VariantType) "a{iai}"); + var wb_iter = HashTableIter<int, GLib.Array<int>> (events); - int subject_id; - unowned Array<int> types; - while (wb_iter.next (out subject_id, out types)) { - builder.open ((VariantType) "{iai}"); + int subject_id; + unowned Array<int> types; + while (wb_iter.next (out subject_id, out types)) { + builder.open ((VariantType) "{iai}"); - builder.add ("i", subject_id); + builder.add ("i", subject_id); - builder.open ((VariantType) "ai"); - for (int i = 0; i < types.length; i++) { - builder.add ("i", types.index (i)); - } - builder.close (); - - builder.close (); + builder.open ((VariantType) "ai"); + for (int i = 0; i < types.length; i++) { + builder.add ("i", types.index (i)); } + builder.close (); - writeback (builder.end ()); + builder.close (); } - Tracker.Writeback.reset_ready (); - - regular_commit_pending = false; - signal_timeout = 0; - return false; + writeback (builder.end ()); } - void on_statements_committed (Tracker.Data.Update.CommitType commit_type) { - /* Class signal feature */ + void on_emit_signals (HashTable<Tracker.Class, Tracker.Events.Batch>? events, HashTable<int, GLib.Array<int>>? writebacks) { + if (events != null) { + var iter = HashTableIter<Tracker.Class, Tracker.Events.Batch> (events); + unowned Events.Batch class_events; + unowned Class cl; - foreach (var cl in Tracker.Events.get_classes ()) { - cl.transact_events (); - } - - if (!regular_commit_pending) { - // never cancel timeout for non-batch commits as we want - // to ensure that the signal corresponding to a certain - // update arrives within a fixed time limit - - // cancel it in all other cases - // in the BATCH_LAST case, the timeout will be reenabled - // further down but it's important to cancel it first - // to reset the timeout to 1 s starting now - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; + while (iter.next (out cl, out class_events)) { + emit_graph_updated (cl, class_events); } } - if (commit_type == Tracker.Data.Update.CommitType.REGULAR) { - regular_commit_pending = true; - } - - if (regular_commit_pending || commit_type == Tracker.Data.Update.CommitType.BATCH_LAST) { - // timer wanted for non-batch commits and the last in a series of batch commits - if (signal_timeout == 0) { - signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); - } - } - - /* Writeback feature */ - Tracker.Writeback.transact (); - } - - void on_statements_rolled_back (Tracker.Data.Update.CommitType commit_type) { - Tracker.Events.reset_pending (); - Tracker.Writeback.reset_pending (); - } - - void check_graph_updated_signal () { - /* Check for whether we need an immediate emit */ - if (Tracker.Events.get_total (false) > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { - // possibly active timeout no longer necessary as signals - // for committed transactions will be emitted by the following on_emit_signals call - // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0 - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } - - // immediately emit signals for already committed transaction - on_emit_signals (); - } - } - - void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - check_graph_updated_signal (); - } - - void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - check_graph_updated_signal (); - } - - [DBus (visible = false)] - public void enable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.add_insert_statement_callback (on_statement_inserted); - data.add_delete_statement_callback (on_statement_deleted); - data.add_commit_statement_callback (on_statements_committed); - data.add_rollback_statement_callback (on_statements_rolled_back); - } - - [DBus (visible = false)] - public void disable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.remove_insert_statement_callback (on_statement_inserted); - data.remove_delete_statement_callback (on_statement_deleted); - data.remove_commit_statement_callback (on_statements_committed); - data.remove_rollback_statement_callback (on_statements_rolled_back); - - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; + if (writebacks != null) { + emit_writeback (writebacks); } } ~Resources () { - this.disable_signals (); + Tracker.Store.set_signal_callback (null); } [DBus (visible = false)] diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala index 40679bf14..1eb7bef05 100644 --- a/src/tracker-store/tracker-steroids.vala +++ b/src/tracker-store/tracker-steroids.vala @@ -29,9 +29,9 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query); try { string[] variable_names = null; - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE)); data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); @@ -90,10 +90,10 @@ public class Tracker.Steroids : Object { } } - async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error { + async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error { var request = DBusRequest.begin (sender, "Steroids.%sUpdate%s", - priority != Tracker.Store.Priority.HIGH ? "Batch" : "", + priority != Priority.HIGH ? "Batch" : "", blank ? "Blank" : ""); try { size_t bytes_read; @@ -112,16 +112,16 @@ public class Tracker.Steroids : Object { data_input_stream = null; request.debug ("query: %s", (string) query); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); if (!blank) { - yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender); + yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender); request.end (); return null; } else { - var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender); request.end (); @@ -140,21 +140,21 @@ public class Tracker.Steroids : Object { } public async void update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream); + yield update_internal (sender, Priority.HIGH, false, input_stream); } public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream); + yield update_internal (sender, Priority.LOW, false, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream); + return yield update_internal (sender, Priority.HIGH, true, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream); + return yield update_internal (sender, Priority.LOW, true, input_stream); } [DBus (signature = "as")] @@ -188,11 +188,11 @@ public class Tracker.Steroids : Object { data_input_stream = null; var builder = new VariantBuilder ((VariantType) "as"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); // first try combined query for best possible performance try { - yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender); // combined query was successful for (i = 0; i < query_count; i++) { @@ -213,7 +213,7 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query_array[i]); try { - yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender); builder.add ("s", ""); builder.add ("s", ""); } catch (Error e1) { diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index 1f58256e8..03cc8078a 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -23,277 +23,49 @@ public class Tracker.Store { const int MAX_CONCURRENT_QUERIES = 2; const int MAX_TASK_TIME = 30; + const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static int n_queries_running; - static bool update_running; - static ThreadPool<Task> update_pool; - static ThreadPool<Task> query_pool; - static ThreadPool<DBInterface> checkpoint_pool; - static GenericArray<Task> running_tasks; static int max_task_time; static bool active; - static SourceFunc active_callback; - public enum Priority { - HIGH, - LOW, - TURTLE, - N_PRIORITIES - } - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } - - public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; - - abstract class Task { - public TaskType type; - public string client_id; - public Error error; - public SourceFunc callback; - public Tracker.Data.Manager data_manager; - } - - class QueryTask : Task { - public string query; - public Cancellable cancellable; - public uint watchdog_id; - public unowned SparqlQueryInThread in_thread; + static Tracker.Config config; + static uint signal_timeout; + static int n_updates; - ~QueryTask () { - if (watchdog_id > 0) { - Source.remove (watchdog_id); - } - } - } - - class UpdateTask : Task { - public string query; - public Variant blank_nodes; - public Priority priority; - } + static HashTable<string, Cancellable> client_cancellables; - class TurtleTask : Task { - public string path; - } - - static void sched () { - Task task = null; - - if (!active) { - return; - } + public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback); + static unowned SignalEmissionFunc signal_callback; - while (n_queries_running < MAX_CONCURRENT_QUERIES) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = query_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task == null) { - /* no pending query */ - break; - } - running_tasks.add (task); - - if (max_task_time != 0) { - var query_task = (QueryTask) task; - query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => { - query_task.cancellable.cancel (); - query_task.watchdog_id = 0; - return false; - }); - } - - n_queries_running++; - try { - query_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } + public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error; - if (!update_running) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = update_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task != null) { - update_running = true; - try { - update_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } - } - } + class CursorTask { + public Sparql.Cursor cursor; + public unowned SourceFunc callback; + public unowned SparqlQueryInThread thread_func; + public Error error; - static Tracker.Data.Update.CommitType commit_type (Task task) { - switch (task.type) { - case TaskType.UPDATE: - case TaskType.UPDATE_BLANK: - if (((UpdateTask) task).priority == Priority.HIGH) { - return Tracker.Data.Update.CommitType.REGULAR; - } else if (update_queues[Priority.LOW].get_length () > 0) { - return Tracker.Data.Update.CommitType.BATCH; - } else { - return Tracker.Data.Update.CommitType.BATCH_LAST; - } - case TaskType.TURTLE: - if (update_queues[Priority.TURTLE].get_length () > 0) { - return Tracker.Data.Update.CommitType.BATCH; - } else { - return Tracker.Data.Update.CommitType.BATCH_LAST; - } - default: - warn_if_reached (); - return Tracker.Data.Update.CommitType.REGULAR; + public CursorTask (Sparql.Cursor cursor) { + this.cursor = cursor; } } - static bool task_finish_cb (Task task) { - var data = task.data_manager.get_data (); - - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - if (task.error == null && - query_task.cancellable != null && - query_task.cancellable.is_cancelled ()) { - task.error = new IOError.CANCELLED ("Operation was cancelled"); - } - - task.callback (); - task.error = null; + static ThreadPool<CursorTask> cursor_pool; - running_tasks.remove (task); - n_queries_running--; - } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { - if (task.error == null) { - data.notify_transaction (commit_type (task)); - } - - task.callback (); - task.error = null; - - update_running = false; - } else if (task.type == TaskType.TURTLE) { - if (task.error == null) { - data.notify_transaction (commit_type (task)); - } - - task.callback (); - task.error = null; - - update_running = false; - } - - if (n_queries_running == 0 && !update_running && active_callback != null) { - active_callback (); - } - - sched (); - - return false; - } - - static void pool_dispatch_cb (owned Task task) { + private static void cursor_dispatch_cb (owned CursorTask task) { try { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query); - - query_task.in_thread (cursor); - } else { - var data = task.data_manager.get_data (); - var iface = task.data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - - if (task.type == TaskType.UPDATE) { - var update_task = (UpdateTask) task; - - data.update_sparql (update_task.query); - } else if (task.type == TaskType.UPDATE_BLANK) { - var update_task = (UpdateTask) task; - - update_task.blank_nodes = data.update_sparql_blank (update_task.query); - } else if (task.type == TaskType.TURTLE) { - var turtle_task = (TurtleTask) task; - - var file = File.new_for_path (turtle_task.path); - - Tracker.Events.freeze (); - try { - data.load_turtle_file (file); - } finally { - Tracker.Events.reset_pending (); - } - } - } + task.thread_func (task.cursor); } catch (Error e) { task.error = e; } Idle.add (() => { - task_finish_cb (task); + task.callback (); return false; }); } - public static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static int checkpointing; - - static void wal_hook (DBInterface iface, int n_pages) { - // run in update thread - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - debug ("WAL: %d pages", n_pages); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000 && checkpoint_pool != null) { - if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) { - // initiate asynchronous checkpointing (not blocking updates) - try { - checkpoint_pool.push (wal_iface); - } catch (Error e) { - warning (e.message); - AtomicInt.set (ref checkpointing, 0); - } - } - } - } - - static void checkpoint_dispatch_cb (DBInterface iface) { - // run in checkpoint thread - wal_checkpoint (iface, false); - AtomicInt.set (ref checkpointing, 0); - } - - public static void init () { + public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { max_task_time = int.parse (max_task_time_env); @@ -301,19 +73,12 @@ public class Tracker.Store { max_task_time = MAX_TASK_TIME; } - running_tasks = new GenericArray<Task> (); - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = new Queue<Task> (); - update_queues[i] = new Queue<Task> (); - } + client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal); try { - update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true); - query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true); - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); + cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false); } catch (Error e) { - warning (e.message); + // Ignore harmless error } /* as the following settings are global for unknown reasons, @@ -321,184 +86,187 @@ public class Tracker.Store { are rather random */ ThreadPool.set_max_idle_time (15 * 1000); ThreadPool.set_max_unused_threads (2); + + config = config_p; } public static void shutdown () { - query_pool = null; - update_pool = null; - checkpoint_pool = null; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = null; - update_queues[i] = null; + if (signal_timeout != 0) { + Source.remove (signal_timeout); + signal_timeout = 0; } } - public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { - var task = new QueryTask (); - task.type = TaskType.QUERY; - task.query = sparql; - task.cancellable = new Cancellable (); - task.in_thread = in_thread; - task.callback = sparql_query.callback; - task.client_id = client_id; - task.data_manager = manager; - - query_queues[priority].push_tail (task); + private static Cancellable create_cancellable (string client_id) { + var client_cancellable = client_cancellables.lookup (client_id); - sched (); - - yield; - - if (task.error != null) { - throw task.error; + if (client_cancellable == null) { + client_cancellable = new Cancellable (); + client_cancellables.insert (client_id, client_cancellable); } - } - - public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { - var task = new UpdateTask (); - task.type = TaskType.UPDATE; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update.callback; - task.client_id = client_id; - task.data_manager = manager; - update_queues[priority].push_tail (task); + var task_cancellable = new Cancellable (); + client_cancellable.connect (() => { + task_cancellable.cancel (); + }); - sched (); + return task_cancellable; + } - yield; + private static void do_emit_signals () { + signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ()); + } - if (task.error != null) { - throw task.error; + private static void ensure_signal_timeout () { + if (signal_timeout == 0) { + signal_timeout = Timeout.add (config.graphupdated_delay, () => { + do_emit_signals (); + if (n_updates == 0) { + signal_timeout = 0; + return false; + } else { + return true; + } + }); } } - public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { - var task = new UpdateTask (); - task.type = TaskType.UPDATE_BLANK; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update_blank.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[priority].push_tail (task); - - sched (); + public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error { + var cancellable = create_cancellable (client_id); + uint timeout_id = 0; - yield; - - if (task.error != null) { - throw task.error; + if (max_task_time != 0) { + timeout_id = Timeout.add_seconds (max_task_time, () => { + cancellable.cancel (); + timeout_id = 0; + return false; + }); } - return task.blank_nodes; - } + var cursor = yield conn.query_async (sparql, cancellable); - public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { - var task = new TurtleTask (); - task.type = TaskType.TURTLE; - task.path = file.get_path (); - task.callback = queue_turtle_import.callback; - task.client_id = client_id; - task.data_manager = manager; + if (timeout_id != 0) + GLib.Source.remove (timeout_id); - update_queues[Priority.TURTLE].push_tail (task); + var task = new CursorTask (cursor); + task.thread_func = in_thread; + task.callback = sparql_query.callback; - sched (); + try { + cursor_pool.add (task); + } catch (Error e) { + // Ignore harmless error + } yield; - if (task.error != null) { + if (task.error != null) throw task.error; - } } - public uint get_queue_size () { - uint result = 0; + public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + yield conn.update_async (sparql, priority, cancellable); + n_updates--; + } - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - result += query_queues[i].get_length (); - result += update_queues[i].get_length (); - } - return result; + public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + var nodes = yield conn.update_blank_async (sparql, priority, cancellable); + n_updates--; + + return nodes; + } + + public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + yield conn.load_async (file, cancellable); + n_updates--; } public static void unreg_batches (string client_id) { - unowned List<Task> list, cur; - unowned Queue<Task> queue; - - for (int i = 0; i < running_tasks.length; i++) { - unowned QueryTask task = running_tasks[i] as QueryTask; - if (task != null && task.client_id == client_id && task.cancellable != null) { - task.cancellable.cancel (); - } + Cancellable cancellable = client_cancellables.lookup (client_id); + + if (cancellable != null) { + cancellable.cancel (); + client_cancellables.remove (client_id); } + } - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - queue = query_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; + public static async void pause () { + Tracker.Store.active = false; - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); + } - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } + public static void resume () { + Tracker.Store.active = true; + } - queue = update_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; + private static void on_statements_committed () { + Tracker.Events.transact (); + Tracker.Writeback.transact (); + check_graph_updated_signal (); + } - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); + private static void on_statements_rolled_back () { + Tracker.Events.reset_pending (); + Tracker.Writeback.reset_pending (); + } - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } + private static void check_graph_updated_signal () { + /* Check for whether we need an immediate emit */ + if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { + // immediately emit signals for already committed transaction + Idle.add (() => { + do_emit_signals (); + return false; + }); } - - sched (); } - public static async void pause () { - Tracker.Store.active = false; - - if (n_queries_running > 0 || update_running) { - active_callback = pause.callback; - yield; - active_callback = null; - } + private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } - if (AtomicInt.get (ref checkpointing) != 0) { - // this will wait for checkpointing to finish - checkpoint_pool = null; - try { - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); - } catch (Error e) { - warning (e.message); - } - } + private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } - if (active) { - sched (); - } + public static void enable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.add_insert_statement_callback (on_statement_inserted); + data.add_delete_statement_callback (on_statement_deleted); + data.add_commit_statement_callback (on_statements_committed); + data.add_rollback_statement_callback (on_statements_rolled_back); } - public static void resume () { - Tracker.Store.active = true; + public static void disable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.remove_insert_statement_callback (on_statement_inserted); + data.remove_delete_statement_callback (on_statement_deleted); + data.remove_commit_statement_callback (on_statements_committed); + data.remove_rollback_statement_callback (on_statements_rolled_back); + } - sched (); + public static void set_signal_callback (SignalEmissionFunc? func) { + signal_callback = func; } } diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c index a183b00a3..1edc3e2dd 100644 --- a/src/tracker-store/tracker-writeback.c +++ b/src/tracker-store/tracker-writeback.c @@ -27,8 +27,12 @@ #include "tracker-writeback.h" typedef struct { + /* Accessed by updates thread */ GHashTable *allowances; GHashTable *pending_events; + + /* Accessed by both updates and dbus threads */ + GMutex mutex; GHashTable *ready_events; } WritebackPrivate; @@ -115,9 +119,16 @@ tracker_writeback_reset_ready () GHashTable * tracker_writeback_get_ready (void) { + GHashTable *events; + g_return_val_if_fail (private != NULL, NULL); - return private->ready_events; + g_mutex_lock (&private->mutex); + events = private->ready_events; + private->ready_events = NULL; + g_mutex_unlock (&private->mutex); + + return events; } static void @@ -145,6 +156,7 @@ tracker_writeback_init (TrackerDataManager *data_manager, g_return_if_fail (private == NULL); private = g_new0 (WritebackPrivate, 1); + g_mutex_init (&private->mutex); private->allowances = g_hash_table_new_full (g_direct_hash, g_direct_equal, @@ -191,6 +203,8 @@ tracker_writeback_transact (void) if (!private->pending_events) return; + g_mutex_lock (&private->mutex); + if (!private->ready_events) { private->ready_events = g_hash_table_new_full (g_direct_hash, g_direct_equal, (GDestroyNotify) NULL, @@ -203,6 +217,8 @@ tracker_writeback_transact (void) g_hash_table_insert (private->ready_events, key, value); g_hash_table_iter_remove (&iter); } + + g_mutex_unlock (&private->mutex); } void @@ -214,7 +230,8 @@ tracker_writeback_shutdown (void) /* Perhaps hurry an emit of the ready events here? We're shutting down, * so I guess we're not required to do that here ... ? */ - tracker_writeback_reset_ready (); + g_clear_pointer (&private->ready_events, + (GDestroyNotify) g_hash_table_unref); free_private (private); private = NULL; diff --git a/src/tracker-store/tracker-writeback.vapi b/src/tracker-store/tracker-writeback.vapi index 368de0303..7c48512b8 100644 --- a/src/tracker-store/tracker-writeback.vapi +++ b/src/tracker-store/tracker-writeback.vapi @@ -26,9 +26,8 @@ namespace Tracker { public void init (Tracker.Data.Manager data_manager, WritebackGetPredicatesFunc callback); public void shutdown (); public void check (int graph_id, string graph, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types); - public unowned GLib.HashTable<int, GLib.Array<int>> get_ready (); + public GLib.HashTable<int, GLib.Array<int>> get_ready (); public void reset_pending (); - public void reset_ready (); public void transact (); } } diff --git a/tests/libtracker-miner/tracker-file-notifier-test.c b/tests/libtracker-miner/tracker-file-notifier-test.c index fd8264467..1bdc16085 100644 --- a/tests/libtracker-miner/tracker-file-notifier-test.c +++ b/tests/libtracker-miner/tracker-file-notifier-test.c @@ -668,6 +668,7 @@ test_file_notifier_monitor_updates_non_recursive (TestCommonContext *fixture, { OPERATION_CREATE, "non-recursive/bbb", NULL } }; FilesystemOperation expected_results2[] = { + { OPERATION_CREATE, "non-recursive", NULL }, { OPERATION_UPDATE, "non-recursive/bbb", NULL }, { OPERATION_CREATE, "non-recursive/ccc", NULL }, { OPERATION_UPDATE, "non-recursive/ccc", NULL } @@ -716,6 +717,7 @@ test_file_notifier_monitor_updates_recursive (TestCommonContext *fixture, { OPERATION_CREATE, "recursive/bbb", NULL } }; FilesystemOperation expected_results2[] = { + { OPERATION_CREATE, "recursive", NULL }, { OPERATION_CREATE, "recursive/folder", NULL }, { OPERATION_CREATE, "recursive/folder/aaa", NULL }, { OPERATION_UPDATE, "recursive/bbb", NULL }, |