diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/tracker-store/tracker-backup.vala | 8 | ||||
-rw-r--r-- | src/tracker-store/tracker-dbus.vala | 8 | ||||
-rw-r--r-- | src/tracker-store/tracker-main.vala | 4 | ||||
-rw-r--r-- | src/tracker-store/tracker-resources.vala | 89 | ||||
-rw-r--r-- | src/tracker-store/tracker-store.vala | 104 |
5 files changed, 116 insertions, 97 deletions
diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala index d589b227a..a9ede1b9a 100644 --- a/src/tracker-store/tracker-backup.vala +++ b/src/tracker-store/tracker-backup.vala @@ -25,7 +25,7 @@ public class Tracker.Backup : Object { public async void save (BusName sender, string destination_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -58,7 +58,7 @@ public class Tracker.Backup : Object { } finally { if (resources != null) { Tracker.Events.init (); - resources.enable_signals (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); @@ -68,7 +68,7 @@ public class Tracker.Backup : Object { public async void restore (BusName sender, string journal_uri) throws Error { var resources = (Resources) Tracker.DBus.get_object (typeof (Resources)); if (resources != null) { - resources.disable_signals (); + Tracker.Store.disable_signals (); Tracker.Events.shutdown (); } @@ -96,7 +96,7 @@ public class Tracker.Backup : Object { } finally { if (resources != null) { Tracker.Events.init (); - resources.enable_signals (); + Tracker.Store.enable_signals (); } Tracker.Store.resume (); diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala index 35c1542e5..48197f4b9 100644 --- a/src/tracker-store/tracker-dbus.vala +++ b/src/tracker-store/tracker-dbus.vala @@ -34,7 +34,6 @@ public class Tracker.DBus { static uint notifier_id; static Tracker.Backup backup; static uint backup_id; - static Tracker.Config config; static uint domain_watch_id; static MainLoop watch_main_loop; @@ -108,9 +107,8 @@ public class Tracker.DBus { } } - public static bool init (Tracker.Config config_p) { + public static bool init () { /* Don't reinitialize */ - config = config_p; if (connection != null) { return true; } @@ -216,7 +214,7 @@ public class Tracker.DBus { statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH); /* Add org.freedesktop.Tracker1.Resources */ - resources = new Tracker.Resources (connection, config); + resources = new Tracker.Resources (connection); if (resources == null) { critical ("Could not create TrackerResources object to register"); return false; @@ -261,7 +259,7 @@ public class Tracker.DBus { return false; } - resources.enable_signals (); + Tracker.Store.enable_signals (); return true; } diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index d14b12c8e..63df072ff 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -239,7 +239,7 @@ License which can be viewed at: sanity_check_option_values (config); - if (!Tracker.DBus.init (config)) { + if (!Tracker.DBus.init ()) { return 1; } @@ -258,7 +258,7 @@ License which can be viewed at: var notifier = Tracker.DBus.register_notifier (); - Tracker.Store.init (); + Tracker.Store.init (config); /* Make Tracker available for introspection */ if (!Tracker.DBus.register_objects ()) { diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index c6a76e749..7322636d1 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -22,8 +22,6 @@ public class Tracker.Resources : Object { public const string PATH = "/org/freedesktop/Tracker1/Resources"; - const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - /* I *know* that this is some arbitrary number that doesn't seem to * resemble anything. In fact it's what I experimentally measured to * be a good value on a default Debian testing which has @@ -50,15 +48,13 @@ public class Tracker.Resources : Object { const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000; DBusConnection connection; - uint signal_timeout; - Tracker.Config config; public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects); public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts); - public Resources (DBusConnection connection, Tracker.Config config_p) { + public Resources (DBusConnection connection) { this.connection = connection; - this.config = config_p; + Tracker.Store.set_signal_callback (on_emit_signals); } public async void load (BusName sender, string uri) throws Error { @@ -246,9 +242,7 @@ public class Tracker.Resources : Object { writeback (builder.end ()); } - bool on_emit_signals () { - var events = Tracker.Events.get_pending (); - + void on_emit_signals (HashTable<Tracker.Class, Tracker.Events.Batch>? events, HashTable<int, GLib.Array<int>>? writebacks) { if (events != null) { var iter = HashTableIter<Tracker.Class, Tracker.Events.Batch> (events); unowned Events.Batch class_events; @@ -259,88 +253,13 @@ public class Tracker.Resources : Object { } } - /* Writeback feature */ - var writebacks = Tracker.Writeback.get_ready (); - if (writebacks != null) { emit_writeback (writebacks); } - - signal_timeout = 0; - return false; - } - - void on_statements_committed () { - Tracker.Events.transact (); - Tracker.Writeback.transact (); - check_graph_updated_signal (); - - if (signal_timeout == 0) { - signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals); - } - } - - void on_statements_rolled_back () { - Tracker.Events.reset_pending (); - Tracker.Writeback.reset_pending (); - } - - void check_graph_updated_signal () { - /* Check for whether we need an immediate emit */ - if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { - // possibly active timeout no longer necessary as signals - // for committed transactions will be emitted by the following on_emit_signals call - // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0 - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } - - // immediately emit signals for already committed transaction - Idle.add (() => { - on_emit_signals (); - return false; - }); - } - } - - void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - } - - void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { - Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); - Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); - } - - [DBus (visible = false)] - public void enable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.add_insert_statement_callback (on_statement_inserted); - data.add_delete_statement_callback (on_statement_deleted); - data.add_commit_statement_callback (on_statements_committed); - data.add_rollback_statement_callback (on_statements_rolled_back); - } - - [DBus (visible = false)] - public void disable_signals () { - var data_manager = Tracker.Main.get_data_manager (); - var data = data_manager.get_data (); - data.remove_insert_statement_callback (on_statement_inserted); - data.remove_delete_statement_callback (on_statement_deleted); - data.remove_commit_statement_callback (on_statements_committed); - data.remove_rollback_statement_callback (on_statements_rolled_back); - - if (signal_timeout != 0) { - Source.remove (signal_timeout); - signal_timeout = 0; - } } ~Resources () { - this.disable_signals (); + Tracker.Store.set_signal_callback (null); } [DBus (visible = false)] diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a555c67b0..d4b4d9646 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -23,6 +23,7 @@ public class Tracker.Store { const int MAX_CONCURRENT_QUERIES = 2; const int MAX_TASK_TIME = 30; + const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; @@ -36,6 +37,10 @@ public class Tracker.Store { static bool active; static SourceFunc active_callback; + static Tracker.Config config; + static uint signal_timeout; + static int n_updates; + public enum Priority { HIGH, LOW, @@ -50,6 +55,9 @@ public class Tracker.Store { TURTLE, } + public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback); + static unowned SignalEmissionFunc signal_callback; + public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; abstract class Task { @@ -265,7 +273,7 @@ public class Tracker.Store { AtomicInt.set (ref checkpointing, 0); } - public static void init () { + public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { max_task_time = int.parse (max_task_time_env); @@ -293,6 +301,8 @@ public class Tracker.Store { are rather random */ ThreadPool.set_max_idle_time (15 * 1000); ThreadPool.set_max_unused_threads (2); + + config = config_p; } public static void shutdown () { @@ -304,6 +314,11 @@ public class Tracker.Store { query_queues[i] = null; update_queues[i] = null; } + + if (signal_timeout != 0) { + Source.remove (signal_timeout); + signal_timeout = 0; + } } public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { @@ -327,7 +342,28 @@ public class Tracker.Store { } } + private static void do_emit_signals () { + signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ()); + } + + private static void ensure_signal_timeout () { + if (signal_timeout == 0) { + signal_timeout = Timeout.add (config.graphupdated_delay, () => { + do_emit_signals (); + if (n_updates == 0) { + signal_timeout = 0; + return false; + } else { + return true; + } + }); + } + } + public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE; task.query = sparql; @@ -342,12 +378,17 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } } public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new UpdateTask (); task.type = TaskType.UPDATE_BLANK; task.query = sparql; @@ -362,6 +403,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -370,6 +413,9 @@ public class Tracker.Store { } public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + n_updates++; + ensure_signal_timeout (); + var task = new TurtleTask (); task.type = TaskType.TURTLE; task.path = file.get_path (); @@ -383,6 +429,8 @@ public class Tracker.Store { yield; + n_updates--; + if (task.error != null) { throw task.error; } @@ -473,4 +521,58 @@ public class Tracker.Store { sched (); } + + private static void on_statements_committed () { + Tracker.Events.transact (); + Tracker.Writeback.transact (); + check_graph_updated_signal (); + } + + private static void on_statements_rolled_back () { + Tracker.Events.reset_pending (); + Tracker.Writeback.reset_pending (); + } + + private static void check_graph_updated_signal () { + /* Check for whether we need an immediate emit */ + if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) { + // immediately emit signals for already committed transaction + Idle.add (() => { + do_emit_signals (); + return false; + }); + } + } + + private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) { + Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types); + Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types); + } + + public static void enable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.add_insert_statement_callback (on_statement_inserted); + data.add_delete_statement_callback (on_statement_deleted); + data.add_commit_statement_callback (on_statements_committed); + data.add_rollback_statement_callback (on_statements_rolled_back); + } + + public static void disable_signals () { + var data_manager = Tracker.Main.get_data_manager (); + var data = data_manager.get_data (); + data.remove_insert_statement_callback (on_statement_inserted); + data.remove_delete_statement_callback (on_statement_deleted); + data.remove_commit_statement_callback (on_statements_committed); + data.remove_rollback_statement_callback (on_statements_rolled_back); + } + + public static void set_signal_callback (SignalEmissionFunc? func) { + signal_callback = func; + } } |