summaryrefslogtreecommitdiff
path: root/src/tracker-store/tracker-store.vala
blob: 03cc8078aca5ed3ab624390111552a8329e50295 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
/*
 * Copyright (C) 2009-2011, Nokia <ivan.frade@nokia.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 *
 * You should have received a copy of the GNU General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
 * Boston, MA  02110-1301, USA.
 *
 * Author: Philip Van Hoof <philip@codeminded.be>
 */

public class Tracker.Store {
	const int MAX_CONCURRENT_QUERIES = 2;

	const int MAX_TASK_TIME = 30;
	const int GRAPH_UPDATED_IMMEDIATE_EMIT_AT = 50000;

	static int max_task_time;
	static bool active;

	static Tracker.Config config;
	static uint signal_timeout;
	static int n_updates;

	static HashTable<string, Cancellable> client_cancellables;

	public delegate void SignalEmissionFunc (HashTable<Tracker.Class, Tracker.Events.Batch>? graph_updated, HashTable<int, GLib.Array<int>>? writeback);
	static unowned SignalEmissionFunc signal_callback;

	public delegate void SparqlQueryInThread (Sparql.Cursor cursor) throws Error;

	class CursorTask {
		public Sparql.Cursor cursor;
		public unowned SourceFunc callback;
		public unowned SparqlQueryInThread thread_func;
		public Error error;

		public CursorTask (Sparql.Cursor cursor) {
			this.cursor = cursor;
		}
	}

	static ThreadPool<CursorTask> cursor_pool;

	private static void cursor_dispatch_cb (owned CursorTask task) {
		try {
			task.thread_func (task.cursor);
		} catch (Error e) {
			task.error = e;
		}

		Idle.add (() => {
			task.callback ();
			return false;
		});
	}

	public static void init (Tracker.Config config_p) {
		string max_task_time_env = Environment.get_variable ("TRACKER_STORE_MAX_TASK_TIME");
		if (max_task_time_env != null) {
			max_task_time = int.parse (max_task_time_env);
		} else {
			max_task_time = MAX_TASK_TIME;
		}

		client_cancellables = new HashTable <string, Cancellable> (str_hash, str_equal);

		try {
			cursor_pool = new ThreadPool<CursorTask>.with_owned_data (cursor_dispatch_cb, 16, false);
		} catch (Error e) {
			// Ignore harmless error
		}

		/* as the following settings are global for unknown reasons,
		   let's use the same settings as gio, otherwise the used settings
		   are rather random */
		ThreadPool.set_max_idle_time (15 * 1000);
		ThreadPool.set_max_unused_threads (2);

		config = config_p;
	}

	public static void shutdown () {
		if (signal_timeout != 0) {
			Source.remove (signal_timeout);
			signal_timeout = 0;
		}
	}

	private static Cancellable create_cancellable (string client_id) {
		var client_cancellable = client_cancellables.lookup (client_id);

		if (client_cancellable == null) {
			client_cancellable = new Cancellable ();
			client_cancellables.insert (client_id, client_cancellable);
		}

		var task_cancellable = new Cancellable ();
		client_cancellable.connect (() => {
			task_cancellable.cancel ();
		});

		return task_cancellable;
	}

	private static void do_emit_signals () {
		signal_callback (Tracker.Events.get_pending (), Tracker.Writeback.get_ready ());
	}

	private static void ensure_signal_timeout () {
		if (signal_timeout == 0) {
			signal_timeout = Timeout.add (config.graphupdated_delay, () => {
				do_emit_signals ();
				if (n_updates == 0) {
					signal_timeout = 0;
					return false;
				} else {
					return true;
				}
			});
		}
	}

