summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/tracker-store/Makefile.am2
-rw-r--r--src/tracker-store/tracker-main.vala39
-rw-r--r--src/tracker-store/tracker-resources.vala26
-rw-r--r--src/tracker-store/tracker-steroids.vala28
-rw-r--r--src/tracker-store/tracker-store.vala446
5 files changed, 117 insertions, 424 deletions
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am
index dc5df50c3..2463cb4ab 100644
--- a/src/tracker-store/Makefile.am
+++ b/src/tracker-store/Makefile.am
@@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \
$(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \
$(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \
$(top_srcdir)/src/libtracker-data/libtracker-data.vapi \
+ $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \
$(top_srcdir)/src/tracker-store/tracker-config.vapi \
$(top_srcdir)/src/tracker-store/tracker-events.vapi \
$(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \
@@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \
tracker_store_LDADD = \
$(top_builddir)/src/libtracker-data/libtracker-data.la \
+ $(top_builddir)/src/libtracker-direct/libtracker-direct.la \
$(top_builddir)/src/libtracker-common/libtracker-common.la \
$(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \
$(BUILD_LIBS) \
diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala
index 63df072ff..861aed2eb 100644
--- a/src/tracker-store/tracker-main.vala
+++ b/src/tracker-store/tracker-main.vala
@@ -39,6 +39,7 @@ License which can be viewed at:
static bool shutdown;
+ static Tracker.Direct.Connection connection;
static Tracker.Data.Manager data_manager;
/* Private command line parameters */
@@ -175,6 +176,10 @@ License which can be viewed at:
return data_manager;
}
+ public static unowned Tracker.Direct.Connection get_sparql_connection () {
+ return connection;
+ }
+
static int main (string[] args) {
Intl.setlocale (LocaleCategory.ALL, "");
@@ -256,6 +261,8 @@ License which can be viewed at:
flags |= DBManagerFlags.FORCE_REINDEX;
}
+ Tracker.Direct.Connection.set_default_flags (flags);
+
var notifier = Tracker.DBus.register_notifier ();
Tracker.Store.init (config);
@@ -281,38 +288,18 @@ License which can be viewed at:
Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to);
- int select_cache_size, update_cache_size;
- string cache_size_s;
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- select_cache_size = int.parse (cache_size_s);
- } else {
- select_cache_size = SELECT_CACHE_SIZE;
- }
-
- cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE");
- if (cache_size_s != null && cache_size_s != "") {
- update_cache_size = int.parse (cache_size_s);
- } else {
- update_cache_size = UPDATE_CACHE_SIZE;
- }
-
try {
- data_manager = new Tracker.Data.Manager (flags,
- cache_location,
- data_location,
- ontology_location,
- true,
- false,
- select_cache_size,
- update_cache_size);
- data_manager.init (null);
+ connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE,
+ cache_location,
+ data_location,
+ ontology_location);
+ connection.init (null);
} catch (GLib.Error e) {
critical ("Cannot initialize database: %s", e.message);
return 1;
}
+ data_manager = connection.get_data_manager ();
db_config = null;
notifier = null;
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index 7322636d1..5cfab8bb2 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -61,9 +61,9 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri);
try {
var file = File.new_for_uri (uri);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.queue_turtle_import (data_manager, file, sender);
+ yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -84,9 +84,9 @@ public class Tracker.Resources : Object {
request.debug ("query: %s", query);
try {
var builder = new VariantBuilder ((VariantType) "aas");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
while (cursor.next ()) {
builder.open ((VariantType) "as");
@@ -126,8 +126,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
@@ -147,8 +147,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender);
request.end ();
@@ -169,10 +169,10 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.Sync");
var data_manager = Tracker.Main.get_data_manager ();
var data = data_manager.get_data ();
- var iface = data_manager.get_writable_db_interface ();
- // wal checkpoint implies sync
- Tracker.Store.wal_checkpoint (iface, true);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
+
// sync journal if available
data.sync ();
@@ -183,8 +183,8 @@ public class Tracker.Resources : Object {
var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate");
request.debug ("query: %s", update);
try {
- var data_manager = Tracker.Main.get_data_manager ();
- yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender);
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender);
request.end ();
} catch (DBInterfaceError.NO_SPACE ie) {
diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala
index 40679bf14..1eb7bef05 100644
--- a/src/tracker-store/tracker-steroids.vala
+++ b/src/tracker-store/tracker-steroids.vala
@@ -29,9 +29,9 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query);
try {
string[] variable_names = null;
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
- yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => {
+ yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => {
var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE));
data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN);
@@ -90,10 +90,10 @@ public class Tracker.Steroids : Object {
}
}
- async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error {
+ async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error {
var request = DBusRequest.begin (sender,
"Steroids.%sUpdate%s",
- priority != Tracker.Store.Priority.HIGH ? "Batch" : "",
+ priority != Priority.HIGH ? "Batch" : "",
blank ? "Blank" : "");
try {
size_t bytes_read;
@@ -112,16 +112,16 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
request.debug ("query: %s", (string) query);
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
if (!blank) {
- yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender);
request.end ();
return null;
} else {
- var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender);
+ var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender);
request.end ();
@@ -140,21 +140,21 @@ public class Tracker.Steroids : Object {
}
public async void update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream);
+ yield update_internal (sender, Priority.HIGH, false, input_stream);
}
public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error {
- yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream);
+ yield update_internal (sender, Priority.LOW, false, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream);
+ return yield update_internal (sender, Priority.HIGH, true, input_stream);
}
[DBus (signature = "aaa{ss}")]
public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error {
- return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream);
+ return yield update_internal (sender, Priority.LOW, true, input_stream);
}
[DBus (signature = "as")]
@@ -188,11 +188,11 @@ public class Tracker.Steroids : Object {
data_input_stream = null;
var builder = new VariantBuilder ((VariantType) "as");
- var data_manager = Tracker.Main.get_data_manager ();
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
// first try combined query for best possible performance
try {
- yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender);
// combined query was successful
for (i = 0; i < query_count; i++) {
@@ -213,7 +213,7 @@ public class Tracker.Steroids : Object {
request.debug ("query: %s", query_array[i]);
try {
- yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender);
+ yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender);
builder.add ("s", "");
builder.add ("s", "");
} catch (Error e1) {
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index a373e6155..03cc8078a 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -25,244 +25,46 @@ public class Tracker.Store {
const int MAX_TASK_TIME = 30;
const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;
- static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */];
- static int n_queries_running;
- static bool update_running;
- static ThreadPool<Task> update_pool;
- static ThreadPool<Task> query_pool;
- static ThreadPool<DBInterface> checkpoint_pool;
- static GenericArray<Task> running_tasks;
static int max_task_time;
static bool active;
- static SourceFunc active_callback;
static Tracker.Config config;
static uint signal_timeout;
static int n_updates;
- public enum Priority {
- HIGH,
- LOW,
- TURTLE,
- N_PRIORITIES
- }
-
- enum TaskType {
- QUERY,
- UPDATE,
- UPDATE_BLANK,
- TURTLE,
- }
+ static HashTable<string, Cancellable> client_cancellables;
public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
static unowned SignalEmissionFunc signal_callback;
- public delegate void SparqlQueryInThread (DBCursor cursor) throws Error;
+ public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error;
- abstract class Task {
- public TaskType type;
- public string client_id;
+ class CursorTask {
+ public Sparql.Cursor cursor;
+ public unowned SourceFunc callback;
+ public unowned SparqlQueryInThread thread_func;
public Error error;
- public SourceFunc callback;
- public Tracker.Data.Manager data_manager;
- }
-
- class QueryTask : Task {
- public string query;
- public Cancellable cancellable;
- public uint watchdog_id;
- public unowned SparqlQueryInThread in_thread;
-
- ~QueryTask () {
- if (watchdog_id > 0) {
- Source.remove (watchdog_id);
- }
- }
- }
-
- class UpdateTask : Task {
- public string query;
- public Variant blank_nodes;
- public Priority priority;
- }
-
- class TurtleTask : Task {
- public string path;
- }
-
- static void sched () {
- Task task = null;
-
- if (!active) {
- return;
- }
- while (n_queries_running < MAX_CONCURRENT_QUERIES) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = query_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task == null) {
- /* no pending query */
- break;
- }
- running_tasks.add (task);
-
- if (max_task_time != 0) {
- var query_task = (QueryTask) task;
- query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => {
- query_task.cancellable.cancel ();
- query_task.watchdog_id = 0;
- return false;
- });
- }
-
- n_queries_running++;
- try {
- query_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
-
- if (!update_running) {
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- task = update_queues[i].pop_head ();
- if (task != null) {
- break;
- }
- }
- if (task != null) {
- update_running = true;
- try {
- update_pool.add (task);
- } catch (Error e) {
- // ignore harmless thread creation error
- }
- }
+ public CursorTask (Sparql.Cursor cursor) {
+ this.cursor = cursor;
}
}
- static bool task_finish_cb (Task task) {
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- if (task.error == null &&
- query_task.cancellable != null &&
- query_task.cancellable.is_cancelled ()) {
- task.error = new IOError.CANCELLED ("Operation was cancelled");
- }
-
- task.callback ();
- task.error = null;
-
- running_tasks.remove (task);
- n_queries_running--;
- } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) {
- task.callback ();
- task.error = null;
-
- update_running = false;
- } else if (task.type == TaskType.TURTLE) {
- task.callback ();
- task.error = null;
-
- update_running = false;
- }
-
- if (n_queries_running == 0 && !update_running && active_callback != null) {
- active_callback ();
- }
-
- sched ();
-
- return false;
- }
+ static ThreadPool<CursorTask> cursor_pool;
- static void pool_dispatch_cb (owned Task task) {
+ private static void cursor_dispatch_cb (owned CursorTask task) {
try {
- if (task.type == TaskType.QUERY) {
- var query_task = (QueryTask) task;
-
- var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query);
-
- query_task.in_thread (cursor);
- } else {
- var data = task.data_manager.get_data ();
- var iface = task.data_manager.get_writable_db_interface ();
- iface.sqlite_wal_hook (wal_hook);
-
- if (task.type == TaskType.UPDATE) {
- var update_task = (UpdateTask) task;
-
- data.update_sparql (update_task.query);
- } else if (task.type == TaskType.UPDATE_BLANK) {
- var update_task = (UpdateTask) task;
-
- update_task.blank_nodes = data.update_sparql_blank (update_task.query);
- } else if (task.type == TaskType.TURTLE) {
- var turtle_task = (TurtleTask) task;
-
- var file = File.new_for_path (turtle_task.path);
-
- data.load_turtle_file (file);
- }
- }
+ task.thread_func (task.cursor);
} catch (Error e) {
task.error = e;
}
Idle.add (() => {
- task_finish_cb (task);
+ task.callback ();
return false;
});
}
- public static void wal_checkpoint (DBInterface iface, bool blocking) {
- try {
- debug ("Checkpointing database...");
- iface.sqlite_wal_checkpoint (blocking);
- debug ("Checkpointing complete...");
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- static int checkpointing;
-
- static void wal_hook (DBInterface iface, int n_pages) {
- // run in update thread
- var manager = (Data.Manager) iface.get_user_data ();
- var wal_iface = manager.get_wal_db_interface ();
-
- debug ("WAL: %d pages", n_pages);
-
- if (n_pages >= 10000) {
- // do immediate checkpointing (blocking updates)
- // to prevent excessive wal file growth
- wal_checkpoint (wal_iface, true);
- } else if (n_pages >= 1000 && checkpoint_pool != null) {
- if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) {
- // initiate asynchronous checkpointing (not blocking updates)
- try {
- checkpoint_pool.push (wal_iface);
- } catch (Error e) {
- warning (e.message);
- AtomicInt.set (ref checkpointing, 0);
- }
- }
- }
- }
-
- static void checkpoint_dispatch_cb (DBInterface iface) {
- // run in checkpoint thread
- wal_checkpoint (iface, false);
- AtomicInt.set (ref checkpointing, 0);
- }
-
public static void init (Tracker.Config config_p) {
string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
if (max_task_time_env != null) {
@@ -271,19 +73,12 @@ public class Tracker.Store {
max_task_time = MAX_TASK_TIME;
}
- running_tasks = new GenericArray<Task> ();
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = new Queue<Task> ();
- update_queues[i] = new Queue<Task> ();
- }
+ client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal);
try {
- update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true);
- query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true);
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
+ cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false);
} catch (Error e) {
- warning (e.message);
+ // Ignore harmless error
}
/* as the following settings are global for unknown reasons,
@@ -296,40 +91,26 @@ public class Tracker.Store {
}
public static void shutdown () {
- query_pool = null;
- update_pool = null;
- checkpoint_pool = null;
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- query_queues[i] = null;
- update_queues[i] = null;
- }
-
if (signal_timeout != 0) {
Source.remove (signal_timeout);
signal_timeout = 0;
}
}
- public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error {
- var task = new QueryTask ();
- task.type = TaskType.QUERY;
- task.query = sparql;
- task.cancellable = new Cancellable ();
- task.in_thread = in_thread;
- task.callback = sparql_query.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- query_queues[priority].push_tail (task);
+ private static Cancellable create_cancellable (string client_id) {
+ var client_cancellable = client_cancellables.lookup (client_id);
- sched ();
+ if (client_cancellable == null) {
+ client_cancellable = new Cancellable ();
+ client_cancellables.insert (client_id, client_cancellable);
+ }
- yield;
+ var task_cancellable = new Cancellable ();
+ client_cancellable.connect (() => {
+ task_cancellable.cancel ();
+ });
- if (task.error != null) {
- throw task.error;
- }
+ return task_cancellable;
}
private static void do_emit_signals () {
@@ -350,166 +131,89 @@ public class Tracker.Store {
}
}
- public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
- n_updates++;
- ensure_signal_timeout ();
+ public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error {
+ var cancellable = create_cancellable (client_id);
+ uint timeout_id = 0;
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update.callback;
- task.client_id = client_id;
- task.data_manager = manager;
+ if (max_task_time != 0) {
+ timeout_id = Timeout.add_seconds (max_task_time, () => {
+ cancellable.cancel ();
+ timeout_id = 0;
+ return false;
+ });
+ }
- update_queues[priority].push_tail (task);
+ var cursor = yield conn.query_async (sparql, cancellable);
- sched ();
+ if (timeout_id != 0)
+ GLib.Source.remove (timeout_id);
- yield;
+ var task = new CursorTask (cursor);
+ task.thread_func = in_thread;
+ task.callback = sparql_query.callback;
- n_updates--;
+ try {
+ cursor_pool.add (task);
+ } catch (Error e) {
+ // Ignore harmless error
+ }
+
+ yield;
- if (task.error != null) {
+ if (task.error != null)
throw task.error;
- }
}
- public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error {
+ public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
n_updates++;
ensure_signal_timeout ();
-
- var task = new UpdateTask ();
- task.type = TaskType.UPDATE_BLANK;
- task.query = sparql;
- task.priority = priority;
- task.callback = sparql_update_blank.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- update_queues[priority].push_tail (task);
-
- sched ();
-
- yield;
-
+ var cancellable = create_cancellable (client_id);
+ yield conn.update_async (sparql, priority, cancellable);
n_updates--;
-
- if (task.error != null) {
- throw task.error;
- }
-
- return task.blank_nodes;
}
- public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error {
+ public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
n_updates++;
ensure_signal_timeout ();
-
- var task = new TurtleTask ();
- task.type = TaskType.TURTLE;
- task.path = file.get_path ();
- task.callback = queue_turtle_import.callback;
- task.client_id = client_id;
- task.data_manager = manager;
-
- update_queues[Priority.TURTLE].push_tail (task);
-
- sched ();
-
- yield;
-
+ var cancellable = create_cancellable (client_id);
+ var nodes = yield conn.update_blank_async (sparql, priority, cancellable);
n_updates--;
- if (task.error != null) {
- throw task.error;
- }
+ return nodes;
}
- public uint get_queue_size () {
- uint result = 0;
-
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- result += query_queues[i].get_length ();
- result += update_queues[i].get_length ();
- }
- return result;
+ public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error {
+ if (!active)
+ throw new Sparql.Error.UNSUPPORTED ("Store is not active");
+ n_updates++;
+ ensure_signal_timeout ();
+ var cancellable = create_cancellable (client_id);
+ yield conn.load_async (file, cancellable);
+ n_updates--;
}
public static void unreg_batches (string client_id) {
- unowned List<Task> list, cur;
- unowned Queue<Task> queue;
-
- for (int i = 0; i < running_tasks.length; i++) {
- unowned QueryTask task = running_tasks[i] as QueryTask;
- if (task != null && task.client_id == client_id && task.cancellable != null) {
- task.cancellable.cancel ();
- }
- }
+ Cancellable cancellable = client_cancellables.lookup (client_id);
- for (int i = 0; i < Priority.N_PRIORITIES; i++) {
- queue = query_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
-
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
-
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
-
- queue = update_queues[i];
- list = queue.head;
- while (list != null) {
- cur = list;
- list = list.next;
- unowned Task task = cur.data;
-
- if (task != null && task.client_id == client_id) {
- queue.delete_link (cur);
-
- task.error = new DBusError.FAILED ("Client disappeared");
- task.callback ();
- }
- }
+ if (cancellable != null) {
+ cancellable.cancel ();
+ client_cancellables.remove (client_id);
}
-
- sched ();
}
public static async void pause () {
Tracker.Store.active = false;
- if (n_queries_running > 0 || update_running) {
- active_callback = pause.callback;
- yield;
- active_callback = null;
- }
-
- if (AtomicInt.get (ref checkpointing) != 0) {
- // this will wait for checkpointing to finish
- checkpoint_pool = null;
- try {
- checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true);
- } catch (Error e) {
- warning (e.message);
- }
- }
-
- if (active) {
- sched ();
- }
+ var sparql_conn = Tracker.Main.get_sparql_connection ();
+ sparql_conn.sync ();
}
public static void resume () {
Tracker.Store.active = true;
-
- sched ();
}
private static void on_statements_committed () {