#!/usr/bin/python3 import argparse import contextlib from evdev import ecodes, ff, uinput, InputDevice import json import logging import os import selectors import stat import sys class Controller: _file = None _in_devices = None _logger = None def __init__(self, path, sel, devices, logger): self._logger = logger.getChild(str(self)) self._in_devices = devices fd = os.open(path, os.O_RDWR | os.O_NONBLOCK) try: self._file = os.fdopen(fd) except: os.close(fd) raise def __str__(self): return 'Controller' def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def close(self): self._file.close() def fileno(self): return self._file.fileno() def process(self, sel_key): for line in self._file: cmd, _, rest = line.rstrip('\n').partition(' ') self._logger.debug('Processing command: %s %s', cmd, rest) if cmd == 'add': self._in_devices.add(rest) elif cmd == 'remove': self._in_devices.remove(rest) elif cmd == 'list': for d in self._in_devices: sys.stderr.write(d + '\n') elif cmd == 'clear': self._in_devices.clear() else: self._logger.error('Unknown command: %s', line) class UInputDevice: _dev = None _tgt_devices = None _logger = None def __init__(self, desc_path, devices, logger): self._logger = logger.getChild(str(self)) self._tgt_devices = devices with open(desc_path, 'rb') as f: desc = json.load(f) if not 'linux-evdev-device-desc' in desc: raise ValueError('device_desc is not a valid device description') if desc['linux-evdev-device-desc'] > 0: raise ValueError('Unsupported device description version: %d' % desc['linux-evdev-device-desc']) events = {} for key, val in desc['capabilities'].items(): events[int(key)] = val self._dev = uinput.UInput(events = events, name = desc['name'], vendor = desc['info']['vendor'], product = desc['info']['product'], version = desc['info']['version'], bustype = desc['info']['bustype']) self._logger.info('Created UInput device "%s": %s' % (self._dev.name, self._dev.device.path)) def __str__(self): return 'UInputDevice' def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._dev.close() def fileno(self): return self._dev.fileno() def write_event(self, ev): self._dev.write_event(ev) def process(self, sel_key): for ev in self._dev.read(): self._logger.debug('processing event: %s', ev) if ev.type == ecodes.EV_FF: # force feedback command, forward to devices self._logger.debug('Forwarding an FF event') self._tgt_devices.write_ff_event(ev) elif ev.type == ecodes.EV_UINPUT: if ev.code == ecodes.UI_FF_UPLOAD: upload = self._dev.begin_upload(ev.value) self._logger.info('Effect upload: id %d; type 0x%x direction 0x%x', upload.effect.id, upload.effect.type, upload.effect.direction) try: self._tgt_devices.upload_effect(upload.effect) upload.retval = 0 except: upload.retval = -1 raise finally: self._dev.end_upload(upload) elif ev.code == ecodes.UI_FF_ERASE: erase = self._dev.begin_erase(ev.value) self._logger.info('Effect erase: %d', erase.effect_id) try: self._tgt_devices.erase_effect(erase.effect_id) erase.retval = 0 except: erase.retval = -1 raise finally: self._dev.end_erase(erase) elif ev.type == ecodes.EV_MSC and ev.type == ecodes.MSC_SCAN: self._logger.debug('Ignoring MSC_SCAN: %d', ev.value) else: self._logger.warning('Unhandled event on the virtual device: %s', ev) class PhysDevices(dict): _sel = None _logger = None # dict of FF effects currently uploaded to the uinput device # indexed by their id on the uinput device _ff_effects = None # FF effect ids are assigned by the driver, so we need to translate # between effect ids on the uinput device and the individual physical # devices # this dict mapps each device path (same keys as self) to a dict that # maps the caller's FF effect ids to that device's ids _ff_effect_idmap = None uinput_dev = None def __init__(self, sel, logger): self._sel = sel self._logger = logger.getChild(str(self)) self._ff_effects = {} self._ff_effect_idmap = {} super().__init__(self) def __str__(self): return 'PhysDevices(%d)' % len(self) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def clear(self): for path in list(self.keys()): self.remove(path) def close(self): self.clear() def add(self, path): if path in self: raise KeyError("Device already added: %s" % path) d = InputDevice(path) try: self._sel.register(d, selectors.EVENT_READ, self) # upload FF effects that already exist on the virtual device idmap = {} for eid, effect in self._ff_effects.items(): # make a copy of the effect and reset its ID, to let # the driver know we are uploading a new effect effect_d = ff.Effect(type = effect.type, direction = effect.direction, ff_trigger = effect.ff_trigger, ff_replay = effect.ff_replay, u = effect.u, id = -1) did = d.upload_effect(effect_d) idmap[eid] = did self._ff_effect_idmap[path] = idmap self[path] = d except: d.close() raise self._logger.info('Added: %s', str(d)) def remove(self, path): d = self.pop(path) self._logger.info('Removing: %s', str(d)) # erase any uploaded FF effects for did in self._ff_effect_idmap[path].values(): d.erase_effect(did) del self._ff_effect_idmap[path] self._sel.unregister(d) d.close() def upload_effect(self, effect): # upload the effect to all currently tracked devices for name, d in self.items(): effect_d = ff.Effect(type = effect.type, direction = effect.direction, ff_trigger = effect.ff_trigger, ff_replay = effect.ff_replay, u = effect.u, id = -1) did = d.upload_effect(effect_d) self._ff_effect_idmap[name][effect.id] = did # store the effect for upload to any newly added devices self._ff_effects[effect.id] = effect def erase_effect(self, effect_id): if not effect_id in self._ff_effects: raise KeyError('No such effect uploaded: %d' % effect_id) for name, d in self.items(): did = self._ff_effect_idmap[name].pop(effect_id) d.erase_effect(did) del self._ff_effects[effect_id] def write_ff_event(self, ev): for name, d in self.items(): c = ev.code if c in self._ff_effects: c = self._ff_effect_idmap[name][c] d.write(ecodes.EV_FF, c, ev.value) def process(self, sel_key): try: for ev in sel_key.fileobj.read(): self._logger.debug('phys->uinput: %s', ev) if ev.type == ecodes.EV_FF: self._logger.debug('disregarding FF event') continue self.uinput_dev.write_event(ev) except OSError as e: self._logger.error('Error reading from device "%s": %s', str(sel_key.fileobj), os.strerror(e.errno)) self.remove(sel_key.fileobj.path) parser = argparse.ArgumentParser() parser.add_argument('-v', '--verbose', action = 'count', default = 0) parser.add_argument('-q', '--quiet', action = 'count', default = 0) parser.add_argument('-c', '--control') parser.add_argument('device_desc') args = parser.parse_args(sys.argv[1:]) # setup logging # default to 20 (INFO), every -q goes a level up, every -v a level down log_level = max(10 * (2 + args.quiet - args.verbose), 0) print(log_level) log_format = '%(asctime)s:%(name)s:%(levelname)s: %(message)s' logging.basicConfig(format = log_format, datefmt = '%F %T', level = log_level) logger = logging.getLogger(os.path.basename(sys.argv[0])) # the device description argument is either a full path or the root name of # a file in ~/.local/var/inputfwd/ desc_path = os.path.expanduser(args.device_desc) if not desc_path or desc_path[0] != '/': desc_path = os.path.join(os.path.expanduser('~/.local/var/inputfwd/'), args.device_desc + '.desc') if not os.path.exists(desc_path): logger.error('Device description "%s"->"%s" does not exist', args.device_desc, desc_path) sys.exit(1) # by default the control FIFO is in ~/.cache/inputfwd/ control_path = args.control if not control_path: control_path = os.path.join(os.path.expanduser('~/.cache/inputfwd'), os.path.splitext(os.path.basename(desc_path))[0]) # create the FIFO if it does not exist yet if not os.path.exists(control_path): logger.info('Creating control FIFO: %s', control_path) os.makedirs(os.path.dirname(control_path), exist_ok = True) os.mkfifo(control_path, mode = 0o600) else: st = os.stat(control_path) if not stat.S_ISFIFO(st.st_mode): logger.error('Control path "%s" is not a FIFO', control_path) sys.exit(1) with contextlib.ExitStack() as stack: sel = stack.enter_context(selectors.DefaultSelector()) devices = stack.enter_context(PhysDevices(sel, logger)) uinput = stack.enter_context(UInputDevice(desc_path, devices, logger)) controller = stack.enter_context(Controller(control_path, sel, devices, logger)) devices.uinput_dev = uinput sel.register(controller, selectors.EVENT_READ) sel.register(uinput, selectors.EVENT_READ) while True: logger.debug('polling') events = sel.select() logger.debug('got events:') for key, mask in events: tgt = key.data if key.data else key.fileobj try: tgt.process(key) except Exception as e: logging.exception('Error processing event on: %s', str(tgt))