	public static async void sparql_query (Tracker.Direct.Connection conn, string sparql, int priority, SparqlQueryInThread in_thread, string client_id) throws Error {
		var cancellable = create_cancellable (client_id);
		uint timeout_id = 0;

		if (max_task_time != 0) {
			timeout_id = Timeout.add_seconds (max_task_time, () => {
				cancellable.cancel ();
				timeout_id = 0;
				return false;
			});
		}

		var cursor = yield conn.query_async (sparql, cancellable);

		if (timeout_id != 0)
			GLib.Source.remove (timeout_id);

		var task = new CursorTask (cursor);
		task.thread_func = in_thread;
		task.callback = sparql_query.callback;

		try {
			cursor_pool.add (task);
		} catch (Error e) {
			// Ignore harmless error
		}

		yield;

		if (task.error != null)
			throw task.error;
	}

	public static async void sparql_update (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
		if (!active)
			throw new Sparql.Error.UNSUPPORTED ("Store is not active");
		n_updates++;
		ensure_signal_timeout ();
		var cancellable = create_cancellable (client_id);
		yield conn.update_async (sparql, priority, cancellable);
		n_updates--;
	}

	public static async Variant sparql_update_blank (Tracker.Direct.Connection conn, string sparql, int priority, string client_id) throws Error {
		if (!active)
			throw new Sparql.Error.UNSUPPORTED ("Store is not active");
		n_updates++;
		ensure_signal_timeout ();
		var cancellable = create_cancellable (client_id);
		var nodes = yield conn.update_blank_async (sparql, priority, cancellable);
		n_updates--;

		return nodes;
	}

	public static async void queue_turtle_import (Tracker.Direct.Connection conn, File file, string client_id) throws Error {
		if (!active)
			throw new Sparql.Error.UNSUPPORTED ("Store is not active");
		n_updates++;
		ensure_signal_timeout ();
		var cancellable = create_cancellable (client_id);
		yield conn.load_async (file, cancellable);
		n_updates--;
	}

	public static void unreg_batches (string client_id) {
		Cancellable cancellable = client_cancellables.lookup (client_id);

		if (cancellable != null) {
			cancellable.cancel ();
			client_cancellables.remove (client_id);
		}
	}

	public static async void pause () {
		Tracker.Store.active = false;

		var sparql_conn = Tracker.Main.get_sparql_connection ();
		sparql_conn.sync ();
	}

	public static void resume () {
		Tracker.Store.active = true;
	}

	private static void on_statements_committed () {
		Tracker.Events.transact ();
		Tracker.Writeback.transact ();
		check_graph_updated_signal ();
	}

	private static void on_statements_rolled_back () {
		Tracker.Events.reset_pending ();
		Tracker.Writeback.reset_pending ();
	}

	private static void check_graph_updated_signal () {
		/* Check for whether we need an immediate emit */
		if (Tracker.Events.get_total () > GRAPH_UPDATED_IMMEDIATE_EMIT_AT) {
			// immediately emit signals for already committed transaction
			Idle.add (() => {
				do_emit_signals ();
				return false;
			});
		}
	}

	private static void on_statement_inserted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
		Tracker.Events.add_insert (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
		Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
	}

	private static void on_statement_deleted (int graph_id, string? graph, int subject_id, string subject, int pred_id, int object_id, string? object, PtrArray rdf_types) {
		Tracker.Events.add_delete (graph_id, subject_id, subject, pred_id, object_id, object, rdf_types);
		Tracker.Writeback.check (graph_id, graph, subject_id, subject, pred_id, object_id, object, rdf_types);
	}

	public static void enable_signals () {
		var data_manager = Tracker.Main.get_data_manager ();
		var data = data_manager.get_data ();
		data.add_insert_statement_callback (on_statement_inserted);
		data.add_delete_statement_callback (on_statement_deleted);
		data.add_commit_statement_callback (on_statements_committed);
		data.add_rollback_statement_callback (on_statements_rolled_back);
	}

	public static void disable_signals () {
		var data_manager = Tracker.Main.get_data_manager ();
		var data = data_manager.get_data ();
		data.remove_insert_statement_callback (on_statement_inserted);
		data.remove_delete_statement_callback (on_statement_deleted);
		data.remove_commit_statement_callback (on_statements_committed);
		data.remove_rollback_statement_callback (on_statements_rolled_back);
	}

	public static void set_signal_callback (SignalEmissionFunc? func) {
		signal_callback = func;
	}
}