summaryrefslogtreecommitdiff
path: root/src/tracker-store/tracker-store.vala
diff options
context:
space:
mode:
Diffstat (limited to 'src/tracker-store/tracker-store.vala')
-rw-r--r--src/tracker-store/tracker-store.vala104
1 files changed, 103 insertions, 1 deletions
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a555c67b0..d4b4d9646 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -23,6 +23,7 @@ public class Tracker.Store {
const int MAX_CONCURRENT_QUERIES = 2;
const int MAX_TASK_TIME = 30;
+ const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
@@ -36,6 +37,10 @@ public class Tracker.Store {
static bool active;
static SourceFunc active_callback;
+ static Tracker.Config config;
+ static uint signal_timeout;
+ static int n_updates;
+
public enum Priority {
HIGH,
LOW,
@@ -50,6 +55,9 @@ public class Tracker.Store {
TURTLE,
}
+ public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
+ static unowned SignalEmissionFunc signal_callback;
+
public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
abstract class Task {
@@ -265,7 +273,7 @@ public class Tracker.Store {
AtomicInt.set (ref checkpointing, 0);
}
- public static void init () {
+ public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
max_task_time = int.parse (max_task_time_env);
@@ -293,6 +301,8 @@ public class Tracker.Store {
are rather random */
ThreadPool.set_max_idle_time (15 * 1000);
ThreadPool.set_max_unused_threads (2);
+
+ config = config_p;
}
public static void shutdown () {
@@ -304,6 +314,11 @@ public class Tracker.Store {
query_queues[i] = null;
update_queues[i] = null;
}
+
+ if (signal_timeout != 0) {
+ Source.remove (signal_timeout);
+ signal_timeout = 0;
+ }
}
public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
@@ -327,7 +342,28 @@ public class Tracker.Store {
}
}
+ private static void do_emit_signals () {
+ signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
+ }
+
+ private static void ensure_signal_timeout () {
+ if (signal_timeout == 0) {
+ signal_timeout = Timeout.add (config.graphupdated_delay, () => {
+ do_emit_signals ();
+ if (n_updates == 0) {
+ signal_timeout = 0;
+ return false;
+ } else {
+ return true;
+ }
+ });
+ }
+ }
+
public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE;
task.query = sparql;
@@ -342,12 +378,17 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
}
public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE_BLANK;
task.query = sparql;
@@ -362,6 +403,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -370,6 +413,9 @@ public class Tracker.Store {
}
public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new TurtleTask ();
task.type = TaskType.TURTLE;
task.path = file.get_path ();
@@ -383,6 +429,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -473,4 +521,58 @@ public class Tracker.Store {
sched ();
}
+
+ private static void on_statements_committed () {
+ Tracker.Events.transact ();
+ Tracker.Writeback.transact ();
+ check_graph_updated_signal ();
+ }
+
+ private static void on_statements_rolled_back () {
+ Tracker.Events.reset_pending ();
+ Tracker.Writeback.reset_pending ();
+ }
+
+ private static void check_graph_updated_signal () {
+ /* Check for whether we need an immediate emit */
+ if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
+ // immediately emit signals for already committed transaction
+ Idle.add (() => {
+ do_emit_signals ();
+ return false;
+ });
+ }
+ }
+
+ private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ public static void enable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.add_insert_statement_callback (on_statement_inserted);
+ data.add_delete_statement_callback (on_statement_deleted);
+ data.add_commit_statement_callback (on_statements_committed);
+ data.add_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void disable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.remove_insert_statement_callback (on_statement_inserted);
+ data.remove_delete_statement_callback (on_statement_deleted);
+ data.remove_commit_statement_callback (on_statements_committed);
+ data.remove_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void set_signal_callback (SignalEmissionFunc? func) {
+ signal_callback = func;
+ }
}