summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2018-07-21 17:52:36 +0200
committerCarlos Garnacho <carlosg@gnome.org>2018-07-21 17:52:36 +0200
commitda971357f43f1f1168950cbe980e1d95561ca891 (patch)
treeb9d1e6c51b435ecfd7fc9076999ef156eec44b92
parent2f7ade534a452153ab9f0faa10e403b30e845c75 (diff)
parente4e7766e2f0baa9d69de61993200680afa914616 (diff)
downloadtracker-da971357f43f1f1168950cbe980e1d95561ca891.tar.gz
Merge branch 'wip/carlosg/direct-rewrite'
-rw-r--r--src/libtracker-data/libtracker-data.vapi11
-rw-r--r--src/libtracker-data/tracker-class.c279
-rw-r--r--src/libtracker-data/tracker-class.h29
-rw-r--r--src/libtracker-data/tracker-data-update.c15
-rw-r--r--src/libtracker-data/tracker-data-update.h11
-rw-r--r--src/libtracker-data/tracker-db-manager.c23
-rw-r--r--src/libtracker-data/tracker-db-manager.h10
-rw-r--r--src/libtracker-direct/.gitignore3
-rw-r--r--src/libtracker-direct/Makefile.am17
-rw-r--r--src/libtracker-direct/meson.build14
-rw-r--r--src/libtracker-direct/tracker-direct.c888
-rw-r--r--src/libtracker-direct/tracker-direct.h59
-rw-r--r--src/libtracker-direct/tracker-direct.vala405
-rw-r--r--src/libtracker-direct/tracker-direct.vapi12
-rw-r--r--src/libtracker-direct/tracker-namespace.vala29
-rw-r--r--src/libtracker-miner/tracker-miner-fs.c3
-rw-r--r--src/libtracker-sparql-backend/Makefile.am1
-rw-r--r--src/libtracker-sparql-backend/meson.build2
-rw-r--r--src/tracker-store/Makefile.am2
-rw-r--r--src/tracker-store/meson.build3
-rw-r--r--src/tracker-store/tracker-backup.vala12
-rw-r--r--src/tracker-store/tracker-dbus.vala8
-rw-r--r--src/tracker-store/tracker-events.c371
-rw-r--r--src/tracker-store/tracker-events.h25
-rw-r--r--src/tracker-store/tracker-events.vapi16
-rw-r--r--src/tracker-store/tracker-main.vala47
-rw-r--r--src/tracker-store/tracker-resources.vala212
-rw-r--r--src/tracker-store/tracker-steroids.vala28
-rw-r--r--src/tracker-store/tracker-store.vala550
-rw-r--r--src/tracker-store/tracker-writeback.c21
-rw-r--r--src/tracker-store/tracker-writeback.vapi3
-rw-r--r--tests/libtracker-miner/tracker-file-notifier-test.c2
32 files changed, 1586 insertions, 1525 deletions
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi
index a9e446cd7..f0300ae73 100644
--- a/src/libtracker-data/libtracker-data.vapi
+++ b/src/libtracker-data/libtracker-data.vapi
@@ -78,7 +78,6 @@ namespace Tracker {
[CCode (cprefix = "TRACKER_DB_MANAGER_", cheader_filename = "libtracker-data/tracker-db-manager.h")]
public enum DBManagerFlags {
FORCE_REINDEX,
- REMOVE_CACHE,
REMOVE_ALL,
READONLY,
DO_NOT_CHECK_ONTOLOGY,
@@ -181,17 +180,10 @@ namespace Tracker {
}
public delegate void StatementCallback (int graph_id, string? graph, int subject_id, string subject, int predicate_id, int object_id, string object, GLib.PtrArray rdf_types);
- public delegate void CommitCallback (Data.Update.CommitType commit_type);
+ public delegate void CommitCallback ();
[CCode (lower_case_cprefix="tracker_data_", cname = "TrackerData", cheader_filename = "libtracker-data/tracker-data-query.h,libtracker-data/tracker-data-update.h")]
public class Data.Update : GLib.Object {
- [CCode (cprefix = "TRACKER_DATA_COMMIT_")]
- public enum CommitType {
- REGULAR,
- BATCH,
- BATCH_LAST
- }
-
public void begin_db_transaction ();
public void commit_db_transaction ();
public void begin_transaction () throws DBInterfaceError;
@@ -200,7 +192,6 @@ namespace Tracker {
public void update_sparql (string update) throws Sparql.Error;
public GLib.Variant update_sparql_blank (string update) throws Sparql.Error;
public void load_turtle_file (GLib.File file) throws Sparql.Error;
- public void notify_transaction (CommitType commit_type);
public void delete_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError;
public void update_statement (string? graph, string subject, string predicate, string? object) throws Sparql.Error, DateError;
public void insert_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError;
diff --git a/src/libtracker-data/tracker-class.c b/src/libtracker-data/tracker-class.c
index 6c9754e70..f216c8e94 100644
--- a/src/libtracker-data/tracker-class.c
+++ b/src/libtracker-data/tracker-class.c
@@ -49,27 +49,6 @@ struct _TrackerClassPrivate {
GArray *last_super_classes;
TrackerOntologies *ontologies;
-
- struct {
- struct {
- GArray *sub_pred_ids;
- GArray *obj_graph_ids;
- } pending;
- struct {
- GArray *sub_pred_ids;
- GArray *obj_graph_ids;
- } ready;
- } deletes;
- struct {
- struct {
- GArray *sub_pred_ids;
- GArray *obj_graph_ids;
- } pending;
- struct {
- GArray *sub_pred_ids;
- GArray *obj_graph_ids;
- } ready;
- } inserts;
};
static void class_finalize (GObject *object);
@@ -99,16 +78,6 @@ tracker_class_init (TrackerClass *service)
priv->last_domain_indexes = NULL;
priv->last_super_classes = NULL;
- priv->deletes.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->deletes.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->deletes.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->deletes.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
-
- priv->inserts.pending.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->inserts.pending.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->inserts.ready.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
- priv->inserts.ready.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
-
/* Make GET_PRIV working */
service->priv = priv;
}
@@ -126,16 +95,6 @@ class_finalize (GObject *object)
g_array_free (priv->super_classes, TRUE);
g_array_free (priv->domain_indexes, TRUE);
- g_array_free (priv->deletes.pending.sub_pred_ids, TRUE);
- g_array_free (priv->deletes.pending.obj_graph_ids, TRUE);
- g_array_free (priv->deletes.ready.sub_pred_ids, TRUE);
- g_array_free (priv->deletes.ready.obj_graph_ids, TRUE);
-
- g_array_free (priv->inserts.pending.sub_pred_ids, TRUE);
- g_array_free (priv->inserts.pending.obj_graph_ids, TRUE);
- g_array_free (priv->inserts.ready.sub_pred_ids, TRUE);
- g_array_free (priv->inserts.ready.obj_graph_ids, TRUE);
-
if (priv->last_domain_indexes) {
g_array_free (priv->last_domain_indexes, TRUE);
}
@@ -511,244 +470,6 @@ tracker_class_set_db_schema_changed (TrackerClass *service,
priv->db_schema_changed = value;
}
-gboolean
-tracker_class_has_insert_events (TrackerClass *class)
-{
- TrackerClassPrivate *priv;
-
- g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE);
-
- priv = GET_PRIV (class);
-
- return (priv->inserts.ready.sub_pred_ids->len > 0);
-}
-
-gboolean
-tracker_class_has_delete_events (TrackerClass *class)
-{
- TrackerClassPrivate *priv;
-
- g_return_val_if_fail (TRACKER_IS_CLASS (class), FALSE);
-
- priv = GET_PRIV (class);
-
- return (priv->deletes.ready.sub_pred_ids->len > 0);
-}
-
-void
-tracker_class_foreach_insert_event (TrackerClass *class,
- TrackerEventsForeach foreach,
- gpointer user_data)
-{
- guint i;
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
- g_return_if_fail (foreach != NULL);
-
- priv = GET_PRIV (class);
-
- for (i = 0; i < priv->inserts.ready.sub_pred_ids->len; i++) {
- gint graph_id, subject_id, pred_id, object_id;
- gint64 sub_pred_id;
- gint64 obj_graph_id;
-
- sub_pred_id = g_array_index (priv->inserts.ready.sub_pred_ids, gint64, i);
- obj_graph_id = g_array_index (priv->inserts.ready.obj_graph_ids, gint64, i);
-
- pred_id = sub_pred_id & 0xffffffff;
- subject_id = sub_pred_id >> 32;
- graph_id = obj_graph_id & 0xffffffff;
- object_id = obj_graph_id >> 32;
-
- foreach (graph_id, subject_id, pred_id, object_id, user_data);
- }
-}
-
-void
-tracker_class_foreach_delete_event (TrackerClass *class,
- TrackerEventsForeach foreach,
- gpointer user_data)
-{
- guint i;
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
- g_return_if_fail (foreach != NULL);
-
- priv = GET_PRIV (class);
-
- for (i = 0; i < priv->deletes.ready.sub_pred_ids->len; i++) {
- gint graph_id, subject_id, pred_id, object_id;
- gint64 sub_pred_id;
- gint64 obj_graph_id;
-
- sub_pred_id = g_array_index (priv->deletes.ready.sub_pred_ids, gint64, i);
- obj_graph_id = g_array_index (priv->deletes.ready.obj_graph_ids, gint64, i);
-
- pred_id = sub_pred_id & 0xffffffff;
- subject_id = sub_pred_id >> 32;
- graph_id = obj_graph_id & 0xffffffff;
- object_id = obj_graph_id >> 32;
-
- foreach (graph_id, subject_id, pred_id, object_id, user_data);
- }
-}
-
-void
-tracker_class_reset_ready_events (TrackerClass *class)
-{
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
-
- priv = GET_PRIV (class);
-
- /* Reset */
- g_array_set_size (priv->deletes.ready.sub_pred_ids, 0);
- g_array_set_size (priv->deletes.ready.obj_graph_ids, 0);
-
- g_array_set_size (priv->inserts.ready.sub_pred_ids, 0);
- g_array_set_size (priv->inserts.ready.obj_graph_ids, 0);
-
-}
-
-void
-tracker_class_reset_pending_events (TrackerClass *class)
-{
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
-
- priv = GET_PRIV (class);
-
- /* Reset */
- g_array_set_size (priv->deletes.pending.sub_pred_ids, 0);
- g_array_set_size (priv->deletes.pending.obj_graph_ids, 0);
-
- g_array_set_size (priv->inserts.pending.sub_pred_ids, 0);
- g_array_set_size (priv->inserts.pending.obj_graph_ids, 0);
-
-}
-
-void
-tracker_class_transact_events (TrackerClass *class)
-{
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
- priv = GET_PRIV (class);
-
- /* Move */
- g_array_insert_vals (priv->deletes.ready.obj_graph_ids,
- priv->deletes.ready.obj_graph_ids->len,
- priv->deletes.pending.obj_graph_ids->data,
- priv->deletes.pending.obj_graph_ids->len);
-
- g_array_insert_vals (priv->deletes.ready.sub_pred_ids,
- priv->deletes.ready.sub_pred_ids->len,
- priv->deletes.pending.sub_pred_ids->data,
- priv->deletes.pending.sub_pred_ids->len);
-
- /* Reset */
- g_array_set_size (priv->deletes.pending.sub_pred_ids, 0);
- g_array_set_size (priv->deletes.pending.obj_graph_ids, 0);
-
-
- /* Move */
- g_array_insert_vals (priv->inserts.ready.obj_graph_ids,
- priv->inserts.ready.obj_graph_ids->len,
- priv->inserts.pending.obj_graph_ids->data,
- priv->inserts.pending.obj_graph_ids->len);
-
- g_array_insert_vals (priv->inserts.ready.sub_pred_ids,
- priv->inserts.ready.sub_pred_ids->len,
- priv->inserts.pending.sub_pred_ids->data,
- priv->inserts.pending.sub_pred_ids->len);
-
- /* Reset */
- g_array_set_size (priv->inserts.pending.sub_pred_ids, 0);
- g_array_set_size (priv->inserts.pending.obj_graph_ids, 0);
-
-}
-
-static void
-insert_vals_into_arrays (GArray *sub_pred_ids,
- GArray *obj_graph_ids,
- gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id)
-{
- gint i, j, k;
- gint64 tmp;
- gint64 sub_pred_id;
- gint64 obj_graph_id;
-
- sub_pred_id = (gint64) subject_id;
- sub_pred_id = sub_pred_id << 32 | pred_id;
- obj_graph_id = (gint64) object_id;
- obj_graph_id = obj_graph_id << 32 | graph_id;
-
- i = 0;
- j = sub_pred_ids->len - 1;
-
- while (j - i > 0) {
- k = (i + j) / 2;
- tmp = g_array_index (sub_pred_ids, gint64, k);
- if (tmp == sub_pred_id) {
- i = k + 1;
- break;
- } else if (tmp > sub_pred_id)
- j = k;
- else
- i = k + 1;
- }
-
- g_array_insert_val (sub_pred_ids, i, sub_pred_id);
- g_array_insert_val (obj_graph_ids, i, obj_graph_id);
-}
-
-void
-tracker_class_add_insert_event (TrackerClass *class,
- gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id)
-{
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
- priv = GET_PRIV (class);
-
- insert_vals_into_arrays (priv->inserts.pending.sub_pred_ids,
- priv->inserts.pending.obj_graph_ids,
- graph_id,
- subject_id,
- pred_id,
- object_id);
-}
-
-void
-tracker_class_add_delete_event (TrackerClass *class,
- gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id)
-{
- TrackerClassPrivate *priv;
-
- g_return_if_fail (TRACKER_IS_CLASS (class));
- priv = GET_PRIV (class);
-
- insert_vals_into_arrays (priv->deletes.pending.sub_pred_ids,
- priv->deletes.pending.obj_graph_ids,
- graph_id,
- subject_id,
- pred_id,
- object_id);
-}
-
void
tracker_class_set_ontologies (TrackerClass *class,
TrackerOntologies *ontologies)
diff --git a/src/libtracker-data/tracker-class.h b/src/libtracker-data/tracker-class.h
index c05634102..9ac46acd8 100644
--- a/src/libtracker-data/tracker-class.h
+++ b/src/libtracker-data/tracker-class.h
@@ -51,12 +51,6 @@ struct _TrackerClassClass {
GObjectClass parent_class;
};
-typedef void (*TrackerEventsForeach) (gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id,
- gpointer user_data);
-
GType tracker_class_get_type (void) G_GNUC_CONST;
TrackerClass * tracker_class_new (gboolean use_gvdb);
const gchar * tracker_class_get_uri (TrackerClass *service);
@@ -96,29 +90,6 @@ void tracker_class_set_notify (TrackerClass *ser
void tracker_class_set_ontologies (TrackerClass *class,
TrackerOntologies *ontologies);
-/* For signals API */
-void tracker_class_foreach_delete_event (TrackerClass *class,
- TrackerEventsForeach foreach,
- gpointer user_data);
-void tracker_class_foreach_insert_event (TrackerClass *class,
- TrackerEventsForeach foreach,
- gpointer user_data);
-gboolean tracker_class_has_insert_events (TrackerClass *class);
-gboolean tracker_class_has_delete_events (TrackerClass *class);
-void tracker_class_reset_ready_events (TrackerClass *class);
-void tracker_class_reset_pending_events (TrackerClass *class);
-void tracker_class_transact_events (TrackerClass *class);
-void tracker_class_add_delete_event (TrackerClass *class,
- gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id);
-void tracker_class_add_insert_event (TrackerClass *class,
- gint graph_id,
- gint subject_id,
- gint pred_id,
- gint object_id);
-
G_END_DECLS
#endif /* __LIBTRACKER_DATA_CLASS_H__ */
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index eb58caafd..444e5efee 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -3624,21 +3624,16 @@ tracker_data_commit_transaction (TrackerData *data,
g_hash_table_remove_all (data->update_buffer.resources_by_id);
g_hash_table_remove_all (data->update_buffer.resource_cache);
- data->in_journal_replay = FALSE;
-}
-
-void
-tracker_data_notify_transaction (TrackerData *data,
- TrackerDataCommitType commit_type)
-{
- if (data->commit_callbacks) {
+ if (!data->in_journal_replay && data->commit_callbacks) {
guint n;
for (n = 0; n < data->commit_callbacks->len; n++) {
TrackerCommitDelegate *delegate;
delegate = g_ptr_array_index (data->commit_callbacks, n);
- delegate->callback (commit_type, delegate->user_data);
+ delegate->callback (delegate->user_data);
}
}
+
+ data->in_journal_replay = FALSE;
}
void
@@ -3679,7 +3674,7 @@ tracker_data_rollback_transaction (TrackerData *data)
for (n = 0; n < data->rollback_callbacks->len; n++) {
TrackerCommitDelegate *delegate;
delegate = g_ptr_array_index (data->rollback_callbacks, n);
- delegate->callback (TRUE, delegate->user_data);
+ delegate->callback (delegate->user_data);
}
}
}
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 75444875c..97fdcd6a9 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -47,12 +47,6 @@ typedef struct _TrackerDataClass TrackerDataClass;
typedef struct _TrackerData TrackerData;
typedef struct _TrackerDataClass TrackerDataClass;
-typedef enum {
- TRACKER_DATA_COMMIT_REGULAR,
- TRACKER_DATA_COMMIT_BATCH,
- TRACKER_DATA_COMMIT_BATCH_LAST
-} TrackerDataCommitType;
-
typedef struct _TrackerData TrackerData;
typedef struct _TrackerData TrackerDataUpdate;
@@ -65,8 +59,7 @@ typedef void (*TrackerStatementCallback) (gint graph_id,
const gchar *object,
GPtrArray *rdf_types,
gpointer user_data);
-typedef void (*TrackerCommitCallback) (TrackerDataCommitType commit_type,
- gpointer user_data);
+typedef void (*TrackerCommitCallback) (gpointer user_data);
GQuark tracker_data_error_quark (void);
@@ -110,8 +103,6 @@ void tracker_data_begin_transaction_for_replay (TrackerData *
GError **error);
void tracker_data_commit_transaction (TrackerData *data,
GError **error);
-void tracker_data_notify_transaction (TrackerData *data,
- TrackerDataCommitType commit_type);
void tracker_data_rollback_transaction (TrackerData *data);
void tracker_data_update_sparql (TrackerData *data,
const gchar *update,
diff --git a/src/libtracker-data/tracker-db-manager.c b/src/libtracker-data/tracker-db-manager.c
index 33e178f4f..5e5b7299c 100644
--- a/src/libtracker-data/tracker-db-manager.c
+++ b/src/libtracker-data/tracker-db-manager.c
@@ -1055,14 +1055,20 @@ tracker_db_manager_get_db_interface (TrackerDBManager *db_manager)
interface = tracker_db_manager_create_db_interface (db_manager,
TRUE, &internal_error);
- if (internal_error) {
- g_critical ("Error opening database: %s", internal_error->message);
- g_error_free (internal_error);
- g_async_queue_unlock (db_manager->interfaces);
- return NULL;
+ if (interface) {
+ tracker_data_manager_init_fts (interface, FALSE);
+ } else {
+ if (g_async_queue_length_unlocked (db_manager->interfaces) == 0) {
+ g_critical ("Error opening database: %s", internal_error->message);
+ g_error_free (internal_error);
+ g_async_queue_unlock (db_manager->interfaces);
+ return NULL;
+ } else {
+ g_error_free (internal_error);
+ /* Fetch the first interface back. Oh well */
+ interface = g_async_queue_try_pop_unlocked (db_manager->interfaces);
+ }
}
-
- tracker_data_manager_init_fts (interface, FALSE);
}
g_async_queue_push_unlocked (db_manager->interfaces, interface);
@@ -1102,7 +1108,8 @@ tracker_db_manager_get_writable_db_interface (TrackerDBManager *db_manager)
TrackerDBInterface *
tracker_db_manager_get_wal_db_interface (TrackerDBManager *db_manager)
{
- if (db_manager->db.wal_iface == NULL) {
+ if (db_manager->db.wal_iface == NULL &&
+ (db_manager->flags & TRACKER_DB_MANAGER_READONLY) == 0) {
db_manager->db.wal_iface = init_writable_db_interface (db_manager);
}
diff --git a/src/libtracker-data/tracker-db-manager.h b/src/libtracker-data/tracker-db-manager.h
index 95a71cfdb..a41b2ff53 100644
--- a/src/libtracker-data/tracker-db-manager.h
+++ b/src/libtracker-data/tracker-db-manager.h
@@ -35,12 +35,10 @@ G_BEGIN_DECLS
typedef enum {
TRACKER_DB_MANAGER_FORCE_REINDEX = 1 << 1,
- TRACKER_DB_MANAGER_REMOVE_CACHE = 1 << 2,
- /* 1 << 3 Was low mem mode */
- TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 4,
- TRACKER_DB_MANAGER_READONLY = 1 << 5,
- TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 6,
- TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 7,
+ TRACKER_DB_MANAGER_REMOVE_ALL = 1 << 2,
+ TRACKER_DB_MANAGER_READONLY = 1 << 3,
+ TRACKER_DB_MANAGER_DO_NOT_CHECK_ONTOLOGY = 1 << 4,
+ TRACKER_DB_MANAGER_ENABLE_MUTEXES = 1 << 5,
} TrackerDBManagerFlags;
typedef struct _TrackerDBManager TrackerDBManager;
diff --git a/src/libtracker-direct/.gitignore b/src/libtracker-direct/.gitignore
deleted file mode 100644
index b55dd251a..000000000
--- a/src/libtracker-direct/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-tracker-namespace.c
-tracker-direct.[ch]
-tracker-direct*.vapi
diff --git a/src/libtracker-direct/Makefile.am b/src/libtracker-direct/Makefile.am
index 5e9a42f46..7960bae6c 100644
--- a/src/libtracker-direct/Makefile.am
+++ b/src/libtracker-direct/Makefile.am
@@ -1,16 +1,5 @@
noinst_LTLIBRARIES = libtracker-direct.la
-AM_VALAFLAGS = \
- --includedir=libtracker-direct \
- --header tracker-direct.h \
- --vapi tracker-direct.vapi \
- --pkg gio-2.0 \
- $(BUILD_VALAFLAGS) \
- $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \
- $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \
- $(top_srcdir)/src/libtracker-common/libtracker-common.vapi \
- $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi
-
AM_CPPFLAGS = \
$(BUILD_VALACFLAGS) \
-I$(top_srcdir)/src \
@@ -19,8 +8,7 @@ AM_CPPFLAGS = \
$(LIBTRACKER_DIRECT_CFLAGS)
libtracker_direct_la_SOURCES = \
- tracker-namespace.vala \
- tracker-direct.vala
+ tracker-direct.c
libtracker_direct_la_LIBADD = \
$(top_builddir)/src/libtracker-data/libtracker-data.la \
@@ -30,7 +18,4 @@ libtracker_direct_la_LIBADD = \
noinst_HEADERS = \
tracker-direct.h
-BUILT_SOURCES = \
- libtracker_direct_la_vala.stamp
-
EXTRA_DIST = meson.build
diff --git a/src/libtracker-direct/meson.build b/src/libtracker-direct/meson.build
index 8cf018c69..0c515eaa7 100644
--- a/src/libtracker-direct/meson.build
+++ b/src/libtracker-direct/meson.build
@@ -1,18 +1,6 @@
libtracker_direct = static_library('tracker-direct',
- 'tracker-direct.vala',
- 'tracker-namespace.vala',
- '../libtracker-common/libtracker-common.vapi',
- '../libtracker-data/libtracker-data.vapi',
+ 'tracker-direct.c',
c_args: tracker_c_args,
- vala_args: [
- '--debug',
- '--pkg', 'posix',
- # FIXME: Meson has code to add --target-glib automatically, but it
- # doesn't seem to work here.
- '--target-glib', glib_required,
- ],
- # This doesn't depend on tracker_common_dep because of
- # https://github.com/mesonbuild/meson/issues/671
dependencies: [ glib, gio, tracker_data_dep ],
include_directories: [commoninc, configinc, srcinc],
)
diff --git a/src/libtracker-direct/tracker-direct.c b/src/libtracker-direct/tracker-direct.c
new file mode 100644
index 000000000..4b8d74a73
--- /dev/null
+++ b/src/libtracker-direct/tracker-direct.c
@@ -0,0 +1,888 @@
+/*
+ * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
+ * Copyright (C) 2017, Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "config.h"
+
+#include "tracker-direct.h"
+#include <libtracker-data/tracker-data.h>
+
+static TrackerDBManagerFlags default_flags = 0;
+
+typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
+
+struct _TrackerDirectConnectionPrivate
+{
+ TrackerSparqlConnectionFlags flags;
+ GFile *store;
+ GFile *journal;
+ GFile *ontology;
+
+ TrackerNamespaceManager *namespace_manager;
+ TrackerDataManager *data_manager;
+ GMutex mutex;
+
+ GThreadPool *update_thread; /* Contains 1 exclusive thread */
+ GThreadPool *select_pool;
+
+ guint initialized : 1;
+};
+
+enum {
+ PROP_0,
+ PROP_FLAGS,
+ PROP_STORE_LOCATION,
+ PROP_JOURNAL_LOCATION,
+ PROP_ONTOLOGY_LOCATION,
+ N_PROPS
+};
+
+static GParamSpec *props[N_PROPS] = { NULL };
+
+typedef enum {
+ TASK_TYPE_QUERY,
+ TASK_TYPE_UPDATE,
+ TASK_TYPE_UPDATE_BLANK,
+ TASK_TYPE_TURTLE
+} TaskType;
+
+typedef struct {
+ TaskType type;
+ union {
+ gchar *query;
+ GFile *turtle_file;
+ } data;
+} TaskData;
+
+static void tracker_direct_connection_initable_iface_init (GInitableIface *iface);
+static void tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (TrackerDirectConnection, tracker_direct_connection,
+ TRACKER_SPARQL_TYPE_CONNECTION,
+ G_ADD_PRIVATE (TrackerDirectConnection)
+ G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
+ tracker_direct_connection_initable_iface_init)
+ G_IMPLEMENT_INTERFACE (G_TYPE_ASYNC_INITABLE,
+ tracker_direct_connection_async_initable_iface_init))
+
+static TaskData *
+task_data_query_new (TaskType type,
+ const gchar *sparql)
+{
+ TaskData *data;
+
+ g_assert (type != TASK_TYPE_TURTLE);
+ data = g_new0 (TaskData, 1);
+ data->type = type;
+ data->data.query = g_strdup (sparql);
+
+ return data;
+}
+
+static TaskData *
+task_data_turtle_new (GFile *file)
+{
+ TaskData *data;
+
+ data = g_new0 (TaskData, 1);
+ data->type = TASK_TYPE_TURTLE;
+ g_set_object (&data->data.turtle_file, file);
+
+ return data;
+}
+
+static void
+task_data_free (TaskData *task)
+{
+ if (task->type == TASK_TYPE_TURTLE)
+ g_object_unref (task->data.turtle_file);
+ else
+ g_free (task->data.query);
+
+ g_free (task);
+}
+
+static void
+update_thread_func (gpointer data,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task = data;
+ TaskData *task_data = g_task_get_task_data (task);
+ TrackerData *tracker_data;
+ GError *error = NULL;
+ gpointer retval = NULL;
+ GDestroyNotify destroy_notify = NULL;
+
+ conn = user_data;
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ tracker_data = tracker_data_manager_get_data (priv->data_manager);
+
+ switch (task_data->type) {
+ case TASK_TYPE_QUERY:
+ g_warning ("Queries don't go through this thread");
+ break;
+ case TASK_TYPE_UPDATE:
+ tracker_data_update_sparql (tracker_data, task_data->data.query, &error);
+ break;
+ case TASK_TYPE_UPDATE_BLANK:
+ retval = tracker_data_update_sparql_blank (tracker_data, task_data->data.query, &error);
+ destroy_notify = (GDestroyNotify) g_variant_unref;
+ break;
+ case TASK_TYPE_TURTLE:
+ tracker_data_load_turtle_file (tracker_data, task_data->data.turtle_file, &error);
+ break;
+ }
+
+ if (error)
+ g_task_return_error (task, error);
+ else if (retval)
+ g_task_return_pointer (task, retval, destroy_notify);
+ else
+ g_task_return_boolean (task, TRUE);
+
+ g_mutex_unlock (&priv->mutex);
+}
+
+static void
+query_thread_pool_func (gpointer data,
+ gpointer user_data)
+{
+ TrackerSparqlCursor *cursor;
+ GTask *task = data;
+ TaskData *task_data = g_task_get_task_data (task);
+ GError *error = NULL;
+
+ g_assert (task_data->type == TASK_TYPE_QUERY);
+ cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
+ task_data->data.query,
+ g_task_get_cancellable (task),
+ &error);
+ if (cursor)
+ g_task_return_pointer (task, cursor, g_object_unref);
+ else
+ g_task_return_error (task, error);
+}
+
+static void
+wal_checkpoint (TrackerDBInterface *iface,
+ gboolean blocking)
+{
+ GError *error = NULL;
+
+ g_debug ("Checkpointing database...");
+ tracker_db_interface_sqlite_wal_checkpoint (iface, blocking, &error);
+
+ if (error) {
+ g_warning ("Error in WAL checkpoint: %s", error->message);
+ g_error_free (error);
+ }
+
+ g_debug ("Checkpointing complete");
+}
+
+static gpointer
+wal_checkpoint_thread (gpointer data)
+{
+ TrackerDBInterface *wal_iface = data;
+
+ wal_checkpoint (wal_iface, FALSE);
+ g_object_unref (wal_iface);
+ return NULL;
+}
+
+static void
+wal_hook (TrackerDBInterface *iface,
+ gint n_pages)
+{
+ TrackerDataManager *data_manager = tracker_db_interface_get_user_data (iface);
+ TrackerDBInterface *wal_iface = tracker_data_manager_get_wal_db_interface (data_manager);
+
+ if (!wal_iface)
+ return;
+
+ if (n_pages >= 10000) {
+ /* Do immediate checkpointing (blocking updates) to
+ * prevent excessive WAL file growth.
+ */
+ wal_checkpoint (wal_iface, TRUE);
+ } else {
+ /* Defer non-blocking checkpoint to thread */
+ g_thread_try_new ("wal-checkpoint", wal_checkpoint_thread,
+ g_object_ref (wal_iface), NULL);
+ }
+}
+
+static gint
+task_compare_func (GTask *a,
+ GTask *b,
+ gpointer user_data)
+{
+ return g_task_get_priority (b) - g_task_get_priority (a);
+}
+
+static gboolean
+set_up_thread_pools (TrackerDirectConnection *conn,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ priv->select_pool = g_thread_pool_new (query_thread_pool_func,
+ conn, 16, FALSE, error);
+ if (!priv->select_pool)
+ return FALSE;
+
+ priv->update_thread = g_thread_pool_new (update_thread_func,
+ conn, 1, TRUE, error);
+ if (!priv->update_thread)
+ return FALSE;
+
+ g_thread_pool_set_sort_function (priv->select_pool,
+ (GCompareDataFunc) task_compare_func,
+ conn);
+ g_thread_pool_set_sort_function (priv->update_thread,
+ (GCompareDataFunc) task_compare_func,
+ conn);
+ return TRUE;
+}
+
+static gboolean
+tracker_direct_connection_initable_init (GInitable *initable,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerDBManagerFlags db_flags = TRACKER_DB_MANAGER_ENABLE_MUTEXES;
+ TrackerDBInterface *iface;
+ GHashTable *namespaces;
+ GHashTableIter iter;
+ gchar *prefix, *ns;
+
+ conn = TRACKER_DIRECT_CONNECTION (initable);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ tracker_locale_sanity_check ();
+
+ if (!set_up_thread_pools (conn, error))
+ return FALSE;
+
+ /* Init data manager */
+ if (priv->flags & TRACKER_SPARQL_CONNECTION_FLAGS_READONLY)
+ db_flags |= TRACKER_DB_MANAGER_READONLY;
+
+ priv->data_manager = tracker_data_manager_new (db_flags | default_flags, priv->store,
+ priv->journal, priv->ontology,
+ FALSE, FALSE, 100, 100);
+ if (!g_initable_init (G_INITABLE (priv->data_manager), cancellable, error))
+ return FALSE;
+
+ /* Set up WAL hook on our connection */
+ iface = tracker_data_manager_get_writable_db_interface (priv->data_manager);
+ tracker_db_interface_sqlite_wal_hook (iface, wal_hook);
+
+ /* Initialize namespace manager */
+ priv->namespace_manager = tracker_namespace_manager_new ();
+ namespaces = tracker_data_manager_get_namespaces (priv->data_manager);
+ g_hash_table_iter_init (&iter, namespaces);
+
+ while (g_hash_table_iter_next (&iter, (gpointer*) &prefix, (gpointer*) &ns)) {
+ tracker_namespace_manager_add_prefix (priv->namespace_manager,
+ prefix, ns);
+ }
+
+ return TRUE;
+}
+
+static void
+tracker_direct_connection_initable_iface_init (GInitableIface *iface)
+{
+ iface->init = tracker_direct_connection_initable_init;
+}
+
+static void
+async_initable_thread_func (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+
+ if (!g_initable_init (G_INITABLE (source_object), cancellable, &error))
+ g_task_return_error (task, error);
+ else
+ g_task_return_boolean (task, TRUE);
+}
+
+static void
+tracker_direct_connection_async_initable_init_async (GAsyncInitable *async_initable,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+
+ task = g_task_new (async_initable, cancellable, callback, user_data);
+ g_task_set_priority (task, priority);
+ g_task_run_in_thread (task, async_initable_thread_func);
+}
+
+static gboolean
+tracker_direct_connection_async_initable_init_finish (GAsyncInitable *async_initable,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_async_initable_iface_init (GAsyncInitableIface *iface)
+{
+ iface->init_async = tracker_direct_connection_async_initable_init_async;
+ iface->init_finish = tracker_direct_connection_async_initable_init_finish;
+}
+
+static void
+tracker_direct_connection_init (TrackerDirectConnection *conn)
+{
+}
+
+static void
+tracker_direct_connection_finalize (GObject *object)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ if (priv->update_thread)
+ g_thread_pool_free (priv->update_thread, TRUE, TRUE);
+ if (priv->select_pool)
+ g_thread_pool_free (priv->select_pool, TRUE, FALSE);
+
+ if (priv->data_manager) {
+ TrackerDBInterface *wal_iface;
+ wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager);
+ if (wal_iface)
+ tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL);
+ }
+
+ g_clear_object (&priv->store);
+ g_clear_object (&priv->journal);
+ g_clear_object (&priv->ontology);
+ g_clear_object (&priv->data_manager);
+
+ G_OBJECT_CLASS (tracker_direct_connection_parent_class)->finalize (object);
+}
+
+static void
+tracker_direct_connection_set_property (GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ switch (prop_id) {
+ case PROP_FLAGS:
+ priv->flags = g_value_get_enum (value);
+ break;
+ case PROP_STORE_LOCATION:
+ priv->store = g_value_dup_object (value);
+ break;
+ case PROP_JOURNAL_LOCATION:
+ priv->journal = g_value_dup_object (value);
+ break;
+ case PROP_ONTOLOGY_LOCATION:
+ priv->ontology = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+tracker_direct_connection_get_property (GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+
+ conn = TRACKER_DIRECT_CONNECTION (object);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ switch (prop_id) {
+ case PROP_FLAGS:
+ g_value_set_enum (value, priv->flags);
+ break;
+ case PROP_STORE_LOCATION:
+ g_value_set_object (value, priv->store);
+ break;
+ case PROP_JOURNAL_LOCATION:
+ g_value_set_object (value, priv->journal);
+ break;
+ case PROP_ONTOLOGY_LOCATION:
+ g_value_set_object (value, priv->ontology);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static TrackerSparqlCursor *
+tracker_direct_connection_query (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerSparqlQuery *query;
+ TrackerSparqlCursor *cursor;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ query = tracker_sparql_query_new (priv->data_manager, sparql);
+ cursor = TRACKER_SPARQL_CURSOR (tracker_sparql_query_execute_cursor (query, error));
+ if (cursor)
+ tracker_sparql_cursor_set_connection (cursor, self);
+ g_mutex_unlock (&priv->mutex);
+
+ return cursor;
+}
+
+static void
+tracker_direct_connection_query_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GError *error = NULL;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_QUERY, sparql),
+ (GDestroyNotify) task_data_free);
+
+ if (!g_thread_pool_push (priv->select_pool, task, &error))
+ g_task_return_error (task, error);
+}
+
+static TrackerSparqlCursor *
+tracker_direct_connection_query_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_update (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ gint priority,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ tracker_data_update_sparql (data, sparql, error);
+ g_mutex_unlock (&priv->mutex);
+}
+
+static void
+tracker_direct_connection_update_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_priority (task, priority);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_UPDATE, sparql),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static void
+tracker_direct_connection_update_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ g_task_propagate_boolean (G_TASK (res), error);
+}
+
+static void
+error_free (GError *error)
+{
+ if (error)
+ g_error_free (error);
+}
+
+static void
+update_array_async_thread_func (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ gchar **updates = task_data;
+ gchar *concatenated;
+ GPtrArray *errors;
+ GError *error = NULL;
+ gint i;
+
+ errors = g_ptr_array_new_with_free_func ((GDestroyNotify) error_free);
+ g_ptr_array_set_size (errors, g_strv_length (updates));
+
+ /* Fast path, perform everything as a single update */
+ concatenated = g_strjoinv (" ", updates);
+ tracker_sparql_connection_update (source_object, concatenated,
+ g_task_get_priority (task),
+ cancellable, &error);
+
+ if (!error) {
+ g_task_return_pointer (task, errors,
+ (GDestroyNotify) g_ptr_array_unref);
+ return;
+ }
+
+ /* Slow path, perform updates one by one */
+ for (i = 0; updates[i]; i++) {
+ GError *err = NULL;
+
+ err = g_ptr_array_index (errors, i);
+ tracker_sparql_connection_update (source_object, updates[i],
+ g_task_get_priority (task),
+ cancellable, &err);
+ }
+
+ g_task_return_pointer (task, errors,
+ (GDestroyNotify) g_ptr_array_unref);
+}
+
+static void
+tracker_direct_connection_update_array_async (TrackerSparqlConnection *self,
+ gchar **updates,
+ gint n_updates,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+ gchar **copy;
+ gint i = 0;
+
+ copy = g_new0 (gchar*, n_updates + 1);
+
+ for (i = 0; i < n_updates; i++) {
+ g_return_if_fail (updates[i] != NULL);
+ copy[i] = g_strdup (updates[i]);
+ }
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_priority (task, priority);
+ g_task_set_task_data (task, copy, (GDestroyNotify) g_strfreev);
+
+ g_task_run_in_thread (task, update_array_async_thread_func);
+}
+
+static GPtrArray *
+tracker_direct_connection_update_array_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static GVariant *
+tracker_direct_connection_update_blank (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ gint priority,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+ GVariant *blank_nodes;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ blank_nodes = tracker_data_update_sparql_blank (data, sparql, error);
+ g_mutex_unlock (&priv->mutex);
+
+ return blank_nodes;
+}
+
+static void
+tracker_direct_connection_update_blank_async (TrackerSparqlConnection *self,
+ const gchar *sparql,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_priority (task, priority);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_UPDATE_BLANK, sparql),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static GVariant *
+tracker_direct_connection_update_blank_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static void
+tracker_direct_connection_load (TrackerSparqlConnection *self,
+ GFile *file,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ TrackerData *data;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ g_mutex_lock (&priv->mutex);
+ data = tracker_data_manager_get_data (priv->data_manager);
+ tracker_data_load_turtle_file (data, file, error);
+ g_mutex_unlock (&priv->mutex);
+}
+
+static void
+tracker_direct_connection_load_async (TrackerSparqlConnection *self,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_turtle_new (file),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static void
+tracker_direct_connection_load_finish (TrackerSparqlConnection *self,
+ GAsyncResult *res,
+ GError **error)
+{
+ g_task_propagate_pointer (G_TASK (res), error);
+}
+
+static TrackerNamespaceManager *
+tracker_direct_connection_get_namespace_manager (TrackerSparqlConnection *self)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (TRACKER_DIRECT_CONNECTION (self));
+
+ return priv->namespace_manager;
+}
+
+static void
+tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
+{
+ TrackerSparqlConnectionClass *sparql_connection_class;
+ GObjectClass *object_class;
+
+ object_class = G_OBJECT_CLASS (klass);
+ sparql_connection_class = TRACKER_SPARQL_CONNECTION_CLASS (klass);
+
+ object_class->finalize = tracker_direct_connection_finalize;
+ object_class->set_property = tracker_direct_connection_set_property;
+ object_class->get_property = tracker_direct_connection_get_property;
+
+ sparql_connection_class->query = tracker_direct_connection_query;
+ sparql_connection_class->query_async = tracker_direct_connection_query_async;
+ sparql_connection_class->query_finish = tracker_direct_connection_query_finish;
+ sparql_connection_class->update = tracker_direct_connection_update;
+ sparql_connection_class->update_async = tracker_direct_connection_update_async;
+ sparql_connection_class->update_finish = tracker_direct_connection_update_finish;
+ sparql_connection_class->update_array_async = tracker_direct_connection_update_array_async;
+ sparql_connection_class->update_array_finish = tracker_direct_connection_update_array_finish;
+ sparql_connection_class->update_blank = tracker_direct_connection_update_blank;
+ sparql_connection_class->update_blank_async = tracker_direct_connection_update_blank_async;
+ sparql_connection_class->update_blank_finish = tracker_direct_connection_update_blank_finish;
+ sparql_connection_class->load = tracker_direct_connection_load;
+ sparql_connection_class->load_async = tracker_direct_connection_load_async;
+ sparql_connection_class->load_finish = tracker_direct_connection_load_finish;
+ sparql_connection_class->get_namespace_manager = tracker_direct_connection_get_namespace_manager;
+
+ props[PROP_FLAGS] =
+ g_param_spec_enum ("flags",
+ "Flags",
+ "Flags",
+ TRACKER_SPARQL_TYPE_CONNECTION_FLAGS,
+ TRACKER_SPARQL_CONNECTION_FLAGS_NONE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+ props[PROP_STORE_LOCATION] =
+ g_param_spec_object ("store-location",
+ "Store location",
+ "Store location",
+ G_TYPE_FILE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+ props[PROP_JOURNAL_LOCATION] =
+ g_param_spec_object ("journal-location",
+ "Journal location",
+ "Journal location",
+ G_TYPE_FILE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+ props[PROP_ONTOLOGY_LOCATION] =
+ g_param_spec_object ("ontology-location",
+ "Ontology location",
+ "Ontology location",
+ G_TYPE_FILE,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY);
+
+ g_object_class_install_properties (object_class, N_PROPS, props);
+}
+
+TrackerDirectConnection *
+tracker_direct_connection_new (TrackerSparqlConnectionFlags flags,
+ GFile *store,
+ GFile *journal,
+ GFile *ontology,
+ GError **error)
+{
+ g_return_val_if_fail (G_IS_FILE (store), NULL);
+ g_return_val_if_fail (G_IS_FILE (journal), NULL);
+ g_return_val_if_fail (G_IS_FILE (ontology), NULL);
+ g_return_val_if_fail (!error || !*error, NULL);
+
+ return g_object_new (TRACKER_TYPE_DIRECT_CONNECTION,
+ "flags", flags,
+ "store-location", store,
+ "journal-location", journal,
+ "ontology-location", ontology,
+ NULL);
+}
+
+TrackerDataManager *
+tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn)
+{
+ TrackerDirectConnectionPrivate *priv;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+ return priv->data_manager;
+}
+
+void
+tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags)
+{
+ default_flags = flags;
+}
+
+void
+tracker_direct_connection_sync (TrackerDirectConnection *conn)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDBInterface *wal_iface;
+
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ if (!priv->data_manager)
+ return;
+
+ /* Wait for pending updates. */
+ if (priv->update_thread)
+ g_thread_pool_free (priv->update_thread, TRUE, TRUE);
+ /* Selects are less important, readonly interfaces won't be bothersome */
+ if (priv->select_pool)
+ g_thread_pool_free (priv->select_pool, TRUE, FALSE);
+
+ set_up_thread_pools (conn, NULL);
+
+ wal_iface = tracker_data_manager_get_wal_db_interface (priv->data_manager);
+ if (wal_iface)
+ tracker_db_interface_sqlite_wal_checkpoint (wal_iface, TRUE, NULL);
+}
diff --git a/src/libtracker-direct/tracker-direct.h b/src/libtracker-direct/tracker-direct.h
new file mode 100644
index 000000000..0b66fdb1f
--- /dev/null
+++ b/src/libtracker-direct/tracker-direct.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
+ * Copyright (C) 2017, Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __TRACKER_LOCAL_CONNECTION_H__
+#define __TRACKER_LOCAL_CONNECTION_H__
+
+#include <libtracker-sparql/tracker-sparql.h>
+#include <libtracker-data/tracker-data-manager.h>
+
+#define TRACKER_TYPE_DIRECT_CONNECTION (tracker_direct_connection_get_type())
+#define TRACKER_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnection))
+#define TRACKER_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass))
+#define TRACKER_IS_DIRECT_CONNECTION(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TRACKER_TYPE_DIRECT_CONNECTION))
+#define TRACKER_IS_DIRECT_CONNECTION_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), TRACKER_TYPE_DIRECT_CONNECTION))
+#define TRACKER_DIRECT_CONNECTION_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TRACKER_TYPE_DIRECT_CONNECTION, TrackerDirectConnectionClass))
+
+typedef struct _TrackerDirectConnection TrackerDirectConnection;
+typedef struct _TrackerDirectConnectionClass TrackerDirectConnectionClass;
+
+struct _TrackerDirectConnectionClass
+{
+ TrackerSparqlConnectionClass parent_class;
+};
+
+struct _TrackerDirectConnection
+{
+ TrackerSparqlConnection parent_instance;
+};
+
+TrackerDirectConnection *tracker_direct_connection_new (TrackerSparqlConnectionFlags flags,
+ GFile *store,
+ GFile *journal,
+ GFile *ontology,
+ GError **error);
+
+TrackerDataManager *tracker_direct_connection_get_data_manager (TrackerDirectConnection *conn);
+
+void tracker_direct_connection_set_default_flags (TrackerDBManagerFlags flags);
+
+void tracker_direct_connection_sync (TrackerDirectConnection *conn);
+
+#endif /* __TRACKER_LOCAL_CONNECTION_H__ */
diff --git a/src/libtracker-direct/tracker-direct.vala b/src/libtracker-direct/tracker-direct.vala
deleted file mode 100644
index b149b6806..000000000
--- a/src/libtracker-direct/tracker-direct.vala
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Copyright (C) 2010, Nokia <ivan.frade@nokia.com>
- * Copyright (C) 2017, Red Hat, Inc.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-public class Tracker.Direct.Connection : Tracker.Sparql.Connection, AsyncInitable, Initable {
- File? database_loc;
- File? journal_loc;
- File? ontology_loc;
- Sparql.ConnectionFlags flags;
-
- Data.Manager data_manager;
-
- // Mutex to hold datamanager
- private Mutex mutex = Mutex ();
- Thread<void*> thread;
-
- // Initialization stuff, both sync and async
- private Mutex init_mutex = Mutex ();
- private Cond init_cond = Cond ();
- private bool initialized;
- private Error init_error;
- public SourceFunc init_callback;
-
- private AsyncQueue<Task> update_queue;
- private NamespaceManager namespace_manager;
-
- [CCode (cname = "SHAREDIR")]
- extern const string SHAREDIR;
-
- enum TaskType {
- QUERY,
- UPDATE,
- UPDATE_BLANK,
- TURTLE,
- }
-
- abstract class Task {
- public TaskType type;
- public int priority;
- public Cancellable? cancellable;
- public SourceFunc callback;
- public Error error;
- }
-
- private class UpdateTask : Task {
- public string sparql;
- public Variant blank_nodes;
-
- private void set (TaskType type, string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable = null) {
- this.type = type;
- this.sparql = sparql;
- this.priority = priority;
- this.cancellable = cancellable;
- }
-
- public UpdateTask (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) {
- this.set (TaskType.UPDATE, sparql, priority, cancellable);
- }
-
- public UpdateTask.blank (string sparql, int priority = Priority.DEFAULT, Cancellable? cancellable) {
- this.set (TaskType.UPDATE_BLANK, sparql, priority, cancellable);
- }
- }
-
- private class TurtleTask : Task {
- public File file;
-
- public TurtleTask (File file, Cancellable? cancellable) {
- this.type = TaskType.TURTLE;
- this.file = file;
- this.priority = Priority.DEFAULT;
- this.cancellable = cancellable;
- }
- }
-
- static void wal_checkpoint (DBInterface iface, bool blocking) {
- try {
- debug ("Checkpointing database...");
- iface.sqlite_wal_checkpoint (blocking);
- debug ("Checkpointing complete...");
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- static void wal_checkpoint_on_thread (DBInterface iface) {
- new Thread<void*> ("wal-checkpoint", () => {
- wal_checkpoint (iface, false);
- return null;
- });
- }
-
- static void wal_hook (DBInterface iface, int n_pages) {
- var manager = (Data.Manager) iface.get_user_data ();
- var wal_iface = manager.get_wal_db_interface ();
-
- if (n_pages >= 10000) {
- // do immediate checkpointing (blocking updates)
- // to prevent excessive wal file growth
- wal_checkpoint (wal_iface, true);
- } else if (n_pages >= 1000) {
- wal_checkpoint_on_thread (wal_iface);
- }
- }
-
- private void* thread_func () {
- init_mutex.lock ();
-
- try {
- Locale.sanity_check ();
- DBManagerFlags db_flags = DBManagerFlags.ENABLE_MUTEXES;
- if ((flags & Sparql.ConnectionFlags.READONLY) != 0)
- db_flags |= DBManagerFlags.READONLY;
-
- data_manager = new Data.Manager (db_flags,
- database_loc, journal_loc, ontology_loc,
- false, false, 100, 100);
- data_manager.init ();
-
- var iface = data_manager.get_writable_db_interface ();
- iface.sqlite_wal_hook (wal_hook);
- } catch (Error e) {
- init_error = e;
- } finally {
- if (init_callback != null) {
- init_callback ();
- } else {
- initialized = true;
- init_cond.signal ();
- init_mutex.unlock ();
- }
- }
-
- while (true) {
- var task = update_queue.pop();
-
- try {
- switch (task.type) {
- case TaskType.UPDATE:
- UpdateTask update_task = (UpdateTask) task;
- update (update_task.sparql, update_task.priority, update_task.cancellable);
- break;
- case TaskType.UPDATE_BLANK:
- UpdateTask update_task = (UpdateTask) task;
- update_task.blank_nodes = update_blank (update_task.sparql, update_task.priority, update_task.cancellable);
- break;
- case TaskType.TURTLE:
- TurtleTask turtle_task = (TurtleTask) task;
- load (turtle_task.file, turtle_task.cancellable);
- break;
- default:
- break;
- }
- } catch (Error e) {
- task.error = e;
- }
-
- task.callback ();
- }
- }
-
- public async bool init_async (int io_priority, Cancellable? cancellable) throws Error {
- init_callback = init_async.callback;
- thread = new Thread<void*> ("database", thread_func);
-
- return initialized;
- }
-
- public bool init (Cancellable? cancellable) throws Error {
- try {
- thread = new Thread<void*> ("database", thread_func);
-
- init_mutex.lock ();
- while (!initialized)
- init_cond.wait(init_mutex);
- init_mutex.unlock ();
-
- if (init_error != null)
- throw init_error;
- } catch (Error e) {
- throw new Sparql.Error.INTERNAL (e.message);
- }
-
- return true;
- }
-
- public Connection (Sparql.ConnectionFlags connection_flags, File loc, File? journal, File? ontology) throws Sparql.Error, IOError, DBusError {
- database_loc = loc;
- journal_loc = journal;
- ontology_loc = ontology;
- flags = connection_flags;
-
- if (journal_loc == null)
- journal_loc = database_loc;
- if (ontology_loc == null)
- ontology_loc = File.new_for_path (Path.build_filename (SHAREDIR, "tracker", "ontologies", "nepomuk"));
-
- update_queue = new AsyncQueue<Task> ();
- }
-
- public override void dispose () {
- data_manager.shutdown ();
- base.dispose ();
- }
-
- Sparql.Cursor query_unlocked (string sparql) throws Sparql.Error, DBusError {
- try {
- var query_object = new Sparql.Query (data_manager, sparql);
- var cursor = query_object.execute_cursor ();
- cursor.connection = this;
- return cursor;
- } catch (DBInterfaceError e) {
- throw new Sparql.Error.INTERNAL (e.message);
- } catch (DateError e) {
- throw new Sparql.Error.PARSE (e.message);
- }
- }
-
- public override Sparql.Cursor query (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
- // Check here for early cancellation, just in case
- // the operation can be entirely avoided
- if (cancellable != null && cancellable.is_cancelled ()) {
- throw new IOError.CANCELLED ("Operation was cancelled");
- }
-
- mutex.lock ();
- try {
- return query_unlocked (sparql);
- } finally {
- mutex.unlock ();
- }
- }
-
- public async override Sparql.Cursor query_async (string sparql, Cancellable? cancellable) throws Sparql.Error, IOError, DBusError {
- // run in a separate thread
- Sparql.Error sparql_error = null;
- IOError io_error = null;
- DBusError dbus_error = null;
- GLib.Error error = null;
- Sparql.Cursor result = null;
- var context = MainContext.get_thread_default ();
-
- IOSchedulerJob.push ((job, cancellable) => {
- try {
- result = query (sparql, cancellable);
- } catch (IOError e_io) {
- io_error = e_io;
- } catch (Sparql.Error e_spql) {
- sparql_error = e_spql;
- } catch (DBusError e_dbus) {
- dbus_error = e_dbus;
- } catch (GLib.Error e) {
- error = e;
- }
-
- context.invoke (() => {
- query_async.callback ();
- return false;
- });
-
- return false;
- }, Priority.DEFAULT, cancellable);
-
- yield;
-
- if (cancellable != null && cancellable.is_cancelled ()) {
- throw new IOError.CANCELLED ("Operation was cancelled");
- } else if (sparql_error != null) {
- throw sparql_error;
- } else if (io_error != null) {
- throw io_error;
- } else if (dbus_error != null) {
- throw dbus_error;
- } else {
- return result;
- }
- }
-
- public override void update (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
- mutex.lock ();
- try {
- var data = data_manager.get_data ();
- data.update_sparql (sparql);
- } finally {
- mutex.unlock ();
- }
- }
-
- public async override void update_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
- var task = new UpdateTask (sparql, priority, cancellable);
- task.callback = update_async.callback;
- update_queue.push (task);
- yield;
-
- if (task.error != null)
- throw task.error;
- }
-
- public override GLib.Variant? update_blank (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
- GLib.Variant? blank_nodes = null;
- mutex.lock ();
- try {
- var data = data_manager.get_data ();
- blank_nodes = data.update_sparql_blank (sparql);
- } finally {
- mutex.unlock ();
- }
-
- return blank_nodes;
- }
-
- public async override GLib.Variant? update_blank_async (string sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError, GLib.Error {
- var task = new UpdateTask.blank (sparql, priority, cancellable);
- task.callback = update_blank_async.callback;
- update_queue.push (task);
- yield;
-
- if (task.error != null)
- throw task.error;
-
- return task.blank_nodes;
- }
-
- public async override GenericArray<Sparql.Error?>? update_array_async (string[] sparql, int priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, DBusError {
- var combined_query = new StringBuilder ();
- var n_updates = sparql.length;
- int i;
-
- for (i = 0; i < n_updates; i++)
- combined_query.append (sparql[i]);
-
- var task = new UpdateTask (combined_query.str, priority, cancellable);
- task.callback = update_array_async.callback;
- update_queue.push (task);
- yield;
-
- var errors = new GenericArray<Sparql.Error?> (n_updates);
-
- if (task.error == null) {
- for (i = 0; i < n_updates; i++)
- errors.add (null);
- } else {
- // combined query was not successful, try queries one by one
- for (i = 0; i < n_updates; i++) {
- try {
- yield update_async (sparql[i], priority, cancellable);
- errors.add (null);
- } catch (Sparql.Error e) {
- errors.add (e);
- }
- }
- }
-
- return errors;
- }
-
- public override void load (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
- mutex.lock ();
- try {
- var data = data_manager.get_data ();
- data.load_turtle_file (file);
- } finally {
- mutex.unlock ();
- }
- }
-
- public async override void load_async (File file, Cancellable? cancellable = null) throws Sparql.Error, IOError, DBusError {
- var task = new TurtleTask (file, cancellable);
- task.callback = load_async.callback;
- update_queue.push (task);
- yield;
-
- if (task.error != null)
- throw new Sparql.Error.INTERNAL (task.error.message);
- }
-
- public override NamespaceManager? get_namespace_manager () {
- if (namespace_manager == null && data_manager != null) {
- var ht = data_manager.get_namespaces ();
- namespace_manager = new NamespaceManager ();
-
- foreach (var prefix in ht.get_keys ()) {
- namespace_manager.add_prefix (prefix, ht.lookup (prefix));
- }
- }
-
- return namespace_manager;
- }
-}
diff --git a/src/libtracker-direct/tracker-direct.vapi b/src/libtracker-direct/tracker-direct.vapi
new file mode 100644
index 000000000..df15c5890
--- /dev/null
+++ b/src/libtracker-direct/tracker-direct.vapi
@@ -0,0 +1,12 @@
+[CCode (cprefix = "Tracker", gir_namespace = "Tracker", gir_version = "2.0", lower_case_cprefix = "tracker_")]
+namespace Tracker {
+ namespace Direct {
+ [CCode (cheader_filename = "libtracker-direct/tracker-direct.h")]
+ public class Connection : Tracker.Sparql.Connection, GLib.Initable, GLib.AsyncInitable {
+ public Connection (Tracker.Sparql.ConnectionFlags connection_flags, GLib.File loc, GLib.File? journal, GLib.File? ontology) throws Tracker.Sparql.Error, GLib.IOError, GLib.DBusError;
+ public Tracker.Data.Manager get_data_manager ();
+ public void sync ();
+ public static void set_default_flags (Tracker.DBManagerFlags flags);
+ }
+ }
+}
diff --git a/src/libtracker-direct/tracker-namespace.vala b/src/libtracker-direct/tracker-namespace.vala
deleted file mode 100644
index 24d7b2ee8..000000000
--- a/src/libtracker-direct/tracker-namespace.vala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright © 2015 Collabora Ltd.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- * Boston, MA 02110-1301, USA.
- */
-
-/*
- * This file serves as the representation for the Tracker namespace, mostly
- * so that we can set its namespace and version attributes for GIR.
- */
-
-[CCode (cprefix = "TrackerDirect", gir_namespace = "TrackerDirect",
- gir_version = "2.0", lower_case_cprefix = "tracker_direct_")]
-namespace Tracker
-{
-}
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index e8c9de85e..0fbf7c60d 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -775,7 +775,8 @@ queue_event_coalesce (const QueueEvent *first,
*replacement = NULL;
if (first->type == TRACKER_MINER_FS_EVENT_CREATED) {
- if (second->type == TRACKER_MINER_FS_EVENT_UPDATED &&
+ if ((second->type == TRACKER_MINER_FS_EVENT_UPDATED ||
+ second->type == TRACKER_MINER_FS_EVENT_CREATED) &&
first->file == second->file) {
return QUEUE_ACTION_DELETE_SECOND;
} else if (second->type == TRACKER_MINER_FS_EVENT_MOVED &&
diff --git a/src/libtracker-sparql-backend/Makefile.am b/src/libtracker-sparql-backend/Makefile.am
index 9a31d98d6..6fe17dccb 100644
--- a/src/libtracker-sparql-backend/Makefile.am
+++ b/src/libtracker-sparql-backend/Makefile.am
@@ -5,6 +5,7 @@ AM_VALAFLAGS = \
$(BUILD_VALAFLAGS) \
$(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \
$(top_srcdir)/src/libtracker-bus/tracker-bus.vapi \
+ $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \
$(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \
$(top_srcdir)/src/libtracker-remote/tracker-remote.vapi \
$(top_srcdir)/src/libtracker-common/libtracker-common.vapi
diff --git a/src/libtracker-sparql-backend/meson.build b/src/libtracker-sparql-backend/meson.build
index b04cdaf0d..d76143306 100644
--- a/src/libtracker-sparql-backend/meson.build
+++ b/src/libtracker-sparql-backend/meson.build
@@ -1,5 +1,7 @@
libtracker_sparql = library('tracker-sparql-' + tracker_api_version,
'../libtracker-common/libtracker-common.vapi',
+ '../libtracker-data/libtracker-data.vapi',
+ '../libtracker-direct/tracker-direct.vapi',
'tracker-backend.vala',
soversion: soversion,
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index dc5df50c3..2463cb4ab 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \
$(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \
$(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \
$(top_srcdir)/src/libtracker-data/libtracker-data.vapi \
+ $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \
$(top_srcdir)/src/tracker-store/tracker-config.vapi \
$(top_srcdir)/src/tracker-store/tracker-events.vapi \
$(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \
@@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \
tracker_store_LDADD = \
$(top_builddir)/src/libtracker-data/libtracker-data.la \
+ $(top_builddir)/src/libtracker-direct/libtracker-direct.la \
$(top_builddir)/src/libtracker-common/libtracker-common.la \
$(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \
$(BUILD_LIBS) \
diff --git a/src/tracker-store/meson.build b/src/tracker-store/meson.build
index 91d7a335b..1c42da3a3 100644
--- a/src/tracker-store/meson.build
+++ b/src/tracker-store/meson.build
@@ -16,6 +16,7 @@ tracker_store_sources = [
'tracker-writeback.vapi',
'../libtracker-common/libtracker-common.vapi',
'../libtracker-data/libtracker-data.vapi',
+ '../libtracker-direct/tracker-direct.vapi',
]
tracker_store = executable('tracker-store',
@@ -25,7 +26,7 @@ tracker_store = executable('tracker-store',
],
vala_args: [ '--pkg', 'posix' ],
dependencies: [
- tracker_common_dep, tracker_data_dep,
+ tracker_common_dep, tracker_data_dep, tracker_sparql_direct_dep,
gio_unix
],
install: true,
diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala
index 5db075274..a9ede1b9a 100644
--- a/src/tracker-store/tracker-backup.vala
+++ b/src/tracker-store/tracker-backup.vala
@@ -25,7 +25,7 @@ public class Tracker.Backup : Object {
public async void save (BusName sender, string destination_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -57,8 +57,8 @@ public class Tracker.Backup : Object {
throw e;
} finally {
if (resources != null) {
- Tracker.Events.init (Tracker.Main.get_data_manager ());
- resources.enable_signals ();
+ Tracker.Events.init ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
@@ -68,7 +68,7 @@ public class Tracker.Backup : Object {
public async void restore (BusName sender, string journal_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -95,8 +95,8 @@ public class Tracker.Backup : Object {
throw e;
} finally {
if (resources != null) {
- Tracker.Events.init (Tracker.Main.get_data_manager ());
- resources.enable_signals ();
+ Tracker.Events.init ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala
index 35c1542e5..48197f4b9 100644
--- a/src/tracker-store/tracker-dbus.vala
+++ b/src/tracker-store/tracker-dbus.vala
@@ -34,7 +34,6 @@ public class Tracker.DBus {
static uint notifier_id;
static Tracker.Backup backup;
static uint backup_id;
- static Tracker.Config config;
static uint domain_watch_id;
static MainLoop watch_main_loop;
@@ -108,9 +107,8 @@ public class Tracker.DBus {
}
}
- public static bool init (Tracker.Config config_p) {
+ public static bool init () {
/* Don't reinitialize */
- config = config_p;
if (connection != null) {
return true;
}
@@ -216,7 +214,7 @@ public class Tracker.DBus {
statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH);
/* Add org.freedesktop.Tracker1.Resources */
- resources = new Tracker.Resources (connection, config);
+ resources = new Tracker.Resources (connection);
if (resources == null) {
critical ("Could not create TrackerResources object to register");
return false;
@@ -261,7 +259,7 @@ public class Tracker.DBus {
return false;
}
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
return true;
}
diff --git a/src/tracker-store/tracker-events.c b/src/tracker-store/tracker-events.c
index 199dced29..77de4c256 100644
--- a/src/tracker-store/tracker-events.c
+++ b/src/tracker-store/tracker-events.c
@@ -26,28 +26,230 @@
#include "tracker-events.h"
+typedef struct _TrackerEventBatch TrackerEventBatch;
+
+struct _TrackerEventBatch
+{
+ struct {
+ GArray *sub_pred_ids;
+ GArray *obj_graph_ids;
+ } deletes;
+ struct {
+ GArray *sub_pred_ids;
+ GArray *obj_graph_ids;
+ } inserts;
+};
+
typedef struct {
- gboolean frozen;
+ /* Accessed by updates/dbus threads */
+ GMutex mutex;
+ GHashTable *ready;
+
+ /* Only accessed by updates thread */
+ GHashTable *pending;
guint total;
- GPtrArray *notify_classes;
} EventsPrivate;
static EventsPrivate *private;
-guint
-tracker_events_get_total (gboolean and_reset)
+static TrackerEventBatch *
+tracker_event_batch_new (void)
{
- guint total;
+ TrackerEventBatch *events;
+
+ events = g_new0 (TrackerEventBatch, 1);
+ events->deletes.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
+ events->deletes.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
+ events->inserts.sub_pred_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
+ events->inserts.obj_graph_ids = g_array_new (FALSE, FALSE, sizeof (gint64));
+
+ return events;
+}
+
+static void
+tracker_event_batch_free (TrackerEventBatch *events)
+{
+ g_array_unref (events->deletes.sub_pred_ids);
+ g_array_unref (events->deletes.obj_graph_ids);
+ g_array_unref (events->inserts.sub_pred_ids);
+ g_array_unref (events->inserts.obj_graph_ids);
+ g_free (events);
+}
+
+static void
+insert_vals_into_arrays (GArray *sub_pred_ids,
+ GArray *obj_graph_ids,
+ gint graph_id,
+ gint subject_id,
+ gint pred_id,
+ gint object_id)
+{
+ gint i, j, k;
+ gint64 tmp;
+ gint64 sub_pred_id;
+ gint64 obj_graph_id;
+
+ sub_pred_id = (gint64) subject_id;
+ sub_pred_id = sub_pred_id << 32 | pred_id;
+ obj_graph_id = (gint64) object_id;
+ obj_graph_id = obj_graph_id << 32 | graph_id;
+
+ i = 0;
+ j = sub_pred_ids->len - 1;
+
+ while (j - i > 0) {
+ k = (i + j) / 2;
+ tmp = g_array_index (sub_pred_ids, gint64, k);
+ if (tmp == sub_pred_id) {
+ i = k + 1;
+ break;
+ } else if (tmp > sub_pred_id)
+ j = k;
+ else
+ i = k + 1;
+ }
+
+ g_array_insert_val (sub_pred_ids, i, sub_pred_id);
+ g_array_insert_val (obj_graph_ids, i, obj_graph_id);
+}
+
+static void
+tracker_event_batch_add_insert_event (TrackerEventBatch *events,
+ gint graph_id,
+ gint subject_id,
+ gint pred_id,
+ gint object_id)
+{
+ insert_vals_into_arrays (events->inserts.sub_pred_ids,
+ events->inserts.obj_graph_ids,
+ graph_id,
+ subject_id,
+ pred_id,
+ object_id);
+}
+
+static void
+tracker_event_batch_add_delete_event (TrackerEventBatch *events,
+ gint graph_id,
+ gint subject_id,
+ gint pred_id,
+ gint object_id)
+{
+ insert_vals_into_arrays (events->deletes.sub_pred_ids,
+ events->deletes.obj_graph_ids,
+ graph_id,
+ subject_id,
+ pred_id,
+ object_id);
+}
+
+static void
+foreach_event_in_arrays (GArray *sub_pred_ids,
+ GArray *obj_graph_ids,
+ TrackerEventsForeach foreach,
+ gpointer user_data)
+{
+ guint i;
+
+ g_assert (sub_pred_ids->len == obj_graph_ids->len);
+
+ for (i = 0; i < sub_pred_ids->len; i++) {
+ gint graph_id, subject_id, pred_id, object_id;
+ gint64 sub_pred_id;
+ gint64 obj_graph_id;
+
+ sub_pred_id = g_array_index (sub_pred_ids, gint64, i);
+ obj_graph_id = g_array_index (obj_graph_ids, gint64, i);
+
+ pred_id = sub_pred_id & 0xffffffff;
+ subject_id = sub_pred_id >> 32;
+ graph_id = obj_graph_id & 0xffffffff;
+ object_id = obj_graph_id >> 32;
+
+ foreach (graph_id, subject_id, pred_id, object_id, user_data);
+ }
+}
+
+void
+tracker_event_batch_foreach_insert_event (TrackerEventBatch *events,
+ TrackerEventsForeach foreach,
+ gpointer user_data)
+{
+ g_return_if_fail (events != NULL);
+ g_return_if_fail (foreach != NULL);
+
+ foreach_event_in_arrays (events->inserts.sub_pred_ids,
+ events->inserts.obj_graph_ids,
+ foreach, user_data);
+}
+
+void
+tracker_event_batch_foreach_delete_event (TrackerEventBatch *events,
+ TrackerEventsForeach foreach,
+ gpointer user_data)
+{
+ g_return_if_fail (events != NULL);
+ g_return_if_fail (foreach != NULL);
+
+ foreach_event_in_arrays (events->deletes.sub_pred_ids,
+ events->deletes.obj_graph_ids,
+ foreach, user_data);
+}
+
+static GHashTable *
+tracker_event_batch_hashtable_new (void)
+{
+ return g_hash_table_new_full (NULL, NULL,
+ (GDestroyNotify) g_object_unref,
+ (GDestroyNotify) tracker_event_batch_free);
+}
+
+void
+tracker_event_batch_merge (TrackerEventBatch *dest,
+ TrackerEventBatch *to_copy)
+{
+ g_array_append_vals (dest->deletes.sub_pred_ids,
+ to_copy->deletes.sub_pred_ids->data,
+ to_copy->deletes.sub_pred_ids->len);
+ g_array_append_vals (dest->deletes.obj_graph_ids,
+ to_copy->deletes.obj_graph_ids->data,
+ to_copy->deletes.obj_graph_ids->len);
+ g_array_append_vals (dest->inserts.sub_pred_ids,
+ to_copy->inserts.sub_pred_ids->data,
+ to_copy->inserts.sub_pred_ids->len);
+ g_array_append_vals (dest->inserts.obj_graph_ids,
+ to_copy->inserts.obj_graph_ids->data,
+ to_copy->inserts.obj_graph_ids->len);
+}
+guint
+tracker_events_get_total (void)
+{
g_return_val_if_fail (private != NULL, 0);
- total = private->total;
+ return private->total;
+}
+
+static inline TrackerEventBatch *
+ensure_event_batch (TrackerClass *rdf_type)
+{
+ TrackerEventBatch *events;
+
+ g_assert (private != NULL);
- if (and_reset) {
- private->total = 0;
+ if (!private->pending)
+ private->pending = tracker_event_batch_hashtable_new ();
+
+ events = g_hash_table_lookup (private->pending, rdf_type);
+
+ if (!events) {
+ events = tracker_event_batch_new ();
+ g_hash_table_insert (private->pending,
+ g_object_ref (rdf_type),
+ events);
}
- return total;
+ return events;
}
void
@@ -59,24 +261,23 @@ tracker_events_add_insert (gint graph_id,
const gchar *object,
GPtrArray *rdf_types)
{
+ TrackerEventBatch *events;
guint i;
g_return_if_fail (rdf_types != NULL);
g_return_if_fail (private != NULL);
- if (private->frozen) {
- return;
- }
-
for (i = 0; i < rdf_types->len; i++) {
- if (tracker_class_get_notify (rdf_types->pdata[i])) {
- tracker_class_add_insert_event (rdf_types->pdata[i],
- graph_id,
- subject_id,
- pred_id,
- object_id);
- private->total++;
- }
+ if (!tracker_class_get_notify (rdf_types->pdata[i]))
+ continue;
+
+ events = ensure_event_batch (rdf_types->pdata[i]);
+ tracker_event_batch_add_insert_event (events,
+ graph_id,
+ subject_id,
+ pred_id,
+ object_id);
+ private->total++;
}
}
@@ -89,104 +290,89 @@ tracker_events_add_delete (gint graph_id,
const gchar *object,
GPtrArray *rdf_types)
{
+ TrackerEventBatch *events;
guint i;
g_return_if_fail (rdf_types != NULL);
g_return_if_fail (private != NULL);
- if (private->frozen) {
- return;
- }
-
for (i = 0; i < rdf_types->len; i++) {
- if (tracker_class_get_notify (rdf_types->pdata[i])) {
- tracker_class_add_delete_event (rdf_types->pdata[i],
- graph_id,
- subject_id,
- pred_id,
- object_id);
- private->total++;
- }
+ if (!tracker_class_get_notify (rdf_types->pdata[i]))
+ continue;
+
+ events = ensure_event_batch (rdf_types->pdata[i]);
+ tracker_event_batch_add_delete_event (events,
+ graph_id,
+ subject_id,
+ pred_id,
+ object_id);
+ private->total++;
}
}
void
-tracker_events_reset_pending (void)
+tracker_events_transact (void)
{
- guint i;
+ TrackerEventBatch *prev_events, *events;
+ TrackerClass *rdf_type;
+ GHashTableIter iter;
g_return_if_fail (private != NULL);
- for (i = 0; i < private->notify_classes->len; i++) {
- TrackerClass *class = g_ptr_array_index (private->notify_classes, i);
+ if (!private->pending || g_hash_table_size (private->pending) == 0)
+ return;
+
+ g_mutex_lock (&private->mutex);
- tracker_class_reset_pending_events (class);
+ if (!private->ready) {
+ private->ready = tracker_event_batch_hashtable_new ();
}
- private->frozen = FALSE;
+ g_hash_table_iter_init (&iter, private->pending);
+
+ while (g_hash_table_iter_next (&iter,
+ (gpointer *) &rdf_type,
+ (gpointer *) &events)) {
+ prev_events = g_hash_table_lookup (private->ready,
+ rdf_type);
+ if (prev_events) {
+ tracker_event_batch_merge (prev_events, events);
+ g_hash_table_iter_remove (&iter);
+ } else {
+ g_hash_table_iter_steal (&iter);
+ g_hash_table_insert (private->ready,
+ g_object_ref (rdf_type),
+ events);
+ /* Drop the reference stolen from the pending HT */
+ g_object_unref (rdf_type);
+ }
+ }
+
+ private->total = 0;
+
+ g_mutex_unlock (&private->mutex);
}
void
-tracker_events_freeze (void)
+tracker_events_reset_pending (void)
{
g_return_if_fail (private != NULL);
- private->frozen = TRUE;
+ g_clear_pointer (&private->pending, g_hash_table_unref);
}
static void
free_private (EventsPrivate *private)
{
- guint i;
-
- for (i = 0; i < private->notify_classes->len; i++) {
- TrackerClass *class = g_ptr_array_index (private->notify_classes, i);
-
- tracker_class_reset_pending_events (class);
-
- /* Perhaps hurry an emit of the ready events here? We're shutting down,
- * so I guess we're not required to do that here ... ? */
- tracker_class_reset_ready_events (class);
- }
-
- g_ptr_array_unref (private->notify_classes);
-
+ tracker_events_reset_pending ();
g_free (private);
}
-TrackerClass **
-tracker_events_get_classes (guint *length)
-{
- g_return_val_if_fail (private != NULL, NULL);
-
- *length = private->notify_classes->len;
-
- return (TrackerClass **) (private->notify_classes->pdata);
-}
-
void
-tracker_events_init (TrackerDataManager *data_manager)
+tracker_events_init (void)
{
- TrackerOntologies *ontologies;
- TrackerClass **classes;
- guint length = 0, i;
-
private = g_new0 (EventsPrivate, 1);
-
- ontologies = tracker_data_manager_get_ontologies (data_manager);
- classes = tracker_ontologies_get_classes (ontologies, &length);
-
- private->notify_classes = g_ptr_array_sized_new (length);
- g_ptr_array_set_free_func (private->notify_classes, (GDestroyNotify) g_object_unref);
-
- for (i = 0; i < length; i++) {
- TrackerClass *class = classes[i];
-
- if (tracker_class_get_notify (class)) {
- g_ptr_array_add (private->notify_classes, g_object_ref (class));
- }
- }
-
+ g_mutex_init (&private->mutex);
}
void
@@ -199,3 +385,18 @@ tracker_events_shutdown (void)
g_warning ("tracker_events already shutdown");
}
}
+
+GHashTable *
+tracker_events_get_pending (void)
+{
+ GHashTable *pending;
+
+ g_return_val_if_fail (private != NULL, NULL);
+
+ g_mutex_lock (&private->mutex);
+ pending = private->ready;
+ private->ready = NULL;
+ g_mutex_unlock (&private->mutex);
+
+ return pending;
+}
diff --git a/src/tracker-store/tracker-events.h b/src/tracker-store/tracker-events.h
index df9322ff1..1aea15ab6 100644
--- a/src/tracker-store/tracker-events.h
+++ b/src/tracker-store/tracker-events.h
@@ -28,9 +28,15 @@
G_BEGIN_DECLS
-typedef GStrv (*TrackerNotifyClassGetter) (void);
+typedef struct _TrackerEventBatch TrackerEventBatch;
-void tracker_events_init (TrackerDataManager *data_manager);
+typedef void (*TrackerEventsForeach) (gint graph_id,
+ gint subject_id,
+ gint pred_id,
+ gint object_id,
+ gpointer user_data);
+
+void tracker_events_init (void);
void tracker_events_shutdown (void);
void tracker_events_add_insert (gint graph_id,
gint subject_id,
@@ -46,10 +52,19 @@ void tracker_events_add_delete (gint graph_id,
gint object_id,
const gchar *object,
GPtrArray *rdf_types);
-guint tracker_events_get_total (gboolean and_reset);
+guint tracker_events_get_total (void);
void tracker_events_reset_pending (void);
-void tracker_events_freeze (void);
-TrackerClass** tracker_events_get_classes (guint *length);
+
+void tracker_events_transact (void);
+
+GHashTable * tracker_events_get_pending (void);
+
+void tracker_event_batch_foreach_insert_event (TrackerEventBatch *events,
+ TrackerEventsForeach foreach,
+ gpointer user_data);
+void tracker_event_batch_foreach_delete_event (TrackerEventBatch *events,
+ TrackerEventsForeach foreach,
+ gpointer user_data);
G_END_DECLS
diff --git a/src/tracker-store/tracker-events.vapi b/src/tracker-store/tracker-events.vapi
index c232d8af5..16df92e84 100644
--- a/src/tracker-store/tracker-events.vapi
+++ b/src/tracker-store/tracker-events.vapi
@@ -20,13 +20,21 @@
namespace Tracker {
[CCode (cheader_filename = "tracker-store/tracker-events.h")]
namespace Events {
- public void init (Tracker.Data.Manager data_manager);
+ public void init ();
public void shutdown ();
public void add_insert (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types);
public void add_delete (int graph_id, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types);
- public uint get_total (bool and_reset);
+ public uint get_total ();
public void reset_pending ();
- public void freeze ();
- public unowned Class[] get_classes ();
+
+ public void transact ();
+ public GLib.HashTable<Tracker.Class, Batch> get_pending ();
+
+ [CCode (lower_case_cprefix="tracker_event_batch_", cname = "TrackerEventBatch")]
+ public class Batch {
+ public delegate void EventsForeach (int graph_id, int subject_id, int pred_id, int object_id);
+ public void foreach_delete_event (EventsForeach func);
+ public void foreach_insert_event (EventsForeach func);
+ }
}
}
diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala
index 8e6d8895d..861aed2eb 100644
--- a/src/tracker-store/tracker-main.vala
+++ b/src/tracker-store/tracker-main.vala
@@ -39,6 +39,7 @@ License which can be viewed at:
static bool shutdown;
+ static Tracker.Direct.Connection connection;
static Tracker.Data.Manager data_manager;
/* Private command line parameters */
@@ -175,6 +176,10 @@ License which can be viewed at:
return data_manager;
}
+ public static unowned Tracker.Direct.Connection get_sparql_connection () {
+ return connection;
+ }
+
static int main (string[] args) {
Intl.setlocale (LocaleCategory.ALL, "");
@@ -239,7 +244,7 @@ License which can be viewed at:
sanity_check_option_values (config);
- if (!Tracker.DBus.init (config)) {
+ if (!Tracker.DBus.init ()) {
return 1;
}
@@ -247,7 +252,7 @@ License which can be viewed at:
config_verbosity_changed_cb (config, null);
ulong config_verbosity_id = config.notify["verbosity"].connect (config_verbosity_changed_cb);
- DBManagerFlags flags = DBManagerFlags.REMOVE_CACHE;
+ DBManagerFlags flags = 0;
if (force_reindex) {
/* TODO port backup support
@@ -256,9 +261,11 @@ License which can be viewed at:
flags |= DBManagerFlags.FORCE_REINDEX;
}
+ Tracker.Direct.Connection.set_default_flags (flags);
+
var notifier = Tracker.DBus.register_notifier ();
- Tracker.Store.init ();
+ Tracker.Store.init (config);
/* Make Tracker available for introspection */
if (!Tracker.DBus.register_objects ()) {
@@ -281,45 +288,25 @@ License which can be viewed at:
Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to);
- int select_cache_size, update_cache_size;
- string cache_size_s;
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- select_cache_size = int.parse (cache_size_s);
- } else {
- select_cache_size = SELECT_CACHE_SIZE;
- }
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- update_cache_size = int.parse (cache_size_s);
- } else {
- update_cache_size = UPDATE_CACHE_SIZE;
- }
-
try {
- data_manager = new Tracker.Data.Manager (flags,
- cache_location,
- data_location,
- ontology_location,
- true,
- false,
- select_cache_size,
- update_cache_size);
- data_manager.init (null);
+ connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE,
+ cache_location,
+ data_location,
+ ontology_location);
+ connection.init (null);
} catch (GLib.Error e) {
critical ("Cannot initialize database: %s", e.message);
return 1;
}
+ data_manager = connection.get_data_manager ();
db_config = null;
notifier = null;
if (!shutdown) {
Tracker.DBus.register_prepare_class_signal ();
- Tracker.Events.init (data_manager);
+ Tracker.Events.init ();
Tracker.Writeback.init (data_manager, get_writeback_predicates);
Tracker.Store.resume ();
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index ed34a5f56..5cfab8bb2 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -22,8 +22,6 @@
public class Tracker.Resources : Object {
public const string PATH = "/org/freedesktop/Tracker1/Resources";
- const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
-
/* I *know* that this is some arbitrary number that doesn't seem to
* resemble anything. In fact it's what I experimentally measured to
* be a good value on a default Debian testing which has
@@ -50,25 +48,22 @@ public class Tracker.Resources : Object {
const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000;
DBusConnection connection;
- uint signal_timeout;
- bool regular_commit_pending;
- Tracker.Config config;
public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects);
public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts);
- public Resources (DBusConnection connection, Tracker.Config config_p) {
+ public Resources (DBusConnection connection) {
this.connection = connection;
- this.config = config_p;
+ Tracker.Store.set_signal_callback (on_emit_signals);
}
public async void load (BusName sender, string uri) throws Error {
var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri);
try {
var file = File.new_for_uri (uri);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.queue_turtle_import (data_manager, file, sender);
+ yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -89,9 +84,9 @@ public class Tracker.Resources : Object {
request.debug ("query: %s", query);
try {
var builder = new VariantBuilder ((VariantType) "aas");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
while (cursor.next ()) {
builder.open ((VariantType) "as");
@@ -131,8 +126,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -152,8 +147,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender);
request.end ();
@@ -174,10 +169,10 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.Sync");
var data_manager = Tracker.Main.get_data_manager ();
var data = data_manager.get_data ();
- var iface = data_manager.get_writable_db_interface ();
- // wal checkpoint implies sync
- Tracker.Store.wal_checkpoint (iface, true);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
+
// sync journal if available
data.sync ();
@@ -188,8 +183,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -208,168 +203,63 @@ public class Tracker.Resources : Object {
/* no longer needed, just return */
}
- bool emit_graph_updated (Class cl) {
- if (cl.has_insert_events () || cl.has_delete_events ()) {
- var builder = new VariantBuilder ((VariantType) "a(iiii)");
- cl.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => {
- builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id);
- });
- var deletes = builder.end ();
-
- builder = new VariantBuilder ((VariantType) "a(iiii)");
- cl.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => {
- builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id);
- });
- var inserts = builder.end ();
+ void emit_graph_updated (Class cl, Events.Batch events) {
+ var builder = new VariantBuilder ((VariantType) "a(iiii)");
+ events.foreach_delete_event ((graph_id, subject_id, pred_id, object_id) => {
+ builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id);
+ });
+ var deletes = builder.end ();
- graph_updated (cl.uri, deletes, inserts);
+ builder = new VariantBuilder ((VariantType) "a(iiii)");
+ events.foreach_insert_event ((graph_id, subject_id, pred_id, object_id) => {
+ builder.add ("(iiii)", graph_id, subject_id, pred_id, object_id);
+ });
+ var inserts = builder.end ();
- cl.reset_ready_events ();
-
- return true;
- }
- return false;
+ graph_updated (cl.uri, deletes, inserts);
}
- bool on_emit_signals () {
- foreach (var cl in Tracker.Events.get_classes ()) {
- emit_graph_updated (cl);
- }
-
- /* Reset counter */
- Tracker.Events.get_total (true);
-
- /* Writeback feature */
- var writebacks = Tracker.Writeback.get_ready ();
-
- if (writebacks != null) {
- var builder = new VariantBuilder ((VariantType) "a{iai}");
-
- var wb_iter = HashTableIter<int, GLib.Array<int>> (writebacks);
+ void emit_writeback (HashTable<int, Array<int>> events) {
+ var builder = new VariantBuilder ((VariantType) "a{iai}");
+ var wb_iter = HashTableIter<int, GLib.Array<int>> (events);
- int subject_id;
- unowned Array<int> types;
- while (wb_iter.next (out subject_id, out types)) {
- builder.open ((VariantType) "{iai}");
+ int subject_id;
+ unowned Array<int> types;
+ while (wb_iter.next (out subject_id, out types)) {
+ builder.open ((VariantType) "{iai}");
- builder.add ("i", subject_id);
+ builder.add ("i", subject_id);
- builder.open ((VariantType) "ai");
- for (int i = 0; i < types.length; i++) {
- builder.add ("i", types.index (i));
- }
- builder.close ();
-
- builder.close ();
+ builder.open ((VariantType) "ai");
+ for (int i = 0; i < types.length; i++) {
+ builder.add ("i", types.index (i));
}
+ builder.close ();
- writeback (builder.end ());
+ builder.close ();
}
- Tracker.Writeback.reset_ready ();
-
- regular_commit_pending = false;
- signal_timeout = 0;
- return false;
+ writeback (builder.end ());
}
- void on_statements_committed (Tracker.Data.Update.CommitType commit_type) {
- /* Class signal feature */
+ void on_emit_signals (HashTable<Tracker.Class, Tracker.Events.Batch>? events, HashTable<int, GLib.Array<int>>? writebacks) {
+ if (events != null) {
+ var iter = HashTableIter<Tracker.Class, Tracker.Events.Batch> (events);
+ unowned Events.Batch class_events;
+ unowned Class cl;
- foreach (var cl in Tracker.Events.get_classes ()) {
- cl.transact_events ();
- }
-
- if (!regular_commit_pending) {
- // never cancel timeout for non-batch commits as we want
- // to ensure that the signal corresponding to a certain
- // update arrives within a fixed time limit
-
- // cancel it in all other cases
- // in the BATCH_LAST case, the timeout will be reenabled
- // further down but it's important to cancel it first
- // to reset the timeout to 1 s starting now
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
+ while (iter.next (out cl, out class_events)) {
+ emit_graph_updated (cl, class_events);
}
}
- if (commit_type == Tracker.Data.Update.CommitType.REGULAR) {
- regular_commit_pending = true;
- }
-
- if (regular_commit_pending || commit_type == Tracker.Data.Update.CommitType.BATCH_LAST) {
- // timer wanted for non-batch commits and the last in a series of batch commits
- if (signal_timeout == 0) {
- signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals);
- }
- }
-
- /* Writeback feature */
- Tracker.Writeback.transact ();
- }
-
- void on_statements_rolled_back (Tracker.Data.Update.CommitType commit_type) {
- Tracker.Events.reset_pending ();
- Tracker.Writeback.reset_pending ();
- }
-
- void check_graph_updated_signal () {
- /* Check for whether we need an immediate emit */
- if (Tracker.Events.get_total (false) > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
- // possibly active timeout no longer necessary as signals
- // for committed transactions will be emitted by the following on_emit_signals call
- // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
- }
-
- // immediately emit signals for already committed transaction
- on_emit_signals ();
- }
- }
-
- void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- check_graph_updated_signal ();
- }
-
- void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- check_graph_updated_signal ();
- }
-
- [DBus (visible = false)]
- public void enable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.add_insert_statement_callback (on_statement_inserted);
- data.add_delete_statement_callback (on_statement_deleted);
- data.add_commit_statement_callback (on_statements_committed);
- data.add_rollback_statement_callback (on_statements_rolled_back);
- }
-
- [DBus (visible = false)]
- public void disable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.remove_insert_statement_callback (on_statement_inserted);
- data.remove_delete_statement_callback (on_statement_deleted);
- data.remove_commit_statement_callback (on_statements_committed);
- data.remove_rollback_statement_callback (on_statements_rolled_back);
-
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
+ if (writebacks != null) {
+ emit_writeback (writebacks);
}
}
~Resources () {
- this.disable_signals ();
+ Tracker.Store.set_signal_callback (null);
}
[DBus (visible = false)]
diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala
index 40679bf14..1eb7bef05 100644
--- a/src/tracker-store/tracker-steroids.vala
+++ b/src/tracker-store/tracker-steroids.vala
@@ -29,9 +29,9 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query);
try {
string[] variable_names = null;
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE));
data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN);
@@ -90,10 +90,10 @@ public class Tracker.Steroids : Object {
}
}
- async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error {
+ async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error {
var request = DBusRequest.begin (sender,
"Steroids.%sUpdate%s",
- priority != Tracker.Store.Priority.HIGH ? "Batch" : "",
+ priority != Priority.HIGH ? "Batch" : "",
blank ? "Blank" : "");
try {
size_t bytes_read;
@@ -112,16 +112,16 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
request.debug ("query: %s", (string) query);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
if (!blank) {
- yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender);
request.end ();
return null;
} else {
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender);
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender);
request.end ();
@@ -140,21 +140,21 @@ public class Tracker.Steroids : Object {
}
public async void update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream);
+ yield update_internal (sender, Priority.HIGH, false, input_stream);
}
public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream);
+ yield update_internal (sender, Priority.LOW, false, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream);
+ return yield update_internal (sender, Priority.HIGH, true, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream);
+ return yield update_internal (sender, Priority.LOW, true, input_stream);
}
[DBus (signature = "as")]
@@ -188,11 +188,11 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
var builder = new VariantBuilder ((VariantType) "as");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
// first try combined query for best possible performance
try {
- yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender);
// combined query was successful
for (i = 0; i < query_count; i++) {
@@ -213,7 +213,7 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query_array[i]);
try {
- yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender);
builder.add ("s", "");
builder.add ("s", "");
} catch (Error e1) {
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index 1f58256e8..03cc8078a 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -23,277 +23,49 @@ public class Tracker.Store {
const int MAX_CONCURRENT_QUERIES = 2;
const int MAX_TASK_TIME = 30;
+ const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
- static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static int n_queries_running;
- static bool update_running;
- static ThreadPool<Task> update_pool;
- static ThreadPool<Task> query_pool;
- static ThreadPool<DBInterface> checkpoint_pool;
- static GenericArray<Task> running_tasks;
static int max_task_time;
static bool active;
- static SourceFunc active_callback;
- public enum Priority {
- HIGH,
- LOW,
- TURTLE,
- N_PRIORITIES
- }
-
- enum TaskType {
- QUERY,
- UPDATE,
- UPDATE_BLANK,
- TURTLE,
- }
-
- public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
-
- abstract class Task {
- public TaskType type;
- public string client_id;
- public Error error;
- public SourceFunc callback;
- public Tracker.Data.Manager data_manager;
- }
-
- class QueryTask : Task {
- public string query;
- public Cancellable cancellable;
- public uint watchdog_id;
- public unowned SparqlQueryInThread in_thread;
+ static Tracker.Config config;
+ static uint signal_timeout;
+ static int n_updates;
- ~QueryTask () {
- if (watchdog_id > 0) {
- Source.remove (watchdog_id);
- }
- }
- }
-
- class UpdateTask : Task {
- public string query;
- public Variant blank_nodes;
- public Priority priority;
- }
+ static HashTable<string, Cancellable> client_cancellables;
- class TurtleTask : Task {
- public string path;
- }
-
- static void sched () {
- Task task = null;
-
- if (!active) {
- return;
- }
+ public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
+ static unowned SignalEmissionFunc signal_callback;
- while (n_queries_running < MAX_CONCURRENT_QUERIES) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = query_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task == null) {
- /* no pending query */
- break;
- }
- running_tasks.add (task);
-
- if (max_task_time != 0) {
- var query_task = (QueryTask) task;
- query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => {
- query_task.cancellable.cancel ();
- query_task.watchdog_id = 0;
- return false;
- });
- }
-
- n_queries_running++;
- try {
- query_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
+ public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error;
- if (!update_running) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = update_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task != null) {
- update_running = true;
- try {
- update_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
- }
- }
+ class CursorTask {
+ public Sparql.Cursor cursor;
+ public unowned SourceFunc callback;
+ public unowned SparqlQueryInThread thread_func;
+ public Error error;
- static Tracker.Data.Update.CommitType commit_type (Task task) {
- switch (task.type) {
- case TaskType.UPDATE:
- case TaskType.UPDATE_BLANK:
- if (((UpdateTask) task).priority == Priority.HIGH) {
- return Tracker.Data.Update.CommitType.REGULAR;
- } else if (update_queues[Priority.LOW].get_length () > 0) {
- return Tracker.Data.Update.CommitType.BATCH;
- } else {
- return Tracker.Data.Update.CommitType.BATCH_LAST;
- }
- case TaskType.TURTLE:
- if (update_queues[Priority.TURTLE].get_length () > 0) {
- return Tracker.Data.Update.CommitType.BATCH;
- } else {
- return Tracker.Data.Update.CommitType.BATCH_LAST;
- }
- default:
- warn_if_reached ();
- return Tracker.Data.Update.CommitType.REGULAR;
+ public CursorTask (Sparql.Cursor cursor) {
+ this.cursor = cursor;
}
}
- static bool task_finish_cb (Task task) {
- var data = task.data_manager.get_data ();
-
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- if (task.error == null &&
- query_task.cancellable != null &&
- query_task.cancellable.is_cancelled ()) {
- task.error = new IOError.CANCELLED ("Operation was cancelled");
- }
-
- task.callback ();
- task.error = null;
+ static ThreadPool<CursorTask> cursor_pool;
- running_tasks.remove (task);
- n_queries_running--;
- } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) {
- if (task.error == null) {
- data.notify_transaction (commit_type (task));
- }
-
- task.callback ();
- task.error = null;
-
- update_running = false;
- } else if (task.type == TaskType.TURTLE) {
- if (task.error == null) {
- data.notify_transaction (commit_type (task));
- }
-
- task.callback ();
- task.error = null;
-
- update_running = false;
- }
-
- if (n_queries_running == 0 && !update_running && active_callback != null) {
- active_callback ();
- }
-
- sched ();
-
- return false;
- }
-
- static void pool_dispatch_cb (owned Task task) {
+ private static void cursor_dispatch_cb (owned CursorTask task) {
try {
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query);
-
- query_task.in_thread (cursor);
- } else {
- var data = task.data_manager.get_data ();
- var iface = task.data_manager.get_writable_db_interface ();
- iface.sqlite_wal_hook (wal_hook);
-
- if (task.type == TaskType.UPDATE) {
- var update_task = (UpdateTask) task;
-
- data.update_sparql (update_task.query);
- } else if (task.type == TaskType.UPDATE_BLANK) {
- var update_task = (UpdateTask) task;
-
- update_task.blank_nodes = data.update_sparql_blank (update_task.query);
- } else if (task.type == TaskType.TURTLE) {
- var turtle_task = (TurtleTask) task;
-
- var file = File.new_for_path (turtle_task.path);
-
- Tracker.Events.freeze ();
- try {
- data.load_turtle_file (file);
- } finally {
- Tracker.Events.reset_pending ();
- }
- }
- }
+ task.thread_func (task.cursor);
} catch (Error e) {
task.error = e;
}
Idle.add (() => {
- task_finish_cb (task);
+ task.callback ();
return false;
});
}
- public static void wal_checkpoint (DBInterface iface, bool blocking) {
- try {
- debug ("Checkpointing database...");
- iface.sqlite_wal_checkpoint (blocking);
- debug ("Checkpointing complete...");
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- static int checkpointing;
-
- static void wal_hook (DBInterface iface, int n_pages) {
- // run in update thread
- var manager = (Data.Manager) iface.get_user_data ();
- var wal_iface = manager.get_wal_db_interface ();
-
- debug ("WAL: %d pages", n_pages);
-
- if (n_pages >= 10000) {
- // do immediate checkpointing (blocking updates)
- // to prevent excessive wal file growth
- wal_checkpoint (wal_iface, true);
- } else if (n_pages >= 1000 && checkpoint_pool != null) {
- if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) {
- // initiate asynchronous checkpointing (not blocking updates)
- try {
- checkpoint_pool.push (wal_iface);
- } catch (Error e) {
- warning (e.message);
- AtomicInt.set (ref checkpointing, 0);
- }
- }
- }
- }
-
- static void checkpoint_dispatch_cb (DBInterface iface) {
- // run in checkpoint thread
- wal_checkpoint (iface, false);
- AtomicInt.set (ref checkpointing, 0);
- }
-
- public static void init () {
+ public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
max_task_time = int.parse (max_task_time_env);
@@ -301,19 +73,12 @@ public class Tracker.Store {
max_task_time = MAX_TASK_TIME;
}
- running_tasks = new GenericArray<Task> ();
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = new Queue<Task> ();
- update_queues[i] = new Queue<Task> ();
- }
+ client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal);
try {
- update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true);
- query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true);
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
+ cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false);
} catch (Error e) {
- warning (e.message);
+ // Ignore harmless error
}
/* as the following settings are global for unknown reasons,
@@ -321,184 +86,187 @@ public class Tracker.Store {
are rather random */
ThreadPool.set_max_idle_time (15 * 1000);
ThreadPool.set_max_unused_threads (2);
+
+ config = config_p;
}
public static void shutdown () {
- query_pool = null;
- update_pool = null;
- checkpoint_pool = null;
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = null;
- update_queues[i] = null;
+ if (signal_timeout != 0) {
+ Source.remove (signal_timeout);
+ signal_timeout = 0;
}
}
- public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
- var task = new QueryTask ();
- task.type = TaskType.QUERY;
- task.query = sparql;
- task.cancellable = new Cancellable ();
- task.in_thread = in_thread;
- task.callback = sparql_query.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- query_queues[priority].push_tail (task);
+ private static Cancellable create_cancellable (string client_id) {
+ var client_cancellable = client_cancellables.lookup (client_id);
- sched ();
-
- yield;
-
- if (task.error != null) {
- throw task.error;
+ if (client_cancellable == null) {
+ client_cancellable = new Cancellable ();
+ client_cancellables.insert (client_id, client_cancellable);
}
- }
-
- public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update.callback;
- task.client_id = client_id;
- task.data_manager = manager;
- update_queues[priority].push_tail (task);
+ var task_cancellable = new Cancellable ();
+ client_cancellable.connect (() => {
+ task_cancellable.cancel ();
+ });
- sched ();
+ return task_cancellable;
+ }
- yield;
+ private static void do_emit_signals () {
+ signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
+ }
- if (task.error != null) {
- throw task.error;
+ private static void ensure_signal_timeout () {
+ if (signal_timeout == 0) {
+ signal_timeout = Timeout.add (config.graphupdated_delay, () => {
+ do_emit_signals ();
+ if (n_updates == 0) {
+ signal_timeout = 0;
+ return false;
+ } else {
+ return true;
+ }
+ });
}
}
- public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE_BLANK;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update_blank.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- update_queues[priority].push_tail (task);
-
- sched ();
+ public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error {
+ var cancellable = create_cancellable (client_id);
+ uint timeout_id = 0;
- yield;
-
- if (task.error != null) {
- throw task.error;
+ if (max_task_time != 0) {
+ timeout_id = Timeout.add_seconds (max_task_time, () => {
+ cancellable.cancel ();
+ timeout_id = 0;
+ return false;
+ });
}
- return task.blank_nodes;
- }
+ var cursor = yield conn.query_async (sparql, cancellable);
- public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
- var task = new TurtleTask ();
- task.type = TaskType.TURTLE;
- task.path = file.get_path ();
- task.callback = queue_turtle_import.callback;
- task.client_id = client_id;
- task.data_manager = manager;
+ if (timeout_id != 0)
+ GLib.Source.remove (timeout_id);
- update_queues[Priority.TURTLE].push_tail (task);
+ var task = new CursorTask (cursor);
+ task.thread_func = in_thread;
+ task.callback = sparql_query.callback;
- sched ();
+ try {
+ cursor_pool.add (task);
+ } catch (Error e) {
+ // Ignore harmless error
+ }
yield;
- if (task.error != null) {
+ if (task.error != null)
throw task.error;
- }
}
- public uint get_queue_size () {
- uint result = 0;
+ public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
+ n_updates++;
+ ensure_signal_timeout ();
+ var cancellable = create_cancellable (client_id);
+ yield conn.update_async (sparql, priority, cancellable);
+ n_updates--;
+ }
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- result += query_queues[i].get_length ();
- result += update_queues[i].get_length ();
- }
- return result;
+ public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
+ n_updates++;
+ ensure_signal_timeout ();
+ var cancellable = create_cancellable (client_id);
+ var nodes = yield conn.update_blank_async (sparql, priority, cancellable);
+ n_updates--;
+
+ return nodes;
+ }
+
+ public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
+ n_updates++;
+ ensure_signal_timeout ();
+ var cancellable = create_cancellable (client_id);
+ yield conn.load_async (file, cancellable);
+ n_updates--;
}
public static void unreg_batches (string client_id) {
- unowned List<Task> list, cur;
- unowned Queue<Task> queue;
-
- for (int i = 0; i < running_tasks.length; i++) {
- unowned QueryTask task = running_tasks[i] as QueryTask;
- if (task != null && task.client_id == client_id && task.cancellable != null) {
- task.cancellable.cancel ();
- }
+ Cancellable cancellable = client_cancellables.lookup (client_id);
+
+ if (cancellable != null) {
+ cancellable.cancel ();
+ client_cancellables.remove (client_id);
}
+ }
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- queue = query_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
+ public static async void pause () {
+ Tracker.Store.active = false;
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
+ }
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
+ public static void resume () {
+ Tracker.Store.active = true;
+ }
- queue = update_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
+ private static void on_statements_committed () {
+ Tracker.Events.transact ();
+ Tracker.Writeback.transact ();
+ check_graph_updated_signal ();
+ }
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
+ private static void on_statements_rolled_back () {
+ Tracker.Events.reset_pending ();
+ Tracker.Writeback.reset_pending ();
+ }
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
+ private static void check_graph_updated_signal () {
+ /* Check for whether we need an immediate emit */
+ if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
+ // immediately emit signals for already committed transaction
+ Idle.add (() => {
+ do_emit_signals ();
+ return false;
+ });
}
-
- sched ();
}
- public static async void pause () {
- Tracker.Store.active = false;
-
- if (n_queries_running > 0 || update_running) {
- active_callback = pause.callback;
- yield;
- active_callback = null;
- }
+ private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
- if (AtomicInt.get (ref checkpointing) != 0) {
- // this will wait for checkpointing to finish
- checkpoint_pool = null;
- try {
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
- } catch (Error e) {
- warning (e.message);
- }
- }
+ private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
- if (active) {
- sched ();
- }
+ public static void enable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.add_insert_statement_callback (on_statement_inserted);
+ data.add_delete_statement_callback (on_statement_deleted);
+ data.add_commit_statement_callback (on_statements_committed);
+ data.add_rollback_statement_callback (on_statements_rolled_back);
}
- public static void resume () {
- Tracker.Store.active = true;
+ public static void disable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.remove_insert_statement_callback (on_statement_inserted);
+ data.remove_delete_statement_callback (on_statement_deleted);
+ data.remove_commit_statement_callback (on_statements_committed);
+ data.remove_rollback_statement_callback (on_statements_rolled_back);
+ }
- sched ();
+ public static void set_signal_callback (SignalEmissionFunc? func) {
+ signal_callback = func;
}
}
diff --git a/src/tracker-store/tracker-writeback.c b/src/tracker-store/tracker-writeback.c
index a183b00a3..1edc3e2dd 100644
--- a/src/tracker-store/tracker-writeback.c
+++ b/src/tracker-store/tracker-writeback.c
@@ -27,8 +27,12 @@
#include "tracker-writeback.h"
typedef struct {
+ /* Accessed by updates thread */
GHashTable *allowances;
GHashTable *pending_events;
+
+ /* Accessed by both updates and dbus threads */
+ GMutex mutex;
GHashTable *ready_events;
} WritebackPrivate;
@@ -115,9 +119,16 @@ tracker_writeback_reset_ready ()
GHashTable *
tracker_writeback_get_ready (void)
{
+ GHashTable *events;
+
g_return_val_if_fail (private != NULL, NULL);
- return private->ready_events;
+ g_mutex_lock (&private->mutex);
+ events = private->ready_events;
+ private->ready_events = NULL;
+ g_mutex_unlock (&private->mutex);
+
+ return events;
}
static void
@@ -145,6 +156,7 @@ tracker_writeback_init (TrackerDataManager *data_manager,
g_return_if_fail (private == NULL);
private = g_new0 (WritebackPrivate, 1);
+ g_mutex_init (&private->mutex);
private->allowances = g_hash_table_new_full (g_direct_hash,
g_direct_equal,
@@ -191,6 +203,8 @@ tracker_writeback_transact (void)
if (!private->pending_events)
return;
+ g_mutex_lock (&private->mutex);
+
if (!private->ready_events) {
private->ready_events = g_hash_table_new_full (g_direct_hash, g_direct_equal,
(GDestroyNotify) NULL,
@@ -203,6 +217,8 @@ tracker_writeback_transact (void)
g_hash_table_insert (private->ready_events, key, value);
g_hash_table_iter_remove (&iter);
}
+
+ g_mutex_unlock (&private->mutex);
}
void
@@ -214,7 +230,8 @@ tracker_writeback_shutdown (void)
/* Perhaps hurry an emit of the ready events here? We're shutting down,
* so I guess we're not required to do that here ... ? */
- tracker_writeback_reset_ready ();
+ g_clear_pointer (&private->ready_events,
+ (GDestroyNotify) g_hash_table_unref);
free_private (private);
private = NULL;
diff --git a/src/tracker-store/tracker-writeback.vapi b/src/tracker-store/tracker-writeback.vapi
index 368de0303..7c48512b8 100644
--- a/src/tracker-store/tracker-writeback.vapi
+++ b/src/tracker-store/tracker-writeback.vapi
@@ -26,9 +26,8 @@ namespace Tracker {
public void init (Tracker.Data.Manager data_manager, WritebackGetPredicatesFunc callback);
public void shutdown ();
public void check (int graph_id, string graph, int subject_id, string subject, int pred_id, int object_id, string object, GLib.PtrArray rdf_types);
- public unowned GLib.HashTable<int, GLib.Array<int>> get_ready ();
+ public GLib.HashTable<int, GLib.Array<int>> get_ready ();
public void reset_pending ();
- public void reset_ready ();
public void transact ();
}
}
diff --git a/tests/libtracker-miner/tracker-file-notifier-test.c b/tests/libtracker-miner/tracker-file-notifier-test.c
index fd8264467..1bdc16085 100644
--- a/tests/libtracker-miner/tracker-file-notifier-test.c
+++ b/tests/libtracker-miner/tracker-file-notifier-test.c
@@ -668,6 +668,7 @@ test_file_notifier_monitor_updates_non_recursive (TestCommonContext *fixture,
{ OPERATION_CREATE, "non-recursive/bbb", NULL }
};
FilesystemOperation expected_results2[] = {
+ { OPERATION_CREATE, "non-recursive", NULL },
{ OPERATION_UPDATE, "non-recursive/bbb", NULL },
{ OPERATION_CREATE, "non-recursive/ccc", NULL },
{ OPERATION_UPDATE, "non-recursive/ccc", NULL }
@@ -716,6 +717,7 @@ test_file_notifier_monitor_updates_recursive (TestCommonContext *fixture,
{ OPERATION_CREATE, "recursive/bbb", NULL }
};
FilesystemOperation expected_results2[] = {
+ { OPERATION_CREATE, "recursive", NULL },
{ OPERATION_CREATE, "recursive/folder", NULL },
{ OPERATION_CREATE, "recursive/folder/aaa", NULL },
{ OPERATION_UPDATE, "recursive/bbb", NULL },