diff options
Diffstat (limited to 'src/libtracker-sparql/direct/tracker-direct.c')
-rw-r--r-- | src/libtracker-sparql/direct/tracker-direct.c | 266 |
1 files changed, 245 insertions, 21 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c index 3d867283c..4908c75b5 100644 --- a/src/libtracker-sparql/direct/tracker-direct.c +++ b/src/libtracker-sparql/direct/tracker-direct.c @@ -21,6 +21,7 @@ #include "config.h" #include "tracker-direct.h" +#include "tracker-direct-batch.h" #include "tracker-direct-statement.h" #include "libtracker-sparql/tracker-private.h" #include <libtracker-data/tracker-data.h> @@ -54,6 +55,11 @@ struct _TrackerDirectConnectionPrivate guint closing : 1; }; +typedef struct { + gchar *graph; + TrackerResource *resource; +} UpdateResource; + enum { PROP_0, PROP_FLAGS, @@ -68,12 +74,15 @@ typedef enum { TASK_TYPE_QUERY, TASK_TYPE_UPDATE, TASK_TYPE_UPDATE_BLANK, + TASK_TYPE_UPDATE_RESOURCE, + TASK_TYPE_UPDATE_BATCH, TASK_TYPE_RELEASE_MEMORY, } TaskType; typedef struct { TaskType type; - gchar *query; + gpointer data; + GDestroyNotify destroy; } TaskData; static void tracker_direct_connection_initable_iface_init (GInitableIface *iface); @@ -90,22 +99,25 @@ G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection, tracker_direct_connection_async_initable_iface_init)) static TaskData * -task_data_query_new (TaskType type, - const gchar *sparql) +task_data_query_new (TaskType type, + gpointer data, + GDestroyNotify destroy) { - TaskData *data; + TaskData *task; - data = g_new0 (TaskData, 1); - data->type = type; - data->query = g_strdup (sparql); + task = g_new0 (TaskData, 1); + task->type = type; + task->data = data; + task->destroy = destroy; - return data; + return task; } static void task_data_free (TaskData *task) { - g_free (task->query); + if (task->destroy && task->data) + task->destroy (task->data); g_free (task); } @@ -131,7 +143,7 @@ cleanup_timeout_cb (gpointer user_data) task = g_task_new (conn, NULL, NULL, NULL); g_task_set_task_data (task, - task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL), + task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL, NULL), (GDestroyNotify) task_data_free); g_thread_pool_push (priv->update_thread, task, NULL); @@ -139,6 +151,39 @@ cleanup_timeout_cb (gpointer user_data) return G_SOURCE_CONTINUE; } +gboolean +update_resource (TrackerData *data, + const gchar *graph, + TrackerResource *resource, + GError **error) +{ + GError *inner_error = NULL; + + tracker_data_begin_transaction (data, &inner_error); + if (inner_error) + goto error; + + tracker_data_update_resource (data, + graph, + resource, + NULL, + &inner_error); + + if (inner_error) { + tracker_data_rollback_transaction (data); + goto error; + } + + tracker_data_commit_transaction (data, &inner_error); + if (inner_error) + goto error; + + return TRUE; + +error: + g_propagate_error (error, inner_error); + return FALSE; +} static void update_thread_func (gpointer data, @@ -165,12 +210,20 @@ update_thread_func (gpointer data, g_warning ("Queries don't go through this thread"); break; case TASK_TYPE_UPDATE: - tracker_data_update_sparql (tracker_data, task_data->query, &error); + tracker_data_update_sparql (tracker_data, task_data->data, &error); break; case TASK_TYPE_UPDATE_BLANK: - retval = tracker_data_update_sparql_blank (tracker_data, task_data->query, &error); + retval = tracker_data_update_sparql_blank (tracker_data, task_data->data, &error); destroy_notify = (GDestroyNotify) g_variant_unref; break; + case TASK_TYPE_UPDATE_RESOURCE: { + UpdateResource *data = task_data->data; + update_resource (tracker_data, data->graph, data->resource, &error); + break; + } + case TASK_TYPE_UPDATE_BATCH: + tracker_direct_batch_update (task_data->data, priv->data_manager, &error); + break; case TASK_TYPE_RELEASE_MEMORY: tracker_data_manager_release_memory (priv->data_manager); update_timestamp = FALSE; @@ -217,7 +270,7 @@ query_thread_pool_func (gpointer data, } cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), - task_data->query, + task_data->data, g_task_get_cancellable (task), &error); if (cursor) @@ -705,7 +758,9 @@ tracker_direct_connection_query_async (TrackerSparqlConnection *self, task = g_task_new (self, cancellable, callback, user_data); g_task_set_task_data (task, - task_data_query_new (TASK_TYPE_QUERY, sparql), + task_data_query_new (TASK_TYPE_QUERY, + g_strdup (sparql), + g_free), (GDestroyNotify) task_data_free); if (!g_thread_pool_push (priv->select_pool, task, &error)) { @@ -771,7 +826,9 @@ tracker_direct_connection_update_async (TrackerSparqlConnection *self, task = g_task_new (self, cancellable, callback, user_data); g_task_set_task_data (task, - task_data_query_new (TASK_TYPE_UPDATE, sparql), + task_data_query_new (TASK_TYPE_UPDATE, + g_strdup (sparql), + g_free), (GDestroyNotify) task_data_free); g_thread_pool_push (priv->update_thread, task, NULL); @@ -813,8 +870,7 @@ tracker_direct_connection_update_array_async (TrackerSparqlConnection *self, concatenated = g_strjoinv ("\n", array_copy); g_free (array_copy); - task_data = task_data_query_new (TASK_TYPE_UPDATE, NULL); - task_data->query = concatenated; + task_data = task_data_query_new (TASK_TYPE_UPDATE, concatenated, g_free); task = g_task_new (self, cancellable, callback, user_data); g_task_set_task_data (task, task_data, @@ -880,7 +936,9 @@ tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self, task = g_task_new (self, cancellable, callback, user_data); g_task_set_task_data (task, - task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql), + task_data_query_new (TASK_TYPE_UPDATE_BLANK, + g_strdup (sparql), + g_free), (GDestroyNotify) task_data_free); g_thread_pool_push (priv->update_thread, task, NULL); @@ -984,7 +1042,7 @@ tracker_direct_connection_close (TrackerSparqlConnection *self) } } -void +static void async_close_thread_func (GTask *task, gpointer source_object, gpointer task_data, @@ -997,7 +1055,7 @@ async_close_thread_func (GTask *task, g_task_return_boolean (task, TRUE); } -void +static void tracker_direct_connection_close_async (TrackerSparqlConnection *connection, GCancellable *cancellable, GAsyncReadyCallback callback, @@ -1010,7 +1068,7 @@ tracker_direct_connection_close_async (TrackerSparqlConnection *connection, g_object_unref (task); } -gboolean +static gboolean tracker_direct_connection_close_finish (TrackerSparqlConnection *connection, GAsyncResult *res, GError **error) @@ -1018,6 +1076,106 @@ tracker_direct_connection_close_finish (TrackerSparqlConnection *connection, return g_task_propagate_boolean (G_TASK (res), error); } +static UpdateResource * +update_resource_data_new (const gchar *graph, + TrackerResource *resource) +{ + UpdateResource *data; + + data = g_new0 (UpdateResource, 1); + data->graph = g_strdup (graph); + data->resource = g_object_ref (resource); + + return data; +} + +static void +update_resource_data_free (UpdateResource *data) +{ + g_free (data->graph); + g_object_unref (data->resource); + g_free (data); +} + +static gboolean +tracker_direct_connection_update_resource (TrackerSparqlConnection *self, + const gchar *graph, + TrackerResource *resource, + GCancellable *cancellable, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerData *data; + GError *inner_error = NULL; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + data = tracker_data_manager_get_data (priv->data_manager); + update_resource (data, graph, resource, &inner_error); + tracker_direct_connection_update_timestamp (conn); + g_mutex_unlock (&priv->mutex); + + if (inner_error) { + g_propagate_error (error, inner_error); + return FALSE; + } + + return TRUE; +} + +static void +tracker_direct_connection_update_resource_async (TrackerSparqlConnection *self, + const gchar *graph, + TrackerResource *resource, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TaskData *task_data; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task_data = task_data_query_new (TASK_TYPE_UPDATE_RESOURCE, + update_resource_data_new (graph, resource), + (GDestroyNotify) update_resource_data_free); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, task_data, + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +static gboolean +tracker_direct_connection_update_resource_finish (TrackerSparqlConnection *connection, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_boolean (G_TASK (res), error); +} + +static TrackerBatch * +tracker_direct_connection_create_batch (TrackerSparqlConnection *connection) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + + conn = TRACKER_DIRECT_CONNECTION (connection); + priv = tracker_direct_connection_get_instance_private (conn); + + if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY) + return NULL; + + return tracker_direct_batch_new (connection); +} + static void tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) { @@ -1048,6 +1206,10 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) sparql_connection_class->close = tracker_direct_connection_close; sparql_connection_class->close_async = tracker_direct_connection_close_async; sparql_connection_class->close_finish = tracker_direct_connection_close_finish; + sparql_connection_class->update_resource = tracker_direct_connection_update_resource; + sparql_connection_class->update_resource_async = tracker_direct_connection_update_resource_async; + sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish; + sparql_connection_class->create_batch = tracker_direct_connection_create_batch; props[PROP_FLAGS] = g_param_spec_flags ("flags", @@ -1109,3 +1271,65 @@ tracker_direct_connection_update_timestamp (TrackerDirectConnection *conn) priv = tracker_direct_connection_get_instance_private (conn); priv->timestamp = g_get_monotonic_time (); } + +gboolean +tracker_direct_connection_update_batch (TrackerDirectConnection *conn, + TrackerBatch *batch, + GError **error) +{ + TrackerDirectConnectionPrivate *priv; + GError *inner_error = NULL; + + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + tracker_direct_batch_update (TRACKER_DIRECT_BATCH (batch), + priv->data_manager, &inner_error); + tracker_direct_connection_update_timestamp (conn); + g_mutex_unlock (&priv->mutex); + + if (inner_error) { + g_propagate_error (error, inner_error); + return FALSE; + } + + return TRUE; +} + +void +tracker_direct_connection_update_batch_async (TrackerDirectConnection *conn, + TrackerBatch *batch, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + GTask *task; + + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (batch, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_UPDATE_BATCH, + g_object_ref (batch), + g_object_unref), + (GDestroyNotify) task_data_free); + + g_thread_pool_push (priv->update_thread, task, NULL); +} + +gboolean +tracker_direct_connection_update_batch_finish (TrackerDirectConnection *conn, + GAsyncResult *res, + GError **error) +{ + GError *inner_error = NULL; + + g_task_propagate_boolean (G_TASK (res), &inner_error); + if (inner_error) { + g_propagate_error (error, _translate_internal_error (inner_error)); + return FALSE; + } + + return TRUE; +} |