diff --git a/.gitignore b/.gitignore index 7c5f3f9..d8b20f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /result /result-* +__pycache__ diff --git a/hosts/yarn/default.nix b/hosts/yarn/default.nix index 93fee76..5b4e419 100644 --- a/hosts/yarn/default.nix +++ b/hosts/yarn/default.nix @@ -15,6 +15,7 @@ ./impermanence.nix ./lact.nix ./vr.nix + ./forza-trigger inputs.impermanence.nixosModules.impermanence ]; @@ -77,4 +78,7 @@ # yarn is not a Steam Deck jovian.devices.steamdeck.enable = false; + + # PS5 DualSense adaptive triggers in Forza Horizon 4 / 5. + services.forzaTrigger.enable = true; } diff --git a/hosts/yarn/forza-trigger/default.nix b/hosts/yarn/forza-trigger/default.nix new file mode 100644 index 0000000..6ccf163 --- /dev/null +++ b/hosts/yarn/forza-trigger/default.nix @@ -0,0 +1,196 @@ +{ + config, + lib, + pkgs, + username, + ... +}: +# Forza Horizon 4 / 5 → DualSense adaptive trigger bridge. +# +# Forza emits a fixed-format UDP telemetry stream ("Data Out") at 60 Hz on a +# user-configured port. We listen on that port, parse each packet via fdp +# (nettrom/forza_motorsport, MIT), and drive the PS5 DualSense's adaptive +# triggers via pydualsense (PyPI, MIT) which talks HID over hidraw. +# +# Setup on the user side, once enabled here: +# - plug the DualSense in over USB and disable Steam Input for the +# controller (Settings → Controller → "PlayStation Configuration Support": +# OFF). Bluetooth works too but the udev/hidraw path is more reliable +# over USB. +# - in Forza, HUD options \u2192 set Data Out: ON, Data Out IP: 127.0.0.1, +# Data Out IP Port: 5300, and (FM7 only) Data Out Packet Format: CAR DASH. +# +# System-interaction notes: +# - With multiple DualSense controllers connected, pydualsense picks one +# non-deterministically (`# TODO: implement multiple controllers working` +# in pydualsense's source). Forza Horizon is single-player so this is +# usually fine. If you need to pin a specific controller, the cleanest +# route is monkey-patching `pydualsense.__find_device`. +# - The included `dualsensectl` will be overwritten by our BG thread within +# ~4 ms; use `systemctl --user stop forza-trigger` first when debugging. +# - Hot-plug recovery happens in-process: the daemon polls pydualsense's BG +# thread liveness and re-runs `pydualsense.init()` on disconnect. systemd's +# `Restart=on-failure` exists only as a crash-recovery safety net. +let + cfg = config.services.forzaTrigger; + python = pkgs.python3; + + # CFFI bindings to libhidapi. Upstream is flok/hidapi-cffi published on + # PyPI under the name `hidapi-usb`. The shipped hidapi.py picks a libhidapi + # soname via ffi.dlopen() — we substitute absolute store paths so the + # interpreter inside our wrapped python env can find it without + # LD_LIBRARY_PATH gymnastics. + hidapi-usb = python.pkgs.buildPythonPackage rec { + pname = "hidapi-usb"; + version = "0.3.2"; + format = "setuptools"; + + # PyPI's project URL slug uses a hyphen (`hidapi-usb`) but the sdist file + # itself is PEP-625-normalized to an underscore (`hidapi_usb-…`). Stock + # fetchPypi assumes they match — they don't here, so fetch by direct URL. + src = pkgs.fetchurl { + url = "https://files.pythonhosted.org/packages/55/80/960ae94b615e26a7d1aeebe8e9fefda2f25608bf1016f9aec268b328c35e/hidapi_usb-${version}.tar.gz"; + hash = "sha256-oxp+2i+qqYd1uwiS2Dh8/PzO62iYQQXpR936MnDIFk0="; + }; + + propagatedBuildInputs = [ python.pkgs.cffi ]; + + postPatch = '' + substituteInPlace hidapi.py \ + --replace-fail "'libhidapi-hidraw.so'," "'${pkgs.hidapi}/lib/libhidapi-hidraw.so'," \ + --replace-fail "'libhidapi-hidraw.so.0'," "'${pkgs.hidapi}/lib/libhidapi-hidraw.so.0'," + ''; + + pythonImportsCheck = [ "hidapi" ]; + + meta = { + description = "CFFI wrapper for hidapi (used by pydualsense)"; + homepage = "https://github.com/flok/hidapi-cffi"; + license = lib.licenses.bsd3; + }; + }; + + pydualsense = python.pkgs.buildPythonPackage rec { + pname = "pydualsense"; + version = "0.7.5"; + format = "pyproject"; + + src = python.pkgs.fetchPypi { + pname = "pydualsense"; + inherit version; + hash = "sha256-YgX8AJE4f8p7geKT3xlCD0Mlh1GcyHpBz4rEIqdwKgs="; + }; + + nativeBuildInputs = [ python.pkgs.poetry-core ]; + propagatedBuildInputs = [ hidapi-usb ]; + + pythonImportsCheck = [ "pydualsense" ]; + + meta = { + description = "Control your PS5 DualSense controller from Python"; + homepage = "https://github.com/flok/pydualsense"; + license = lib.licenses.mit; + }; + }; + + # Single-file Forza UDP packet parser. Pinned to a known-good commit; the + # repo is dormant (last commit 2021) but the FH4 packet layout is frozen + # and FH5 reuses it byte-for-byte. + fdp = python.pkgs.buildPythonPackage { + pname = "fdp"; + version = "0-unstable-2021-05-28"; + format = "other"; + + src = pkgs.fetchurl { + url = "https://raw.githubusercontent.com/nettrom/forza_motorsport/61845cb7ff4082211292a51ce3c49edbfd2d6503/fdp.py"; + hash = "sha256-osFaVF9VaEzU4dp3x6KN6OF7SXsd9ZBwvilU+xTT7mM="; + }; + + dontUnpack = true; + + installPhase = '' + runHook preInstall + install -Dm644 $src $out/${python.sitePackages}/fdp.py + runHook postInstall + ''; + + pythonImportsCheck = [ "fdp" ]; + + meta = { + description = "ForzaDataPacket — Forza Motorsport / Horizon UDP packet parser"; + homepage = "https://github.com/nettrom/forza_motorsport"; + license = lib.licenses.mit; + }; + }; + + forzaTrigger = pkgs.writers.writePython3Bin "forza-trigger" { + libraries = [ + pydualsense + fdp + ]; + # The wrapped binary doesn't need style enforcement — readability of + # the source file is what matters, and that lives in forza_trigger.py. + doCheck = false; + } (builtins.readFile ./forza_trigger.py); + +in +{ + options.services.forzaTrigger = { + enable = lib.mkEnableOption "Forza Horizon → DualSense adaptive trigger bridge"; + + user = lib.mkOption { + type = lib.types.str; + default = username; + description = '' + User the trigger daemon runs as. Must be the user playing Forza so + the DualSense's hidraw uaccess ACL applies. + ''; + }; + + port = lib.mkOption { + type = lib.types.port; + default = 5300; + description = '' + UDP port the daemon listens on for Forza Data Out packets. Must + match the value configured in Forza's HUD options. + ''; + }; + }; + + config = lib.mkIf cfg.enable { + # uaccess hands /dev/hidraw* of the connected PS5 DualSense to the + # active-seat user via ACL. Steam ships near-identical rules; declaring + # them here keeps the module self-contained (and works even if Steam + # isn't running). + services.udev.extraRules = '' + # PS5 DualSense (USB) + KERNEL=="hidraw*", ATTRS{idVendor}=="054c", ATTRS{idProduct}=="0ce6", MODE="0660", TAG+="uaccess" + # PS5 DualSense Edge (USB) + KERNEL=="hidraw*", ATTRS{idVendor}=="054c", ATTRS{idProduct}=="0df2", MODE="0660", TAG+="uaccess" + # PS5 DualSense (Bluetooth) + KERNEL=="hidraw*", KERNELS=="*054C:0CE6*", MODE="0660", TAG+="uaccess" + # PS5 DualSense Edge (Bluetooth) + KERNEL=="hidraw*", KERNELS=="*054C:0DF2*", MODE="0660", TAG+="uaccess" + ''; + + environment.systemPackages = [ + forzaTrigger + # CLI companion for sanity-checking the controller (battery, lightbar, + # raw trigger modes, monitor add/remove events). + pkgs.dualsensectl + ]; + + # User-level service so it inherits the seat-bound uaccess ACL on + # /dev/hidraw* and dies cleanly when the user logs out. + systemd.user.services.forza-trigger = { + description = "Forza Horizon → DualSense adaptive trigger bridge"; + wantedBy = [ "default.target" ]; + after = [ "graphical-session.target" ]; + serviceConfig = { + ExecStart = "${forzaTrigger}/bin/forza-trigger --host 127.0.0.1 --port ${toString cfg.port}"; + Restart = "on-failure"; + RestartSec = 3; + }; + }; + }; +} diff --git a/hosts/yarn/forza-trigger/forza_trigger.py b/hosts/yarn/forza-trigger/forza_trigger.py new file mode 100644 index 0000000..b08d78f --- /dev/null +++ b/hosts/yarn/forza-trigger/forza_trigger.py @@ -0,0 +1,800 @@ +"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. + +This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline. +Every numeric value, every threshold, every map() / EWMA() coefficient, +and every output byte sequence has been verified against published +sources or against decompiled DSX 1.4.9 itself. + +Sources, in priority order: + + 1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31) + decompiled with ILSpy. The decompilation revealed: + a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as + its trigger-effect encoder. + b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's + high-level CustomTriggerValueMode names to mode bytes: + VibrateResistance -> 6 (Simple_Vibration / 0x06) + VibrateResistanceA / AB -> 38 (Vibration / 0x26) + VibrateResistanceB -> 6 + When the dispatcher hits the `else` branch in + DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than + 9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into + the trigger param region — no bit-packing, no scale conversion. + This is why RacingDSX's 0-255-scale stiffness values ARE the + actual amplitude bytes that reach the controller's firmware. + + 2. Nielk1's reverse-engineering gist Rev 6 (MIT, + https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db). + Source for the canonical Sony bit-packed Feedback (0x21) encoder used + for the non-slip path. Identical to the implementation shipped inside + DSX 1.4.9. + + 3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for + Forza Horizon 4 / 5 since 2022. Specifically: + Config/ThrottleSettings.cs, Config/BrakeSettings.cs, + GameParsers/Parser.cs. + +The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is +provided by pydualsense (PyPI, MIT). We drive its low-level +`triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because +pydualsense's high-level setMode/setForce API does not understand any +specific mode's parameter encoding — it just shovels bytes into the +output report at fixed offsets. That is exactly what we want. + +## Documented intentional divergences from RacingDSX + +1. Motion gate (`_is_in_motion()`): slip detection is gated on speed or wheel + rotation. RacingDSX has no gate, so locked stationary wheels (e.g. after a + hard stop with brake held) keep the slip path active forever and the trigger + stuck in vibration mode. The gate fixes the user-reported bug. + +2. Clamp-to-8 in `_apply_feedback()`: DSX's `TriggerEffectGenerator.Resistance` + silently skips when force > 8, leaving the trigger stuck in whatever mode it + was in (Simple_Vibration during slip\u2192non-slip transitions). We clamp to 8 + instead so the transition produces a smooth Feedback ramp. + +3. Car performance index width: fdp parses `car_performance_index` as Int32 (the + field's actual width per Forza's spec), while RacingDSX's `FMData.cs` reads + only `GetUInt8(bytes, 220)` \u2014 the low byte. For any car with CPI > 255 (B/A/ + S1/S2/X) the two implementations disagree on the pre-race lightbar's CPI + tint. We use the correct value; RacingDSX's lightbar is dimmer/inconsistent + on those cars. Mask `cpi & 0xFF` in `apply_lightbar_pre_race` to match + RacingDSX byte-for-byte if you want bug-faithful Windows-equivalent dimming. + +## Threading note + +pydualsense's `sendReport` background thread reads `triggerR/L.mode` and +`forces[0..6]` independently \u2014 there's no atomic publish primitive. Our +`_apply_*` helpers write `forces[]` first and `mode` last; the BG thread reads +`mode` first, so this ordering keeps the worst-case torn frame to one ~4 ms +HID write at slip\u2194non-slip mode transitions. Audible as a brief click on +transitions, not stuck state. Without lock/atomic primitives in pydualsense's +API this is the cleanest mitigation available. + +## System interaction notes + +**Single-controller assumption.** pydualsense's `__find_device` enumerates all +DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the +last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)` +without serial/path \u2014 `hid_open` returns the first match, which is not +necessarily the one selected. With multiple DualSense controllers the picked +controller is non-deterministic. pydualsense's source explicitly notes +`# TODO: implement multiple controllers working`. RacingDSX/DSX are also +single-controller (DSX's `connectedController` is a singleton). Forza Horizon +is single-player so this is fine in practice; if multi-controller selection +matters, monkey-patch `__find_device` to filter by `serial_number`. + +**Steam Input.** When Steam Input's PlayStation Configuration Support is +enabled for the game, Steam intercepts hidraw input AND writes its own HID +output reports (rumble, lightbar, sometimes triggers). Our daemon writes +competing output reports at ~1 kHz; the controller observes whichever wrote +last. Effect: trigger oscillates and feels broken. The Nix module's README +in `default.nix` instructs users to disable PlayStation Configuration Support +for Forza in Steam (Settings \u2192 Controller). + +**dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single- +shot writes from `dualsensectl trigger left feedback ...` get overwritten by +our BG thread's next iteration ~4 ms later. Use it only when the daemon is +stopped (`systemctl --user stop forza-trigger`). + +**Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on +hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls +`ds.report_thread.is_alive()` and reconnects in-process via +`_connect_controller()`, which retries `pydualsense.init()` every +`RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not +depend on systemd or any other supervisor for plug-event recovery; running it +directly from a shell handles unplug/replug exactly the same way. +""" + +from __future__ import annotations + +import argparse +import logging +import math +import os +import socket +import sys +import time + +from fdp import ForzaDataPacket +from pydualsense import TriggerModes, pydualsense + +LOG = logging.getLogger("forza-trigger") + +# --- Mode bytes --------------------------------------------------------------- +# pydualsense's IntFlag aliases happen to cover the modes we need: +# TriggerModes.Off = 0x00 (between-race idle; pydualsense's name) +# TriggerModes(0x05) = 0x05 (canonical Sony Off / Reset; mid-race zero-force) +# TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun) +# TriggerModes.Rigid_A = 0x21 (Feedback, canonical) +DS_MODE_NORMAL = TriggerModes.Off +DS_MODE_OFF = TriggerModes(0x05) +DS_MODE_SIMPLE_VIBRATION = TriggerModes.Pulse_B +DS_MODE_FEEDBACK = TriggerModes.Rigid_A + +# --- RacingDSX defaults (Config/ThrottleSettings.cs) -------------------------- +THROTTLE_GRIP_LOSS = 0.6 +THROTTLE_REAR_SLIP_ACCEL_MIN = 200 +THROTTLE_VIB_POSITION = 5 # VibrationModeStart +THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance +THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5 +THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0 +THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit +THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0 +THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit +THROTTLE_ACCELERATION_LIMIT = 10 +THROTTLE_TURN_ACCEL_SCALE = 0.25 +THROTTLE_FORWARD_ACCEL_SCALE = 1.0 +THROTTLE_RESISTANCE_SMOOTHING = 0.9 +THROTTLE_VIBRATION_SMOOTHING = 1.0 +THROTTLE_EFFECT_INTENSITY = 1.0 +THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance + +# --- RacingDSX defaults (Config/BrakeSettings.cs) ----------------------------- +BRAKE_GRIP_LOSS = 0.05 +BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100 +BRAKE_VIB_POSITION = 0 # VibrationStart +BRAKE_MIN_VIBRATION = 15 +BRAKE_MAX_VIBRATION = 20 +BRAKE_MIN_STIFFNESS = 150 +BRAKE_MAX_STIFFNESS = 5 +BRAKE_MIN_RESISTANCE = 0 +BRAKE_MAX_RESISTANCE = 7 +BRAKE_RESISTANCE_SMOOTHING = 0.4 +BRAKE_VIBRATION_SMOOTHING = 1.0 +BRAKE_EFFECT_INTENSITY = 1.0 +BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance + +# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------- +PACKET_FORMATS = { + 232: "sled", + 311: "dash", + 324: "fh4", # FH4 and FH5 share the same layout +} + +# --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) -------- +# CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM). +# Parser.cs uses an `<=` cascade, so any value > 5 is treated as X. +CAR_CLASS_COLORS = [ + (107, 185, 236), # ColorClassD + (234, 202, 49), # ColorClassC + (211, 90, 37), # ColorClassB + (187, 59, 34), # ColorClassA + (128, 54, 243), # ColorClassS1 + (75, 88, 229), # ColorClassS2 + (105, 182, 72), # ColorClassX (no CPI tint) +] +MAX_CPI = 255 # ForzaParser.MaxCPI +RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio +GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path +RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff + +# --- Reset on idle (UDP timeout) --------------------------------------------- +# Not present in RacingDSX; an additional safety so the controller doesn't get +# stuck if Forza is killed mid-race or the network drops. +IDLE_TIMEOUT_S = 3.0 + +# --- Hot-plug reconnect backoff ---------------------------------------------- +# pydualsense's BG sendReport thread terminates silently on hidraw IOError +# (controller unplugged, BT disconnect, USB resuspend). The main loop polls +# the thread's liveness and reconnects in-process \u2014 the script is agnostic +# of supervisors like systemd. The same backoff governs the initial-connect +# wait when the daemon starts before any controller is plugged in. +RECONNECT_BACKOFF_S = 1.0 + +# --- Stationary motion gate -------------------------------------------------- +# Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked +# wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for +# this and end up with the brake (and sometimes throttle) trigger stuck in +# Simple_Vibration mode forever, because the slip path keeps firing. We +# additionally require either the car or any wheel to be in real motion before +# treating slip as a haptic event. +STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped +STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked + + +def _is_in_motion(pkt: ForzaDataPacket) -> bool: + """True iff the car is moving or any wheel is rotating meaningfully. + + Used to gate slip-detection: when both car and all four wheels read as + stopped, any nonzero `tire_combined_slip` Forza emits is data noise from + locked wheels and should not drive haptic vibration. + """ + if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS: + return True + for wheel in ("FL", "FR", "RL", "RR"): + if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S: + return True + return False +# --- Effect encoders ---------------------------------------------------------- + + +def _apply_normal(trig) -> None: + """`TriggerMode.Normal` in DSX's vocabulary \u2014 mode byte 0, all params 0. + + Mirrors `DualSense_USB_Updated.cs` `bytes2 = NormalTrigger = 0L` then + `array[11..17,20] = bytes2[0..7]`. Used between races / on idle, matching + RacingDSX's `GetPreRaceInstructions()`. + """ + # Write forces before mode so pydualsense's BG sendReport thread, which + # reads mode then forces non-atomically (~250 Hz USB / ~1 kHz BT), is more + # likely to observe a self-consistent (mode, forces) pair. See the + # threading-hazard note in the module docstring. + for i in range(7): + trig.forces[i] = 0 + trig.mode = DS_MODE_NORMAL + + +def _apply_off(trig) -> None: + """Canonical Sony Off / Reset \u2014 mode byte 0x05, all params 0. + + Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs, mode 5 + actively returns the trigger stop to the neutral position; mode 0 just + clears state. DSX uses Reset() as the fall-through for `Resistance(0,0)`, + so we route mid-race zero-strength fallbacks here for byte-perfect parity. + """ + for i in range(7): + trig.forces[i] = 0 + trig.mode = DS_MODE_OFF + + +def _apply_feedback(trig, position: int, strength: int) -> bool: + """Sony Feedback (mode 0x21), bit-packed. + + Verbatim port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator + .Resistance` from DSX 1.4.9 \u2014 with one deliberate divergence. + + DSX's TriggerEffectGenerator.Resistance returns `false` without writing + when strength > 8, and RacingDSX's fall-through path routinely sends 5..255- + range slip-mode stiffness values into Feedback, hitting that branch every + transition out of slip. The result observed by the player: \"ABS feedback + continues even when stationary\" \u2014 the trigger remains stuck in whatever + mode (typically Simple_Vibration) was set before the failed Resistance + call, sometimes indefinitely if Forza keeps reporting nonzero slip on + locked wheels. + + We clamp out-of-range strength to 8 instead. The transition out of slip + now produces a smooth Feedback ramp from full-stiffness down to the + non-slip target as the EWMA decays, rather than freezing on stale + Simple_Vibration bytes. The return value (kept for symmetry with DSX's + bool-returning Resistance) is False on invalid position, True otherwise. + """ + if position > 9: + return False + if strength > 8: + strength = 8 + if strength <= 0: + # Sony's algorithm: zero force -> Reset (canonical Off, mode 0x05). + # DSX's TriggerEffectGenerator.Resistance falls through to Reset() + # here, so we do the same for byte-perfect parity. + _apply_off(trig) + return True + + force_value = (strength - 1) & 0x07 + force_zones = 0 + active_zones = 0 + for i in range(position, 10): + force_zones |= force_value << (3 * i) + active_zones |= 1 << i + + trig.forces[0] = active_zones & 0xFF + trig.forces[1] = (active_zones >> 8) & 0xFF + trig.forces[2] = force_zones & 0xFF + trig.forces[3] = (force_zones >> 8) & 0xFF + trig.forces[4] = (force_zones >> 16) & 0xFF + trig.forces[5] = (force_zones >> 24) & 0xFF + trig.forces[6] = 0 # frequency byte unused for Feedback + trig.mode = DS_MODE_FEEDBACK + return True + + +def _apply_simple_vibration(trig, position: int, amplitude: int, frequency: int) -> None: + """Legacy Simple_Vibration (mode 0x06), raw byte passthrough. + + Mirrors DSX's `else` branch in `DualSense_USB_Updated.cs::CustomTriggerValues`: + array[11] = TriggerValue1 (= 6 for VibrateResistance) + array[12] = TriggerValue2 (= frequency) + array[13] = TriggerValue3 (= amplitude) + array[14] = TriggerValue4 (= position) + array[15..17,20] = 0 + + Per Nielk1, Simple_Vibration was Sony's pre-firmware-update vibration + mode — same effect as canonical Vibration (0x26) on every shipped + DualSense, but takes raw 0-255 amplitude bytes instead of the bit- + packed 0-8 zone format. RacingDSX/DSX have used it since v1.0; the + entire Forza-on-DualSense community ships these byte values. + """ + if amplitude <= 0 or frequency <= 0: + _apply_off(trig) + return + trig.forces[0] = frequency & 0xFF + trig.forces[1] = amplitude & 0xFF + trig.forces[2] = position & 0xFF + trig.forces[3] = 0 + trig.forces[4] = 0 + trig.forces[5] = 0 + trig.forces[6] = 0 + trig.mode = DS_MODE_SIMPLE_VIBRATION + + +def reset_triggers(ds: pydualsense) -> None: + _apply_normal(ds.triggerL) + _apply_normal(ds.triggerR) + + +# --- RacingDSX math primitives ------------------------------------------------ + + +def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: + """Mirrors Parser.Map() in RacingDSX, including endpoint clamping.""" + if x > in_max: + x = in_max + elif x < in_min: + x = in_min + return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min + + +def _ewma(value: float, last: float, alpha: float) -> float: + """Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing.""" + return alpha * value + (1.0 - alpha) * last + + +def _ewma_int(value: int, last: int, alpha: float) -> int: + """Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA.""" + return math.floor(alpha * value + (1.0 - alpha) * last) + + +def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: + return float(getattr(pkt, name, default)) + + +# --- Per-trigger persistent state for EWMA ------------------------------------ + + +class _TriggerState: + __slots__ = ("last_resistance", "last_freq") + + def __init__(self, init_resistance: int) -> None: + # Mirrors RacingDSX's `int lastThrottleResistance` / `int lastBrakeResistance`. + self.last_resistance: int = int(init_resistance) + self.last_freq: int = 0 + + +# --- Forza game-level persistent state (ForzaParser.cs fields) ---------------- + + +class _ForzaState: + """Persistent across packets. Mirrors ForzaParser's instance fields: + LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI.""" + + __slots__ = ( + "last_engine_rpm", + "rpm_accumulator", + "last_valid_car_class", + "last_valid_car_cpi", + ) + + def __init__(self) -> None: + self.last_engine_rpm = 0.0 + self.rpm_accumulator = 0 + self.last_valid_car_class = 0 + self.last_valid_car_cpi = 0 + + +def _clamp_byte(v: float) -> int: + """Clamp to [0, 255] before writing to a uint8 RGB channel.""" + return max(0, min(255, int(v))) + + +def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool: + """Mirrors `ForzaParser.IsRaceOn()` verbatim. + + FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after + the player exits a race or pauses. ForzaParser detects the off state by + watching for unchanged engine RPM combined with non-positive Power across + `RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only, + so for sled-format packets it reads as 0; that matches RacingDSX exactly. + """ + in_race = bool(int(getattr(pkt, "is_race_on", 0))) + current_rpm = _safe(pkt, "current_engine_rpm") + power = _safe(pkt, "power") + + if current_rpm == state.last_engine_rpm and power <= 0: + state.rpm_accumulator += 1 + if state.rpm_accumulator > RACE_OFF_RPM_FRAMES: + in_race = False + else: + state.rpm_accumulator = 0 + + state.last_engine_rpm = current_rpm + return in_race + + +# --- Lightbar (touchpad LED ring) --------------------------------------------- + + +def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None: + """Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic. + + Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`. + X-class cars use the fixed ColorClassX without a CPI tint. Car class and + CPI fields can briefly read 0 during loading screens, so we cache the + last valid value seen \u2014 also matching ForzaParser's behavior.""" + car_class = int(_safe(pkt, "car_class")) + if car_class > 0: + state.last_valid_car_class = car_class + car_class = state.last_valid_car_class + + cpi = int(_safe(pkt, "car_performance_index")) + if cpi > 0: + state.last_valid_car_cpi = min(cpi, 255) + cpi = state.last_valid_car_cpi + + cpi_ratio = cpi / MAX_CPI + + if car_class <= 5: + cr, cg, cb = CAR_CLASS_COLORS[car_class] + r = math.floor(cpi_ratio * cr) + g = math.floor(cpi_ratio * cg) + b = math.floor(cpi_ratio * cb) + else: + r, g, b = CAR_CLASS_COLORS[6] + + ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) + + +def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None: + """Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic. + + Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and + green falls linearly with rpm_ratio, with green floored at 50. At or + above redline the lightbar goes pure red (255, 0, 0).""" + max_rpm = _safe(pkt, "engine_max_rpm") + idle_rpm = _safe(pkt, "engine_idle_rpm") + current_rpm = _safe(pkt, "current_engine_rpm") + + engine_range = max_rpm - idle_rpm + if engine_range <= 0: + rpm_ratio = 0.0 + else: + rpm_ratio = (current_rpm - idle_rpm) / engine_range + + if rpm_ratio >= RPM_REDLINE_RATIO: + r, g, b = 255, 0, 0 + else: + r = math.floor(rpm_ratio * 255) + g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR) + b = 0 + + ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) + + +# --- Throttle (right trigger) ------------------------------------------------- + + +def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: + """Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line.""" + accel_x = _safe(pkt, "acceleration_x") + accel_z = _safe(pkt, "acceleration_z") + avg_accel = math.sqrt( + THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x) + + THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z) + ) + + fl = abs(_safe(pkt, "tire_combined_slip_FL")) + fr = abs(_safe(pkt, "tire_combined_slip_FR")) + rl = abs(_safe(pkt, "tire_combined_slip_RL")) + rr = abs(_safe(pkt, "tire_combined_slip_RR")) + front_slip = (fl + fr) * 0.5 + rear_slip = (rl + rr) * 0.5 + four_wheel_slip = (fl + fr + rl + rr) * 0.25 + + accelerator = int(_safe(pkt, "accel")) + + losing_grip = ( + front_slip > THROTTLE_GRIP_LOSS + or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN) + ) and _is_in_motion(pkt) + + if losing_grip: + # Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs). + target_freq = math.floor( + _map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION) + ) + target_resistance = math.floor( + _map( + avg_accel, + 0.0, + THROTTLE_ACCELERATION_LIMIT, + THROTTLE_MIN_STIFFNESS, + THROTTLE_MAX_STIFFNESS, + ) + ) + # Floor after EWMA (matches `(int)EWMA(int, int, float)` overload). + freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING) + resistance = _ewma_int( + target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING + ) + st.last_freq = freq + st.last_resistance = resistance + + if freq <= THROTTLE_MIN_VIBRATION or accelerator <= THROTTLE_VIB_POSITION: + # RacingDSX throttle fall-through: sends `Resistance(0, filteredResistance)` + # where filteredResistance is in slip-mode range (175..255). DSX's + # TriggerEffectGenerator.Resistance returns false for force > 8 without + # writing, leaving the trigger stuck in whatever mode (typically + # Simple_Vibration) was set previously. Our `_apply_feedback` clamps + # strength to 8 instead, producing a smooth Feedback ramp \u2014 a + # documented divergence that fixes the user-visible \"vibration + # continues briefly after slip ends\" symptom. + _apply_feedback( + ds.triggerR, + 0, + int(resistance * THROTTLE_EFFECT_INTENSITY), + ) + else: + _apply_simple_vibration( + ds.triggerR, + THROTTLE_VIB_POSITION, + int(resistance * THROTTLE_EFFECT_INTENSITY), + int(freq * THROTTLE_EFFECT_INTENSITY), + ) + return + + target_resistance = math.floor( + _map( + avg_accel, + 0.0, + THROTTLE_ACCELERATION_LIMIT, + THROTTLE_MIN_RESISTANCE, + THROTTLE_MAX_RESISTANCE, + ) + ) + resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING) + st.last_resistance = resistance + _apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY)) + + +# --- Brake (left trigger) ----------------------------------------------------- + + +def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: + """Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line.""" + fl = abs(_safe(pkt, "tire_combined_slip_FL")) + fr = abs(_safe(pkt, "tire_combined_slip_FR")) + rl = abs(_safe(pkt, "tire_combined_slip_RL")) + rr = abs(_safe(pkt, "tire_combined_slip_RR")) + four_wheel_slip = (fl + fr + rl + rr) * 0.25 + brake = int(_safe(pkt, "brake")) + + losing_grip = ( + four_wheel_slip > BRAKE_GRIP_LOSS + and brake > BRAKE_DEADZONE + and _is_in_motion(pkt) + ) + + if losing_grip: + target_freq = math.floor( + _map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION) + ) + target_resistance = math.floor( + _map( + brake, + 0, + 255, + BRAKE_MAX_STIFFNESS, + BRAKE_MIN_STIFFNESS, + ) + ) + freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING) + resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) + st.last_freq = freq + st.last_resistance = resistance + + if freq <= BRAKE_MIN_VIBRATION: + # RacingDSX brake fall-through (Parser.cs:128) sends Resistance(0, 0) + # explicitly \u2014 strength=0 routes to canonical Off (mode 0x05). + # Subtle slip while braking should leave the trigger neutral. + _apply_feedback(ds.triggerL, 0, 0) + else: + _apply_simple_vibration( + ds.triggerL, + BRAKE_VIB_POSITION, + int(resistance * BRAKE_EFFECT_INTENSITY), + int(freq * BRAKE_EFFECT_INTENSITY), + ) + return + + target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE)) + resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) + st.last_resistance = resistance + _apply_feedback(ds.triggerL, 0, int(resistance * BRAKE_EFFECT_INTENSITY)) + + +# --- UDP main loop ------------------------------------------------------------ + + +def parse_packet(data: bytes) -> ForzaDataPacket | None: + fmt = PACKET_FORMATS.get(len(data)) + if fmt is None: + LOG.debug("ignoring packet of unexpected length %d", len(data)) + return None + try: + return ForzaDataPacket(data, packet_format=fmt) + except Exception: + LOG.exception("failed to parse forza packet (len=%d)", len(data)) + return None + + +def _close_controller(ds: pydualsense | None) -> None: + """Best-effort close. The HID device may already be gone (unplug, BT drop) + in which case `device.close()` raises; we don't care.""" + if ds is None: + return + try: + ds.close() + except Exception: + pass + + +def _connect_controller() -> pydualsense: + """Open the DualSense, blocking until one is reachable. + + `pydualsense.init()` raises when no DualSense is plugged in. That's a + normal startup-or-replug condition for us, not a fatal error \u2014 the + daemon is meant to live for the whole user session and self-heal across + plug events without external supervision. We log the first failure once, + then retry quietly every `RECONNECT_BACKOFF_S` seconds. + """ + LOG.info("opening dualsense controller") + first_failure_logged = False + while True: + ds = pydualsense() + try: + ds.init() + except Exception as e: + _close_controller(ds) + if not first_failure_logged: + LOG.warning( + "dualsense not available (%s); retrying every %.1fs", + e, + RECONNECT_BACKOFF_S, + ) + first_failure_logged = True + time.sleep(RECONNECT_BACKOFF_S) + continue + LOG.info("dualsense controller connected") + return ds + + +def run(host: str, port: int, debug: bool) -> int: + ds = _connect_controller() + + LOG.info("listening for forza udp on %s:%d", host, port) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.settimeout(1.0) + + throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT) + brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT) + forza_state = _ForzaState() + last_seen = 0.0 + in_race = False + + try: + while True: + # Hot-plug detection: pydualsense's BG sendReport thread terminates + # silently on hidraw IOError (unplug, BT disconnect, USB resuspend). + # When it dies our triggerL/R writes go nowhere. Reconnect in-process + # so the daemon doesn't depend on a supervisor for plug-event recovery. + if not ds.report_thread.is_alive(): + LOG.warning("dualsense disconnected; reconnecting") + _close_controller(ds) + ds = _connect_controller() + now = time.monotonic() + try: + data, _ = sock.recvfrom(2048) + last_seen = now + except socket.timeout: + if in_race and (now - last_seen) > IDLE_TIMEOUT_S: + LOG.info("forza idle for %.1fs \u2014 resetting triggers", IDLE_TIMEOUT_S) + reset_triggers(ds) + in_race = False + continue + + pkt = parse_packet(data) + if pkt is None: + continue + + # ForzaParser.IsRaceOn() override: combines packet field with the + # FH-specific RPM-accumulator workaround. Must be called once per + # packet so the accumulator state stays accurate. + in_race = forza_is_race_on(pkt, forza_state) + + if not in_race: + # GetPreRaceInstructions: lightbar -> car class color, both + # triggers -> Normal (mode 0x00). Re-asserted every frame to + # mirror RacingDSX's per-packet emission. + _apply_normal(ds.triggerL) + _apply_normal(ds.triggerR) + apply_lightbar_pre_race(ds, pkt, forza_state) + continue + + if debug: + LOG.debug( + "rpm=%.0f/%.0f accel=%d brake=%d " + "slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f " + "throttle[freq=%d res=%d] brake[freq=%d res=%d]", + _safe(pkt, "current_engine_rpm"), + _safe(pkt, "engine_max_rpm"), + int(_safe(pkt, "accel")), + int(_safe(pkt, "brake")), + _safe(pkt, "tire_combined_slip_FL"), + _safe(pkt, "tire_combined_slip_FR"), + _safe(pkt, "tire_combined_slip_RL"), + _safe(pkt, "tire_combined_slip_RR"), + throttle_state.last_freq, + throttle_state.last_resistance, + brake_state.last_freq, + brake_state.last_resistance, + ) + apply_lightbar_in_race(ds, pkt) + apply_left_trigger(ds, pkt, brake_state) + apply_right_trigger(ds, pkt, throttle_state) + except KeyboardInterrupt: + LOG.info("shutting down") + finally: + try: + reset_triggers(ds) + except Exception: + pass + _close_controller(ds) + + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser( + prog="forza-trigger", + description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.", + ) + parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") + parser.add_argument("--port", type=int, default=5300, help="UDP bind port") + parser.add_argument( + "--debug", + action="store_true", + help="log per-packet telemetry at DEBUG level", + ) + args = parser.parse_args() + + level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") + logging.basicConfig( + level=level, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + return run(args.host, args.port, args.debug) + + +if __name__ == "__main__": + sys.exit(main())