diff options
Diffstat (limited to 'src/tracker-store/tracker-store.vala')
-rw-r--r-- | src/tracker-store/tracker-store.vala | 104 |
1 files changed, 103 insertions, 1 deletions
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a555c67b0..d4b4d9646 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -23,6 +23,7 @@ public class Tracker.Store { const int MAX_CONCURRENT_QUERIES = 2; const int MAX_TASK_TIME = 30; + const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; @@ -36,6 +37,10 @@ public class Tracker.Store { static bool active; static SourceFunc active_callback; + static Tracker.Config config; + static uint signal_timeout; + static int n_updates; + public enum Priority { HIGH, LOW, @@ -50,6 +55,9 @@ public class Tracker.Store { TURTLE, } + public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback); + static unowned SignalEmissionFunc signal_callback; + public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; abstract class Task { @@ -265,7 +273,7 @@ public class Tracker.Store { AtomicInt.set (ref checkpointing, 0); } - public static void init () { + public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { max_task_time = int.parse (max_task_time_env); @@ -293,6 +301,8 @@ public class Tracker.Store { are rather random */ ThreadPool.set_max_idle_time (15 * 1000); ThreadPool.set_max_unused_threads (2); + + config = config_p; } public static void shutdown () { @@ -304,6 +314,11 @@ public class Tracker.Store { query_queues[i] = null; update_queues[i] = null; } + + if (signal_timeout != 0) { + Source.remove (signal_timeout); + signal_timeout = 0; + } } public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { @@ -327,7 +342,28 @@ public class Tracker.Store { } } + private static void do_emit_signals () { + signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ()); + } + + private static void ensure_signal_timeout () { + if (signal_timeout == 0) { + signal_timeout = Timeout.add (config.graphupdated_delay, () => { + do_emit_signals (); + if (n_updates == 0) { + signal_timeout = 0; + return false; + } else { + return true; + } + }); + } + } + public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE; task.query = sparql; @@ -342,12 +378,17 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } } public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE_BLANK; task.query = sparql; @@ -362,6 +403,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -370,6 +413,9 @@ public class Tracker.Store { } public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new TurtleTask (); task.type = TaskType.TURTLE; task.path = file.get_path (); @@ -383,6 +429,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -473,4 +521,58 @@ public class Tracker.Store { sched (); } + + private static void on_statements_committed () { + Tracker.Events.transact (); + Tracker.Writeback.transact (); + check_graph_updated_signal (); + } + + private static void on_statements_rolled_back () { + Tracker.Events.reset_pending (); + Tracker.Writeback.reset_pending (); + } + + private static void check_graph_updated_signal () { + /* Check for whether we need an immediate emit */ + if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { + // immediately emit signals for already committed transaction + Idle.add (() => { + do_emit_signals (); + return false; + }); + } + } + + private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + public static void enable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.add_insert_statement_callback (on_statement_inserted); + data.add_delete_statement_callback (on_statement_deleted); + data.add_commit_statement_callback (on_statements_committed); + data.add_rollback_statement_callback (on_statements_rolled_back); + } + + public static void disable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.remove_insert_statement_callback (on_statement_inserted); + data.remove_delete_statement_callback (on_statement_deleted); + data.remove_commit_statement_callback (on_statements_committed); + data.remove_rollback_statement_callback (on_statements_rolled_back); + } + + public static void set_signal_callback (SignalEmissionFunc? func) { + signal_callback = func; + } } |