summaryrefslogtreecommitdiff
path: root/src/libtracker-sparql/direct/tracker-direct.c
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2021-11-21 00:02:06 +0100
committerCarlos Garnacho <carlosg@gnome.org>2022-02-04 19:22:00 +0100
commit98a5ae8a0bf1ea16a2e1065c7e3a9d86d408afbf (patch)
treec050f778ed603248c3d26ecf670a2f923de5f5a5 /src/libtracker-sparql/direct/tracker-direct.c
parentfbca0ab6321c841fa54239ee367ad3aff6d3b565 (diff)
downloadtracker-98a5ae8a0bf1ea16a2e1065c7e3a9d86d408afbf.tar.gz
libtracker-sparql: Implement serialize_async/finish in direct connection
This allows serialization of RDF into GInputStream, currently only TTL format.
Diffstat (limited to 'src/libtracker-sparql/direct/tracker-direct.c')
-rw-r--r--src/libtracker-sparql/direct/tracker-direct.c163
1 files changed, 151 insertions, 12 deletions
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index a4a69d44c..649da93eb 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -29,6 +29,7 @@
#include <libtracker-data/tracker-sparql.h>
#include <libtracker-sparql/tracker-notifier-private.h>
#include <libtracker-sparql/tracker-private.h>
+#include <libtracker-sparql/tracker-serializer.h>
typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
@@ -61,6 +62,11 @@ typedef struct {
TrackerResource *resource;
} UpdateResource;
+typedef struct {
+ gchar *query;
+ TrackerRdfFormat format;
+} SerializeRdf;
+
enum {
PROP_0,
PROP_FLAGS,
@@ -78,6 +84,7 @@ typedef enum {
TASK_TYPE_UPDATE_RESOURCE,
TASK_TYPE_UPDATE_BATCH,
TASK_TYPE_RELEASE_MEMORY,
+ TASK_TYPE_SERIALIZE,
} TaskType;
typedef struct {
@@ -208,6 +215,7 @@ update_thread_func (gpointer data,
switch (task_data->type) {
case TASK_TYPE_QUERY:
+ case TASK_TYPE_SERIALIZE:
g_warning ("Queries don't go through this thread");
break;
case TASK_TYPE_UPDATE:
@@ -247,17 +255,85 @@ update_thread_func (gpointer data,
}
static void
+execute_query_in_thread (GTask *task,
+ TaskData *task_data)
+{
+ TrackerSparqlCursor *cursor;
+ GError *error = NULL;
+
+ cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
+ task_data->data,
+ g_task_get_cancellable (task),
+ &error);
+ if (cursor)
+ g_task_return_pointer (task, cursor, g_object_unref);
+ else
+ g_task_return_error (task, error);
+}
+
+static TrackerSerializerFormat
+convert_format (TrackerRdfFormat format)
+{
+ switch (format) {
+ case TRACKER_RDF_FORMAT_TURTLE:
+ return TRACKER_SERIALIZER_FORMAT_TTL;
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+static void
+serialize_in_thread (GTask *task,
+ TaskData *task_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerSparql *query = NULL;
+ TrackerSparqlCursor *cursor = NULL;
+ GInputStream *istream = NULL;
+ SerializeRdf *data = task_data->data;
+ GError *error = NULL;
+
+ conn = g_task_get_source_object (task);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ query = tracker_sparql_new (priv->data_manager, data->query);
+ if (!tracker_sparql_is_serializable (query)) {
+ g_set_error (&error,
+ TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_PARSE,
+ "Query is not DESCRIBE or CONSTRUCT");
+ goto out;
+ }
+
+ cursor = tracker_sparql_execute_cursor (query, NULL, &error);
+ tracker_direct_connection_update_timestamp (conn);
+ if (!cursor)
+ goto out;
+
+ tracker_sparql_cursor_set_connection (cursor, TRACKER_SPARQL_CONNECTION (conn));
+ istream = tracker_serializer_new (cursor, convert_format (data->format));
+
+ out:
+ g_clear_object (&query);
+ g_clear_object (&cursor);
+ g_mutex_unlock (&priv->mutex);
+
+ if (istream)
+ g_task_return_pointer (task, istream, g_object_unref);
+ else
+ g_task_return_error (task, error);
+}
+
+static void
query_thread_pool_func (gpointer data,
gpointer user_data)
{
TrackerDirectConnection *conn = user_data;
TrackerDirectConnectionPrivate *priv;
- TrackerSparqlCursor *cursor;
GTask *task = data;
TaskData *task_data = g_task_get_task_data (task);
- GError *error = NULL;
-
- g_assert (task_data->type == TASK_TYPE_QUERY);
priv = tracker_direct_connection_get_instance_private (conn);
@@ -270,14 +346,16 @@ query_thread_pool_func (gpointer data,
return;
}
- cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
- task_data->data,
- g_task_get_cancellable (task),
- &error);
- if (cursor)
- g_task_return_pointer (task, cursor, g_object_unref);
- else
- g_task_return_error (task, error);
+ switch (task_data->type) {
+ case TASK_TYPE_QUERY:
+ execute_query_in_thread (task, task_data);
+ break;
+ case TASK_TYPE_SERIALIZE:
+ serialize_in_thread (task, task_data);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
g_object_unref (task);
}
@@ -1227,6 +1305,65 @@ tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection *connect
return TRUE;
}
+static SerializeRdf *
+serialize_rdf_data_new (const gchar *query,
+ TrackerRdfFormat format)
+{
+ SerializeRdf *data;
+
+ data = g_new0 (SerializeRdf, 1);
+ data->query = g_strdup (query);
+ data->format = format;
+
+ return data;
+}
+
+static void
+serialize_rdf_data_free (gpointer user_data)
+{
+ SerializeRdf *data = user_data;
+
+ g_free (data->query);
+ g_free (data);
+}
+
+static void
+tracker_direct_connection_serialize_async (TrackerSparqlConnection *self,
+ TrackerRdfFormat format,
+ const gchar *query,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GError *error = NULL;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_SERIALIZE,
+ serialize_rdf_data_new (query, format),
+ serialize_rdf_data_free),
+ (GDestroyNotify) task_data_free);
+
+ if (!g_thread_pool_push (priv->select_pool, task, &error)) {
+ g_task_return_error (task, _translate_internal_error (error));
+ g_object_unref (task);
+ }
+}
+
+static GInputStream *
+tracker_direct_connection_serialize_finish (TrackerSparqlConnection *connection,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
static void
tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
{
@@ -1262,6 +1399,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
+ sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async;
+ sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish;
props[PROP_FLAGS] =
g_param_spec_flags ("flags",