diff options
Diffstat (limited to 'src/libtracker-direct/tracker-direct.c')
-rw-r--r-- | src/libtracker-direct/tracker-direct.c | 888 |
1 files changed, 888 insertions, 0 deletions
diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c new file mode 100644 index 000000000..4b8d74a73 --- /dev/null +++ b/src/libtracker-direct/tracker-direct.c @@ -0,0 +1,888 @@ +/* + * Copyright (C) 2010, Nokia <ivan.frade@nokia.com> + * Copyright (C) 2017, Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "config.h" + +#include "tracker-direct.h" +#include <libtracker-data/tracker-data.h> + +static TrackerDBManagerFlags default_flags = 0; + +typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate; + +struct _TrackerDirectConnectionPrivate +{ + TrackerSparqlConnectionFlags flags; + GFile *store; + GFile *journal; + GFile *ontology; + + TrackerNamespaceManager *namespace_manager; + TrackerDataManager *data_manager; + GMutex mutex; + + GThreadPool *update_thread; /* Contains 1 exclusive thread */ + GThreadPool *select_pool; + + guint initialized : 1; +}; + +enum { + PROP_0, + PROP_FLAGS, + PROP_STORE_LOCATION, + PROP_JOURNAL_LOCATION, + PROP_ONTOLOGY_LOCATION, + N_PROPS +}; + +static GParamSpec *props[N_PROPS] = { NULL }; + +typedef enum { + TASK_TYPE_QUERY, + TASK_TYPE_UPDATE, + TASK_TYPE_UPDATE_BLANK, + TASK_TYPE_TURTLE +} TaskType; + +typedef struct { + TaskType type; + union { + gchar *query; + GFile *turtle_file; + } data; +} TaskData; + +static void tracker_direct_connection_initable_iface_init (GInitableIface *iface); +static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface); + +G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection, + TRACKER_SPARQL_TYPE_CONNECTION, + G_ADD_PRIVATE (TrackerDirectConnection) + G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, + tracker_direct_connection_initable_iface_init) + G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE, + tracker_direct_connection_async_initable_iface_init)) + +static TaskData * +task_data_query_new (TaskType type, + const gchar *sparql) +{ + TaskData *data; + + g_assert (type != TASK_TYPE_TURTLE); + data = g_new0 (TaskData, 1); + data->type = type; + data->data.query = g_strdup (sparql); + + return data; +} + +static TaskData * +task_data_turtle_new (GFile *file) +{ + TaskData *data; + + data = g_new0 (TaskData, 1); + data->type = TASK_TYPE_TURTLE; + g_set_object (&data->data.turtle_file, file); + + return data; +} + +static void +task_data_free (TaskData *task) +{ + if (task->type == TASK_TYPE_TURTLE) + g_object_unref (task->data.turtle_file); + else + g_free (task->data.query); + + g_free (task); +} + +static void +update_thread_func (gpointer data, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + TrackerData *tracker_data; + GError *error = NULL; + gpointer retval = NULL; + GDestroyNotify destroy_notify = NULL; + + conn = user_data; + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + tracker_data = tracker_data_manager_get_data (priv->data_manager); + + switch (task_data->type) { + case TASK_TYPE_QUERY: + g_warning ("Queries don't go through this thread"); + break; + case TASK_TYPE_UPDATE: + tracker_data_update_sparql (tracker_data, task_data->data.query, &error); + break; + case TASK_TYPE_UPDATE_BLANK: + retval = tracker_data_update_sparql_blank (tracker_data, task_data->data.query, &error); + destroy_notify = (GDestroyNotify) g_variant_unref; + break; + case TASK_TYPE_TURTLE: + tracker_data_load_turtle_file (tracker_data, task_data->data.turtle_file, &error); + break; + } + + if (error) + g_task_return_error (task, error); + else if (retval) + g_task_return_pointer (task, retval, destroy_notify); + else + g_task_return_boolean (task, TRUE); + + g_mutex_unlock (&priv->mutex); +} + +static void +query_thread_pool_func (gpointer data, + gpointer user_data) +{ + TrackerSparqlCursor *cursor; + GTask *task = data; + TaskData *task_data = g_task_get_task_data (task); + GError *error = NULL; + + g_assert (task_data->type == TASK_TYPE_QUERY); + cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), + task_data->data.query, + g_task_get_cancellable (task), + &error); + if (cursor) + g_task_return_pointer (task, cursor, g_object_unref); + else + g_task_return_error (task, error); +} + +static void +wal_checkpoint (TrackerDBInterface *iface, + gboolean blocking) +{ + GError *error = NULL; + + g_debug ("Checkpointing database..."); + tracker_db_interface_sqlite_wal_checkpoint (iface, blocking, &error); + + if (error) { + g_warning ("Error in WAL checkpoint: %s", error->message); + g_error_free (error); + } + + g_debug ("Checkpointing complete"); +} + +static gpointer +wal_checkpoint_thread (gpointer data) +{ + TrackerDBInterface *wal_iface = data; + + wal_checkpoint (wal_iface, FALSE); + g_object_unref (wal_iface); + return NULL; +} + +static void +wal_hook (TrackerDBInterface *iface, + gint n_pages) +{ + TrackerDataManager *data_manager = tracker_db_interface_get_user_data (iface); + TrackerDBInterface *wal_iface = tracker_data_manager_get_wal_db_interface (data_manager); + + if (!wal_iface) + return; + + if (n_pages >= 10000) { + /* Do immediate checkpointing (blocking updates) to + * prevent excessive WAL file growth. + */ + wal_checkpoint (wal_iface, TRUE); + } else { + /* Defer non-blocking checkpoint to thread */ + g_thread_try_new ("wal-checkpoint", wal_checkpoint_thread, + g_object_ref (wal_iface), NULL); + } +} + +static gint +task_compare_func (GTask *a, + GTask *b, + gpointer user_data) +{ + return g_task_get_priority (b) - g_task_get_priority (a); +} + +static gboolean +set_up_thread_pools (TrackerDirectConnection *conn, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (conn); + + priv->select_pool = g_thread_pool_new (query_thread_pool_func, + conn, 16, FALSE, error); + if (!priv->select_pool) + return FALSE; + + priv->update_thread = g_thread_pool_new (update_thread_func, + conn, 1, TRUE, error); + if (!priv->update_thread) + return FALSE; + + g_thread_pool_set_sort_function (priv->select_pool, + (GCompareDataFunc) task_compare_func, + conn); + g_thread_pool_set_sort_function (priv->update_thread, + (GCompareDataFunc) task_compare_func, + conn); + return TRUE; +} + +static gboolean +tracker_direct_connection_initable_init (GInitable *initable, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES; + TrackerDBInterface *iface; + GHashTable *namespaces; + GHashTableIter iter; + gchar *prefix, *ns; + + conn = TRACKER_DIRECT_CONNECTION (initable); + priv = tracker_direct_connection_get_instance_private (conn); + + tracker_locale_sanity_check (); + + if (!set_up_thread_pools (conn, error)) + return FALSE; + + /* Init data manager */ + if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) + db_flags |= TRACKER_DB_MANAGER_READONLY; + + priv->data_manager = tracker_data_manager_new (db_flags | default_flags, priv->store, + priv->journal, priv->ontology, + FALSE, FALSE, 100, 100); + if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, error)) + return FALSE; + + /* Set up WAL hook on our connection */ + iface = tracker_data_manager_get_writable_db_interface (priv->data_manager); + tracker_db_interface_sqlite_wal_hook (iface, wal_hook); + + /* Initialize namespace manager */ + priv->namespace_manager = tracker_namespace_manager_new (); + namespaces = tracker_data_manager_get_namespaces (priv->data_manager); + g_hash_table_iter_init (&iter, namespaces); + + while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) { + tracker_namespace_manager_add_prefix (priv->namespace_manager, + prefix, ns); + } + + return TRUE; +} + +static void +tracker_direct_connection_initable_iface_init (GInitableIface *iface) +{ + iface->init = tracker_direct_connection_initable_init; +} + +static void +async_initable_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GError *error = NULL; + + if (!g_initable_init (G_INITABLE (source_object), cancellable, &error)) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); +} + +static void +tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + + task = g_task_new (async_initable, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_run_in_thread (task, async_initable_thread_func); +} + +static gboolean +tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (res), error); +} + +static void +tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface) +{ + iface->init_async = tracker_direct_connection_async_initable_init_async; + iface->init_finish = tracker_direct_connection_async_initable_init_finish; +} + +static void +tracker_direct_connection_init (TrackerDirectConnection *conn) +{ +} + +static void +tracker_direct_connection_finalize (GObject *object) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + if (priv->data_manager) { + TrackerDBInterface *wal_iface; + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); + } + + g_clear_object (&priv->store); + g_clear_object (&priv->journal); + g_clear_object (&priv->ontology); + g_clear_object (&priv->data_manager); + + G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object); +} + +static void +tracker_direct_connection_set_property (GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + priv->flags = g_value_get_enum (value); + break; + case PROP_STORE_LOCATION: + priv->store = g_value_dup_object (value); + break; + case PROP_JOURNAL_LOCATION: + priv->journal = g_value_dup_object (value); + break; + case PROP_ONTOLOGY_LOCATION: + priv->ontology = g_value_dup_object (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +tracker_direct_connection_get_property (GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (object); + priv = tracker_direct_connection_get_instance_private (conn); + + switch (prop_id) { + case PROP_FLAGS: + g_value_set_enum (value, priv->flags); + break; + case PROP_STORE_LOCATION: + g_value_set_object (value, priv->store); + break; + case PROP_JOURNAL_LOCATION: + g_value_set_object (value, priv->journal); + break; + case PROP_ONTOLOGY_LOCATION: + g_value_set_object (value, priv->ontology); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static TrackerSparqlCursor * +tracker_direct_connection_query (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerSparqlQuery *query; + TrackerSparqlCursor *cursor; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + query = tracker_sparql_query_new (priv->data_manager, sparql); + cursor = TRACKER_SPARQL_CURSOR (tracker_sparql_query_execute_cursor (query, error)); + if (cursor) + tracker_sparql_cursor_set_connection (cursor, self); + g_mutex_unlock (&priv->mutex); + + return cursor; +} + +static void +tracker_direct_connection_query_async (TrackerSparqlConnection *self, + const gchar *sparql, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GError *error = NULL; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_QUERY, sparql), + (GDestroyNotify) task_data_free); + + if (!g_thread_pool_push (priv->select_pool, task, &error)) + g_task_return_error (task, error); +} + +static TrackerSparqlCursor * +tracker_direct_connection_query_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_update (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_update_sparql (data, sparql, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_update_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_update_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_boolean (G_TASK (res), error); +} + +static void +error_free (GError *error) +{ + if (error) + g_error_free (error); +} + +static void +update_array_async_thread_func (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + gchar **updates = task_data; + gchar *concatenated; + GPtrArray *errors; + GError *error = NULL; + gint i; + + errors = g_ptr_array_new_with_free_func ((GDestroyNotify) error_free); + g_ptr_array_set_size (errors, g_strv_length (updates)); + + /* Fast path, perform everything as a single update */ + concatenated = g_strjoinv (" ", updates); + tracker_sparql_connection_update (source_object, concatenated, + g_task_get_priority (task), + cancellable, &error); + + if (!error) { + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); + return; + } + + /* Slow path, perform updates one by one */ + for (i = 0; updates[i]; i++) { + GError *err = NULL; + + err = g_ptr_array_index (errors, i); + tracker_sparql_connection_update (source_object, updates[i], + g_task_get_priority (task), + cancellable, &err); + } + + g_task_return_pointer (task, errors, + (GDestroyNotify) g_ptr_array_unref); +} + +static void +tracker_direct_connection_update_array_async (TrackerSparqlConnection *self, + gchar **updates, + gint n_updates, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gchar **copy; + gint i = 0; + + copy = g_new0 (gchar*, n_updates + 1); + + for (i = 0; i < n_updates; i++) { + g_return_if_fail (updates[i] != NULL); + copy[i] = g_strdup (updates[i]); + } + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, copy, (GDestroyNotify) g_strfreev); + + g_task_run_in_thread (task, update_array_async_thread_func); +} + +static GPtrArray * +tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static GVariant * +tracker_direct_connection_update_blank (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + GVariant *blank_nodes; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + blank_nodes = tracker_data_update_sparql_blank (data, sparql, error); + g_mutex_unlock (&priv->mutex); + + return blank_nodes; +} + +static void +tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self, + const gchar *sparql, + gint priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_priority (task, priority); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static GVariant * +tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + +static void +tracker_direct_connection_load (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + tracker_data_load_turtle_file (data, file, error); + g_mutex_unlock (&priv->mutex); +} + +static void +tracker_direct_connection_load_async (TrackerSparqlConnection *self, + GFile *file, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_turtle_new (file), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static void +tracker_direct_connection_load_finish (TrackerSparqlConnection *self, + GAsyncResult *res, + GError **error) +{ + g_task_propagate_pointer (G_TASK (res), error); +} + +static TrackerNamespaceManager * +tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self)); + + return priv->namespace_manager; +} + +static void +tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) +{ + TrackerSparqlConnectionClass *sparql_connection_class; + GObjectClass *object_class; + + object_class = G_OBJECT_CLASS (klass); + sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass); + + object_class->finalize = tracker_direct_connection_finalize; + object_class->set_property = tracker_direct_connection_set_property; + object_class->get_property = tracker_direct_connection_get_property; + + sparql_connection_class->query = tracker_direct_connection_query; + sparql_connection_class->query_async = tracker_direct_connection_query_async; + sparql_connection_class->query_finish = tracker_direct_connection_query_finish; + sparql_connection_class->update = tracker_direct_connection_update; + sparql_connection_class->update_async = tracker_direct_connection_update_async; + sparql_connection_class->update_finish = tracker_direct_connection_update_finish; + sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async; + sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish; + sparql_connection_class->update_blank = tracker_direct_connection_update_blank; + sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async; + sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish; + sparql_connection_class->load = tracker_direct_connection_load; + sparql_connection_class->load_async = tracker_direct_connection_load_async; + sparql_connection_class->load_finish = tracker_direct_connection_load_finish; + sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager; + + props[PROP_FLAGS] = + g_param_spec_enum ("flags", + "Flags", + "Flags", + TRACKER_SPARQL_TYPE_CONNECTION_FLAGS, + TRACKER_SPARQL_CONNECTION_FLAGS_NONE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_STORE_LOCATION] = + g_param_spec_object ("store-location", + "Store location", + "Store location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_JOURNAL_LOCATION] = + g_param_spec_object ("journal-location", + "Journal location", + "Journal location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + props[PROP_ONTOLOGY_LOCATION] = + g_param_spec_object ("ontology-location", + "Ontology location", + "Ontology location", + G_TYPE_FILE, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY); + + g_object_class_install_properties (object_class, N_PROPS, props); +} + +TrackerDirectConnection * +tracker_direct_connection_new (TrackerSparqlConnectionFlags flags, + GFile *store, + GFile *journal, + GFile *ontology, + GError **error) +{ + g_return_val_if_fail (G_IS_FILE (store), NULL); + g_return_val_if_fail (G_IS_FILE (journal), NULL); + g_return_val_if_fail (G_IS_FILE (ontology), NULL); + g_return_val_if_fail (!error || !*error, NULL); + + return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION, + "flags", flags, + "store-location", store, + "journal-location", journal, + "ontology-location", ontology, + NULL); +} + +TrackerDataManager * +tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + + priv = tracker_direct_connection_get_instance_private (conn); + return priv->data_manager; +} + +void +tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags) +{ + default_flags = flags; +} + +void +tracker_direct_connection_sync (TrackerDirectConnection *conn) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDBInterface *wal_iface; + + priv = tracker_direct_connection_get_instance_private (conn); + + if (!priv->data_manager) + return; + + /* Wait for pending updates. */ + if (priv->update_thread) + g_thread_pool_free (priv->update_thread, TRUE, TRUE); + /* Selects are less important, readonly interfaces won't be bothersome */ + if (priv->select_pool) + g_thread_pool_free (priv->select_pool, TRUE, FALSE); + + set_up_thread_pools (conn, NULL); + + wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager); + if (wal_iface) + tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL); +} |