diff options
author | Carlos Garnacho <carlosg@gnome.org> | 2021-11-21 00:02:06 +0100 |
---|---|---|
committer | Carlos Garnacho <carlosg@gnome.org> | 2022-02-04 19:22:00 +0100 |
commit | 98a5ae8a0bf1ea16a2e1065c7e3a9d86d408afbf (patch) | |
tree | c050f778ed603248c3d26ecf670a2f923de5f5a5 /src/libtracker-sparql/direct/tracker-direct.c | |
parent | fbca0ab6321c841fa54239ee367ad3aff6d3b565 (diff) | |
download | tracker-98a5ae8a0bf1ea16a2e1065c7e3a9d86d408afbf.tar.gz |
libtracker-sparql: Implement serialize_async/finish in direct connection
This allows serialization of RDF into GInputStream, currently only TTL
format.
Diffstat (limited to 'src/libtracker-sparql/direct/tracker-direct.c')
-rw-r--r-- | src/libtracker-sparql/direct/tracker-direct.c | 163 |
1 files changed, 151 insertions, 12 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c index a4a69d44c..649da93eb 100644 --- a/src/libtracker-sparql/direct/tracker-direct.c +++ b/src/libtracker-sparql/direct/tracker-direct.c @@ -29,6 +29,7 @@ #include <libtracker-data/tracker-sparql.h> #include <libtracker-sparql/tracker-notifier-private.h> #include <libtracker-sparql/tracker-private.h> +#include <libtracker-sparql/tracker-serializer.h> typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate; @@ -61,6 +62,11 @@ typedef struct { TrackerResource *resource; } UpdateResource; +typedef struct { + gchar *query; + TrackerRdfFormat format; +} SerializeRdf; + enum { PROP_0, PROP_FLAGS, @@ -78,6 +84,7 @@ typedef enum { TASK_TYPE_UPDATE_RESOURCE, TASK_TYPE_UPDATE_BATCH, TASK_TYPE_RELEASE_MEMORY, + TASK_TYPE_SERIALIZE, } TaskType; typedef struct { @@ -208,6 +215,7 @@ update_thread_func (gpointer data, switch (task_data->type) { case TASK_TYPE_QUERY: + case TASK_TYPE_SERIALIZE: g_warning ("Queries don't go through this thread"); break; case TASK_TYPE_UPDATE: @@ -247,17 +255,85 @@ update_thread_func (gpointer data, } static void +execute_query_in_thread (GTask *task, + TaskData *task_data) +{ + TrackerSparqlCursor *cursor; + GError *error = NULL; + + cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), + task_data->data, + g_task_get_cancellable (task), + &error); + if (cursor) + g_task_return_pointer (task, cursor, g_object_unref); + else + g_task_return_error (task, error); +} + +static TrackerSerializerFormat +convert_format (TrackerRdfFormat format) +{ + switch (format) { + case TRACKER_RDF_FORMAT_TURTLE: + return TRACKER_SERIALIZER_FORMAT_TTL; + default: + g_assert_not_reached (); + } +} + +static void +serialize_in_thread (GTask *task, + TaskData *task_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + TrackerSparql *query = NULL; + TrackerSparqlCursor *cursor = NULL; + GInputStream *istream = NULL; + SerializeRdf *data = task_data->data; + GError *error = NULL; + + conn = g_task_get_source_object (task); + priv = tracker_direct_connection_get_instance_private (conn); + + g_mutex_lock (&priv->mutex); + query = tracker_sparql_new (priv->data_manager, data->query); + if (!tracker_sparql_is_serializable (query)) { + g_set_error (&error, + TRACKER_SPARQL_ERROR, + TRACKER_SPARQL_ERROR_PARSE, + "Query is not DESCRIBE or CONSTRUCT"); + goto out; + } + + cursor = tracker_sparql_execute_cursor (query, NULL, &error); + tracker_direct_connection_update_timestamp (conn); + if (!cursor) + goto out; + + tracker_sparql_cursor_set_connection (cursor, TRACKER_SPARQL_CONNECTION (conn)); + istream = tracker_serializer_new (cursor, convert_format (data->format)); + + out: + g_clear_object (&query); + g_clear_object (&cursor); + g_mutex_unlock (&priv->mutex); + + if (istream) + g_task_return_pointer (task, istream, g_object_unref); + else + g_task_return_error (task, error); +} + +static void query_thread_pool_func (gpointer data, gpointer user_data) { TrackerDirectConnection *conn = user_data; TrackerDirectConnectionPrivate *priv; - TrackerSparqlCursor *cursor; GTask *task = data; TaskData *task_data = g_task_get_task_data (task); - GError *error = NULL; - - g_assert (task_data->type == TASK_TYPE_QUERY); priv = tracker_direct_connection_get_instance_private (conn); @@ -270,14 +346,16 @@ query_thread_pool_func (gpointer data, return; } - cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)), - task_data->data, - g_task_get_cancellable (task), - &error); - if (cursor) - g_task_return_pointer (task, cursor, g_object_unref); - else - g_task_return_error (task, error); + switch (task_data->type) { + case TASK_TYPE_QUERY: + execute_query_in_thread (task, task_data); + break; + case TASK_TYPE_SERIALIZE: + serialize_in_thread (task, task_data); + break; + default: + g_assert_not_reached (); + } g_object_unref (task); } @@ -1227,6 +1305,65 @@ tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection *connect return TRUE; } +static SerializeRdf * +serialize_rdf_data_new (const gchar *query, + TrackerRdfFormat format) +{ + SerializeRdf *data; + + data = g_new0 (SerializeRdf, 1); + data->query = g_strdup (query); + data->format = format; + + return data; +} + +static void +serialize_rdf_data_free (gpointer user_data) +{ + SerializeRdf *data = user_data; + + g_free (data->query); + g_free (data); +} + +static void +tracker_direct_connection_serialize_async (TrackerSparqlConnection *self, + TrackerRdfFormat format, + const gchar *query, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + TrackerDirectConnectionPrivate *priv; + TrackerDirectConnection *conn; + GError *error = NULL; + GTask *task; + + conn = TRACKER_DIRECT_CONNECTION (self); + priv = tracker_direct_connection_get_instance_private (conn); + + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_task_data (task, + task_data_query_new (TASK_TYPE_SERIALIZE, + serialize_rdf_data_new (query, format), + serialize_rdf_data_free), + (GDestroyNotify) task_data_free); + + if (!g_thread_pool_push (priv->select_pool, task, &error)) { + g_task_return_error (task, _translate_internal_error (error)); + g_object_unref (task); + } +} + +static GInputStream * +tracker_direct_connection_serialize_finish (TrackerSparqlConnection *connection, + GAsyncResult *res, + GError **error) +{ + return g_task_propagate_pointer (G_TASK (res), error); +} + static void tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) { @@ -1262,6 +1399,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass) sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish; sparql_connection_class->create_batch = tracker_direct_connection_create_batch; sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service; + sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async; + sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish; props[PROP_FLAGS] = g_param_spec_flags ("flags", |