diff options
Diffstat (limited to 'tools/libinput-replay.py')
-rwxr-xr-x | tools/libinput-replay.py | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/tools/libinput-replay.py b/tools/libinput-replay.py new file mode 100755 index 00000000..527b9d05 --- /dev/null +++ b/tools/libinput-replay.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +# vim: set expandtab shiftwidth=4: +# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */ +# +# Copyright © 2018 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice (including the next +# paragraph) shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +import os +import sys +import time +import math +import multiprocessing +import argparse +from pathlib import Path + +try: + import libevdev + import yaml + import pyudev +except ModuleNotFoundError as e: + print("Error: {}".format(e), file=sys.stderr) + print( + "One or more python modules are missing. Please install those " + "modules and re-run this tool." + ) + sys.exit(1) + + +SUPPORTED_FILE_VERSION = 1 + + +def error(msg, **kwargs): + print(msg, **kwargs, file=sys.stderr) + + +class YamlException(Exception): + pass + + +def fetch(yaml, key): + """Helper function to avoid confusing a YAML error with a + normal KeyError bug""" + try: + return yaml[key] + except KeyError: + raise YamlException("Failed to get '{}' from recording.".format(key)) + + +def check_udev_properties(yaml_data, uinput): + """ + Compare the properties our new uinput device has with the ones from the + recording and ring the alarm bell if one of them is off. + """ + yaml_udev_section = fetch(yaml_data, "udev") + yaml_udev_props = fetch(yaml_udev_section, "properties") + yaml_props = { + k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props] + } + try: + # We don't assign this one to virtual devices + del yaml_props["LIBINPUT_DEVICE_GROUP"] + except KeyError: + pass + + # give udev some time to catch up + time.sleep(0.2) + context = pyudev.Context() + udev_device = pyudev.Devices.from_device_file(context, uinput.devnode) + for name, value in udev_device.properties.items(): + if name in yaml_props: + if yaml_props[name] != value: + error( + f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}" + ) + del yaml_props[name] + else: + # The list of properties we add to the recording, see libinput-record.c + prefixes = ( + "ID_INPUT", + "LIBINPUT", + "EVDEV_ABS", + "MOUSE_DPI", + "POINTINGSTICK_", + ) + for prefix in prefixes: + if name.startswith(prefix): + error(f"Warning: unexpected property: {name}={value}") + + # the ones we found above were removed from the dict + for name, value in yaml_props.items(): + error(f"Warning: device is missing recorded udev property: {name}={value}") + + +def create(device): + evdev = fetch(device, "evdev") + + d = libevdev.Device() + d.name = fetch(evdev, "name") + + ids = fetch(evdev, "id") + if len(ids) != 4: + raise YamlException("Invalid ID format: {}".format(ids)) + d.id = dict(zip(["bustype", "vendor", "product", "version"], ids)) + + codes = fetch(evdev, "codes") + for evtype, evcodes in codes.items(): + for code in evcodes: + data = None + if evtype == libevdev.EV_ABS.value: + values = fetch(evdev, "absinfo")[code] + absinfo = libevdev.InputAbsInfo( + minimum=values[0], + maximum=values[1], + fuzz=values[2], + flat=values[3], + resolution=values[4], + ) + data = absinfo + elif evtype == libevdev.EV_REP.value: + if code == libevdev.EV_REP.REP_DELAY.value: + data = 500 + elif code == libevdev.EV_REP.REP_PERIOD.value: + data = 20 + d.enable(libevdev.evbit(evtype, code), data=data) + + properties = fetch(evdev, "properties") + for prop in properties: + d.enable(libevdev.propbit(prop)) + + uinput = d.create_uinput_device() + + check_udev_properties(device, uinput) + + return uinput + + +def print_events(devnode, indent, evs): + devnode = os.path.basename(devnode) + for e in evs: + print( + "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format( + devnode, + " " * (indent * 8), + e.sec, + e.usec, + e.type.name, + e.code.name, + e.value, + ) + ) + + +def replay(device, verbose): + events = fetch(device, "events") + if events is None: + return + uinput = device["__uinput"] + + # The first event may have a nonzero offset but we want to replay + # immediately regardless. When replaying multiple devices, the first + # offset is the offset from the first event on any device. + offset = time.time() - device["__first_event_offset"] + + if offset < 0: + error("WARNING: event time offset is in the future, refusing to replay") + return + + # each 'evdev' set contains one SYN_REPORT so we only need to check for + # the time offset once per event + for event in events: + try: + evdev = fetch(event, "evdev") + except YamlException: + continue + + (sec, usec, evtype, evcode, value) = evdev[0] + evtime = sec + usec / 1e6 + offset + now = time.time() + + if evtime - now > 150 / 1e6: # 150 µs error margin + time.sleep(evtime - now - 150 / 1e6) + + evs = [ + libevdev.InputEvent( + libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1] + ) + for e in evdev + ] + uinput.send_events(evs) + if verbose: + print_events(uinput.devnode, device["__index"], evs) + + +def first_timestamp(device): + events = fetch(device, "events") + for e in events or []: + try: + evdev = fetch(e, "evdev") + (sec, usec, *_) = evdev[0] + return sec + usec / 1.0e6 + except YamlException: + pass + + return None + + +def wrap(func, *args): + try: + func(*args) + except KeyboardInterrupt: + pass + + +def loop(args, recording): + devices = fetch(recording, "devices") + + first_timestamps = tuple( + filter(lambda x: x is not None, [first_timestamp(d) for d in devices]) + ) + # All devices need to start replaying at the same time, so let's find + # the very first event and offset everything by that timestamp. + toffset = min(first_timestamps or [math.inf]) + + for idx, d in enumerate(devices): + uinput = create(d) + print("{}: {}".format(uinput.devnode, uinput.name)) + d["__uinput"] = uinput # cheaper to hide it in the dict then work around it + d["__index"] = idx + d["__first_event_offset"] = toffset + + if not first_timestamps: + input("No events in recording. Hit enter to quit") + return + + while True: + input("Hit enter to start replaying") + + processes = [] + for d in devices: + p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose)) + processes.append(p) + + for p in processes: + p.start() + + for p in processes: + p.join() + + del processes + + +def create_device_quirk(device): + try: + quirks = fetch(device, "quirks") + if not quirks: + return None + except YamlException: + return None + # Where the device has a quirk, we match on name, vendor and product. + # That's the best match we can assemble here from the info we have. + evdev = fetch(device, "evdev") + name = fetch(evdev, "name") + id = fetch(evdev, "id") + quirk = ( + "[libinput-replay {name}]\n" + "MatchName={name}\n" + "MatchVendor=0x{id[1]:04X}\n" + "MatchProduct=0x{id[2]:04X}\n" + ).format(name=name, id=id) + quirk += "\n".join(quirks) + return quirk + + +def setup_quirks(recording): + devices = fetch(recording, "devices") + overrides = None + quirks = [] + for d in devices: + if "quirks" in d: + quirk = create_device_quirk(d) + if quirk: + quirks.append(quirk) + if not quirks: + return None + + overrides = Path("/etc/libinput/local-overrides.quirks") + if overrides.exists(): + print( + "{} exists, please move it out of the way first".format(overrides), + file=sys.stderr, + ) + sys.exit(1) + + overrides.parent.mkdir(exist_ok=True) + with overrides.open("w+") as fd: + fd.write("# This file was generated by libinput replay\n") + fd.write("# Unless libinput replay is running right now, remove this file.\n") + fd.write("\n\n".join(quirks)) + + return overrides + + +def check_file(recording): + version = fetch(recording, "version") + if version != SUPPORTED_FILE_VERSION: + raise YamlException( + "Invalid file format: {}, expected {}".format( + version, SUPPORTED_FILE_VERSION + ) + ) + + ndevices = fetch(recording, "ndevices") + devices = fetch(recording, "devices") + if ndevices != len(devices): + error( + "WARNING: truncated file, expected {} devices, got {}".format( + ndevices, len(devices) + ) + ) + + +def main(): + parser = argparse.ArgumentParser(description="Replay a device recording") + parser.add_argument( + "recording", + metavar="recorded-file.yaml", + type=str, + help="Path to device recording", + ) + parser.add_argument("--verbose", action="store_true") + args = parser.parse_args() + + quirks_file = None + + try: + with open(args.recording) as f: + y = yaml.safe_load(f) + check_file(y) + quirks_file = setup_quirks(y) + loop(args, y) + except KeyboardInterrupt: + pass + except (PermissionError, OSError) as e: + error("Error: failed to open device: {}".format(e)) + except YamlException as e: + error("Error: failed to parse recording: {}".format(e)) + finally: + if quirks_file: + quirks_file.unlink() + + +if __name__ == "__main__": + main() |