diff options
author | Sam Thursfield <sam@afuera.me.uk> | 2019-08-21 22:30:55 +0100 |
---|---|---|
committer | Sam Thursfield <sam@afuera.me.uk> | 2019-08-23 12:39:19 +0300 |
commit | 68f73d4302cf48c37d11bbd466befff2f431c228 (patch) | |
tree | 21ad5f98ba84dfb3e75a0f5943d4a29a6bb5dff6 | |
parent | 5e2c5e45d2d3d610a9cc5451d17ddb97f01ff0a4 (diff) | |
download | tracker-68f73d4302cf48c37d11bbd466befff2f431c228.tar.gz |
functional-tests: Merge changes to 'helpers' module from tracker-miners.git
There are various features which were implemented in tracker-miners.git
since it was split from tracker.git which never made their way into
tracker.git.
-rw-r--r-- | utils/trackertestutils/helpers.py | 297 | ||||
-rw-r--r-- | utils/trackertestutils/mainloop.py | 58 |
2 files changed, 332 insertions, 23 deletions
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py index 71685674a..2b218e5d0 100644 --- a/utils/trackertestutils/helpers.py +++ b/utils/trackertestutils/helpers.py @@ -21,20 +21,39 @@ from gi.repository import Gio from gi.repository import GLib +import atexit import logging import os -import re -import sys import subprocess -import time + +from . import mainloop + +log = logging.getLogger(__name__) + + +class GraphUpdateTimeoutException(RuntimeError): + pass class NoMetadataException (Exception): pass + REASONABLE_TIMEOUT = 30 +_process_list = [] + + +def _cleanup_processes(): + for process in _process_list: + log.debug("helpers._cleanup_processes: stopping %s", process) + process.stop() + + +atexit.register(_cleanup_processes) + + class Helper: """ Abstract helper for Tracker processes. Launches the process @@ -62,24 +81,14 @@ class Helper: self.process = None self.available = False - self.loop = GLib.MainLoop() - self.install_glib_excepthook(self.loop) + self.loop = mainloop.MainLoop() self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) - def install_glib_excepthook(self, loop): - """ - Handler to abort test if an exception occurs inside the GLib main loop. - """ - old_hook = sys.excepthook - - def new_hook(etype, evalue, etb): - old_hook(etype, evalue, etb) - GLib.MainLoop.quit(loop) - sys.exit(1) - sys.excepthook = new_hook - def _start_process(self, command_args=None, extra_env=None): + global _process_list + _process_list.append(self) + command = [self.process_path] + (command_args or []) self.log.debug("Starting %s.", ' '.join(command)) @@ -91,7 +100,7 @@ class Helper: try: return subprocess.Popen(command, env=env) except OSError as e: - raise RuntimeError("Error starting %s: %s" % (path, e)) + raise RuntimeError("Error starting %s: %s" % (self.process_path, e)) def _bus_name_appeared(self, connection, name, owner): self.log.debug("%s appeared on the message bus, owned by %s", name, owner) @@ -138,7 +147,7 @@ class Helper: # We expect the _bus_name_vanished callback to be called here, # causing the loop to exit again. - self.loop.run() + self.loop.run_checked() if self.available: # It's running, but we didn't start it... @@ -155,16 +164,17 @@ class Helper: self.abort_if_process_exits_with_status_0 = True # Run the loop until the bus name appears, or the process dies. - self.loop.run() + self.loop.run_checked() self.abort_if_process_exits_with_status_0 = False def stop(self): + global _process_list + if self.process is None: # Seems that it didn't even start... return - start = time.time() if self.process.poll() == None: GLib.source_remove(self.process_startup_timeout) self.process_startup_timeout = 0 @@ -181,12 +191,15 @@ class Helper: self.log.debug("Process stopped.") # Run the loop to handle the expected name_vanished signal. - self.loop.run() + self.loop.run_checked() Gio.bus_unwatch_name(self._bus_name_watch_id) self.process = None + _process_list.remove(self) def kill(self): + global _process_list + if self.process_watch_timeout != 0: GLib.source_remove(self.process_watch_timeout) self.process_watch_timeout = 0 @@ -194,10 +207,11 @@ class Helper: self.process.kill() # Name owner changed callback should take us out from this loop - self.loop.run() + self.loop.run_checked() Gio.bus_unwatch_name(self._bus_name_watch_id) self.process = None + _process_list.remove(self) self.log.debug("Process killed.") @@ -246,9 +260,246 @@ class StoreHelper (Helper): self.status_iface.Wait() self.log.debug("Ready") + self.reset_graph_updates_tracking() + + def signal_handler(proxy, sender_name, signal_name, parameters): + if signal_name == 'GraphUpdated': + self._graph_updated_cb(*parameters.unpack()) + + self.graph_updated_handler_id = self.resources.connect( + 'g-signal', signal_handler) + def stop(self): Helper.stop(self) + if self.graph_updated_handler_id != 0: + self.resources.disconnect(self.graph_updated_handler_id) + + # A system to follow GraphUpdated and make sure all changes are tracked. + # This code saves every change notification received, and exposes methods + # to await insertion or deletion of a certain resource which first check + # the list of events already received and wait for more if the event has + # not yet happened. + + def reset_graph_updates_tracking(self): + self.class_to_track = None + self.inserts_list = [] + self.deletes_list = [] + self.inserts_match_function = None + self.deletes_match_function = None + + def _graph_updated_timeout_cb(self): + raise GraphUpdateTimeoutException() + + def _graph_updated_cb(self, class_name, deletes_list, inserts_list): + """ + Process notifications from tracker-store on resource changes. + """ + exit_loop = False + + if class_name == self.class_to_track: + self.log.debug("GraphUpdated for %s: %i deletes, %i inserts", class_name, len(deletes_list), len(inserts_list)) + + if inserts_list is not None: + if self.inserts_match_function is not None: + # The match function will remove matched entries from the list + (exit_loop, inserts_list) = self.inserts_match_function(inserts_list) + self.inserts_list += inserts_list + + if not exit_loop and deletes_list is not None: + if self.deletes_match_function is not None: + (exit_loop, deletes_list) = self.deletes_match_function(deletes_list) + self.deletes_list += deletes_list + + if exit_loop: + GLib.source_remove(self.graph_updated_timeout_id) + self.graph_updated_timeout_id = 0 + self.loop.quit() + else: + self.log.debug("Ignoring GraphUpdated for class %s, currently tracking %s", class_name, self.class_to_track) + + def _enable_await_timeout(self): + self.graph_updated_timeout_id = GLib.timeout_add_seconds(REASONABLE_TIMEOUT, + self._graph_updated_timeout_cb) + + def await_resource_inserted(self, rdf_class, url=None, title=None, required_property=None): + """ + Block until a resource matching the parameters becomes available + """ + assert (self.inserts_match_function == None) + assert (self.class_to_track == None), "Already waiting for resource of type %s" % self.class_to_track + + self.class_to_track = rdf_class + + self.matched_resource_urn = None + self.matched_resource_id = None + + self.log.debug("Await new %s (%i existing inserts)", rdf_class, len(self.inserts_list)) + + if required_property is not None: + required_property_id = self.get_resource_id_by_uri(required_property) + self.log.debug("Required property %s id %i", required_property, required_property_id) + + def find_resource_insertion(inserts_list): + matched_creation = (self.matched_resource_id is not None) + matched_required_property = False + remaining_events = [] + + # FIXME: this could be done in an easier way: build one query that filters + # based on every subject id in inserts_list, and returns the id of the one + # that matched :) + for insert in inserts_list: + id = insert[1] + + if not matched_creation: + where = " ?urn a <%s> " % rdf_class + + if url is not None: + where += "; nie:url \"%s\"" % url + + if title is not None: + where += "; nie:title \"%s\"" % title + + query = "SELECT ?urn WHERE { %s FILTER (tracker:id(?urn) = %s)}" % (where, insert[1]) + result_set = self.query(query) + + if len(result_set) > 0: + matched_creation = True + self.matched_resource_urn = result_set[0][0] + self.matched_resource_id = insert[1] + self.log.debug("Matched creation of resource %s (%i)", + self.matched_resource_urn, + self.matched_resource_id) + if required_property is not None: + self.log.debug("Waiting for property %s (%i) to be set", + required_property, required_property_id) + + if required_property is not None and matched_creation and not matched_required_property: + if id == self.matched_resource_id and insert[2] == required_property_id: + matched_required_property = True + self.log.debug("Matched %s %s", self.matched_resource_urn, required_property) + + if not matched_creation or id != self.matched_resource_id: + remaining_events += [insert] + + matched = matched_creation if required_property is None else matched_required_property + return matched, remaining_events + + def match_cb(inserts_list): + matched, remaining_events = find_resource_insertion(inserts_list) + exit_loop = matched + return exit_loop, remaining_events + + # Check the list of previously received events for matches + (existing_match, self.inserts_list) = find_resource_insertion(self.inserts_list) + + if not existing_match: + self._enable_await_timeout() + self.inserts_match_function = match_cb + # Run the event loop until the correct notification arrives + try: + self.loop.run_checked() + except GraphUpdateTimeoutException: + raise GraphUpdateTimeoutException("Timeout waiting for resource: class %s, URL %s, title %s" % (rdf_class, url, title)) from None + self.inserts_match_function = None + + self.class_to_track = None + return (self.matched_resource_id, self.matched_resource_urn) + + def await_resource_deleted(self, rdf_class, id): + """ + Block until we are notified of a resources deletion + """ + assert (self.deletes_match_function == None) + assert (self.class_to_track == None) + + def find_resource_deletion(deletes_list): + self.log.debug("find_resource_deletion: looking for %i in %s", id, deletes_list) + + matched = False + remaining_events = [] + + for delete in deletes_list: + if delete[1] == id: + matched = True + else: + remaining_events += [delete] + + return matched, remaining_events + + def match_cb(deletes_list): + matched, remaining_events = find_resource_deletion(deletes_list) + exit_loop = matched + return exit_loop, remaining_events + + self.log.debug("Await deletion of %i (%i existing)", id, len(self.deletes_list)) + + (existing_match, self.deletes_list) = find_resource_deletion(self.deletes_list) + + if not existing_match: + self._enable_await_timeout() + self.class_to_track = rdf_class + self.deletes_match_function = match_cb + # Run the event loop until the correct notification arrives + try: + self.loop.run_checked() + except GraphUpdateTimeoutException: + raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id) + self.deletes_match_function = None + self.class_to_track = None + + return + + def await_property_changed(self, rdf_class, subject_id, property_uri): + """ + Block until a property of a resource is updated or inserted. + """ + assert (self.inserts_match_function == None) + assert (self.deletes_match_function == None) + assert (self.class_to_track == None) + + self.log.debug("Await change to %i %s (%i, %i existing)", subject_id, property_uri, len(self.inserts_list), len(self.deletes_list)) + + self.class_to_track = rdf_class + + property_id = self.get_resource_id_by_uri(property_uri) + + def find_property_change(event_list): + matched = False + remaining_events = [] + + for event in event_list: + if event[1] == subject_id and event[2] == property_id: + self.log.debug("Matched property change: %s", str(event)) + matched = True + else: + remaining_events += [event] + + return matched, remaining_events + + def match_cb(event_list): + matched, remaining_events = find_property_change(event_list) + exit_loop = matched + return exit_loop, remaining_events + + # Check the list of previously received events for matches + (existing_match, self.inserts_list) = find_property_change(self.inserts_list) + (existing_match, self.deletes_list) = find_property_change(self.deletes_list) + + if not existing_match: + self._enable_await_timeout() + self.inserts_match_function = match_cb + self.deletes_match_function = match_cb + # Run the event loop until the correct notification arrives + try: + self.loop.run_checked() + except GraphUpdateTimeoutException: + raise GraphUpdateTimeoutException( + "Timeout waiting for property change, subject %i property %s (%i)" % (subject_id, property_uri, property_id)) + self.inserts_match_function = None + self.deletes_match_function = None + self.class_to_track = None + # Note: The methods below call the tracker-store D-Bus API directly. This # is useful for testing this API surface, but we recommand that all regular # applications use libtracker-sparql library to talk to the database. diff --git a/utils/trackertestutils/mainloop.py b/utils/trackertestutils/mainloop.py new file mode 100644 index 000000000..1e7a46c87 --- /dev/null +++ b/utils/trackertestutils/mainloop.py @@ -0,0 +1,58 @@ +# Copyright (C) 2018, Sam Thursfield <sam@afuera.me.uk> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +from gi.repository import GLib + +import sys + + +class MainLoop(): + '''Wrapper for GLib.MainLoop that propagates any unhandled exceptions. + + PyGObject doesn't seem to provide any help with propagating exceptions from + the GLib main loop to the main Python execution context. The default + behaviour is to print a message and continue, which is useless for tests as + it means tests appear to pass when in fact they are broken. + + ''' + + def __init__(self): + self._loop = GLib.MainLoop.new(None, 0) + + def quit(self): + self._loop.quit() + + def run_checked(self): + '''Run the loop until quit(), then raise any unhandled exception.''' + self._exception = None + + old_hook = sys.excepthook + + def new_hook(etype, evalue, etb): + self._loop.quit() + self._exception = evalue + old_hook(etype, evalue, etb) + + try: + sys.excepthook = new_hook + self._loop.run() + finally: + sys.excepthook = old_hook + + if self._exception: + raise self._exception |