summaryrefslogtreecommitdiff
path: root/src/libtracker-sparql/direct/tracker-direct.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libtracker-sparql/direct/tracker-direct.c')
-rw-r--r--src/libtracker-sparql/direct/tracker-direct.c266
1 files changed, 245 insertions, 21 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index 3d867283c..4908c75b5 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -21,6 +21,7 @@
#include "config.h"
#include "tracker-direct.h"
+#include "tracker-direct-batch.h"
#include "tracker-direct-statement.h"
#include "libtracker-sparql/tracker-private.h"
#include <libtracker-data/tracker-data.h>
@@ -54,6 +55,11 @@ struct _TrackerDirectConnectionPrivate
guint closing : 1;
};
+typedef struct {
+ gchar *graph;
+ TrackerResource *resource;
+} UpdateResource;
+
enum {
PROP_0,
PROP_FLAGS,
@@ -68,12 +74,15 @@ typedef enum {
TASK_TYPE_QUERY,
TASK_TYPE_UPDATE,
TASK_TYPE_UPDATE_BLANK,
+ TASK_TYPE_UPDATE_RESOURCE,
+ TASK_TYPE_UPDATE_BATCH,
TASK_TYPE_RELEASE_MEMORY,
} TaskType;
typedef struct {
TaskType type;
- gchar *query;
+ gpointer data;
+ GDestroyNotify destroy;
} TaskData;
static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
@@ -90,22 +99,25 @@ G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
tracker_direct_connection_async_initable_iface_init))
static TaskData *
-task_data_query_new (TaskType type,
- const gchar *sparql)
+task_data_query_new (TaskType type,
+ gpointer data,
+ GDestroyNotify destroy)
{
- TaskData *data;
+ TaskData *task;
- data = g_new0 (TaskData, 1);
- data->type = type;
- data->query = g_strdup (sparql);
+ task = g_new0 (TaskData, 1);
+ task->type = type;
+ task->data = data;
+ task->destroy = destroy;
- return data;
+ return task;
}
static void
task_data_free (TaskData *task)
{
- g_free (task->query);
+ if (task->destroy && task->data)
+ task->destroy (task->data);
g_free (task);
}
@@ -131,7 +143,7 @@ cleanup_timeout_cb (gpointer user_data)
task = g_task_new (conn, NULL, NULL, NULL);
g_task_set_task_data (task,
- task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL),
+ task_data_query_new (TASK_TYPE_RELEASE_MEMORY, NULL, NULL),
(GDestroyNotify) task_data_free);
g_thread_pool_push (priv->update_thread, task, NULL);
@@ -139,6 +151,39 @@ cleanup_timeout_cb (gpointer user_data)
return G_SOURCE_CONTINUE;
}
+gboolean
+update_resource (TrackerData *data,
+ const gchar *graph,
+ TrackerResource *resource,
+ GError **error)
+{
+ GError *inner_error = NULL;
+
+ tracker_data_begin_transaction (data, &inner_error);
+ if (inner_error)
+ goto error;
+
+ tracker_data_update_resource (data,
+ graph,
+ resource,
+ NULL,
+ &inner_error);
+
+ if (inner_error) {
+ tracker_data_rollback_transaction (data);
+ goto error;
+ }
+
+ tracker_data_commit_transaction (data, &inner_error);
+ if (inner_error)
+ goto error;
+
+ return TRUE;
+
+error:
+ g_propagate_error (error, inner_error);
+ return FALSE;
+}
static void
update_thread_func (gpointer data,
@@ -165,12 +210,20 @@ update_thread_func (gpointer data,
g_warning ("Queries don't go through this thread");
break;
case TASK_TYPE_UPDATE:
- tracker_data_update_sparql (tracker_data, task_data->query, &error);
+ tracker_data_update_sparql (tracker_data, task_data->data, &error);
break;
case TASK_TYPE_UPDATE_BLANK:
- retval = tracker_data_update_sparql_blank (tracker_data, task_data->query, &error);
+ retval = tracker_data_update_sparql_blank (tracker_data, task_data->data, &error);
destroy_notify = (GDestroyNotify) g_variant_unref;
break;
+ case TASK_TYPE_UPDATE_RESOURCE: {
+ UpdateResource *data = task_data->data;
+ update_resource (tracker_data, data->graph, data->resource, &error);
+ break;
+ }
+ case TASK_TYPE_UPDATE_BATCH:
+ tracker_direct_batch_update (task_data->data, priv->data_manager, &error);
+ break;
case TASK_TYPE_RELEASE_MEMORY:
tracker_data_manager_release_memory (priv->data_manager);
update_timestamp = FALSE;
@@ -217,7 +270,7 @@ query_thread_pool_func (gpointer data,
}
cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
- task_data->query,
+ task_data->data,
g_task_get_cancellable (task),
&error);
if (cursor)
@@ -705,7 +758,9 @@ tracker_direct_connection_query_async (TrackerSparqlConnection *self,
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_task_data (task,
- task_data_query_new (TASK_TYPE_QUERY, sparql),
+ task_data_query_new (TASK_TYPE_QUERY,
+ g_strdup (sparql),
+ g_free),
(GDestroyNotify) task_data_free);
if (!g_thread_pool_push (priv->select_pool, task, &error)) {
@@ -771,7 +826,9 @@ tracker_direct_connection_update_async (TrackerSparqlConnection *self,
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_task_data (task,
- task_data_query_new (TASK_TYPE_UPDATE, sparql),
+ task_data_query_new (TASK_TYPE_UPDATE,
+ g_strdup (sparql),
+ g_free),
(GDestroyNotify) task_data_free);
g_thread_pool_push (priv->update_thread, task, NULL);
@@ -813,8 +870,7 @@ tracker_direct_connection_update_array_async (TrackerSparqlConnection *self,
concatenated = g_strjoinv ("\n", array_copy);
g_free (array_copy);
- task_data = task_data_query_new (TASK_TYPE_UPDATE, NULL);
- task_data->query = concatenated;
+ task_data = task_data_query_new (TASK_TYPE_UPDATE, concatenated, g_free);
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_task_data (task, task_data,
@@ -880,7 +936,9 @@ tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self,
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_task_data (task,
- task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql),
+ task_data_query_new (TASK_TYPE_UPDATE_BLANK,
+ g_strdup (sparql),
+ g_free),
(GDestroyNotify) task_data_free);
g_thread_pool_push (priv->update_thread, task, NULL);
@@ -984,7 +1042,7 @@ tracker_direct_connection_close (TrackerSparqlConnection *self)
}
}
-void
+static void
async_close_thread_func (GTask *task,
gpointer source_object,
gpointer task_data,
@@ -997,7 +1055,7 @@ async_close_thread_func (GTask *task,
g_task_return_boolean (task, TRUE);
}
-void
+static void
tracker_direct_connection_close_async (TrackerSparqlConnection *connection,
GCancellable *cancellable,
GAsyncReadyCallback callback,
@@ -1010,7 +1068,7 @@ tracker_direct_connection_close_async (TrackerSparqlConnection *connection,
g_object_unref (task);
}
-gboolean
+static gboolean
tracker_direct_connection_close_finish (TrackerSparqlConnection *connection,
GAsyncResult *res,
GError **error)
@@ -1018,6 +1076,106 @@ tracker_direct_connection_close_finish (TrackerSparqlConnection *connection,
return g_task_propagate_boolean (G_TASK (res), error);
}
+static UpdateResource *
+update_resource_data_new (const gchar *graph,
+ TrackerResource *resource)
+{
+ UpdateResource *data;
+
+ data = g_new0 (UpdateResource, 1);
+ data->graph = g_strdup (graph);
+ data->resource = g_object_ref (resource);
+
+ return data;
+}
+
+static void
+update_resource_data_free (UpdateResource *data)
+{
+ g_free (data->graph);
+ g_object_unref (data->resource);
+ g_free (data);
+}
+
+static gboolean
+tracker_direct_connection_update_resource (TrackerSparqlConnection *self,
+ const gchar *graph,
+ TrackerResource *resource,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+ GError *inner_error = NULL;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ update_resource (data, graph, resource, &inner_error);
+ tracker_direct_connection_update_timestamp (conn);
+ g_mutex_unlock (&priv->mutex);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+tracker_direct_connection_update_resource_async (TrackerSparqlConnection *self,
+ const gchar *graph,
+ TrackerResource *resource,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TaskData *task_data;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task_data = task_data_query_new (TASK_TYPE_UPDATE_RESOURCE,
+ update_resource_data_new (graph, resource),
+ (GDestroyNotify) update_resource_data_free);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task, task_data,
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static gboolean
+tracker_direct_connection_update_resource_finish (TrackerSparqlConnection *connection,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static TrackerBatch *
+tracker_direct_connection_create_batch (TrackerSparqlConnection *connection)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (connection);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY)
+ return NULL;
+
+ return tracker_direct_batch_new (connection);
+}
+
static void
tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
{
@@ -1048,6 +1206,10 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
sparql_connection_class->close = tracker_direct_connection_close;
sparql_connection_class->close_async = tracker_direct_connection_close_async;
sparql_connection_class->close_finish = tracker_direct_connection_close_finish;
+ sparql_connection_class->update_resource = tracker_direct_connection_update_resource;
+ sparql_connection_class->update_resource_async = tracker_direct_connection_update_resource_async;
+ sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
+ sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
props[PROP_FLAGS] =
g_param_spec_flags ("flags",
@@ -1109,3 +1271,65 @@ tracker_direct_connection_update_timestamp (TrackerDirectConnection *conn)
priv = tracker_direct_connection_get_instance_private (conn);
priv->timestamp = g_get_monotonic_time ();
}
+
+gboolean
+tracker_direct_connection_update_batch (TrackerDirectConnection *conn,
+ TrackerBatch *batch,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ GError *inner_error = NULL;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ tracker_direct_batch_update (TRACKER_DIRECT_BATCH (batch),
+ priv->data_manager, &inner_error);
+ tracker_direct_connection_update_timestamp (conn);
+ g_mutex_unlock (&priv->mutex);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+void
+tracker_direct_connection_update_batch_async (TrackerDirectConnection *conn,
+ TrackerBatch *batch,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ GTask *task;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (batch, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_UPDATE_BATCH,
+ g_object_ref (batch),
+ g_object_unref),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+gboolean
+tracker_direct_connection_update_batch_finish (TrackerDirectConnection *conn,
+ GAsyncResult *res,
+ GError **error)
+{
+ GError *inner_error = NULL;
+
+ g_task_propagate_boolean (G_TASK (res), &inner_error);
+ if (inner_error) {
+ g_propagate_error (error, _translate_internal_error (inner_error));
+ return FALSE;
+ }
+
+ return TRUE;
+}