diff options
-rw-r--r-- | src/tracker-store/Makefile.am | 2 | ||||
-rw-r--r-- | src/tracker-store/tracker-main.vala | 39 | ||||
-rw-r--r-- | src/tracker-store/tracker-resources.vala | 26 | ||||
-rw-r--r-- | src/tracker-store/tracker-steroids.vala | 28 | ||||
-rw-r--r-- | src/tracker-store/tracker-store.vala | 446 |
5 files changed, 117 insertions, 424 deletions
diff --git a/src/tracker-store/Makefile.am b/src/tracker-store/Makefile.am index dc5df50c3..2463cb4ab 100644 --- a/src/tracker-store/Makefile.am +++ b/src/tracker-store/Makefile.am @@ -39,6 +39,7 @@ tracker_store_VALAFLAGS = \ $(top_srcdir)/src/libtracker-sparql/tracker-sparql-$(TRACKER_API_VERSION).vapi \ $(top_srcdir)/src/libtracker-data/tracker-sparql-query.vapi \ $(top_srcdir)/src/libtracker-data/libtracker-data.vapi \ + $(top_srcdir)/src/libtracker-direct/tracker-direct.vapi \ $(top_srcdir)/src/tracker-store/tracker-config.vapi \ $(top_srcdir)/src/tracker-store/tracker-events.vapi \ $(top_srcdir)/src/tracker-store/tracker-locale-change.vapi \ @@ -47,6 +48,7 @@ tracker_store_VALAFLAGS = \ tracker_store_LDADD = \ $(top_builddir)/src/libtracker-data/libtracker-data.la \ + $(top_builddir)/src/libtracker-direct/libtracker-direct.la \ $(top_builddir)/src/libtracker-common/libtracker-common.la \ $(top_builddir)/src/libtracker-sparql-backend/libtracker-sparql-@TRACKER_API_VERSION@.la \ $(BUILD_LIBS) \ diff --git a/src/tracker-store/tracker-main.vala b/src/tracker-store/tracker-main.vala index 63df072ff..861aed2eb 100644 --- a/src/tracker-store/tracker-main.vala +++ b/src/tracker-store/tracker-main.vala @@ -39,6 +39,7 @@ License which can be viewed at: static bool shutdown; + static Tracker.Direct.Connection connection; static Tracker.Data.Manager data_manager; /* Private command line parameters */ @@ -175,6 +176,10 @@ License which can be viewed at: return data_manager; } + public static unowned Tracker.Direct.Connection get_sparql_connection () { + return connection; + } + static int main (string[] args) { Intl.setlocale (LocaleCategory.ALL, ""); @@ -256,6 +261,8 @@ License which can be viewed at: flags |= DBManagerFlags.FORCE_REINDEX; } + Tracker.Direct.Connection.set_default_flags (flags); + var notifier = Tracker.DBus.register_notifier (); Tracker.Store.init (config); @@ -281,38 +288,18 @@ License which can be viewed at: Tracker.DBJournal.set_rotating (do_rotating, chunk_size, rotate_to); - int select_cache_size, update_cache_size; - string cache_size_s; - - cache_size_s = Environment.get_variable ("TRACKER_STORE_SELECT_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - select_cache_size = int.parse (cache_size_s); - } else { - select_cache_size = SELECT_CACHE_SIZE; - } - - cache_size_s = Environment.get_variable ("TRACKER_STORE_UPDATE_CACHE_SIZE"); - if (cache_size_s != null && cache_size_s != "") { - update_cache_size = int.parse (cache_size_s); - } else { - update_cache_size = UPDATE_CACHE_SIZE; - } - try { - data_manager = new Tracker.Data.Manager (flags, - cache_location, - data_location, - ontology_location, - true, - false, - select_cache_size, - update_cache_size); - data_manager.init (null); + connection = new Tracker.Direct.Connection (Sparql.ConnectionFlags.NONE, + cache_location, + data_location, + ontology_location); + connection.init (null); } catch (GLib.Error e) { critical ("Cannot initialize database: %s", e.message); return 1; } + data_manager = connection.get_data_manager (); db_config = null; notifier = null; diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala index 7322636d1..5cfab8bb2 100644 --- a/src/tracker-store/tracker-resources.vala +++ b/src/tracker-store/tracker-resources.vala @@ -61,9 +61,9 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.Load (uri: '%s')", uri); try { var file = File.new_for_uri (uri); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.queue_turtle_import (data_manager, file, sender); + yield Tracker.Store.queue_turtle_import (sparql_conn, file, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -84,9 +84,9 @@ public class Tracker.Resources : Object { request.debug ("query: %s", query); try { var builder = new VariantBuilder ((VariantType) "aas"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { while (cursor.next ()) { builder.open ((VariantType) "as"); @@ -126,8 +126,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.HIGH, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { @@ -147,8 +147,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.SparqlUpdateBlank"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - var variant = yield Tracker.Store.sparql_update_blank (data_manager, update, Tracker.Store.Priority.HIGH, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, update, Priority.HIGH, sender); request.end (); @@ -169,10 +169,10 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.Sync"); var data_manager = Tracker.Main.get_data_manager (); var data = data_manager.get_data (); - var iface = data_manager.get_writable_db_interface (); - // wal checkpoint implies sync - Tracker.Store.wal_checkpoint (iface, true); + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); + // sync journal if available data.sync (); @@ -183,8 +183,8 @@ public class Tracker.Resources : Object { var request = DBusRequest.begin (sender, "Resources.BatchSparqlUpdate"); request.debug ("query: %s", update); try { - var data_manager = Tracker.Main.get_data_manager (); - yield Tracker.Store.sparql_update (data_manager, update, Tracker.Store.Priority.LOW, sender); + var sparql_conn = Tracker.Main.get_sparql_connection (); + yield Tracker.Store.sparql_update (sparql_conn, update, Priority.LOW, sender); request.end (); } catch (DBInterfaceError.NO_SPACE ie) { diff --git a/src/tracker-store/tracker-steroids.vala b/src/tracker-store/tracker-steroids.vala index 40679bf14..1eb7bef05 100644 --- a/src/tracker-store/tracker-steroids.vala +++ b/src/tracker-store/tracker-steroids.vala @@ -29,9 +29,9 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query); try { string[] variable_names = null; - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); - yield Tracker.Store.sparql_query (data_manager, query, Tracker.Store.Priority.HIGH, cursor => { + yield Tracker.Store.sparql_query (sparql_conn, query, Priority.HIGH, cursor => { var data_output_stream = new DataOutputStream (new BufferedOutputStream.sized (output_stream, BUFFER_SIZE)); data_output_stream.set_byte_order (DataStreamByteOrder.HOST_ENDIAN); @@ -90,10 +90,10 @@ public class Tracker.Steroids : Object { } } - async Variant? update_internal (BusName sender, Tracker.Store.Priority priority, bool blank, UnixInputStream input_stream) throws Error { + async Variant? update_internal (BusName sender, int priority, bool blank, UnixInputStream input_stream) throws Error { var request = DBusRequest.begin (sender, "Steroids.%sUpdate%s", - priority != Tracker.Store.Priority.HIGH ? "Batch" : "", + priority != Priority.HIGH ? "Batch" : "", blank ? "Blank" : ""); try { size_t bytes_read; @@ -112,16 +112,16 @@ public class Tracker.Steroids : Object { data_input_stream = null; request.debug ("query: %s", (string) query); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); if (!blank) { - yield Tracker.Store.sparql_update (data_manager, (string) query, priority, sender); + yield Tracker.Store.sparql_update (sparql_conn, (string) query, priority, sender); request.end (); return null; } else { - var variant = yield Tracker.Store.sparql_update_blank (data_manager, (string) query, priority, sender); + var variant = yield Tracker.Store.sparql_update_blank (sparql_conn, (string) query, priority, sender); request.end (); @@ -140,21 +140,21 @@ public class Tracker.Steroids : Object { } public async void update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.HIGH, false, input_stream); + yield update_internal (sender, Priority.HIGH, false, input_stream); } public async void batch_update (BusName sender, UnixInputStream input_stream) throws Error { - yield update_internal (sender, Tracker.Store.Priority.LOW, false, input_stream); + yield update_internal (sender, Priority.LOW, false, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.HIGH, true, input_stream); + return yield update_internal (sender, Priority.HIGH, true, input_stream); } [DBus (signature = "aaa{ss}")] public async Variant batch_update_blank (BusName sender, UnixInputStream input_stream) throws Error { - return yield update_internal (sender, Tracker.Store.Priority.LOW, true, input_stream); + return yield update_internal (sender, Priority.LOW, true, input_stream); } [DBus (signature = "as")] @@ -188,11 +188,11 @@ public class Tracker.Steroids : Object { data_input_stream = null; var builder = new VariantBuilder ((VariantType) "as"); - var data_manager = Tracker.Main.get_data_manager (); + var sparql_conn = Tracker.Main.get_sparql_connection (); // first try combined query for best possible performance try { - yield Tracker.Store.sparql_update (data_manager, combined_query.str, Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, combined_query.str, Priority.LOW, sender); // combined query was successful for (i = 0; i < query_count; i++) { @@ -213,7 +213,7 @@ public class Tracker.Steroids : Object { request.debug ("query: %s", query_array[i]); try { - yield Tracker.Store.sparql_update (data_manager, query_array[i], Tracker.Store.Priority.LOW, sender); + yield Tracker.Store.sparql_update (sparql_conn, query_array[i], Priority.LOW, sender); builder.add ("s", ""); builder.add ("s", ""); } catch (Error e1) { diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a373e6155..03cc8078a 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -25,244 +25,46 @@ public class Tracker.Store { const int MAX_TASK_TIME = 30; const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static int n_queries_running; - static bool update_running; - static ThreadPool<Task> update_pool; - static ThreadPool<Task> query_pool; - static ThreadPool<DBInterface> checkpoint_pool; - static GenericArray<Task> running_tasks; static int max_task_time; static bool active; - static SourceFunc active_callback; static Tracker.Config config; static uint signal_timeout; static int n_updates; - public enum Priority { - HIGH, - LOW, - TURTLE, - N_PRIORITIES - } - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } + static HashTable<string, Cancellable> client_cancellables; public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback); static unowned SignalEmissionFunc signal_callback; - public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; + public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error; - abstract class Task { - public TaskType type; - public string client_id; + class CursorTask { + public Sparql.Cursor cursor; + public unowned SourceFunc callback; + public unowned SparqlQueryInThread thread_func; public Error error; - public SourceFunc callback; - public Tracker.Data.Manager data_manager; - } - - class QueryTask : Task { - public string query; - public Cancellable cancellable; - public uint watchdog_id; - public unowned SparqlQueryInThread in_thread; - - ~QueryTask () { - if (watchdog_id > 0) { - Source.remove (watchdog_id); - } - } - } - - class UpdateTask : Task { - public string query; - public Variant blank_nodes; - public Priority priority; - } - - class TurtleTask : Task { - public string path; - } - - static void sched () { - Task task = null; - - if (!active) { - return; - } - while (n_queries_running < MAX_CONCURRENT_QUERIES) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = query_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task == null) { - /* no pending query */ - break; - } - running_tasks.add (task); - - if (max_task_time != 0) { - var query_task = (QueryTask) task; - query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => { - query_task.cancellable.cancel (); - query_task.watchdog_id = 0; - return false; - }); - } - - n_queries_running++; - try { - query_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } - - if (!update_running) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = update_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task != null) { - update_running = true; - try { - update_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } + public CursorTask (Sparql.Cursor cursor) { + this.cursor = cursor; } } - static bool task_finish_cb (Task task) { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - if (task.error == null && - query_task.cancellable != null && - query_task.cancellable.is_cancelled ()) { - task.error = new IOError.CANCELLED ("Operation was cancelled"); - } - - task.callback (); - task.error = null; - - running_tasks.remove (task); - n_queries_running--; - } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { - task.callback (); - task.error = null; - - update_running = false; - } else if (task.type == TaskType.TURTLE) { - task.callback (); - task.error = null; - - update_running = false; - } - - if (n_queries_running == 0 && !update_running && active_callback != null) { - active_callback (); - } - - sched (); - - return false; - } + static ThreadPool<CursorTask> cursor_pool; - static void pool_dispatch_cb (owned Task task) { + private static void cursor_dispatch_cb (owned CursorTask task) { try { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query); - - query_task.in_thread (cursor); - } else { - var data = task.data_manager.get_data (); - var iface = task.data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - - if (task.type == TaskType.UPDATE) { - var update_task = (UpdateTask) task; - - data.update_sparql (update_task.query); - } else if (task.type == TaskType.UPDATE_BLANK) { - var update_task = (UpdateTask) task; - - update_task.blank_nodes = data.update_sparql_blank (update_task.query); - } else if (task.type == TaskType.TURTLE) { - var turtle_task = (TurtleTask) task; - - var file = File.new_for_path (turtle_task.path); - - data.load_turtle_file (file); - } - } + task.thread_func (task.cursor); } catch (Error e) { task.error = e; } Idle.add (() => { - task_finish_cb (task); + task.callback (); return false; }); } - public static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static int checkpointing; - - static void wal_hook (DBInterface iface, int n_pages) { - // run in update thread - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - debug ("WAL: %d pages", n_pages); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000 && checkpoint_pool != null) { - if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) { - // initiate asynchronous checkpointing (not blocking updates) - try { - checkpoint_pool.push (wal_iface); - } catch (Error e) { - warning (e.message); - AtomicInt.set (ref checkpointing, 0); - } - } - } - } - - static void checkpoint_dispatch_cb (DBInterface iface) { - // run in checkpoint thread - wal_checkpoint (iface, false); - AtomicInt.set (ref checkpointing, 0); - } - public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { @@ -271,19 +73,12 @@ public class Tracker.Store { max_task_time = MAX_TASK_TIME; } - running_tasks = new GenericArray<Task> (); - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = new Queue<Task> (); - update_queues[i] = new Queue<Task> (); - } + client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal); try { - update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true); - query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true); - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); + cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false); } catch (Error e) { - warning (e.message); + // Ignore harmless error } /* as the following settings are global for unknown reasons, @@ -296,40 +91,26 @@ public class Tracker.Store { } public static void shutdown () { - query_pool = null; - update_pool = null; - checkpoint_pool = null; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = null; - update_queues[i] = null; - } - if (signal_timeout != 0) { Source.remove (signal_timeout); signal_timeout = 0; } } - public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { - var task = new QueryTask (); - task.type = TaskType.QUERY; - task.query = sparql; - task.cancellable = new Cancellable (); - task.in_thread = in_thread; - task.callback = sparql_query.callback; - task.client_id = client_id; - task.data_manager = manager; - - query_queues[priority].push_tail (task); + private static Cancellable create_cancellable (string client_id) { + var client_cancellable = client_cancellables.lookup (client_id); - sched (); + if (client_cancellable == null) { + client_cancellable = new Cancellable (); + client_cancellables.insert (client_id, client_cancellable); + } - yield; + var task_cancellable = new Cancellable (); + client_cancellable.connect (() => { + task_cancellable.cancel (); + }); - if (task.error != null) { - throw task.error; - } + return task_cancellable; } private static void do_emit_signals () { @@ -350,166 +131,89 @@ public class Tracker.Store { } } - public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { - n_updates++; - ensure_signal_timeout (); + public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error { + var cancellable = create_cancellable (client_id); + uint timeout_id = 0; - var task = new UpdateTask (); - task.type = TaskType.UPDATE; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update.callback; - task.client_id = client_id; - task.data_manager = manager; + if (max_task_time != 0) { + timeout_id = Timeout.add_seconds (max_task_time, () => { + cancellable.cancel (); + timeout_id = 0; + return false; + }); + } - update_queues[priority].push_tail (task); + var cursor = yield conn.query_async (sparql, cancellable); - sched (); + if (timeout_id != 0) + GLib.Source.remove (timeout_id); - yield; + var task = new CursorTask (cursor); + task.thread_func = in_thread; + task.callback = sparql_query.callback; - n_updates--; + try { + cursor_pool.add (task); + } catch (Error e) { + // Ignore harmless error + } + + yield; - if (task.error != null) { + if (task.error != null) throw task.error; - } } - public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new UpdateTask (); - task.type = TaskType.UPDATE_BLANK; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update_blank.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[priority].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + yield conn.update_async (sparql, priority, cancellable); n_updates--; - - if (task.error != null) { - throw task.error; - } - - return task.blank_nodes; } - public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new TurtleTask (); - task.type = TaskType.TURTLE; - task.path = file.get_path (); - task.callback = queue_turtle_import.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[Priority.TURTLE].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + var nodes = yield conn.update_blank_async (sparql, priority, cancellable); n_updates--; - if (task.error != null) { - throw task.error; - } + return nodes; } - public uint get_queue_size () { - uint result = 0; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - result += query_queues[i].get_length (); - result += update_queues[i].get_length (); - } - return result; + public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + yield conn.load_async (file, cancellable); + n_updates--; } public static void unreg_batches (string client_id) { - unowned List<Task> list, cur; - unowned Queue<Task> queue; - - for (int i = 0; i < running_tasks.length; i++) { - unowned QueryTask task = running_tasks[i] as QueryTask; - if (task != null && task.client_id == client_id && task.cancellable != null) { - task.cancellable.cancel (); - } - } + Cancellable cancellable = client_cancellables.lookup (client_id); - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - queue = query_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } - - queue = update_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } + if (cancellable != null) { + cancellable.cancel (); + client_cancellables.remove (client_id); } - - sched (); } public static async void pause () { Tracker.Store.active = false; - if (n_queries_running > 0 || update_running) { - active_callback = pause.callback; - yield; - active_callback = null; - } - - if (AtomicInt.get (ref checkpointing) != 0) { - // this will wait for checkpointing to finish - checkpoint_pool = null; - try { - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); - } catch (Error e) { - warning (e.message); - } - } - - if (active) { - sched (); - } + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); } public static void resume () { Tracker.Store.active = true; - - sched (); } private static void on_statements_committed () { |