summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2017-11-25 14:39:48 +0100
committerCarlos Garnacho <carlosg@gnome.org>2018-07-20 18:27:32 +0200
commit1d59f03662b91a1ad6475333fbe778a38a38aa2b (patch)
treecc6bb140a0f61a1cd129966dcb801c0f88208035
parentadfa177cb1c38bf3452d16ea66195cf0fade4068 (diff)
downloadtracker-1d59f03662b91a1ad6475333fbe778a38a38aa2b.tar.gz
tracker-store: Push TrackerData hooks down to Tracker.Store
Move this out of the Resources object, which is basically a view of the internal Store object. All event accounting and signaling is now performed by the Store object, to which the Resources DBus object connects to in order to implement GraphUpdated and Writeback signals. Only one handler of these events is possible at the moment, would be nice to consider doing something marginally better on the Steroids interface at some point, at least wrt the amount of data sent through the bus. Instead of trying to schedule the timeout across threads (the TrackerData hooks run in the thread performing the updates, and we want signaling done from the main/dbus thread), the code now just sets up a timeout on the main thread that keeps running as long as there are pending updates. When the task for the last batched update returns, it will be safe for the timeout to do signaling one last time and turn itself down, all of this happening in the main thread.
-rw-r--r--src/tracker-store/tracker-backup.vala8
-rw-r--r--src/tracker-store/tracker-dbus.vala8
-rw-r--r--src/tracker-store/tracker-main.vala4
-rw-r--r--src/tracker-store/tracker-resources.vala89
-rw-r--r--src/tracker-store/tracker-store.vala104
5 files changed, 116 insertions, 97 deletions
diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala
index d589b227a..a9ede1b9a 100644
--- a/src/tracker-store/tracker-backup.vala
+++ b/src/tracker-store/tracker-backup.vala
@@ -25,7 +25,7 @@ public class Tracker.Backup : Object {
public async void save (BusName sender, string destination_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -58,7 +58,7 @@ public class Tracker.Backup : Object {
} finally {
if (resources != null) {
Tracker.Events.init ();
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
@@ -68,7 +68,7 @@ public class Tracker.Backup : Object {
public async void restore (BusName sender, string journal_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -96,7 +96,7 @@ public class Tracker.Backup : Object {
} finally {
if (resources != null) {
Tracker.Events.init ();
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala
index 35c1542e5..48197f4b9 100644
--- a/src/tracker-store/tracker-dbus.vala
+++ b/src/tracker-store/tracker-dbus.vala
@@ -34,7 +34,6 @@ public class Tracker.DBus {
static uint notifier_id;
static Tracker.Backup backup;
static uint backup_id;
- static Tracker.Config config;
static uint domain_watch_id;
static MainLoop watch_main_loop;
@@ -108,9 +107,8 @@ public class Tracker.DBus {
}
}
- public static bool init (Tracker.Config config_p) {
+ public static bool init () {
/* Don't reinitialize */
- config = config_p;
if (connection != null) {
return true;
}
@@ -216,7 +214,7 @@ public class Tracker.DBus {
statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH);
/* Add org.freedesktop.Tracker1.Resources */
- resources = new Tracker.Resources (connection, config);
+ resources = new Tracker.Resources (connection);
if (resources == null) {
critical ("Could not create TrackerResources object to register");
return false;
@@ -261,7 +259,7 @@ public class Tracker.DBus {
return false;
}
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
return true;
}
diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala
index d14b12c8e..63df072ff 100644
--- a/src/tracker-store/tracker-main.vala
+++ b/src/tracker-store/tracker-main.vala
@@ -239,7 +239,7 @@ License which can be viewed at:
sanity_check_option_values (config);
- if (!Tracker.DBus.init (config)) {
+ if (!Tracker.DBus.init ()) {
return 1;
}
@@ -258,7 +258,7 @@ License which can be viewed at:
var notifier = Tracker.DBus.register_notifier ();
- Tracker.Store.init ();
+ Tracker.Store.init (config);
/* Make Tracker available for introspection */
if (!Tracker.DBus.register_objects ()) {
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index c6a76e749..7322636d1 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -22,8 +22,6 @@
public class Tracker.Resources : Object {
public const string PATH = "/org/freedesktop/Tracker1/Resources";
- const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
-
/* I *know* that this is some arbitrary number that doesn't seem to
* resemble anything. In fact it's what I experimentally measured to
* be a good value on a default Debian testing which has
@@ -50,15 +48,13 @@ public class Tracker.Resources : Object {
const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000;
DBusConnection connection;
- uint signal_timeout;
- Tracker.Config config;
public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects);
public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts);
- public Resources (DBusConnection connection, Tracker.Config config_p) {
+ public Resources (DBusConnection connection) {
this.connection = connection;
- this.config = config_p;
+ Tracker.Store.set_signal_callback (on_emit_signals);
}
public async void load (BusName sender, string uri) throws Error {
@@ -246,9 +242,7 @@ public class Tracker.Resources : Object {
writeback (builder.end ());
}
- bool on_emit_signals () {
- var events = Tracker.Events.get_pending ();
-
+ void on_emit_signals (HashTable<Tracker.Class, Tracker.Events.Batch>? events, HashTable<int, GLib.Array<int>>? writebacks) {
if (events != null) {
var iter = HashTableIter<Tracker.Class, Tracker.Events.Batch> (events);
unowned Events.Batch class_events;
@@ -259,88 +253,13 @@ public class Tracker.Resources : Object {
}
}
- /* Writeback feature */
- var writebacks = Tracker.Writeback.get_ready ();
-
if (writebacks != null) {
emit_writeback (writebacks);
}
-
- signal_timeout = 0;
- return false;
- }
-
- void on_statements_committed () {
- Tracker.Events.transact ();
- Tracker.Writeback.transact ();
- check_graph_updated_signal ();
-
- if (signal_timeout == 0) {
- signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals);
- }
- }
-
- void on_statements_rolled_back () {
- Tracker.Events.reset_pending ();
- Tracker.Writeback.reset_pending ();
- }
-
- void check_graph_updated_signal () {
- /* Check for whether we need an immediate emit */
- if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
- // possibly active timeout no longer necessary as signals
- // for committed transactions will be emitted by the following on_emit_signals call
- // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
- }
-
- // immediately emit signals for already committed transaction
- Idle.add (() => {
- on_emit_signals ();
- return false;
- });
- }
- }
-
- void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- }
-
- void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- }
-
- [DBus (visible = false)]
- public void enable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.add_insert_statement_callback (on_statement_inserted);
- data.add_delete_statement_callback (on_statement_deleted);
- data.add_commit_statement_callback (on_statements_committed);
- data.add_rollback_statement_callback (on_statements_rolled_back);
- }
-
- [DBus (visible = false)]
- public void disable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.remove_insert_statement_callback (on_statement_inserted);
- data.remove_delete_statement_callback (on_statement_deleted);
- data.remove_commit_statement_callback (on_statements_committed);
- data.remove_rollback_statement_callback (on_statements_rolled_back);
-
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
- }
}
~Resources () {
- this.disable_signals ();
+ Tracker.Store.set_signal_callback (null);
}
[DBus (visible = false)]
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a555c67b0..d4b4d9646 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -23,6 +23,7 @@ public class Tracker.Store {
const int MAX_CONCURRENT_QUERIES = 2;
const int MAX_TASK_TIME = 30;
+ const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
@@ -36,6 +37,10 @@ public class Tracker.Store {
static bool active;
static SourceFunc active_callback;
+ static Tracker.Config config;
+ static uint signal_timeout;
+ static int n_updates;
+
public enum Priority {
HIGH,
LOW,
@@ -50,6 +55,9 @@ public class Tracker.Store {
TURTLE,
}
+ public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
+ static unowned SignalEmissionFunc signal_callback;
+
public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
abstract class Task {
@@ -265,7 +273,7 @@ public class Tracker.Store {
AtomicInt.set (ref checkpointing, 0);
}
- public static void init () {
+ public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
max_task_time = int.parse (max_task_time_env);
@@ -293,6 +301,8 @@ public class Tracker.Store {
are rather random */
ThreadPool.set_max_idle_time (15 * 1000);
ThreadPool.set_max_unused_threads (2);
+
+ config = config_p;
}
public static void shutdown () {
@@ -304,6 +314,11 @@ public class Tracker.Store {
query_queues[i] = null;
update_queues[i] = null;
}
+
+ if (signal_timeout != 0) {
+ Source.remove (signal_timeout);
+ signal_timeout = 0;
+ }
}
public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
@@ -327,7 +342,28 @@ public class Tracker.Store {
}
}
+ private static void do_emit_signals () {
+ signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
+ }
+
+ private static void ensure_signal_timeout () {
+ if (signal_timeout == 0) {
+ signal_timeout = Timeout.add (config.graphupdated_delay, () => {
+ do_emit_signals ();
+ if (n_updates == 0) {
+ signal_timeout = 0;
+ return false;
+ } else {
+ return true;
+ }
+ });
+ }
+ }
+
public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE;
task.query = sparql;
@@ -342,12 +378,17 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
}
public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE_BLANK;
task.query = sparql;
@@ -362,6 +403,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -370,6 +413,9 @@ public class Tracker.Store {
}
public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new TurtleTask ();
task.type = TaskType.TURTLE;
task.path = file.get_path ();
@@ -383,6 +429,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -473,4 +521,58 @@ public class Tracker.Store {
sched ();
}
+
+ private static void on_statements_committed () {
+ Tracker.Events.transact ();
+ Tracker.Writeback.transact ();
+ check_graph_updated_signal ();
+ }
+
+ private static void on_statements_rolled_back () {
+ Tracker.Events.reset_pending ();
+ Tracker.Writeback.reset_pending ();
+ }
+
+ private static void check_graph_updated_signal () {
+ /* Check for whether we need an immediate emit */
+ if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
+ // immediately emit signals for already committed transaction
+ Idle.add (() => {
+ do_emit_signals ();
+ return false;
+ });
+ }
+ }
+
+ private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ public static void enable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.add_insert_statement_callback (on_statement_inserted);
+ data.add_delete_statement_callback (on_statement_deleted);
+ data.add_commit_statement_callback (on_statements_committed);
+ data.add_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void disable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.remove_insert_statement_callback (on_statement_inserted);
+ data.remove_delete_statement_callback (on_statement_deleted);
+ data.remove_commit_statement_callback (on_statements_committed);
+ data.remove_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void set_signal_callback (SignalEmissionFunc? func) {
+ signal_callback = func;
+ }
}