summaryrefslogtreecommitdiff
path: root/src/tracker-store/tracker-store.vala
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2017-11-25 14:39:48 +0100
committerCarlos Garnacho <carlosg@gnome.org>2018-07-20 18:27:32 +0200
commit1d59f03662b91a1ad6475333fbe778a38a38aa2b (patch)
treecc6bb140a0f61a1cd129966dcb801c0f88208035 /src/tracker-store/tracker-store.vala
parentadfa177cb1c38bf3452d16ea66195cf0fade4068 (diff)
downloadtracker-1d59f03662b91a1ad6475333fbe778a38a38aa2b.tar.gz
tracker-store: Push TrackerData hooks down to Tracker.Store
Move this out of the Resources object, which is basically a view of the internal Store object. All event accounting and signaling is now performed by the Store object, to which the Resources DBus object connects to in order to implement GraphUpdated and Writeback signals. Only one handler of these events is possible at the moment, would be nice to consider doing something marginally better on the Steroids interface at some point, at least wrt the amount of data sent through the bus. Instead of trying to schedule the timeout across threads (the TrackerData hooks run in the thread performing the updates, and we want signaling done from the main/dbus thread), the code now just sets up a timeout on the main thread that keeps running as long as there are pending updates. When the task for the last batched update returns, it will be safe for the timeout to do signaling one last time and turn itself down, all of this happening in the main thread.
Diffstat (limited to 'src/tracker-store/tracker-store.vala')
-rw-r--r--src/tracker-store/tracker-store.vala104
1 files changed, 103 insertions, 1 deletions
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a555c67b0..d4b4d9646 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -23,6 +23,7 @@ public class Tracker.Store {
const int MAX_CONCURRENT_QUERIES = 2;
const int MAX_TASK_TIME = 30;
+ const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
@@ -36,6 +37,10 @@ public class Tracker.Store {
static bool active;
static SourceFunc active_callback;
+ static Tracker.Config config;
+ static uint signal_timeout;
+ static int n_updates;
+
public enum Priority {
HIGH,
LOW,
@@ -50,6 +55,9 @@ public class Tracker.Store {
TURTLE,
}
+ public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
+ static unowned SignalEmissionFunc signal_callback;
+
public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
abstract class Task {
@@ -265,7 +273,7 @@ public class Tracker.Store {
AtomicInt.set (ref checkpointing, 0);
}
- public static void init () {
+ public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
max_task_time = int.parse (max_task_time_env);
@@ -293,6 +301,8 @@ public class Tracker.Store {
are rather random */
ThreadPool.set_max_idle_time (15 * 1000);
ThreadPool.set_max_unused_threads (2);
+
+ config = config_p;
}
public static void shutdown () {
@@ -304,6 +314,11 @@ public class Tracker.Store {
query_queues[i] = null;
update_queues[i] = null;
}
+
+ if (signal_timeout != 0) {
+ Source.remove (signal_timeout);
+ signal_timeout = 0;
+ }
}
public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
@@ -327,7 +342,28 @@ public class Tracker.Store {
}
}
+ private static void do_emit_signals () {
+ signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
+ }
+
+ private static void ensure_signal_timeout () {
+ if (signal_timeout == 0) {
+ signal_timeout = Timeout.add (config.graphupdated_delay, () => {
+ do_emit_signals ();
+ if (n_updates == 0) {
+ signal_timeout = 0;
+ return false;
+ } else {
+ return true;
+ }
+ });
+ }
+ }
+
public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE;
task.query = sparql;
@@ -342,12 +378,17 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
}
public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new UpdateTask ();
task.type = TaskType.UPDATE_BLANK;
task.query = sparql;
@@ -362,6 +403,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -370,6 +413,9 @@ public class Tracker.Store {
}
public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ n_updates++;
+ ensure_signal_timeout ();
+
var task = new TurtleTask ();
task.type = TaskType.TURTLE;
task.path = file.get_path ();
@@ -383,6 +429,8 @@ public class Tracker.Store {
yield;
+ n_updates--;
+
if (task.error != null) {
throw task.error;
}
@@ -473,4 +521,58 @@ public class Tracker.Store {
sched ();
}
+
+ private static void on_statements_committed () {
+ Tracker.Events.transact ();
+ Tracker.Writeback.transact ();
+ check_graph_updated_signal ();
+ }
+
+ private static void on_statements_rolled_back () {
+ Tracker.Events.reset_pending ();
+ Tracker.Writeback.reset_pending ();
+ }
+
+ private static void check_graph_updated_signal () {
+ /* Check for whether we need an immediate emit */
+ if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
+ // immediately emit signals for already committed transaction
+ Idle.add (() => {
+ do_emit_signals ();
+ return false;
+ });
+ }
+ }
+
+ private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
+ Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
+ Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
+ }
+
+ public static void enable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.add_insert_statement_callback (on_statement_inserted);
+ data.add_delete_statement_callback (on_statement_deleted);
+ data.add_commit_statement_callback (on_statements_committed);
+ data.add_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void disable_signals () {
+ var data_manager = Tracker.Main.get_data_manager ();
+ var data = data_manager.get_data ();
+ data.remove_insert_statement_callback (on_statement_inserted);
+ data.remove_delete_statement_callback (on_statement_deleted);
+ data.remove_commit_statement_callback (on_statements_committed);
+ data.remove_rollback_statement_callback (on_statements_rolled_back);
+ }
+
+ public static void set_signal_callback (SignalEmissionFunc? func) {
+ signal_callback = func;
+ }
}