diff options
Diffstat (limited to 'src/tracker-store/tracker-store.vala')
-rw-r--r-- | src/tracker-store/tracker-store.vala | 446 |
1 files changed, 75 insertions, 371 deletions
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala index a373e6155..03cc8078a 100644 --- a/src/tracker-store/tracker-store.vala +++ b/src/tracker-store/tracker-store.vala @@ -25,244 +25,46 @@ public class Tracker.Store { const int MAX_TASK_TIME = 30; const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000; - static Queue<Task> query_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static Queue<Task> update_queues[3 /* TRACKER_STORE_N_PRIORITIES */]; - static int n_queries_running; - static bool update_running; - static ThreadPool<Task> update_pool; - static ThreadPool<Task> query_pool; - static ThreadPool<DBInterface> checkpoint_pool; - static GenericArray<Task> running_tasks; static int max_task_time; static bool active; - static SourceFunc active_callback; static Tracker.Config config; static uint signal_timeout; static int n_updates; - public enum Priority { - HIGH, - LOW, - TURTLE, - N_PRIORITIES - } - - enum TaskType { - QUERY, - UPDATE, - UPDATE_BLANK, - TURTLE, - } + static HashTable<string, Cancellable> client_cancellables; public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback); static unowned SignalEmissionFunc signal_callback; - public delegate void SparqlQueryInThread (DBCursor cursor) throws Error; + public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error; - abstract class Task { - public TaskType type; - public string client_id; + class CursorTask { + public Sparql.Cursor cursor; + public unowned SourceFunc callback; + public unowned SparqlQueryInThread thread_func; public Error error; - public SourceFunc callback; - public Tracker.Data.Manager data_manager; - } - - class QueryTask : Task { - public string query; - public Cancellable cancellable; - public uint watchdog_id; - public unowned SparqlQueryInThread in_thread; - - ~QueryTask () { - if (watchdog_id > 0) { - Source.remove (watchdog_id); - } - } - } - - class UpdateTask : Task { - public string query; - public Variant blank_nodes; - public Priority priority; - } - - class TurtleTask : Task { - public string path; - } - - static void sched () { - Task task = null; - - if (!active) { - return; - } - while (n_queries_running < MAX_CONCURRENT_QUERIES) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = query_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task == null) { - /* no pending query */ - break; - } - running_tasks.add (task); - - if (max_task_time != 0) { - var query_task = (QueryTask) task; - query_task.watchdog_id = Timeout.add_seconds (max_task_time, () => { - query_task.cancellable.cancel (); - query_task.watchdog_id = 0; - return false; - }); - } - - n_queries_running++; - try { - query_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } - - if (!update_running) { - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - task = update_queues[i].pop_head (); - if (task != null) { - break; - } - } - if (task != null) { - update_running = true; - try { - update_pool.add (task); - } catch (Error e) { - // ignore harmless thread creation error - } - } + public CursorTask (Sparql.Cursor cursor) { + this.cursor = cursor; } } - static bool task_finish_cb (Task task) { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - if (task.error == null && - query_task.cancellable != null && - query_task.cancellable.is_cancelled ()) { - task.error = new IOError.CANCELLED ("Operation was cancelled"); - } - - task.callback (); - task.error = null; - - running_tasks.remove (task); - n_queries_running--; - } else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) { - task.callback (); - task.error = null; - - update_running = false; - } else if (task.type == TaskType.TURTLE) { - task.callback (); - task.error = null; - - update_running = false; - } - - if (n_queries_running == 0 && !update_running && active_callback != null) { - active_callback (); - } - - sched (); - - return false; - } + static ThreadPool<CursorTask> cursor_pool; - static void pool_dispatch_cb (owned Task task) { + private static void cursor_dispatch_cb (owned CursorTask task) { try { - if (task.type == TaskType.QUERY) { - var query_task = (QueryTask) task; - - var cursor = Tracker.Data.query_sparql_cursor (task.data_manager, query_task.query); - - query_task.in_thread (cursor); - } else { - var data = task.data_manager.get_data (); - var iface = task.data_manager.get_writable_db_interface (); - iface.sqlite_wal_hook (wal_hook); - - if (task.type == TaskType.UPDATE) { - var update_task = (UpdateTask) task; - - data.update_sparql (update_task.query); - } else if (task.type == TaskType.UPDATE_BLANK) { - var update_task = (UpdateTask) task; - - update_task.blank_nodes = data.update_sparql_blank (update_task.query); - } else if (task.type == TaskType.TURTLE) { - var turtle_task = (TurtleTask) task; - - var file = File.new_for_path (turtle_task.path); - - data.load_turtle_file (file); - } - } + task.thread_func (task.cursor); } catch (Error e) { task.error = e; } Idle.add (() => { - task_finish_cb (task); + task.callback (); return false; }); } - public static void wal_checkpoint (DBInterface iface, bool blocking) { - try { - debug ("Checkpointing database..."); - iface.sqlite_wal_checkpoint (blocking); - debug ("Checkpointing complete..."); - } catch (Error e) { - warning (e.message); - } - } - - static int checkpointing; - - static void wal_hook (DBInterface iface, int n_pages) { - // run in update thread - var manager = (Data.Manager) iface.get_user_data (); - var wal_iface = manager.get_wal_db_interface (); - - debug ("WAL: %d pages", n_pages); - - if (n_pages >= 10000) { - // do immediate checkpointing (blocking updates) - // to prevent excessive wal file growth - wal_checkpoint (wal_iface, true); - } else if (n_pages >= 1000 && checkpoint_pool != null) { - if (AtomicInt.compare_and_exchange (ref checkpointing, 0, 1)) { - // initiate asynchronous checkpointing (not blocking updates) - try { - checkpoint_pool.push (wal_iface); - } catch (Error e) { - warning (e.message); - AtomicInt.set (ref checkpointing, 0); - } - } - } - } - - static void checkpoint_dispatch_cb (DBInterface iface) { - // run in checkpoint thread - wal_checkpoint (iface, false); - AtomicInt.set (ref checkpointing, 0); - } - public static void init (Tracker.Config config_p) { string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME"); if (max_task_time_env != null) { @@ -271,19 +73,12 @@ public class Tracker.Store { max_task_time = MAX_TASK_TIME; } - running_tasks = new GenericArray<Task> (); - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = new Queue<Task> (); - update_queues[i] = new Queue<Task> (); - } + client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal); try { - update_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, 1, true); - query_pool = new ThreadPool<Task>.with_owned_data (pool_dispatch_cb, MAX_CONCURRENT_QUERIES, true); - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); + cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false); } catch (Error e) { - warning (e.message); + // Ignore harmless error } /* as the following settings are global for unknown reasons, @@ -296,40 +91,26 @@ public class Tracker.Store { } public static void shutdown () { - query_pool = null; - update_pool = null; - checkpoint_pool = null; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - query_queues[i] = null; - update_queues[i] = null; - } - if (signal_timeout != 0) { Source.remove (signal_timeout); signal_timeout = 0; } } - public static async void sparql_query (Tracker.Data.Manager manager, string sparql, Priority priority, SparqlQueryInThread in_thread, string client_id) throws Error { - var task = new QueryTask (); - task.type = TaskType.QUERY; - task.query = sparql; - task.cancellable = new Cancellable (); - task.in_thread = in_thread; - task.callback = sparql_query.callback; - task.client_id = client_id; - task.data_manager = manager; - - query_queues[priority].push_tail (task); + private static Cancellable create_cancellable (string client_id) { + var client_cancellable = client_cancellables.lookup (client_id); - sched (); + if (client_cancellable == null) { + client_cancellable = new Cancellable (); + client_cancellables.insert (client_id, client_cancellable); + } - yield; + var task_cancellable = new Cancellable (); + client_cancellable.connect (() => { + task_cancellable.cancel (); + }); - if (task.error != null) { - throw task.error; - } + return task_cancellable; } private static void do_emit_signals () { @@ -350,166 +131,89 @@ public class Tracker.Store { } } - public static async void sparql_update (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { - n_updates++; - ensure_signal_timeout (); + public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error { + var cancellable = create_cancellable (client_id); + uint timeout_id = 0; - var task = new UpdateTask (); - task.type = TaskType.UPDATE; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update.callback; - task.client_id = client_id; - task.data_manager = manager; + if (max_task_time != 0) { + timeout_id = Timeout.add_seconds (max_task_time, () => { + cancellable.cancel (); + timeout_id = 0; + return false; + }); + } - update_queues[priority].push_tail (task); + var cursor = yield conn.query_async (sparql, cancellable); - sched (); + if (timeout_id != 0) + GLib.Source.remove (timeout_id); - yield; + var task = new CursorTask (cursor); + task.thread_func = in_thread; + task.callback = sparql_query.callback; - n_updates--; + try { + cursor_pool.add (task); + } catch (Error e) { + // Ignore harmless error + } + + yield; - if (task.error != null) { + if (task.error != null) throw task.error; - } } - public static async Variant sparql_update_blank (Tracker.Data.Manager manager, string sparql, Priority priority, string client_id) throws Error { + public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new UpdateTask (); - task.type = TaskType.UPDATE_BLANK; - task.query = sparql; - task.priority = priority; - task.callback = sparql_update_blank.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[priority].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + yield conn.update_async (sparql, priority, cancellable); n_updates--; - - if (task.error != null) { - throw task.error; - } - - return task.blank_nodes; } - public static async void queue_turtle_import (Tracker.Data.Manager manager, File file, string client_id) throws Error { + public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); n_updates++; ensure_signal_timeout (); - - var task = new TurtleTask (); - task.type = TaskType.TURTLE; - task.path = file.get_path (); - task.callback = queue_turtle_import.callback; - task.client_id = client_id; - task.data_manager = manager; - - update_queues[Priority.TURTLE].push_tail (task); - - sched (); - - yield; - + var cancellable = create_cancellable (client_id); + var nodes = yield conn.update_blank_async (sparql, priority, cancellable); n_updates--; - if (task.error != null) { - throw task.error; - } + return nodes; } - public uint get_queue_size () { - uint result = 0; - - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - result += query_queues[i].get_length (); - result += update_queues[i].get_length (); - } - return result; + public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error { + if (!active) + throw new Sparql.Error.UNSUPPORTED ("Store is not active"); + n_updates++; + ensure_signal_timeout (); + var cancellable = create_cancellable (client_id); + yield conn.load_async (file, cancellable); + n_updates--; } public static void unreg_batches (string client_id) { - unowned List<Task> list, cur; - unowned Queue<Task> queue; - - for (int i = 0; i < running_tasks.length; i++) { - unowned QueryTask task = running_tasks[i] as QueryTask; - if (task != null && task.client_id == client_id && task.cancellable != null) { - task.cancellable.cancel (); - } - } + Cancellable cancellable = client_cancellables.lookup (client_id); - for (int i = 0; i < Priority.N_PRIORITIES; i++) { - queue = query_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } - - queue = update_queues[i]; - list = queue.head; - while (list != null) { - cur = list; - list = list.next; - unowned Task task = cur.data; - - if (task != null && task.client_id == client_id) { - queue.delete_link (cur); - - task.error = new DBusError.FAILED ("Client disappeared"); - task.callback (); - } - } + if (cancellable != null) { + cancellable.cancel (); + client_cancellables.remove (client_id); } - - sched (); } public static async void pause () { Tracker.Store.active = false; - if (n_queries_running > 0 || update_running) { - active_callback = pause.callback; - yield; - active_callback = null; - } - - if (AtomicInt.get (ref checkpointing) != 0) { - // this will wait for checkpointing to finish - checkpoint_pool = null; - try { - checkpoint_pool = new ThreadPool<DBInterface> (checkpoint_dispatch_cb, 1, true); - } catch (Error e) { - warning (e.message); - } - } - - if (active) { - sched (); - } + var sparql_conn = Tracker.Main.get_sparql_connection (); + sparql_conn.sync (); } public static void resume () { Tracker.Store.active = true; - - sched (); } private static void on_statements_committed () { |