diff options
authorSam Thursfield <>2019-08-21 22:30:55 +0100
committerSam Thursfield <>2019-08-23 12:39:19 +0300
commit68f73d4302cf48c37d11bbd466befff2f431c228 (patch)
parent5e2c5e45d2d3d610a9cc5451d17ddb97f01ff0a4 (diff)
functional-tests: Merge changes to 'helpers' module from tracker-miners.git
There are various features which were implemented in tracker-miners.git since it was split from tracker.git which never made their way into tracker.git.
2 files changed, 332 insertions, 23 deletions
diff --git a/utils/trackertestutils/ b/utils/trackertestutils/
index 71685674a..2b218e5d0 100644
--- a/utils/trackertestutils/
+++ b/utils/trackertestutils/
@@ -21,20 +21,39 @@
from gi.repository import Gio
from gi.repository import GLib
+import atexit
import logging
import os
-import re
-import sys
import subprocess
-import time
+from . import mainloop
+log = logging.getLogger(__name__)
+class GraphUpdateTimeoutException(RuntimeError):
+ pass
class NoMetadataException (Exception):
+_process_list = []
+def _cleanup_processes():
+ for process in _process_list:
+ log.debug("helpers._cleanup_processes: stopping %s", process)
+ process.stop()
class Helper:
Abstract helper for Tracker processes. Launches the process
@@ -62,24 +81,14 @@ class Helper:
self.process = None
self.available = False
- self.loop = GLib.MainLoop()
- self.install_glib_excepthook(self.loop)
+ self.loop = mainloop.MainLoop()
self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
- def install_glib_excepthook(self, loop):
- """
- Handler to abort test if an exception occurs inside the GLib main loop.
- """
- old_hook = sys.excepthook
- def new_hook(etype, evalue, etb):
- old_hook(etype, evalue, etb)
- GLib.MainLoop.quit(loop)
- sys.exit(1)
- sys.excepthook = new_hook
def _start_process(self, command_args=None, extra_env=None):
+ global _process_list
+ _process_list.append(self)
command = [self.process_path] + (command_args or [])
self.log.debug("Starting %s.", ' '.join(command))
@@ -91,7 +100,7 @@ class Helper:
return subprocess.Popen(command, env=env)
except OSError as e:
- raise RuntimeError("Error starting %s: %s" % (path, e))
+ raise RuntimeError("Error starting %s: %s" % (self.process_path, e))
def _bus_name_appeared(self, connection, name, owner):
self.log.debug("%s appeared on the message bus, owned by %s", name, owner)
@@ -138,7 +147,7 @@ class Helper:
# We expect the _bus_name_vanished callback to be called here,
# causing the loop to exit again.
+ self.loop.run_checked()
if self.available:
# It's running, but we didn't start it...
@@ -155,16 +164,17 @@ class Helper:
self.abort_if_process_exits_with_status_0 = True
# Run the loop until the bus name appears, or the process dies.
+ self.loop.run_checked()
self.abort_if_process_exits_with_status_0 = False
def stop(self):
+ global _process_list
if self.process is None:
# Seems that it didn't even start...
- start = time.time()
if self.process.poll() == None:
self.process_startup_timeout = 0
@@ -181,12 +191,15 @@ class Helper:
self.log.debug("Process stopped.")
# Run the loop to handle the expected name_vanished signal.
+ self.loop.run_checked()
self.process = None
+ _process_list.remove(self)
def kill(self):
+ global _process_list
if self.process_watch_timeout != 0:
self.process_watch_timeout = 0
@@ -194,10 +207,11 @@ class Helper:
# Name owner changed callback should take us out from this loop
+ self.loop.run_checked()
self.process = None
+ _process_list.remove(self)
self.log.debug("Process killed.")
@@ -246,9 +260,246 @@ class StoreHelper (Helper):
+ self.reset_graph_updates_tracking()
+ def signal_handler(proxy, sender_name, signal_name, parameters):
+ if signal_name == 'GraphUpdated':
+ self._graph_updated_cb(*parameters.unpack())
+ self.graph_updated_handler_id = self.resources.connect(
+ 'g-signal', signal_handler)
def stop(self):
+ if self.graph_updated_handler_id != 0:
+ self.resources.disconnect(self.graph_updated_handler_id)
+ # A system to follow GraphUpdated and make sure all changes are tracked.
+ # This code saves every change notification received, and exposes methods
+ # to await insertion or deletion of a certain resource which first check
+ # the list of events already received and wait for more if the event has
+ # not yet happened.
+ def reset_graph_updates_tracking(self):
+ self.class_to_track = None
+ self.inserts_list = []
+ self.deletes_list = []
+ self.inserts_match_function = None
+ self.deletes_match_function = None
+ def _graph_updated_timeout_cb(self):
+ raise GraphUpdateTimeoutException()
+ def _graph_updated_cb(self, class_name, deletes_list, inserts_list):
+ """
+ Process notifications from tracker-store on resource changes.
+ """
+ exit_loop = False
+ if class_name == self.class_to_track:
+ self.log.debug("GraphUpdated for %s: %i deletes, %i inserts", class_name, len(deletes_list), len(inserts_list))
+ if inserts_list is not None:
+ if self.inserts_match_function is not None:
+ # The match function will remove matched entries from the list
+ (exit_loop, inserts_list) = self.inserts_match_function(inserts_list)
+ self.inserts_list += inserts_list
+ if not exit_loop and deletes_list is not None:
+ if self.deletes_match_function is not None:
+ (exit_loop, deletes_list) = self.deletes_match_function(deletes_list)
+ self.deletes_list += deletes_list
+ if exit_loop:
+ GLib.source_remove(self.graph_updated_timeout_id)
+ self.graph_updated_timeout_id = 0
+ self.loop.quit()
+ else:
+ self.log.debug("Ignoring GraphUpdated for class %s, currently tracking %s", class_name, self.class_to_track)
+ def _enable_await_timeout(self):
+ self.graph_updated_timeout_id = GLib.timeout_add_seconds(REASONABLE_TIMEOUT,
+ self._graph_updated_timeout_cb)
+ def await_resource_inserted(self, rdf_class, url=None, title=None, required_property=None):
+ """
+ Block until a resource matching the parameters becomes available
+ """
+ assert (self.inserts_match_function == None)
+ assert (self.class_to_track == None), "Already waiting for resource of type %s" % self.class_to_track
+ self.class_to_track = rdf_class
+ self.matched_resource_urn = None
+ self.matched_resource_id = None
+ self.log.debug("Await new %s (%i existing inserts)", rdf_class, len(self.inserts_list))
+ if required_property is not None:
+ required_property_id = self.get_resource_id_by_uri(required_property)
+ self.log.debug("Required property %s id %i", required_property, required_property_id)
+ def find_resource_insertion(inserts_list):
+ matched_creation = (self.matched_resource_id is not None)
+ matched_required_property = False
+ remaining_events = []
+ # FIXME: this could be done in an easier way: build one query that filters
+ # based on every subject id in inserts_list, and returns the id of the one
+ # that matched :)
+ for insert in inserts_list:
+ id = insert[1]
+ if not matched_creation:
+ where = " ?urn a <%s> " % rdf_class
+ if url is not None:
+ where += "; nie:url \"%s\"" % url
+ if title is not None:
+ where += "; nie:title \"%s\"" % title
+ query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1])
+ result_set = self.query(query)
+ if len(result_set) > 0:
+ matched_creation = True
+ self.matched_resource_urn = result_set[0][0]
+ self.matched_resource_id = insert[1]
+ self.log.debug("Matched creation of resource %s (%i)",
+ self.matched_resource_urn,
+ self.matched_resource_id)
+ if required_property is not None:
+ self.log.debug("Waiting for property %s (%i) to be set",
+ required_property, required_property_id)
+ if required_property is not None and matched_creation and not matched_required_property:
+ if id == self.matched_resource_id and insert[2] == required_property_id:
+ matched_required_property = True
+ self.log.debug("Matched %s %s", self.matched_resource_urn, required_property)
+ if not matched_creation or id != self.matched_resource_id:
+ remaining_events += [insert]
+ matched = matched_creation if required_property is None else matched_required_property
+ return matched, remaining_events
+ def match_cb(inserts_list):
+ matched, remaining_events = find_resource_insertion(inserts_list)
+ exit_loop = matched
+ return exit_loop, remaining_events
+ # Check the list of previously received events for matches
+ (existing_match, self.inserts_list) = find_resource_insertion(self.inserts_list)
+ if not existing_match:
+ self._enable_await_timeout()
+ self.inserts_match_function = match_cb
+ # Run the event loop until the correct notification arrives
+ try:
+ self.loop.run_checked()
+ except GraphUpdateTimeoutException:
+ raise GraphUpdateTimeoutException("Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title)) from None
+ self.inserts_match_function = None
+ self.class_to_track = None
+ return (self.matched_resource_id, self.matched_resource_urn)
+ def await_resource_deleted(self, rdf_class, id):
+ """
+ Block until we are notified of a resources deletion
+ """
+ assert (self.deletes_match_function == None)
+ assert (self.class_to_track == None)
+ def find_resource_deletion(deletes_list):
+ self.log.debug("find_resource_deletion: looking for %i in %s", id, deletes_list)
+ matched = False
+ remaining_events = []
+ for delete in deletes_list:
+ if delete[1] == id:
+ matched = True
+ else:
+ remaining_events += [delete]
+ return matched, remaining_events
+ def match_cb(deletes_list):
+ matched, remaining_events = find_resource_deletion(deletes_list)
+ exit_loop = matched
+ return exit_loop, remaining_events
+ self.log.debug("Await deletion of %i (%i existing)", id, len(self.deletes_list))
+ (existing_match, self.deletes_list) = find_resource_deletion(self.deletes_list)
+ if not existing_match:
+ self._enable_await_timeout()
+ self.class_to_track = rdf_class
+ self.deletes_match_function = match_cb
+ # Run the event loop until the correct notification arrives
+ try:
+ self.loop.run_checked()
+ except GraphUpdateTimeoutException:
+ raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id)
+ self.deletes_match_function = None
+ self.class_to_track = None
+ return
+ def await_property_changed(self, rdf_class, subject_id, property_uri):
+ """
+ Block until a property of a resource is updated or inserted.
+ """
+ assert (self.inserts_match_function == None)
+ assert (self.deletes_match_function == None)
+ assert (self.class_to_track == None)
+ self.log.debug("Await change to %i %s (%i, %i existing)", subject_id, property_uri, len(self.inserts_list), len(self.deletes_list))
+ self.class_to_track = rdf_class
+ property_id = self.get_resource_id_by_uri(property_uri)
+ def find_property_change(event_list):
+ matched = False
+ remaining_events = []
+ for event in event_list:
+ if event[1] == subject_id and event[2] == property_id:
+ self.log.debug("Matched property change: %s", str(event))
+ matched = True
+ else:
+ remaining_events += [event]
+ return matched, remaining_events
+ def match_cb(event_list):
+ matched, remaining_events = find_property_change(event_list)
+ exit_loop = matched
+ return exit_loop, remaining_events
+ # Check the list of previously received events for matches
+ (existing_match, self.inserts_list) = find_property_change(self.inserts_list)
+ (existing_match, self.deletes_list) = find_property_change(self.deletes_list)
+ if not existing_match:
+ self._enable_await_timeout()
+ self.inserts_match_function = match_cb
+ self.deletes_match_function = match_cb
+ # Run the event loop until the correct notification arrives
+ try:
+ self.loop.run_checked()
+ except GraphUpdateTimeoutException:
+ raise GraphUpdateTimeoutException(
+ "Timeout waiting for property change, subject %i property %s (%i)" % (subject_id, property_uri, property_id))
+ self.inserts_match_function = None
+ self.deletes_match_function = None
+ self.class_to_track = None
# Note: The methods below call the tracker-store D-Bus API directly. This
# is useful for testing this API surface, but we recommand that all regular
# applications use libtracker-sparql library to talk to the database.
diff --git a/utils/trackertestutils/ b/utils/trackertestutils/
new file mode 100644
index 000000000..1e7a46c87
--- /dev/null
+++ b/utils/trackertestutils/
@@ -0,0 +1,58 @@
+# Copyright (C) 2018, Sam Thursfield <>
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+from gi.repository import GLib
+import sys
+class MainLoop():
+ '''Wrapper for GLib.MainLoop that propagates any unhandled exceptions.
+ PyGObject doesn't seem to provide any help with propagating exceptions from
+ the GLib main loop to the main Python execution context. The default
+ behaviour is to print a message and continue, which is useless for tests as
+ it means tests appear to pass when in fact they are broken.
+ '''
+ def __init__(self):
+ self._loop =, 0)
+ def quit(self):
+ self._loop.quit()
+ def run_checked(self):
+ '''Run the loop until quit(), then raise any unhandled exception.'''
+ self._exception = None
+ old_hook = sys.excepthook
+ def new_hook(etype, evalue, etb):
+ self._loop.quit()
+ self._exception = evalue
+ old_hook(etype, evalue, etb)
+ try:
+ sys.excepthook = new_hook
+ finally:
+ sys.excepthook = old_hook
+ if self._exception:
+ raise self._exception