summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/tracker-store/tracker-backup.vala8
-rw-r--r--src/tracker-store/tracker-dbus.vala8
-rw-r--r--src/tracker-store/tracker-main.vala4
-rw-r--r--src/tracker-store/tracker-resources.vala89
-rw-r--r--src/tracker-store/tracker-store.vala104
5 files changed, 116 insertions, 97 deletions
diff --git a/src/tracker-store/tracker-backup.vala b/src/tracker-store/tracker-backup.vala
index d589b227a..a9ede1b9a 100644
--- a/src/tracker-store/tracker-backup.vala
+++ b/src/tracker-store/tracker-backup.vala
@@ -25,7 +25,7 @@ public class Tracker.Backup : Object {
public async void save (BusName sender, string destination_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -58,7 +58,7 @@ public class Tracker.Backup : Object {
} finally {
if (resources != null) {
Tracker.Events.init ();
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
@@ -68,7 +68,7 @@ public class Tracker.Backup : Object {
public async void restore (BusName sender, string journal_uri) throws Error {
var resources = (Resources) Tracker.DBus.get_object (typeof (Resources));
if (resources != null) {
- resources.disable_signals ();
+ Tracker.Store.disable_signals ();
Tracker.Events.shutdown ();
}
@@ -96,7 +96,7 @@ public class Tracker.Backup : Object {
} finally {
if (resources != null) {
Tracker.Events.init ();
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
}
Tracker.Store.resume ();
diff --git a/src/tracker-store/tracker-dbus.vala b/src/tracker-store/tracker-dbus.vala
index 35c1542e5..48197f4b9 100644
--- a/src/tracker-store/tracker-dbus.vala
+++ b/src/tracker-store/tracker-dbus.vala
@@ -34,7 +34,6 @@ public class Tracker.DBus {
static uint notifier_id;
static Tracker.Backup backup;
static uint backup_id;
- static Tracker.Config config;
static uint domain_watch_id;
static MainLoop watch_main_loop;
@@ -108,9 +107,8 @@ public class Tracker.DBus {
}
}
- public static bool init (Tracker.Config config_p) {
+ public static bool init () {
/* Don't reinitialize */
- config = config_p;
if (connection != null) {
return true;
}
@@ -216,7 +214,7 @@ public class Tracker.DBus {
statistics_id = register_object (connection, statistics, Tracker.Statistics.PATH);
/* Add org.freedesktop.Tracker1.Resources */
- resources = new Tracker.Resources (connection, config);
+ resources = new Tracker.Resources (connection);
if (resources == null) {
critical ("Could not create TrackerResources object to register");
return false;
@@ -261,7 +259,7 @@ public class Tracker.DBus {
return false;
}
- resources.enable_signals ();
+ Tracker.Store.enable_signals ();
return true;
}
diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala
index d14b12c8e..63df072ff 100644
--- a/src/tracker-store/tracker-main.vala
+++ b/src/tracker-store/tracker-main.vala
@@ -239,7 +239,7 @@ License which can be viewed at:
sanity_check_option_values (config);
- if (!Tracker.DBus.init (config)) {
+ if (!Tracker.DBus.init ()) {
return 1;
}
@@ -258,7 +258,7 @@ License which can be viewed at:
var notifier = Tracker.DBus.register_notifier ();
- Tracker.Store.init ();
+ Tracker.Store.init (config);
/* Make Tracker available for introspection */
if (!Tracker.DBus.register_objects ()) {
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index c6a76e749..7322636d1 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -22,8 +22,6 @@
public class Tracker.Resources : Object {
public const string PATH = "/org/freedesktop/Tracker1/Resources";
- const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
-
/* I *know* that this is some arbitrary number that doesn't seem to
* resemble anything. In fact it's what I experimentally measured to
* be a good value on a default Debian testing which has
@@ -50,15 +48,13 @@ public class Tracker.Resources : Object {
const int DBUS_ARBITRARY_MAX_MSG_SIZE = 10000000;
DBusConnection connection;
- uint signal_timeout;
- Tracker.Config config;
public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects);
public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts);
- public Resources (DBusConnection connection, Tracker.Config config_p) {
+ public Resources (DBusConnection connection) {
this.connection = connection;
- this.config = config_p;
+ Tracker.Store.set_signal_callback (on_emit_signals);
}
public async void load (BusName sender, string uri) throws Error {
@@ -246,9 +242,7 @@ public class Tracker.Resources : Object {
writeback (builder.end ());
}
- bool on_emit_signals () {
- var events = Tracker.Events.get_pending ();
-
+ void on_emit_signals (HashTable<Tracker.Class, Tracker.Events.Batch>? events, HashTable<int, GLib.Array<int>>? writebacks) {
if (events != null) {
var iter = HashTableIter<Tracker.Class, Tracker.Events.Batch> (events);
unowned Events.Batch class_events;
@@ -259,88 +253,13 @@ public class Tracker.Resources : Object {
}
}
- /* Writeback feature */
- var writebacks = Tracker.Writeback.get_ready ();
-
if (writebacks != null) {
emit_writeback (writebacks);
}
-
- signal_timeout = 0;
- return false;
- }
-
- void on_statements_committed () {
- Tracker.Events.transact ();
- Tracker.Writeback.transact ();
- check_graph_updated_signal ();
-
- if (signal_timeout == 0) {
- signal_timeout = Timeout.add (config.graphupdated_delay, on_emit_signals);
- }
- }
-
- void on_statements_rolled_back () {
- Tracker.Events.reset_pending ();
- Tracker.Writeback.reset_pending ();
- }
-
- void check_graph_updated_signal () {
- /* Check for whether we need an immediate emit */
- if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
- // possibly active timeout no longer necessary as signals
- // for committed transactions will be emitted by the following on_emit_signals call
- // do this before actually calling on_emit_signals as on_emit_signals sets signal_timeout to 0
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
- }
-
- // immediately emit signals for already committed transaction
- Idle.add (() => {
- on_emit_signals ();
- return false;
- });
- }
- }
-
- void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- }
-
- void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
- Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
- Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
- }
-
- [DBus (visible = false)]
- public void enable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.add_insert_statement_callback (on_statement_inserted);
- data.add_delete_statement_callback (on_statement_deleted);
- data.add_commit_statement_callback (on_statements_committed);
- data.add_rollback_statement_callback (on_statements_rolled_back);
- }
-
- [DBus (visible = false)]
- public void disable_signals () {
- var data_manager = Tracker.Main.get_data_manager ();
- var data = data_manager.get_data ();
- data.remove_insert_statement_callback (on_statement_inserted);
- data.remove_delete_statement_callback (on_statement_deleted);
- data.remove_commit_statement_callback (on_statements_committed);
- data.remove_rollback_statement_callback (on_statements_rolled_back);
-
- if (signal_timeout != 0) {
- Source.remove (signal_timeout);
- signal_timeout = 0;
- }
}
~Resources () {
- this.disable_signals ();
+ Tracker.Store.set_signal_callback (null);
}
[DBus (visible = false)]
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a555c67b0..d4b4d9646 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -23,6 +23,7 @@ public class Tracker.Store {
const int MAX_CONCURRENT_QUERIES = 2;
const int MAX_TASK_TIME = 30;
+ const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
@@ -36,6 +37,10 @@ public class Tracker.Store {
static bool active;
static SourceFunc active_callback;
+ static Tracker.Config config;
+ static uint signal_timeout;
+ static int n_updates;
+
public enum Priority {
HIGH,
LOW,
@@ -50,6 +55,9 @@ public class Tracker.Store {
TURTLE,
}
+ public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
+ static unowned SignalEmissionFunc signal_callback;
+
public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
abstract class Task {
@@ -265,7 +273,7 @@ public class Tracker.Store {
AtomicInt.set (ref checkpointing, 0);
}
- public static void init () {
+ public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
max_task_time = int.parse (max_task_time_env);
@@ -293,6 +301,8 @@ public class Tracker.Store {
are rather random */
ThreadPool.set_max_idle_time (15 * 1000);
ThreadPool.set_max_unused_threads (2);
+
+ config = config_p;
}
public static void shutdown () {
@@ -304,6 +314,11 @@ public class Tracker.Store {
query_queues[i] = null;
update_queues[i] = null;
}
+
+ if (signal_timeout != 0) {
+ Source.remove (signal_timeout);
+ signal_timeout = 0;
+ }
}
public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
@@ -327,7 +342,28 @@ public class Tracker.Store {
}
}
+ private static void do_emit_signals () {
+ signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
+ }
+
+ private static void ensure_signal_timeout () {
+ if (signal_timeout == 0) {
+ signal_timeout = Timeout.add (config.graphupdated_delay, () => {
+ do_emit_signals ();
+ if (n_updates == 0) {
+ signal_timeout = 0;
+ return false;
+ } else {
+ return true;
+ }
+ });
+ }
+ }
+
public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE;
task.query = sparql;
@@ -342,12 +378,17 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
}
public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE_BLANK;
task.query = sparql;
@@ -362,6 +403,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -370,6 +413,9 @@ public class Tracker.Store {
}
public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new TurtleTask ();
task.type = TaskType.TURTLE;
task.path = file.get_path ();
@@ -383,6 +429,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -473,4 +521,58 @@ public class Tracker.Store {
sched ();
}
+
+ private static void on_statements_committed () {
+ Tracker.Events.transact ();
+ Tracker.Writeback.transact ();
+ check_graph_updated_signal ();
+ }
+
+ private static void on_statements_rolled_back () {
+ Tracker.Events.reset_pending ();
+ Tracker.Writeback.reset_pending ();
+ }
+
+ private static void check_graph_updated_signal () {
+ /* Check for whether we need an immediate emit */
+ if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
+ // immediately emit signals for already committed transaction
+ Idle.add (() => {
+ do_emit_signals ();
+ return false;
+ });
+ }
+ }
+
+ private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ public static void enable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.add_insert_statement_callback (on_statement_inserted);
+ data.add_delete_statement_callback (on_statement_deleted);
+ data.add_commit_statement_callback (on_statements_committed);
+ data.add_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void disable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.remove_insert_statement_callback (on_statement_inserted);
+ data.remove_delete_statement_callback (on_statement_deleted);
+ data.remove_commit_statement_callback (on_statements_committed);
+ data.remove_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void set_signal_callback (SignalEmissionFunc? func) {
+ signal_callback = func;
+ }
}