summaryrefslogtreecommitdiff
path: root/tools/libinput-replay.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/libinput-replay.py')
-rwxr-xr-xtools/libinput-replay.py371
1 files changed, 371 insertions, 0 deletions
diff --git a/tools/libinput-replay.py b/tools/libinput-replay.py
new file mode 100755
index 00000000..527b9d05
--- /dev/null
+++ b/tools/libinput-replay.py
@@ -0,0 +1,371 @@
+#!/usr/bin/env python3
+# vim: set expandtab shiftwidth=4:
+# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
+#
+# Copyright © 2018 Red Hat, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice (including the next
+# paragraph) shall be included in all copies or substantial portions of the
+# Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+# DEALINGS IN THE SOFTWARE.
+
+import os
+import sys
+import time
+import math
+import multiprocessing
+import argparse
+from pathlib import Path
+
+try:
+ import libevdev
+ import yaml
+ import pyudev
+except ModuleNotFoundError as e:
+ print("Error: {}".format(e), file=sys.stderr)
+ print(
+ "One or more python modules are missing. Please install those "
+ "modules and re-run this tool."
+ )
+ sys.exit(1)
+
+
+SUPPORTED_FILE_VERSION = 1
+
+
+def error(msg, **kwargs):
+ print(msg, **kwargs, file=sys.stderr)
+
+
+class YamlException(Exception):
+ pass
+
+
+def fetch(yaml, key):
+ """Helper function to avoid confusing a YAML error with a
+ normal KeyError bug"""
+ try:
+ return yaml[key]
+ except KeyError:
+ raise YamlException("Failed to get '{}' from recording.".format(key))
+
+
+def check_udev_properties(yaml_data, uinput):
+ """
+ Compare the properties our new uinput device has with the ones from the
+ recording and ring the alarm bell if one of them is off.
+ """
+ yaml_udev_section = fetch(yaml_data, "udev")
+ yaml_udev_props = fetch(yaml_udev_section, "properties")
+ yaml_props = {
+ k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props]
+ }
+ try:
+ # We don't assign this one to virtual devices
+ del yaml_props["LIBINPUT_DEVICE_GROUP"]
+ except KeyError:
+ pass
+
+ # give udev some time to catch up
+ time.sleep(0.2)
+ context = pyudev.Context()
+ udev_device = pyudev.Devices.from_device_file(context, uinput.devnode)
+ for name, value in udev_device.properties.items():
+ if name in yaml_props:
+ if yaml_props[name] != value:
+ error(
+ f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}"
+ )
+ del yaml_props[name]
+ else:
+ # The list of properties we add to the recording, see libinput-record.c
+ prefixes = (
+ "ID_INPUT",
+ "LIBINPUT",
+ "EVDEV_ABS",
+ "MOUSE_DPI",
+ "POINTINGSTICK_",
+ )
+ for prefix in prefixes:
+ if name.startswith(prefix):
+ error(f"Warning: unexpected property: {name}={value}")
+
+ # the ones we found above were removed from the dict
+ for name, value in yaml_props.items():
+ error(f"Warning: device is missing recorded udev property: {name}={value}")
+
+
+def create(device):
+ evdev = fetch(device, "evdev")
+
+ d = libevdev.Device()
+ d.name = fetch(evdev, "name")
+
+ ids = fetch(evdev, "id")
+ if len(ids) != 4:
+ raise YamlException("Invalid ID format: {}".format(ids))
+ d.id = dict(zip(["bustype", "vendor", "product", "version"], ids))
+
+ codes = fetch(evdev, "codes")
+ for evtype, evcodes in codes.items():
+ for code in evcodes:
+ data = None
+ if evtype == libevdev.EV_ABS.value:
+ values = fetch(evdev, "absinfo")[code]
+ absinfo = libevdev.InputAbsInfo(
+ minimum=values[0],
+ maximum=values[1],
+ fuzz=values[2],
+ flat=values[3],
+ resolution=values[4],
+ )
+ data = absinfo
+ elif evtype == libevdev.EV_REP.value:
+ if code == libevdev.EV_REP.REP_DELAY.value:
+ data = 500
+ elif code == libevdev.EV_REP.REP_PERIOD.value:
+ data = 20
+ d.enable(libevdev.evbit(evtype, code), data=data)
+
+ properties = fetch(evdev, "properties")
+ for prop in properties:
+ d.enable(libevdev.propbit(prop))
+
+ uinput = d.create_uinput_device()
+
+ check_udev_properties(device, uinput)
+
+ return uinput
+
+
+def print_events(devnode, indent, evs):
+ devnode = os.path.basename(devnode)
+ for e in evs:
+ print(
+ "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
+ devnode,
+ " " * (indent * 8),
+ e.sec,
+ e.usec,
+ e.type.name,
+ e.code.name,
+ e.value,
+ )
+ )
+
+
+def replay(device, verbose):
+ events = fetch(device, "events")
+ if events is None:
+ return
+ uinput = device["__uinput"]
+
+ # The first event may have a nonzero offset but we want to replay
+ # immediately regardless. When replaying multiple devices, the first
+ # offset is the offset from the first event on any device.
+ offset = time.time() - device["__first_event_offset"]
+
+ if offset < 0:
+ error("WARNING: event time offset is in the future, refusing to replay")
+ return
+
+ # each 'evdev' set contains one SYN_REPORT so we only need to check for
+ # the time offset once per event
+ for event in events:
+ try:
+ evdev = fetch(event, "evdev")
+ except YamlException:
+ continue
+
+ (sec, usec, evtype, evcode, value) = evdev[0]
+ evtime = sec + usec / 1e6 + offset
+ now = time.time()
+
+ if evtime - now > 150 / 1e6: # 150 µs error margin
+ time.sleep(evtime - now - 150 / 1e6)
+
+ evs = [
+ libevdev.InputEvent(
+ libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]
+ )
+ for e in evdev
+ ]
+ uinput.send_events(evs)
+ if verbose:
+ print_events(uinput.devnode, device["__index"], evs)
+
+
+def first_timestamp(device):
+ events = fetch(device, "events")
+ for e in events or []:
+ try:
+ evdev = fetch(e, "evdev")
+ (sec, usec, *_) = evdev[0]
+ return sec + usec / 1.0e6
+ except YamlException:
+ pass
+
+ return None
+
+
+def wrap(func, *args):
+ try:
+ func(*args)
+ except KeyboardInterrupt:
+ pass
+
+
+def loop(args, recording):
+ devices = fetch(recording, "devices")
+
+ first_timestamps = tuple(
+ filter(lambda x: x is not None, [first_timestamp(d) for d in devices])
+ )
+ # All devices need to start replaying at the same time, so let's find
+ # the very first event and offset everything by that timestamp.
+ toffset = min(first_timestamps or [math.inf])
+
+ for idx, d in enumerate(devices):
+ uinput = create(d)
+ print("{}: {}".format(uinput.devnode, uinput.name))
+ d["__uinput"] = uinput # cheaper to hide it in the dict then work around it
+ d["__index"] = idx
+ d["__first_event_offset"] = toffset
+
+ if not first_timestamps:
+ input("No events in recording. Hit enter to quit")
+ return
+
+ while True:
+ input("Hit enter to start replaying")
+
+ processes = []
+ for d in devices:
+ p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
+ processes.append(p)
+
+ for p in processes:
+ p.start()
+
+ for p in processes:
+ p.join()
+
+ del processes
+
+
+def create_device_quirk(device):
+ try:
+ quirks = fetch(device, "quirks")
+ if not quirks:
+ return None
+ except YamlException:
+ return None
+ # Where the device has a quirk, we match on name, vendor and product.
+ # That's the best match we can assemble here from the info we have.
+ evdev = fetch(device, "evdev")
+ name = fetch(evdev, "name")
+ id = fetch(evdev, "id")
+ quirk = (
+ "[libinput-replay {name}]\n"
+ "MatchName={name}\n"
+ "MatchVendor=0x{id[1]:04X}\n"
+ "MatchProduct=0x{id[2]:04X}\n"
+ ).format(name=name, id=id)
+ quirk += "\n".join(quirks)
+ return quirk
+
+
+def setup_quirks(recording):
+ devices = fetch(recording, "devices")
+ overrides = None
+ quirks = []
+ for d in devices:
+ if "quirks" in d:
+ quirk = create_device_quirk(d)
+ if quirk:
+ quirks.append(quirk)
+ if not quirks:
+ return None
+
+ overrides = Path("/etc/libinput/local-overrides.quirks")
+ if overrides.exists():
+ print(
+ "{} exists, please move it out of the way first".format(overrides),
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ overrides.parent.mkdir(exist_ok=True)
+ with overrides.open("w+") as fd:
+ fd.write("# This file was generated by libinput replay\n")
+ fd.write("# Unless libinput replay is running right now, remove this file.\n")
+ fd.write("\n\n".join(quirks))
+
+ return overrides
+
+
+def check_file(recording):
+ version = fetch(recording, "version")
+ if version != SUPPORTED_FILE_VERSION:
+ raise YamlException(
+ "Invalid file format: {}, expected {}".format(
+ version, SUPPORTED_FILE_VERSION
+ )
+ )
+
+ ndevices = fetch(recording, "ndevices")
+ devices = fetch(recording, "devices")
+ if ndevices != len(devices):
+ error(
+ "WARNING: truncated file, expected {} devices, got {}".format(
+ ndevices, len(devices)
+ )
+ )
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Replay a device recording")
+ parser.add_argument(
+ "recording",
+ metavar="recorded-file.yaml",
+ type=str,
+ help="Path to device recording",
+ )
+ parser.add_argument("--verbose", action="store_true")
+ args = parser.parse_args()
+
+ quirks_file = None
+
+ try:
+ with open(args.recording) as f:
+ y = yaml.safe_load(f)
+ check_file(y)
+ quirks_file = setup_quirks(y)
+ loop(args, y)
+ except KeyboardInterrupt:
+ pass
+ except (PermissionError, OSError) as e:
+ error("Error: failed to open device: {}".format(e))
+ except YamlException as e:
+ error("Error: failed to parse recording: {}".format(e))
+ finally:
+ if quirks_file:
+ quirks_file.unlink()
+
+
+if __name__ == "__main__":
+ main()