summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarlos Garnacho <carlosg@gnome.org>2017-11-25 15:21:42 +0100
committerCarlos Garnacho <carlosg@gnome.org>2018-07-20 18:27:32 +0200
commit693f89de16b44efd3b8663c9c493762f51de3a3b (patch)
treeded56a036b19124f91269b1d6c21c1c792f63558
parent09fae200db0e0cba4fc91c7d50e443e1da08e328 (diff)
downloadtracker-693f89de16b44efd3b8663c9c493762f51de3a3b.tar.gz
tracker-store: Use TrackerDirectConnection underneath
Instead of the lower level TrackerDataManager object directly. The only additional thing that tracker-store does is signal emission for writeback and GraphUpdated, the internal TrackerDataManager object is still accessed to implement those features. This makes libtracker-direct the only place where queries/updates are queued, performed and dispatched. There's other indirect benefit from this, update queue handling no longer needs to hit the main thread in order to schedule the next update. Besides the very unlikely thread contention situations described in previous commits, this should maximize throughput of the updates queue.
-rw-r--r--src/tracker-store/Makefile.am2
-rw-r--r--src/tracker-store/tracker-main.vala39
-rw-r--r--src/tracker-store/tracker-resources.vala26
-rw-r--r--src/tracker-store/tracker-steroids.vala28
-rw-r--r--src/tracker-store/tracker-store.vala446
5 files changed, 117 insertions, 424 deletions
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index dc5df50c3..2463cb4ab 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \
$(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \
$(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \
$(top_srcdir)/src/libtracker-data/libtracker-data.vapi \
+ $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \
$(top_srcdir)/src/tracker-store/tracker-config.vapi \
$(top_srcdir)/src/tracker-store/tracker-events.vapi \
$(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \
@@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \
tracker_store_LDADD = \
$(top_builddir)/src/libtracker-data/libtracker-data.la \
+ $(top_builddir)/src/libtracker-direct/libtracker-direct.la \
$(top_builddir)/src/libtracker-common/libtracker-common.la \
$(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \
$(BUILD_LIBS) \
diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala
index 63df072ff..861aed2eb 100644
--- a/src/tracker-store/tracker-main.vala
+++ b/src/tracker-store/tracker-main.vala
@@ -39,6 +39,7 @@ License which can be viewed at:
static bool shutdown;
+ static Tracker.Direct.Connection connection;
static Tracker.Data.Manager data_manager;
/* Private command line parameters */
@@ -175,6 +176,10 @@ License which can be viewed at:
return data_manager;
}
+ public static unowned Tracker.Direct.Connection get_sparql_connection () {
+ return connection;
+ }
+
static int main (string[] args) {
Intl.setlocale (LocaleCategory.ALL, "");
@@ -256,6 +261,8 @@ License which can be viewed at:
flags |= DBManagerFlags.FORCE_REINDEX;
}
+ Tracker.Direct.Connection.set_default_flags (flags);
+
var notifier = Tracker.DBus.register_notifier ();
Tracker.Store.init (config);
@@ -281,38 +288,18 @@ License which can be viewed at:
Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to);
- int select_cache_size, update_cache_size;
- string cache_size_s;
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- select_cache_size = int.parse (cache_size_s);
- } else {
- select_cache_size = SELECT_CACHE_SIZE;
- }
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- update_cache_size = int.parse (cache_size_s);
- } else {
- update_cache_size = UPDATE_CACHE_SIZE;
- }
-
try {
- data_manager = new Tracker.Data.Manager (flags,
- cache_location,
- data_location,
- ontology_location,
- true,
- false,
- select_cache_size,
- update_cache_size);
- data_manager.init (null);
+ connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE,
+ cache_location,
+ data_location,
+ ontology_location);
+ connection.init (null);
} catch (GLib.Error e) {
critical ("Cannot initialize database: %s", e.message);
return 1;
}
+ data_manager = connection.get_data_manager ();
db_config = null;
notifier = null;
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index 7322636d1..5cfab8bb2 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -61,9 +61,9 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri);
try {
var file = File.new_for_uri (uri);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.queue_turtle_import (data_manager, file, sender);
+ yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -84,9 +84,9 @@ public class Tracker.Resources : Object {
request.debug ("query: %s", query);
try {
var builder = new VariantBuilder ((VariantType) "aas");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
while (cursor.next ()) {
builder.open ((VariantType) "as");
@@ -126,8 +126,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -147,8 +147,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender);
request.end ();
@@ -169,10 +169,10 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.Sync");
var data_manager = Tracker.Main.get_data_manager ();
var data = data_manager.get_data ();
- var iface = data_manager.get_writable_db_interface ();
- // wal checkpoint implies sync
- Tracker.Store.wal_checkpoint (iface, true);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
+
// sync journal if available
data.sync ();
@@ -183,8 +183,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala
index 40679bf14..1eb7bef05 100644
--- a/src/tracker-store/tracker-steroids.vala
+++ b/src/tracker-store/tracker-steroids.vala
@@ -29,9 +29,9 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query);
try {
string[] variable_names = null;
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE));
data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN);
@@ -90,10 +90,10 @@ public class Tracker.Steroids : Object {
}
}
- async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error {
+ async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error {
var request = DBusRequest.begin (sender,
"Steroids.%sUpdate%s",
- priority != Tracker.Store.Priority.HIGH ? "Batch" : "",
+ priority != Priority.HIGH ? "Batch" : "",
blank ? "Blank" : "");
try {
size_t bytes_read;
@@ -112,16 +112,16 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
request.debug ("query: %s", (string) query);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
if (!blank) {
- yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender);
request.end ();
return null;
} else {
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender);
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender);
request.end ();
@@ -140,21 +140,21 @@ public class Tracker.Steroids : Object {
}
public async void update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream);
+ yield update_internal (sender, Priority.HIGH, false, input_stream);
}
public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream);
+ yield update_internal (sender, Priority.LOW, false, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream);
+ return yield update_internal (sender, Priority.HIGH, true, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream);
+ return yield update_internal (sender, Priority.LOW, true, input_stream);
}
[DBus (signature = "as")]
@@ -188,11 +188,11 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
var builder = new VariantBuilder ((VariantType) "as");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
// first try combined query for best possible performance
try {
- yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender);
// combined query was successful
for (i = 0; i < query_count; i++) {
@@ -213,7 +213,7 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query_array[i]);
try {
- yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender);
builder.add ("s", "");
builder.add ("s", "");
} catch (Error e1) {
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a373e6155..03cc8078a 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -25,244 +25,46 @@ public class Tracker.Store {
const int MAX_TASK_TIME = 30;
const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
- static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static int n_queries_running;
- static bool update_running;
- static ThreadPool<Task> update_pool;
- static ThreadPool<Task> query_pool;
- static ThreadPool<DBInterface> checkpoint_pool;
- static GenericArray<Task> running_tasks;
static int max_task_time;
static bool active;
- static SourceFunc active_callback;
static Tracker.Config config;
static uint signal_timeout;
static int n_updates;
- public enum Priority {
- HIGH,
- LOW,
- TURTLE,
- N_PRIORITIES
- }
-
- enum TaskType {
- QUERY,
- UPDATE,
- UPDATE_BLANK,
- TURTLE,
- }
+ static HashTable<string, Cancellable> client_cancellables;
public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
static unowned SignalEmissionFunc signal_callback;
- public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
+ public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error;
- abstract class Task {
- public TaskType type;
- public string client_id;
+ class CursorTask {
+ public Sparql.Cursor cursor;
+ public unowned SourceFunc callback;
+ public unowned SparqlQueryInThread thread_func;
public Error error;
- public SourceFunc callback;
- public Tracker.Data.Manager data_manager;
- }
-
- class QueryTask : Task {
- public string query;
- public Cancellable cancellable;
- public uint watchdog_id;
- public unowned SparqlQueryInThread in_thread;
-
- ~QueryTask () {
- if (watchdog_id > 0) {
- Source.remove (watchdog_id);
- }
- }
- }
-
- class UpdateTask : Task {
- public string query;
- public Variant blank_nodes;
- public Priority priority;
- }
-
- class TurtleTask : Task {
- public string path;
- }
-
- static void sched () {
- Task task = null;
-
- if (!active) {
- return;
- }
- while (n_queries_running < MAX_CONCURRENT_QUERIES) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = query_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task == null) {
- /* no pending query */
- break;
- }
- running_tasks.add (task);
-
- if (max_task_time != 0) {
- var query_task = (QueryTask) task;
- query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => {
- query_task.cancellable.cancel ();
- query_task.watchdog_id = 0;
- return false;
- });
- }
-
- n_queries_running++;
- try {
- query_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
-
- if (!update_running) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = update_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task != null) {
- update_running = true;
- try {
- update_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
+ public CursorTask (Sparql.Cursor cursor) {
+ this.cursor = cursor;
}
}
- static bool task_finish_cb (Task task) {
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- if (task.error == null &&
- query_task.cancellable != null &&
- query_task.cancellable.is_cancelled ()) {
- task.error = new IOError.CANCELLED ("Operation was cancelled");
- }
-
- task.callback ();
- task.error = null;
-
- running_tasks.remove (task);
- n_queries_running--;
- } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) {
- task.callback ();
- task.error = null;
-
- update_running = false;
- } else if (task.type == TaskType.TURTLE) {
- task.callback ();
- task.error = null;
-
- update_running = false;
- }
-
- if (n_queries_running == 0 && !update_running && active_callback != null) {
- active_callback ();
- }
-
- sched ();
-
- return false;
- }
+ static ThreadPool<CursorTask> cursor_pool;
- static void pool_dispatch_cb (owned Task task) {
+ private static void cursor_dispatch_cb (owned CursorTask task) {
try {
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query);
-
- query_task.in_thread (cursor);
- } else {
- var data = task.data_manager.get_data ();
- var iface = task.data_manager.get_writable_db_interface ();
- iface.sqlite_wal_hook (wal_hook);
-
- if (task.type == TaskType.UPDATE) {
- var update_task = (UpdateTask) task;
-
- data.update_sparql (update_task.query);
- } else if (task.type == TaskType.UPDATE_BLANK) {
- var update_task = (UpdateTask) task;
-
- update_task.blank_nodes = data.update_sparql_blank (update_task.query);
- } else if (task.type == TaskType.TURTLE) {
- var turtle_task = (TurtleTask) task;
-
- var file = File.new_for_path (turtle_task.path);
-
- data.load_turtle_file (file);
- }
- }
+ task.thread_func (task.cursor);
} catch (Error e) {
task.error = e;
}
Idle.add (() => {
- task_finish_cb (task);
+ task.callback ();
return false;
});
}
- public static void wal_checkpoint (DBInterface iface, bool blocking) {
- try {
- debug ("Checkpointing database...");
- iface.sqlite_wal_checkpoint (blocking);
- debug ("Checkpointing complete...");
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- static int checkpointing;
-
- static void wal_hook (DBInterface iface, int n_pages) {
- // run in update thread
- var manager = (Data.Manager) iface.get_user_data ();
- var wal_iface = manager.get_wal_db_interface ();
-
- debug ("WAL: %d pages", n_pages);
-
- if (n_pages >= 10000) {
- // do immediate checkpointing (blocking updates)
- // to prevent excessive wal file growth
- wal_checkpoint (wal_iface, true);
- } else if (n_pages >= 1000 && checkpoint_pool != null) {
- if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) {
- // initiate asynchronous checkpointing (not blocking updates)
- try {
- checkpoint_pool.push (wal_iface);
- } catch (Error e) {
- warning (e.message);
- AtomicInt.set (ref checkpointing, 0);
- }
- }
- }
- }
-
- static void checkpoint_dispatch_cb (DBInterface iface) {
- // run in checkpoint thread
- wal_checkpoint (iface, false);
- AtomicInt.set (ref checkpointing, 0);
- }
-
public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
@@ -271,19 +73,12 @@ public class Tracker.Store {
max_task_time = MAX_TASK_TIME;
}
- running_tasks = new GenericArray<Task> ();
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = new Queue<Task> ();
- update_queues[i] = new Queue<Task> ();
- }
+ client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal);
try {
- update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true);
- query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true);
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
+ cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false);
} catch (Error e) {
- warning (e.message);
+ // Ignore harmless error
}
/* as the following settings are global for unknown reasons,
@@ -296,40 +91,26 @@ public class Tracker.Store {
}
public static void shutdown () {
- query_pool = null;
- update_pool = null;
- checkpoint_pool = null;
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = null;
- update_queues[i] = null;
- }
-
if (signal_timeout != 0) {
Source.remove (signal_timeout);
signal_timeout = 0;
}
}
- public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
- var task = new QueryTask ();
- task.type = TaskType.QUERY;
- task.query = sparql;
- task.cancellable = new Cancellable ();
- task.in_thread = in_thread;
- task.callback = sparql_query.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- query_queues[priority].push_tail (task);
+ private static Cancellable create_cancellable (string client_id) {
+ var client_cancellable = client_cancellables.lookup (client_id);
- sched ();
+ if (client_cancellable == null) {
+ client_cancellable = new Cancellable ();
+ client_cancellables.insert (client_id, client_cancellable);
+ }
- yield;
+ var task_cancellable = new Cancellable ();
+ client_cancellable.connect (() => {
+ task_cancellable.cancel ();
+ });
- if (task.error != null) {
- throw task.error;
- }
+ return task_cancellable;
}
private static void do_emit_signals () {
@@ -350,166 +131,89 @@ public class Tracker.Store {
}
}
- public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
- n_updates++;
- ensure_signal_timeout ();
+ public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error {
+ var cancellable = create_cancellable (client_id);
+ uint timeout_id = 0;
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update.callback;
- task.client_id = client_id;
- task.data_manager = manager;
+ if (max_task_time != 0) {
+ timeout_id = Timeout.add_seconds (max_task_time, () => {
+ cancellable.cancel ();
+ timeout_id = 0;
+ return false;
+ });
+ }
- update_queues[priority].push_tail (task);
+ var cursor = yield conn.query_async (sparql, cancellable);
- sched ();
+ if (timeout_id != 0)
+ GLib.Source.remove (timeout_id);
- yield;
+ var task = new CursorTask (cursor);
+ task.thread_func = in_thread;
+ task.callback = sparql_query.callback;
- n_updates--;
+ try {
+ cursor_pool.add (task);
+ } catch (Error e) {
+ // Ignore harmless error
+ }
+
+ yield;
- if (task.error != null) {
+ if (task.error != null)
throw task.error;
- }
}
- public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
n_updates++;
ensure_signal_timeout ();
-
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE_BLANK;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update_blank.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- update_queues[priority].push_tail (task);
-
- sched ();
-
- yield;
-
+ var cancellable = create_cancellable (client_id);
+ yield conn.update_async (sparql, priority, cancellable);
n_updates--;
-
- if (task.error != null) {
- throw task.error;
- }
-
- return task.blank_nodes;
}
- public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
n_updates++;
ensure_signal_timeout ();
-
- var task = new TurtleTask ();
- task.type = TaskType.TURTLE;
- task.path = file.get_path ();
- task.callback = queue_turtle_import.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- update_queues[Priority.TURTLE].push_tail (task);
-
- sched ();
-
- yield;
-
+ var cancellable = create_cancellable (client_id);
+ var nodes = yield conn.update_blank_async (sparql, priority, cancellable);
n_updates--;
- if (task.error != null) {
- throw task.error;
- }
+ return nodes;
}
- public uint get_queue_size () {
- uint result = 0;
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- result += query_queues[i].get_length ();
- result += update_queues[i].get_length ();
- }
- return result;
+ public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
+ n_updates++;
+ ensure_signal_timeout ();
+ var cancellable = create_cancellable (client_id);
+ yield conn.load_async (file, cancellable);
+ n_updates--;
}
public static void unreg_batches (string client_id) {
- unowned List<Task> list, cur;
- unowned Queue<Task> queue;
-
- for (int i = 0; i < running_tasks.length; i++) {
- unowned QueryTask task = running_tasks[i] as QueryTask;
- if (task != null && task.client_id == client_id && task.cancellable != null) {
- task.cancellable.cancel ();
- }
- }
+ Cancellable cancellable = client_cancellables.lookup (client_id);
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- queue = query_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
-
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
-
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
-
- queue = update_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
-
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
-
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
+ if (cancellable != null) {
+ cancellable.cancel ();
+ client_cancellables.remove (client_id);
}
-
- sched ();
}
public static async void pause () {
Tracker.Store.active = false;
- if (n_queries_running > 0 || update_running) {
- active_callback = pause.callback;
- yield;
- active_callback = null;
- }
-
- if (AtomicInt.get (ref checkpointing) != 0) {
- // this will wait for checkpointing to finish
- checkpoint_pool = null;
- try {
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- if (active) {
- sched ();
- }
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
}
public static void resume () {
Tracker.Store.active = true;
-
- sched ();
}
private static void on_statements_committed () {