From 1fc2f995c7de44a7e9a2a2c524f543785586f206 Mon Sep 17 00:00:00 2001 From: Simon Gardling Date: Fri, 1 May 2026 14:26:32 -0400 Subject: [PATCH] yarn: forza dualsense adaptive trigger bridge --- .gitignore | 1 + hosts/yarn/default.nix | 18 +- hosts/yarn/forza-trigger/default.nix | 54 +- hosts/yarn/forza-trigger/forza_trigger.py | 1516 +++++++++++++++++++++ modules/desktop-steam.nix | 5 +- 5 files changed, 1563 insertions(+), 31 deletions(-) create mode 100644 hosts/yarn/forza-trigger/forza_trigger.py diff --git a/.gitignore b/.gitignore index 7c5f3f9..d8b20f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /result /result-* +__pycache__ diff --git a/hosts/yarn/default.nix b/hosts/yarn/default.nix index 0863d75..e0a167d 100644 --- a/hosts/yarn/default.nix +++ b/hosts/yarn/default.nix @@ -15,6 +15,7 @@ ./impermanence.nix ./lact.nix ./vr.nix + ./forza-trigger inputs.impermanence.nixosModules.impermanence ]; @@ -95,14 +96,9 @@ # live in ./optiscaler-fh5-rdna3.ini; keys not listed there fall through # to OptiScaler's "auto" defaults. # - # Required one-time per-game setup the user has to do in Steam (no API): - # - Properties > Compatibility: pick the GE-Proton tool by hand. The - # `compatTool` option is intentionally unset \u2014 nixpkgs registers - # proton-ge-bin under its versioned id (e.g. GE-Proton10-34), and - # writing the generic "GE-Proton" string silently falls back to - # bundled Proton. - # - In-game: switch the Upscaling option from FSR 2.2 to DLSS or XeSS - # (FSR 2 inputs aren't intercepted). Press Insert to open the Opti + # Required one-time in-game setup the user has to do in FH5 (no API): + # - Switch the Upscaling option from FSR 2.2 to DLSS or XeSS (FSR 2 + # inputs aren't intercepted). Press Insert to open the OptiScaler # overlay and set the FFX upscaler to FSR 4. # # OptiScaler.ini is dropped with mode = "init" so in-game overlay edits @@ -125,8 +121,10 @@ { enable = true; closeSteam = true; + defaultCompatTool = "proton_10"; apps."fh5" = { id = 1551360; + compatTool = "proton_10"; launchOptions.env = { # OptiScaler FSR 4 INT8 path on this RDNA 3 (Navi 32) box. # PROTON_FSR4_UPGRADE opts FH5 into Proton's FSR 4 DLL upgrade; @@ -172,4 +170,8 @@ ] fromOpti; }; }; + + + # PS5 DualSense adaptive triggers in Forza Horizon 4 / 5. + services.forzaTrigger.enable = true; } diff --git a/hosts/yarn/forza-trigger/default.nix b/hosts/yarn/forza-trigger/default.nix index 770f2b2..51c4568 100644 --- a/hosts/yarn/forza-trigger/default.nix +++ b/hosts/yarn/forza-trigger/default.nix @@ -20,19 +20,6 @@ # - in Forza, HUD options → set Data Out: ON, Data Out IP: 127.0.0.1, # Data Out IP Port: 5300, and (FM7 only) Data Out Packet Format: CAR DASH. # -# System-interaction notes: -# - With multiple DualSense controllers connected, pydualsense picks one -# non-deterministically (`# TODO: implement multiple controllers working` -# in pydualsense's source). Forza Horizon is single-player so this is -# usually fine. If you need to pin a specific controller, the cleanest -# route is monkey-patching `pydualsense.__find_device`. -# - `pkgs.dualsensectl` is intentionally NOT installed by default -# (single-shot writes from it get overwritten by our BG thread within -# ~4 ms). Bring it in ad-hoc with `nix-shell -p dualsensectl` and stop -# this service first via `systemctl --user stop forza-trigger`. -# - Hot-plug recovery happens in-process: the daemon polls pydualsense's BG -# thread liveness and re-runs `pydualsense.init()` on disconnect. systemd's -# `Restart=on-failure` exists only as a crash-recovery safety net. let cfg = config.services.forzaTrigger; pythonPackages = import ./python-packages.nix { inherit lib pkgs; }; @@ -90,16 +77,45 @@ in environment.systemPackages = [ forzaTrigger ]; - # User-level service so it inherits the seat-bound uaccess ACL on - # /dev/hidraw* and dies cleanly when the user logs out. - systemd.user.services.forza-trigger = { + # Socket-activated by Forza's Data Out UDP stream on port 5300. + # The service starts when Forza Horizon 4/5 sends its first telemetry + # packet, runs for the entire session, and exits on 3 s of silence + # (--exit-on-idle). systemd restarts automatically on the next launch. + # + # Runs as a system service (not user service) so there is exactly one + # instance regardless of how many users have active sessions. The service + # runs as `cfg.user` (primary) for hidraw uaccess compatibility with the + # udev TAG+="uaccess" rules. + # + # Crash-recovery: Restart=on-failure restarts the daemon mid-session if + # it dies abnormally (non-zero exit). The clean exit-on-idle path returns + # 0, which systemd leaves alone — the socket unit stays listening. + systemd.services.forza-trigger = { description = "Forza Horizon → DualSense adaptive trigger bridge"; - wantedBy = [ "default.target" ]; - after = [ "graphical-session.target" ]; + # No wantedBy — socket activation pulls it in when Forza sends UDP. serviceConfig = { - ExecStart = "${forzaTrigger}/bin/forza-trigger --host 127.0.0.1 --port ${toString cfg.port}"; + ExecStart = "${forzaTrigger}/bin/forza-trigger --host 127.0.0.1 --port ${toString cfg.port} --exit-on-idle"; Restart = "on-failure"; RestartSec = 3; + User = cfg.user; + # KillMode=control-group is the default, but be explicit. + # systemd sends SIGTERM by default; Python doesn't route that to + # KeyboardInterrupt, so we tell systemd to send SIGINT instead. + # The daemon registers handlers for both for robustness. + KillMode = "control-group"; + KillSignal = "SIGINT"; + TimeoutStopSec = 5; + }; + }; + + systemd.sockets.forza-trigger = { + wantedBy = [ "sockets.target" ]; + socketConfig = { + ListenDatagram = "127.0.0.1:${toString cfg.port}"; + # Guard against rapid re-triggering. One activation per Forza session + # (minutes+) is normal; > 3 within 10 s is pathological (bug). + TriggerLimitIntervalSec = 10; + TriggerLimitBurst = 3; }; }; }; diff --git a/hosts/yarn/forza-trigger/forza_trigger.py b/hosts/yarn/forza-trigger/forza_trigger.py new file mode 100644 index 0000000..9a085c9 --- /dev/null +++ b/hosts/yarn/forza-trigger/forza_trigger.py @@ -0,0 +1,1516 @@ +"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. + +This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline. +Every numeric value, every threshold, every map() / EWMA() coefficient, +and every output byte sequence has been verified against published +sources or against decompiled DSX 1.4.9 itself. + +Sources, in priority order: + + 1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31) + decompiled with ILSpy. The decompilation revealed: + a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as + its trigger-effect encoder. + b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's + high-level CustomTriggerValueMode names to mode bytes: + VibrateResistance -> 6 (Simple_Vibration / 0x06) + VibrateResistanceA / AB -> 38 (Vibration / 0x26) + VibrateResistanceB -> 6 + When the dispatcher hits the `else` branch in + DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than + 9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into + the trigger param region — no bit-packing, no scale conversion. + This is why RacingDSX's 0-255-scale stiffness values ARE the + actual amplitude bytes that reach the controller's firmware. + + 2. Nielk1's reverse-engineering gist Rev 6 (MIT, + https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db). + Source for the canonical Sony bit-packed Feedback (0x21) encoder used + for the non-slip path. Identical to the implementation shipped inside + DSX 1.4.9. + + 3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for + Forza Horizon 4 / 5 since 2022. Specifically: + Config/ThrottleSettings.cs, Config/BrakeSettings.cs, + GameParsers/Parser.cs. + +The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is +provided by pydualsense (PyPI, MIT). We drive its low-level +`triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because +pydualsense's high-level setMode/setForce API does not understand any +specific mode's parameter encoding — it just shovels bytes into the +output report at fixed offsets. That is exactly what we want. + +## Documented intentional divergences from RacingDSX + +Reviewed and revised after a full audit against RacingDSX upstream +(`Parser.cs`, `ForzaParser.cs`, `Config/*Settings.cs`) and DSX 1.4.9 +decompilation (`Main.cs`, `DualSense_USB_Updated.cs`, +`TriggerEffectGenerator.cs`). Several earlier divergences were folded back +into upstream behavior; remaining items below are real and source-justified. + +1. **Motion gate** (`_is_in_motion()`): slip detection is gated on speed or + wheel rotation. RacingDSX `Parser.cs:108-114, 182-186` has no such gate, + so locked stationary wheels keep the slip path active forever and the + trigger stuck in vibration mode (the original user-reported bug). We gate + on `speed > 0.1 m/s` OR any wheel rotation > 0.1 rad/s. + +2. **DSX-faithful Feedback** (`_apply_feedback`): when `strength > 8` we + return False without writing, identical to DSX's `TriggerEffectGenerator + .Resistance` (`TriggerEffectGenerator.cs:193-218`). The trigger holds its + last-set physical state until the EWMA-smoothed strength decays into the + 1..8 canonical range. Note: an earlier revision clamped to 8, which + produced a maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA + decayed from slip-stiffness range (175..255 throttle, 5..150 brake) down + to Feedback range (0..3 throttle, 0..7 brake) \u2014 that brick wall is what + the user reported as 'fights my finger' on burnouts and ABS braking. + +3. **Clutch gate** (opt-in, default off): RacingDSX is clutch-blind by + design \u2014 `ForzaParser.cs:147-185` `ParsePacket` doesn't read the Clutch + field, and `Parser.cs:170-225` doesn't reference it either. When + `FORZA_TRIGGER_CLUTCH_GATE=1` is set in the environment we bypass the + throttle path when `clutch > 128`. On automatic transmission (default in + FH4/FH5) the game blips the clutch byte to ~255 for ~100 ms during every + gear change, so this gate produces a felt trigger relaxation on every + shift. Manual-transmission users may find it physically accurate; + auto-transmission users almost certainly don't. + +4. **AutomaticGun (mode 0x26) for slip vibration** instead of RacingDSX's + Simple_Vibration (mode 0x06). RacingDSX sends `CustomTriggerValueMode + .VibrateResistance` which DSX dispatches to the raw-passthrough branch in + `DualSense_USB_Updated.cs::CustomTriggerValues` (mode byte 6, 0..255 raw + amplitude). Mode 0x06 is a raw PWM buzzer \u2014 audibly clunky, no resistance + characteristic. Mode 0x26 (`TriggerEffectGenerator.AutomaticGun`, + `TriggerEffectGenerator.cs:292-326`) uses Sony's bit-packed force layout + plus a frequency byte: applies controlled resistance at zones [position..9] + while pulsing the motor at `frequency` Hz. The result is 'pulsed + resistance' rather than raw buzz \u2014 textural, controlled, GT7-style. + +5. **No-op write suppression** on `_TriggerState._last_sent`: each + `_write_trigger` compares the new (mode, forces) tuple against the cache. + Identical writes are skipped, and `_trigger_dirty` stays False so the + next `_flush()` masks the trigger update flag bits in `outReport[1]`. + This makes pre-race steady-state, the clutch gate, and the EWMA-stable + non-slip path effectively zero-cost at the HID layer. + + + +6. **Disabled pydualsense BG thread + on-demand HID push**: pydualsense's + `sendReport` thread writes a complete output report at the USB poll rate + (~250 Hz) with the trigger update flags asserted. DSX writes only on UDP + packet arrival from RacingDSX (~60 Hz; `Main.cs:7692-7710`, + `DualSense_USB_Updated.cs:21-79`). Empirically, ~250 Hz of trigger writes + produces buzz/oscillation under finger pressure on Linux that ~60 Hz + does not (the user-reported burnout/ABS feel issue). [Inference] the + firmware likely re-initializes its PID controller per write at the higher + rate; this is not source-confirmed but is the simplest model that fits + both observations. We disable the BG thread (`ds.ds_thread = False` after + `init`) and push reports ourselves via `_flush()` only on actual state + change, masking the trigger flag bits when only the lightbar (RPM + gradient) updated. Hot-plug detection moves from `report_thread.is_alive()` + to write-failure handling in `_flush()`. + + + +## Threading note + +pydualsense's `sendReport` background thread is disabled (divergence #6). +We call `prepareReport()` + `writeReport()` ourselves on the same main thread +that reads UDP and computes effects, so trigger / lightbar / motor field +writes are atomic by construction \u2014 no torn-frame mitigation needed. + +## System interaction notes + +**Single-controller assumption.** pydualsense's `__find_device` enumerates all +DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the +last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)` +without serial/path \u2014 `hid_open` returns the first match, which is not +necessarily the one selected. With multiple DualSense controllers the picked +controller is non-deterministic. pydualsense's source explicitly notes +`# TODO: implement multiple controllers working`. RacingDSX/DSX are also +single-controller (DSX's `connectedController` is a singleton). Forza Horizon +is single-player so this is fine in practice; if multi-controller selection +matters, monkey-patch `__find_device` to filter by `serial_number`. + +**Steam Input.** When Steam Input's PlayStation Configuration Support is +enabled for the game, Steam intercepts hidraw input AND writes its own HID +output reports (rumble, lightbar, sometimes triggers). Our daemon writes +competing output reports at ~1 kHz; the controller observes whichever wrote +last. Effect: trigger oscillates and feels broken. The Nix module's README +in `default.nix` instructs users to disable PlayStation Configuration Support +for Forza in Steam (Settings \u2192 Controller). + +**dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single- +shot writes from `dualsensectl trigger left feedback ...` get overwritten by +our BG thread's next iteration ~4 ms later. Use it only when the daemon is +stopped (`systemctl --user stop forza-trigger`). + +**Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on +hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls +`ds.report_thread.is_alive()` and reconnects in-process via +`_connect_controller()`, which retries `pydualsense.init()` every +`RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not +depend on systemd or any other supervisor for plug-event recovery; running it +directly from a shell handles unplug/replug exactly the same way. +""" + +import argparse +import collections +import logging +import math +import os +import signal +import socket +import sys +import time + +from fdp import ForzaDataPacket +from pydualsense import TriggerModes, pydualsense + + +def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: + """Create or inherit the UDP listener socket. + + Under systemd socket activation (LISTEN_FDS=1 with LISTEN_PID matching our + pid) the socket is already bound by the service manager and passed as fd 3. + Otherwise bind normally — this keeps the daemon runnable outside of systemd. + """ + listen_pid_str = os.environ.get("LISTEN_PID", "") + listen_fds_str = os.environ.get("LISTEN_FDS", "0") + if ( + listen_pid_str + and int(listen_pid_str) == os.getpid() + and int(listen_fds_str) >= 1 + ): + sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + LOG.info("using systemd-pre-bound socket on %s:%d", host, port) + return sock + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.settimeout(timeout) + return sock + +LOG = logging.getLogger("forza-trigger") + +# --- Mode bytes --------------------------------------------------------------- +# pydualsense's `TriggerModes` IntFlag covers every mode byte we use: +# TriggerModes.Off = 0x00 (no-op) +# TriggerModes.Rigid_B = 0x05 (canonical Sony Off / Reset) +# TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun) +# TriggerModes.Rigid_A = 0x21 (Feedback, canonical) +# TriggerModes.Pulse_AB = 0x26 (AutomaticGun / pulsed resistance) +DS_MODE_OFF = TriggerModes.Rigid_B +DS_MODE_AUTOMATIC_GUN = TriggerModes.Pulse_AB # bit-packed forces + frequency +DS_MODE_FEEDBACK = TriggerModes.Rigid_A +DS_MODE_MACHINE = TriggerModes(0x27) # two-amplitude alternating pulse, GT7 ABS +DS_MODE_BOW = TriggerModes.Pulse_A # 0x22, resist-then-snap impulse for gear-shift events + +# --- RacingDSX defaults (Config/ThrottleSettings.cs) -------------------------- +THROTTLE_GRIP_LOSS = 0.6 +THROTTLE_REAR_SLIP_ACCEL_MIN = 200 +THROTTLE_VIB_POSITION = 5 # VibrationModeStart +THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance + +THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5 +THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0 +THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit +THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0 +THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit +THROTTLE_ACCELERATION_LIMIT = 10 +THROTTLE_TURN_ACCEL_SCALE = 0.25 +THROTTLE_FORWARD_ACCEL_SCALE = 1.0 +THROTTLE_RESISTANCE_SMOOTHING = 0.5 # was RacingDSX 0.9 (over-smoothed by ~10 frames; per GT7-research \u00a74.1) +THROTTLE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces slip-freq jitter without killing responsiveness +THROTTLE_EFFECT_INTENSITY = 1.0 +THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance + +# --- RacingDSX defaults (Config/BrakeSettings.cs) ----------------------------- +BRAKE_GRIP_LOSS = 0.05 +BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100 +BRAKE_VIB_POSITION = 0 # VibrationStart - position for AutomaticGun lockup (whole-range) +# Machine 0x27 sets ONLY two zones (start, end), not a range. For ABS bite-point +# feel we anchor the alternating pulse in the middle of trigger travel. +BRAKE_ABS_MACHINE_START = 2 +BRAKE_ABS_MACHINE_END = 8 +BRAKE_MIN_VIBRATION = 15 + +BRAKE_MAX_VIBRATION = 20 +BRAKE_MIN_STIFFNESS = 150 # RacingDSX default; the slip-vibration amplitude floor is set by the physical-limit scaler at endpoint +BRAKE_MAX_STIFFNESS = 5 +BRAKE_MIN_RESISTANCE = 0 +BRAKE_MAX_RESISTANCE = 7 +BRAKE_RESISTANCE_SMOOTHING = 0.4 +BRAKE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces ABS-band jitter (per GT7-research \u00a74.1) +BRAKE_EFFECT_INTENSITY = 1.0 +BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance + +# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------- +PACKET_FORMATS = { + 232: "sled", + 311: "dash", + 324: "fh4", # FH4 and FH5 share the same layout +} + +# --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) -------- +# CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM). +# Parser.cs uses an `<=` cascade, so any value > 5 is treated as X. +CAR_CLASS_COLORS = [ + (107, 185, 236), # ColorClassD + (234, 202, 49), # ColorClassC + (211, 90, 37), # ColorClassB + (187, 59, 34), # ColorClassA + (128, 54, 243), # ColorClassS1 + (75, 88, 229), # ColorClassS2 + (105, 182, 72), # ColorClassX (no CPI tint) +] +MAX_CPI = 255 # ForzaParser.MaxCPI +RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio +GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path +RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff + +# --- Clutch gate (throttle only, opt-in) ------------------------------------- +# Forza emits `clutch` 0..255 (0 = engaged, 255 = disengaged). When the clutch +# is disengaged the engine is mechanically disconnected from the wheels and +# the throttle pedal can't transmit power, so a 'physically accurate' throttle +# trigger has no business resisting. RacingDSX is clutch-blind by design +# (`ForzaParser.cs` ParsePacket comments out the Clutch field). On automatic +# transmission \u2014 the FH4/FH5 default \u2014 the game blips the clutch byte to ~255 +# for ~100 ms during every gear change, so enabling this gate produces a felt +# trigger relaxation on every shift. Disabled by default; set the env var +# FORZA_TRIGGER_CLUTCH_GATE=1 to enable. +CLUTCH_DISENGAGE_THRESHOLD = 128 +CLUTCH_GATE_ENABLED = os.environ.get("FORZA_TRIGGER_CLUTCH_GATE", "0") == "1" + + +# --- Reset on idle (UDP timeout) --------------------------------------------- +# Not present in RacingDSX; an additional safety so the controller doesn't get +# stuck if Forza is killed mid-race or the network drops. +IDLE_TIMEOUT_S = 3.0 + +# --- Hot-plug reconnect backoff ---------------------------------------------- +# pydualsense's BG sendReport thread terminates silently on hidraw IOError +# (controller unplugged, BT disconnect, USB resuspend). The main loop polls +# the thread's liveness and reconnects in-process \u2014 the script is agnostic +# of supervisors like systemd. The same backoff governs the initial-connect +# wait when the daemon starts before any controller is plugged in. +RECONNECT_BACKOFF_S = 1.0 + +# --- Stationary motion gate -------------------------------------------------- +# Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked +# wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for +# this and end up with the brake (and sometimes throttle) trigger stuck in +# Simple_Vibration mode forever, because the slip path keeps firing. We +# additionally require either the car or any wheel to be in real motion before +# treating slip as a haptic event. +STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped +STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked + + +def _is_in_motion(pkt: ForzaDataPacket) -> bool: + """True iff the car is moving or any wheel is rotating meaningfully. + + Used to gate slip-detection: when both car and all four wheels read as + stopped, any nonzero `tire_combined_slip` Forza emits is data noise from + locked wheels and should not drive haptic vibration. + + fdp's `sled` and `dash`/`fh4` formats both carry `wheel_rotation_speed_*`; + only `dash`/`fh4` adds `speed`. The wheel-rotation check covers both. + """ + if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS: + return True + for wheel in ("FL", "FR", "RL", "RR"): + if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S: + return True + return False +# --- HID output: dirty-tracking + on-demand flush ---------------------------- +# pydualsense's BG `sendReport` thread writes a complete output report at the +# USB poll rate (~250 Hz) with the 'set right/left trigger motor' flag bits +# (0x04 / 0x08) asserted in `outReport[1] = 0xFF`. The DualSense firmware reads +# every such report as a fresh trigger command and re-initializes its internal +# PID controller \u2014 manifesting under finger pressure as the buzz/oscillation +# the user reported during burnouts and ABS braking. +# +# We disable the BG thread entirely (see _connect_controller) and call +# pydualsense's `prepareReport()` + `writeReport()` ourselves, exactly once per +# state change. The Python-level no-op cache on `_TriggerState._last_sent` +# (divergence #5) becomes the real gate: when neither trigger nor lightbar +# changed since the last flush, no HID write happens and the firmware sees a +# truly continuous effect. +# +# Divergence #9 in the module docstring. +_trigger_dirty: bool = False # set when _write_trigger updates trigger state +_lightbar_dirty: bool = False # set when _set_lightbar updates RGB +_last_lightbar: tuple[int, int, int] | None = None +_motors_dirty: bool = False # set when _set_motors updates LRA amplitude +_last_motors: tuple[int, int] | None = None # (left, right) 0..255 +_lra_smoothed: tuple[float, float] = (0.0, 0.0) # (left, right) EWMA-smoothed amplitude pre-clamp +LRA_SMOOTHING_ALPHA = 0.6 # alpha for surface-rumble texture; suppresses 60 Hz machine-gun artifact + +# pydualsense's `prepareReport()` returns a USB report (outReport[0]==0x02) +# with `outReport[1] = 0xFF`. The flag bits we care about: +# outReport[1]: bit 0x04 = update right trigger, bit 0x08 = update left +# outReport[2]: bit 0x04 = update lightbar (LED strips) +# We clear the bits the firmware doesn't need to process so the trigger PID +# isn't reset when only the lightbar (RPM gradient) changed. +# Bluetooth reports (outReport[0]==0x31) carry a CRC32 at bytes 74-77 that +# pydualsense computes inside `prepareReport()`; we don't apply the flag-mask +# optimization there because mutating after that point invalidates the CRC. +_USB_TRIGGER_FLAGS_BYTE = 1 +_USB_LIGHTBAR_FLAGS_BYTE = 2 +_TRIGGER_FLAG_BITS = 0x04 | 0x08 # right + left trigger update bits +_LIGHTBAR_FLAG_BIT = 0x04 # LED strips update bit + + +def _set_lightbar(ds, r: int, g: int, b: int) -> None: + """Set the touchpad lightbar RGB. Marks the lightbar dirty if the color + actually changed; otherwise no-op so steady-state colors don't trigger + redundant HID writes.""" + global _lightbar_dirty, _last_lightbar + new = (int(r), int(g), int(b)) + if _last_lightbar == new: + return + ds.light.setColorI(new[0], new[1], new[2]) + _last_lightbar = new + _lightbar_dirty = True + + +def _set_motors(ds, left: int, right: int) -> None: + """Set the body LRA motor amplitudes (0..255 each). Marks motors dirty if + the (left, right) tuple actually changed. + + pydualsense writes these at `outReport[3]` (right) / `outReport[4]` (left) + via its existing `prepareReport()`; the rumble flag bits in `outReport[1]` + (0x01, 0x02) are already asserted by pydualsense's `0xFF` default. We only + need to update `ds.leftMotor` / `ds.rightMotor` and dirty-track here.""" + global _motors_dirty, _last_motors + new = (max(0, min(255, int(left))), max(0, min(255, int(right)))) + if _last_motors == new: + return + ds.leftMotor = new[0] + ds.rightMotor = new[1] + _last_motors = new + _motors_dirty = True + + + +def _flush(ds) -> bool: + """Push the current trigger / lightbar state to the controller via one HID + write. Idempotent \u2014 no-op if nothing changed since the last flush. The + update flags for unchanged subsystems are masked out so the firmware doesn't + re-process them. Returns False on write failure (controller unplugged) so + the caller can reconnect.""" + global _trigger_dirty, _lightbar_dirty, _motors_dirty + if not (_trigger_dirty or _lightbar_dirty or _motors_dirty): + return True + try: + out = ds.prepareReport() + # Bluetooth reports (outReport[0] == 0x31) carry a CRC32 at bytes + # 74-77 that pydualsense computes inside `prepareReport()`. If we + # mutate flag bytes after that point the firmware drops every packet + # for CRC mismatch \u2014 a previous revision did exactly that and any BT + # user got zero trigger updates. The cost of leaving the BT update + # flags asserted is one cheap firmware re-read of identical trigger + # bytes per frame; the firmware idempotency we rely on for USB + # ('don't restart the PID on identical input') applies here too. + # USB has no CRC field so we keep the flag-mask optimization there. + if out and out[0] == 0x02: + if not _trigger_dirty: + out[_USB_TRIGGER_FLAGS_BYTE] &= ~_TRIGGER_FLAG_BITS + if not _lightbar_dirty: + out[_USB_LIGHTBAR_FLAGS_BYTE] &= ~_LIGHTBAR_FLAG_BIT + ds.writeReport(out) + except (IOError, OSError) as e: + LOG.warning("dualsense write failed: %s", e) + return False + _trigger_dirty = False + _lightbar_dirty = False + _motors_dirty = False + return True + + +def _reset_caches(*states) -> None: + """Resync `_TriggerState._last_sent` to the controller's freshly-reset state. + + After a reconnect or idle-timeout reset, the controller is at mode 0x05 + + zero forces. Without resyncing the caches, the next `apply_*` whose target + happens to match the pre-reset cached value would skip the write and we'd + silently leave the trigger in mode 0x05 instead of the intended state.""" + for st in states: + st.reset() + global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors + _trigger_dirty = False + _lightbar_dirty = False + _motors_dirty = False + _last_lightbar = (0, 0, 0) + _last_motors = (0, 0) + global _lra_smoothed + _lra_smoothed = (0.0, 0.0) + + + +# --- Effect encoders ---------------------------------------------------------- + + +def _write_trigger(trig, mode: TriggerModes, forces: list[int], st: object | None) -> None: + """Write mode + forces to a pydualsense trigger, suppressing no-op writes. + + When `st` is a `_TriggerState`, the (mode, forces) tuple is compared against + `st._last_sent`. If identical, the write is skipped and the trigger-dirty + was \u2014 the next `_flush()` will not push anything for this trigger. + + When the tuple differs, the trigger fields are updated and `_trigger_dirty` + The next `_flush()` pushes one HID report carrying the new state. + + Callers that do NOT pass `st` (reset_triggers, shutdown reset) are + unconditional \u2014 they always write and always mark dirty. + """ + global _trigger_dirty + if st is not None: + new = (int(mode), tuple(forces)) + if st._last_sent == new: + return + for i in range(7): + trig.forces[i] = forces[i] + trig.mode = mode + if st is not None: + st._last_sent = (int(mode), tuple(forces)) + _trigger_dirty = True + + +def _apply_off(trig, st: object | None = None) -> None: + """Canonical Sony Off / Reset — mode byte 0x05, all params 0. + + Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs (Nielk1 + Rev 6), mode 0x05 *actively* returns the trigger stop to the neutral + position; mode 0x00 only clears state without retracting the motor. + """ + _write_trigger(trig, DS_MODE_OFF, [0, 0, 0, 0, 0, 0, 0], st) + + +def _apply_feedback(trig, position: int, strength: int, st: object | None = None) -> bool: + """Sony Feedback (mode 0x21), bit-packed. + + Byte-faithful port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator + .Resistance` from DSX 1.4.9 (cs lines 193-218). Returns False without + writing when `strength > 8`, matching DSX exactly: the trigger holds its + last-set physical state (Simple_Vibration if mid-slip, Reset if pre-race) + until the EWMA-smoothed strength decays into the 1..8 canonical range. + + The previous implementation clamped strength to 8 and produced a + maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA decayed from + its slip-stiffness range (175..255 throttle, 5..150 brake) down to the + Feedback range (0..3 throttle, 0..7 brake). That brick wall is what the + user reported as 'fights my finger' on burnouts and ABS braking.""" + if position > 9: + return False + if strength > 8: + # DSX-faithful: silently no-op. The trigger fields and the no-op + # cache (`st._last_sent`) keep their previous values; the next + # `_flush()` masks the trigger flag bits via `_trigger_dirty=False`. + return False + if strength <= 0: + _apply_off(trig, st) + return True + + force_value = (strength - 1) & 0x07 + force_zones = 0 + active_zones = 0 + for i in range(position, 10): + force_zones |= force_value << (3 * i) + active_zones |= 1 << i + + _write_trigger( + trig, + DS_MODE_FEEDBACK, + [ + active_zones & 0xFF, + (active_zones >> 8) & 0xFF, + force_zones & 0xFF, + (force_zones >> 8) & 0xFF, + (force_zones >> 16) & 0xFF, + (force_zones >> 24) & 0xFF, + 0, + ], + st, + ) + return True + + + +def _apply_automatic_gun( + trig, position: int, strength: int, frequency: int, st: object | None = None +) -> bool: + """Sony AutomaticGun (mode 0x26), bit-packed forces with frequency byte. + + Verbatim port of `TriggerEffectGenerator.AutomaticGun` (DSX 1.4.9 + `TriggerEffectGenerator.cs:292-326`). Same bit-packed force layout as + Resistance/Feedback (mode 0x21), plus a frequency byte at offset +9 + (forces[6] in pydualsense's mapping). + + Produces a 'pulsed resistance' effect: the firmware applies bit-packed + force at zones [position..9] and pulses the motor at `frequency` Hz. This + feels textural / controlled rather than the raw PWM buzz of mode 0x06, + and matches what GT7 and other PS5-native racing games use for wheelspin. + """ + if position > 9: + return False + if strength > 8: + return False # DSX-faithful silent no-op (matches Resistance behavior) + if strength <= 0 or frequency <= 0: + _apply_off(trig, st) + return True + + force_value = (strength - 1) & 0x07 + force_zones = 0 + active_zones = 0 + for i in range(position, 10): + force_zones |= force_value << (3 * i) + active_zones |= 1 << i + + _write_trigger( + trig, + DS_MODE_AUTOMATIC_GUN, + [ + active_zones & 0xFF, + (active_zones >> 8) & 0xFF, + force_zones & 0xFF, + (force_zones >> 8) & 0xFF, + (force_zones >> 16) & 0xFF, + (force_zones >> 24) & 0xFF, + frequency & 0xFF, + ], + st, + ) + return True + + + +def _apply_machine( + trig, + start: int, + end: int, + strength_a: int, + strength_b: int, + frequency: int, + period: int, + st: object | None = None, +) -> bool: + """Sony Machine (mode 0x27), two-amplitude alternating pulse with period. + + Verbatim port of `TriggerEffectGenerator.Machine` (DSX 1.4.9 + `TriggerEffectGenerator.cs:328-368`). Applies bit-packed force at zones + {start, end} alternating between strength_a and strength_b at `frequency` + Hz on a `period` (in 100ms units) cycle. The 'rhythmic catch-and-release' + character is what GT7 uses for ABS \u2014 cannot be produced by mode 0x26 + which has only one amplitude. Fits cleanly into pydualsense's forces[] + layout: forces[0..4] cover destinationArray[+1..+5] (active zones, + strength pair, frequency, period); forces[5..6] are required-zero by + Sony's spec. + """ + if start > 8 or end > 9 or end <= start: + return False + if strength_a > 7 or strength_b > 7: + return False + if frequency <= 0: + _apply_off(trig, st) + return True + active_zones = (1 << start) | (1 << end) + strength_pair = (strength_a & 0x07) | ((strength_b & 0x07) << 3) + _write_trigger( + trig, + DS_MODE_MACHINE, + [ + active_zones & 0xFF, + (active_zones >> 8) & 0xFF, + strength_pair & 0xFF, + frequency & 0xFF, + period & 0xFF, + 0, + 0, + ], + st, + ) + return True + + + +def _apply_bow( + trig, + start: int, + end: int, + force: int, + snap_force: int, + st: object | None = None, +) -> bool: + """Sony Bow (mode 0x22). Resists between zones {start, end} then snaps back. + + Verbatim port of `TriggerEffectGenerator.Bow` (DSX 1.4.9 + `TriggerEffectGenerator.cs:167-207`). Used as a 1-frame impulse on + gear-change edges \u2014 NFS-Unbound-style shift detent. Activates exactly two + zones (start, end) bit-packed; force/snap_force are 3-bit each, encoded + as `(force-1) | ((snap_force-1) << 3)`. + """ + if start > 8 or end > 8 or start >= end: + return False + if force > 8 or snap_force > 8: + return False + if end <= 0 or force <= 0 or snap_force <= 0: + _apply_off(trig, st) + return True + active_zones = (1 << start) | (1 << end) + pair = ((force - 1) & 0x07) | (((snap_force - 1) & 0x07) << 3) + _write_trigger( + trig, + DS_MODE_BOW, + [ + active_zones & 0xFF, + (active_zones >> 8) & 0xFF, + pair & 0xFF, + (pair >> 8) & 0xFF, # always 0 for force, snap_force \u2264 8 + 0, + 0, + 0, + ], + st, + ) + return True + + + +def _apply_slope_feedback( + trig, + start: int, + end: int, + start_strength: int, + end_strength: int, + st: object | None = None, +) -> bool: + """Sony SlopeFeedback (mode 0x21). Linear strength ramp from `start_strength` + at zone `start` to `end_strength` at zone `end`. + + Built on Nielk1 Rev6 gist's `MultiplePositionFeedback` factory: same byte + layout as `Resistance` (mode 0x21) with per-zone strengths instead of + uniform. Outside [start, end] zones are inactive (no force). Used for the + progressive brake bite-point feel \u2014 light pressure at the top of throw, + firmer at the bottom \u2014 mimicking a real hydraulic brake pedal. + + Stricter validation than `Resistance`: `start_strength` and `end_strength` + must each be in [1, 8]. Caller is responsible for short-circuiting to + `_apply_off` when the target is 0. + """ + if start > 8 or end > 9 or end <= start: + return False + if start_strength < 1 or start_strength > 8: + return False + if end_strength < 1 or end_strength > 8: + return False + span = end - start + force_zones = 0 + active_zones = 0 + for i in range(10): + if i < start: + continue + elif i <= end: + s = round(start_strength + (end_strength - start_strength) * (i - start) / span) + else: + s = end_strength + s = max(1, min(8, int(s))) + force_zones |= ((s - 1) & 0x07) << (3 * i) + active_zones |= 1 << i + _write_trigger( + trig, + DS_MODE_FEEDBACK, + [ + active_zones & 0xFF, + (active_zones >> 8) & 0xFF, + force_zones & 0xFF, + (force_zones >> 8) & 0xFF, + (force_zones >> 16) & 0xFF, + (force_zones >> 24) & 0xFF, + 0, + ], + st, + ) + return True + + + + +def reset_triggers(ds: pydualsense) -> None: + """Both triggers to canonical Off (mode 0x05). Actively retracts the motor.""" + _apply_off(ds.triggerL) + _apply_off(ds.triggerR) + + +def reset_lightbar(ds: pydualsense) -> None: + """Lightbar to off (RGB 0,0,0). + + Used when telemetry has been idle long enough that we should stop asserting + a race color \u2014 e.g. Forza exited or hasn't started a session yet. Without + this, pydualsense's BG sendReport thread keeps re-publishing whatever + `TouchpadColor` we last set, so the controller stays lit indefinitely. + """ + _set_lightbar(ds, 0, 0, 0) + + +# --- RacingDSX math primitives ------------------------------------------------ + + +def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: + """Mirrors Parser.Map() in RacingDSX, including endpoint clamping.""" + if x > in_max: + x = in_max + elif x < in_min: + x = in_min + return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min + + +def _ewma(value: float, last: float, alpha: float) -> float: + """Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing.""" + return alpha * value + (1.0 - alpha) * last + + +def _ewma_int(value: int, last: int, alpha: float) -> int: + """Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA.""" + return math.floor(alpha * value + (1.0 - alpha) * last) + + +def _slip_to_strength(slip_ratio: float, lo: float, hi: float) -> int: + """Map a slip ratio to AutomaticGun / Vibration strength (0..8) on a log curve. + + Replaces the previous `amplitude >> 5` mapping which collapsed Sony's 8 + strength steps to 3 ({5,6,7}) over RacingDSX's 175..255 amplitude range \u2014 + burnout-onset and full-burnout felt identical. + + The log curve matches human trigger-force perception (roughly logarithmic) + and uses the full 1..8 range. Below `lo`, returns 0 (caller should route + to Off / steady-state). Above `hi`, saturates at 8. + """ + if slip_ratio < lo: + return 0 + if slip_ratio >= hi: + return 8 + t = (math.log(slip_ratio) - math.log(lo)) / (math.log(hi) - math.log(lo)) + return max(1, min(8, math.ceil(t * 8))) + + +def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: + """Read a numeric field defensively. Returns default for missing fields, + NaN, or +/-inf \u2014 protects every caller from crashing on corrupt UDP + packets, fdp parser overflow, or future-game-version field changes.""" + v = float(getattr(pkt, name, default)) + if not math.isfinite(v): + return float(default) + return v + + +# --- Per-trigger persistent state for EWMA ------------------------------------ + + +class _TriggerState: + __slots__ = ( + "last_resistance", + "last_freq", + "_init_resistance", + "_last_sent", + "event_remaining_frames", + "event_callable", + ) + + def __init__(self, init_resistance: int) -> None: + self._init_resistance: int = int(init_resistance) + self.last_resistance: int = int(init_resistance) + self.last_freq: int = 0 + # (mode_int, forces_tuple) of last byte sequence actually written to + # the pydualsense trigger fields. None = nothing written yet. Used by + # _write_trigger to suppress redundant HID writes \u2014 divergence #5. + self._last_sent: tuple[int, tuple[int, ...]] | None = None + # Event-impulse override: when event_remaining_frames > 0, apply_*_trigger + # calls event_callable(trig, st) instead of computing per-frame state. + # Used for gear-shift Bow (Phase 5) and any future short overrides. + self.event_remaining_frames: int = 0 + self.event_callable = None + + def reset(self) -> None: + """Resync to the controller's freshly-reset state. Called from + `_reset_caches` after reconnect / idle reset; also clears the EWMA + smoothing state so the first frame of a new race doesn't carry stale + slip-amplitude values across the transition.""" + self.last_resistance = self._init_resistance + self.last_freq = 0 + self._last_sent = (int(DS_MODE_OFF), (0, 0, 0, 0, 0, 0, 0)) + self.event_remaining_frames = 0 + self.event_callable = None + + +# --- Forza game-level persistent state (ForzaParser.cs fields) ---------------- + + +class _ForzaState: + """Persistent across packets. Mirrors ForzaParser's instance fields: + LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI.""" + + __slots__ = ( + "last_engine_rpm", + "rpm_accumulator", + "last_valid_car_class", + "last_valid_car_cpi", + "last_gear", + ) + + def __init__(self) -> None: + self.last_engine_rpm = 0.0 + self.rpm_accumulator = 0 + self.last_valid_car_class = 0 + self.last_valid_car_cpi = 0 + self.last_gear = -1 # sentinel; first packet's gear edge is suppressed + + +def _clamp_byte(v: float) -> int: + """Clamp to [0, 255] before writing to a uint8 RGB channel.""" + return max(0, min(255, int(v))) + + +def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool: + """Mirrors `ForzaParser.IsRaceOn()` verbatim. + + FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after + the player exits a race or pauses. ForzaParser detects the off state by + watching for unchanged engine RPM combined with non-positive Power across + `RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only, + so for sled-format packets it reads as 0; that matches RacingDSX exactly. + """ + in_race = bool(int(getattr(pkt, "is_race_on", 0))) + current_rpm = _safe(pkt, "current_engine_rpm") + power = _safe(pkt, "power") + + if current_rpm == state.last_engine_rpm and power <= 0: + state.rpm_accumulator += 1 + if state.rpm_accumulator > RACE_OFF_RPM_FRAMES: + in_race = False + else: + state.rpm_accumulator = 0 + + state.last_engine_rpm = current_rpm + return in_race + + +# --- Lightbar (touchpad LED ring) --------------------------------------------- + + +def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None: + """Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic. + + Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`. + X-class cars use the fixed ColorClassX without a CPI tint. Car class and + CPI fields can briefly read 0 during loading screens, so we cache the + last valid value seen \u2014 also matching ForzaParser's behavior.""" + car_class = int(_safe(pkt, "car_class")) + if car_class > 0: + state.last_valid_car_class = car_class + car_class = state.last_valid_car_class + + cpi = int(_safe(pkt, "car_performance_index")) + if cpi > 0: + state.last_valid_car_cpi = min(cpi, 255) + cpi = state.last_valid_car_cpi + + cpi_ratio = cpi / MAX_CPI + + if car_class <= 5: + cr, cg, cb = CAR_CLASS_COLORS[car_class] + r = math.floor(cpi_ratio * cr) + g = math.floor(cpi_ratio * cg) + b = math.floor(cpi_ratio * cb) + else: + r, g, b = CAR_CLASS_COLORS[6] + + _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) + + +def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None: + """Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic. + + Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and + green falls linearly with rpm_ratio, with green floored at 50. At or + above redline the lightbar goes pure red (255, 0, 0).""" + max_rpm = _safe(pkt, "engine_max_rpm") + idle_rpm = _safe(pkt, "engine_idle_rpm") + current_rpm = _safe(pkt, "current_engine_rpm") + + engine_range = max_rpm - idle_rpm + if engine_range <= 0: + rpm_ratio = 0.0 + else: + rpm_ratio = (current_rpm - idle_rpm) / engine_range + + if rpm_ratio >= RPM_REDLINE_RATIO: + r, g, b = 255, 0, 0 + else: + r = math.floor(rpm_ratio * 255) + g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR) + b = 0 + + _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) + + +# --- Body LRA motors --------------------------------------------------------- +# Adaptive triggers are only half of the GT7 racing-haptic vocabulary; the +# other half is the body LRAs (left/right linear resonant actuators) feeding +# 'feel which side hit the kerb' through outReport[3]/[4]. RacingDSX/DSX +# don't drive these for racing telemetry; we extend the daemon to use them. +# Mix surface_rumble (road texture, 0..1 per wheel) and wheel_on_rumble_strip +# (boolean kerb contact) per side: front-left/rear-left -> left motor, +# front-right/rear-right -> right motor. +LRA_KERB_FLOOR_AMP = 80 # minimum amplitude when wheel is on a kerb strip + + +def apply_lra(ds: pydualsense, pkt: ForzaDataPacket) -> None: + """Drive the body LRA motors from Forza's surface-rumble + kerb fields. + + Surface-rumble values jitter frame-to-frame on textured surfaces (gravel, + kerbs, wet asphalt). Without smoothing the LRAs sound like a machine-gun + at 60 Hz. EWMA at alpha=0.6 keeps response responsive (~5 frames to reach + >=240 from a step input) while killing single-frame artifacts.""" + global _lra_smoothed + surface_fl = abs(_safe(pkt, "surface_rumble_FL")) + surface_fr = abs(_safe(pkt, "surface_rumble_FR")) + surface_rl = abs(_safe(pkt, "surface_rumble_RL")) + surface_rr = abs(_safe(pkt, "surface_rumble_RR")) + kerb_left = bool(_safe(pkt, "wheel_on_rumble_strip_FL")) or bool( + _safe(pkt, "wheel_on_rumble_strip_RL") + ) + kerb_right = bool(_safe(pkt, "wheel_on_rumble_strip_FR")) or bool( + _safe(pkt, "wheel_on_rumble_strip_RR") + ) + + target_left = max(surface_fl, surface_rl) * 255.0 + target_right = max(surface_fr, surface_rr) * 255.0 + smoothed_left = LRA_SMOOTHING_ALPHA * target_left + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[0] + smoothed_right = LRA_SMOOTHING_ALPHA * target_right + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[1] + _lra_smoothed = (smoothed_left, smoothed_right) + + left_amp = int(smoothed_left) + right_amp = int(smoothed_right) + if kerb_left: + left_amp = max(left_amp, LRA_KERB_FLOOR_AMP) + if kerb_right: + right_amp = max(right_amp, LRA_KERB_FLOOR_AMP) + + _set_motors(ds, left_amp, right_amp) + + + +# --- Throttle (right trigger) ------------------------------------------------- + + +def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: + """Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line, with + one optional divergence: the throttle is released when the clutch is + disengaged. See divergence #3 in the module docstring.""" + # Event impulse override (e.g. gear-shift Bow): runs the scheduled encoder + # for `event_remaining_frames` packets, then resumes steady-state. + if st.event_remaining_frames > 0 and st.event_callable is not None: + st.event_remaining_frames -= 1 + st.event_callable(ds.triggerR, st) + return + if CLUTCH_GATE_ENABLED and int(_safe(pkt, "clutch", 0.0)) > CLUTCH_DISENGAGE_THRESHOLD: + _apply_off(ds.triggerR, st) + return + + accel_x = _safe(pkt, "acceleration_x") + accel_z = _safe(pkt, "acceleration_z") + avg_accel = math.sqrt( + THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x) + + THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z) + ) + + fl = abs(_safe(pkt, "tire_combined_slip_FL")) + fr = abs(_safe(pkt, "tire_combined_slip_FR")) + rl = abs(_safe(pkt, "tire_combined_slip_RL")) + rr = abs(_safe(pkt, "tire_combined_slip_RR")) + front_slip = (fl + fr) * 0.5 + rear_slip = (rl + rr) * 0.5 + four_wheel_slip = (fl + fr + rl + rr) * 0.25 + + accelerator = int(_safe(pkt, "accel")) + + losing_grip = ( + front_slip > THROTTLE_GRIP_LOSS + or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN) + ) and _is_in_motion(pkt) + + if losing_grip: + # Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs). + target_freq = math.floor( + _map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION) + ) + target_resistance = math.floor( + _map( + avg_accel, + 0.0, + THROTTLE_ACCELERATION_LIMIT, + THROTTLE_MIN_STIFFNESS, + THROTTLE_MAX_STIFFNESS, + ) + ) + # Floor after EWMA (matches `(int)EWMA(int, int, float)` overload). + freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING) + resistance = _ewma_int( + target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING + ) + st.last_freq = freq + st.last_resistance = resistance + + # Vibration entry condition matches RacingDSX `Parser.cs:189-195`: + # `freq > MinVibration AND accelerator > VibrationModeStart`. Below it, + # fall through to Feedback (which DSX-faithfully no-ops on force > 8). + if freq > THROTTLE_MIN_VIBRATION and accelerator > THROTTLE_VIB_POSITION: + # AutomaticGun (mode 0x26) gives a controlled pulse rather than the + # raw-PWM buzz of Simple_Vibration (mode 0x06). Strength is 1-8; + # we map RacingDSX's slip-stiffness range (175..255) into it via + # >>5 (175\u21925, 200\u21926, 255\u21927 clamped 8). Frequency byte is the + # filtered slip-vibration freq from RacingDSX's mapping (0..55). + # Strength from slip ratio via log curve (audit Phase 2). Throttle + # slip strength uses the dominant axle (max of front/rear) since RWD/AWD + # burnouts concentrate slip on the drive wheels; averaging across all 4 + # collapses the dominant signal. lo=0.4 means slip-onset at the entry + # threshold registers as strength 2-3 rather than 1. + slip_metric = max(front_slip, rear_slip) + strength = _slip_to_strength(slip_metric, lo=0.4, hi=2.0) + if strength == 0: + strength = 1 # in slip path => entry threshold passed; floor at 1 + _apply_automatic_gun( + ds.triggerR, + THROTTLE_VIB_POSITION, + strength, + int(freq * THROTTLE_EFFECT_INTENSITY), + st, + ) + else: + # Below threshold: RacingDSX sends `Resistance(0, filteredResistance)` + # with slip-stiffness force values (175..255). DSX silently no-ops + # via `TriggerEffectGenerator.Resistance` returning False on force > 8; + # the trigger holds whatever Vibration state was last set. Our + # `_apply_feedback` does the same. + _apply_feedback( + ds.triggerR, + 0, + int(resistance * THROTTLE_EFFECT_INTENSITY), + st, + ) + return + + # Out of slip path entirely. + + + target_resistance = math.floor( + _map( + avg_accel, + 0.0, + THROTTLE_ACCELERATION_LIMIT, + THROTTLE_MIN_RESISTANCE, + THROTTLE_MAX_RESISTANCE, + ) + ) + resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING) + st.last_resistance = resistance + _apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY), st) + + +# --- Brake (left trigger) ----------------------------------------------------- + + +def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: + """Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line.""" + fl = abs(_safe(pkt, "tire_combined_slip_FL")) + fr = abs(_safe(pkt, "tire_combined_slip_FR")) + rl = abs(_safe(pkt, "tire_combined_slip_RL")) + rr = abs(_safe(pkt, "tire_combined_slip_RR")) + four_wheel_slip = (fl + fr + rl + rr) * 0.25 + brake = int(_safe(pkt, "brake")) + + losing_grip = ( + four_wheel_slip > BRAKE_GRIP_LOSS + and brake > BRAKE_DEADZONE + and _is_in_motion(pkt) + ) + + if losing_grip: + # Compute the freq for the AutomaticGun (lock-up) path. Machine uses + # its own per-band fixed frequencies per the audit's recommended map. + target_freq = math.floor( + _map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION) + ) + target_resistance = math.floor( + _map( + brake, + 0, + 255, + BRAKE_MAX_STIFFNESS, + BRAKE_MIN_STIFFNESS, + ) + ) + freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING) + resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) + st.last_freq = freq + st.last_resistance = resistance + + # GT7-style: ABS regime uses Machine (mode 0x27, two-amplitude + # alternating pulse for the 'rhythmic catch-and-release' character). + # Severe lock-up (slip > 0.25) crosses to AutomaticGun at max strength + # \u2014 there's no useful catch-release feel beyond ABS, just sustained + # heavy buzz. Gate on slip directly rather than the EWMA-smoothed freq + # because RacingDSX's freq map only reaches MIN_VIBRATION at slip > 3.7 + # which is well past lock-up; gating on freq would make Machine + # unreachable in the actual ABS regime (0.05..0.25). + if four_wheel_slip > 0.25: + _apply_automatic_gun( + ds.triggerL, + BRAKE_VIB_POSITION, + 8, + max(20, int(freq * BRAKE_EFFECT_INTENSITY)), + st, + ) + else: + if four_wheel_slip < 0.10: + amp_a, amp_b, machine_freq, period = 1, 3, 20, 2 + elif four_wheel_slip < 0.15: + amp_a, amp_b, machine_freq, period = 2, 5, 22, 1 + elif four_wheel_slip < 0.20: + amp_a, amp_b, machine_freq, period = 3, 6, 25, 1 + else: + amp_a, amp_b, machine_freq, period = 3, 7, 28, 1 + _apply_machine( + ds.triggerL, + BRAKE_ABS_MACHINE_START, + BRAKE_ABS_MACHINE_END, + amp_a, + amp_b, + machine_freq, + period, + st, + ) + return + + # Out of slip path entirely. + + + target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE)) + # Slip\u2192non-slip recovery: when last_resistance carries a slip-mode value + # (5..150 from the slip-stiffness range), normal EWMA decay holds the trigger + # in its prior Vibration/Machine state for ~8 frames (~133 ms) because + # `_apply_feedback` silently no-ops on force > 8. Halve the residue each + # frame until it's in Feedback range so the user feels brake-release + # within ~3 frames of slip exit. + if st.last_resistance > 12: + st.last_resistance = max(target_resistance, st.last_resistance // 2) + resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) + st.last_resistance = resistance + + + + # SlopeFeedback gives a progressive bite-point: light at the top of throw, + # firmer at the bottom \u2014 a real hydraulic brake pedal feel. start=2, end=8 + # anchors the ramp in the bite-point band; start_strength=2 keeps the top + # easy to engage while end_strength scales with brake input. + end_strength = int(resistance * BRAKE_EFFECT_INTENSITY) + if end_strength < 1: + # No (or trivial) brake: trigger neutral. Same as DSX-faithful Resistance(0,0). + _apply_off(ds.triggerL, st) + elif end_strength > 8: + # Slip-stiffness EWMA decay residue: silently no-op, keep prior state. + # Matches the divergence #2 brick-wall-avoidance design (and audit \u00a71.5 + # decay accelerator above). + return + else: + _apply_slope_feedback( + ds.triggerL, start=2, end=8, start_strength=2, end_strength=end_strength, st=st, + ) + + +# --- UDP main loop ------------------------------------------------------------ + + +def parse_packet(data: bytes) -> ForzaDataPacket | None: + fmt = PACKET_FORMATS.get(len(data)) + if fmt is None: + LOG.debug("ignoring packet of unexpected length %d", len(data)) + return None + try: + return ForzaDataPacket(data, packet_format=fmt) + except Exception: + LOG.exception("failed to parse forza packet (len=%d)", len(data)) + return None + + +def _close_controller(ds: pydualsense | None) -> None: + """Best-effort close. The HID device may already be gone (unplug, BT drop) + in which case `device.close()` raises; we don't care.""" + if ds is None: + return + try: + ds.close() + except Exception: + pass + + +def _connect_controller() -> pydualsense: + """Open the DualSense, blocking until one is reachable. + + `pydualsense.init()` raises when no DualSense is plugged in. That's a + normal startup-or-replug condition for us, not a fatal error \u2014 the + daemon is meant to live for the whole user session and self-heal across + plug events without external supervision. We log the first failure once, + then retry quietly every `RECONNECT_BACKOFF_S` seconds. + """ + LOG.info("opening dualsense controller") + first_failure_logged = False + while True: + ds = pydualsense() + try: + ds.init() + except Exception as e: + _close_controller(ds) + if not first_failure_logged: + LOG.warning( + "dualsense not available (%s); retrying every %.1fs", + e, + RECONNECT_BACKOFF_S, + ) + first_failure_logged = True + time.sleep(RECONNECT_BACKOFF_S) + if _shutdown_requested: + raise KeyboardInterrupt + continue + # Stop pydualsense's BG sendReport thread. See divergence #6 in the + # rationale near `_set_lightbar` / `_flush`. Without this the firmware + # gets a stream of 'set trigger' commands at ~250 Hz, which it treats + # as fresh effect commands \u2014 the trigger PID restarts every ~4 ms and + # oscillates against the user's finger. + ds.ds_thread = False + if ds.report_thread.is_alive(): + ds.report_thread.join(timeout=2.0) + # Push a clean initial state so we don't inherit residual trigger + # effects from a previous Steam Input session, prior daemon instance, + # or stale firmware state. + global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors + _trigger_dirty = False + _lightbar_dirty = False + _motors_dirty = False + _last_lightbar = None + _last_motors = None + _apply_off(ds.triggerL) + _apply_off(ds.triggerR) + _set_lightbar(ds, 0, 0, 0) + _set_motors(ds, 0, 0) + try: + ds.writeReport(ds.prepareReport()) + _trigger_dirty = False + _lightbar_dirty = False + _motors_dirty = False + except (IOError, OSError) as e: + LOG.warning("initial write failed: %s; retrying connect", e) + _close_controller(ds) + time.sleep(RECONNECT_BACKOFF_S) + if _shutdown_requested: + raise KeyboardInterrupt + continue + LOG.info("dualsense controller connected") + return ds + + +# --- Signal handling -------------------------------------------------------- +# systemd sends SIGTERM (not SIGINT) by default. Python does not route +# SIGTERM to KeyboardInterrupt, so without an explicit handler the process +# ignores SIGTERM and systemd waits TimeoutStopSec=90s before SIGKILL. +# Meanwhile socket activation can fire a second instance — two daemons +# fighting over the same controller. We set a module-level flag that the +# main loop polls once per timeout (≤1s) so shutdown is prompt. +_shutdown_requested = False + + +def _handle_termination(signum: int, frame: object) -> None: + """Signal handler for SIGTERM / SIGINT — unblock the main loop.""" + global _shutdown_requested + _shutdown_requested = True + + +def run(host: str, port: int, debug: bool, exit_on_idle: bool = False) -> int: + # Register signal handlers FIRST so a SIGTERM during the (potentially + # blocking) `_connect_controller` retry loop sets the shutdown flag + # instead of killing the process with default-handler. The connect loop + # checks the flag between retries so the daemon exits cleanly even with + # no controller plugged in. + signal.signal(signal.SIGTERM, _handle_termination) + signal.signal(signal.SIGINT, _handle_termination) + ds = _connect_controller() + + + LOG.info("listening for forza udp on %s:%d", host, port) + sock = _get_socket(host, port) + + throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT) + brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT) + forza_state = _ForzaState() + last_seen = 0.0 + in_race = False + + have_telemetry = False # True between the first packet and the IDLE_TIMEOUT_S reset + + try: + while True: + if _shutdown_requested: + raise KeyboardInterrupt + + now = time.monotonic() + try: + data, _ = sock.recvfrom(2048) + last_seen = now + have_telemetry = True + except socket.timeout: + if _shutdown_requested: + raise KeyboardInterrupt + + # Reset on telemetry-idle regardless of in_race state. After Forza + # exits with the user in its main menu (is_race_on=0 packets just + # before exit, so in_race was already False), the old check would + # leave the last lightbar/trigger state asserted forever. + if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S: + LOG.info( + "forza idle for %.1fs — resetting controller", + IDLE_TIMEOUT_S, + ) + # One-shot 0x05 to actively retract the trigger motor; the BG + # thread will publish it ~12 times in the next 50 ms before + # main thread loops back here. + reset_triggers(ds) + reset_lightbar(ds) + _set_motors(ds, 0, 0) + if not _flush(ds): + LOG.warning("dualsense disconnected; reconnecting") + _close_controller(ds) + ds = _connect_controller() + _reset_caches(throttle_state, brake_state) + have_telemetry = False + in_race = False + + if exit_on_idle: + LOG.info("exiting on idle") + return 0 + continue + + pkt = parse_packet(data) + if pkt is None: + continue + + # ForzaParser.IsRaceOn() override: combines packet field with the + # FH-specific RPM-accumulator workaround. Must be called once per + # packet so the accumulator state stays accurate. + in_race = forza_is_race_on(pkt, forza_state) + + if not in_race: + # Pre-race: release both triggers via mode 0x05. The no-op + # cache (divergence #5) means subsequent identical frames + # don't produce HID writes \u2014 the trigger holds its released + # state with zero ongoing motor work. + _apply_off(ds.triggerL, brake_state) + _apply_off(ds.triggerR, throttle_state) + apply_lightbar_pre_race(ds, pkt, forza_state) + _set_motors(ds, 0, 0) + + else: + if debug: + LOG.debug( + "rpm=%.0f/%.0f accel=%d brake=%d " + "slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f " + "throttle[freq=%d res=%d] brake[freq=%d res=%d]", + _safe(pkt, "current_engine_rpm"), + _safe(pkt, "engine_max_rpm"), + int(_safe(pkt, "accel")), + int(_safe(pkt, "brake")), + _safe(pkt, "tire_combined_slip_FL"), + _safe(pkt, "tire_combined_slip_FR"), + _safe(pkt, "tire_combined_slip_RL"), + _safe(pkt, "tire_combined_slip_RR"), + throttle_state.last_freq, + throttle_state.last_resistance, + brake_state.last_freq, + brake_state.last_resistance, + ) + apply_lightbar_in_race(ds, pkt) + apply_lra(ds, pkt) + # Gear-shift event: when the gear field changes between packets, + # schedule a 1-frame Bow on R2 \u2014 NFS-Unbound-style shift detent. + # Bow zones (4..6) anchor the snap mid-throw; force=8/snap=8. + gear = int(_safe(pkt, "gear", -1)) + if ( + gear != forza_state.last_gear + and gear >= 0 + and forza_state.last_gear >= 0 + ): + throttle_state.event_remaining_frames = 1 + throttle_state.event_callable = lambda trig, st: _apply_bow( + trig, 4, 6, 8, 8, st + ) + forza_state.last_gear = gear + apply_left_trigger(ds, pkt, brake_state) + apply_right_trigger(ds, pkt, throttle_state) + + + # Push one HID report per Forza packet (60 Hz max). Idempotent \u2014 + # _flush() returns immediately if nothing actually changed since the + # last push (the no-op cache on `_TriggerState._last_sent` and on + # `_last_lightbar`). On write failure (controller unplugged), reset + # the daemon's cached state so the post-reconnect frame writes anew. + if not _flush(ds): + LOG.warning("dualsense disconnected; reconnecting") + _close_controller(ds) + ds = _connect_controller() + _reset_caches(throttle_state, brake_state) + except KeyboardInterrupt: + LOG.info("shutting down") + finally: + try: + reset_triggers(ds) + _flush(ds) + except Exception: + pass + _close_controller(ds) + + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser( + prog="forza-trigger", + description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.", + ) + parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") + parser.add_argument("--port", type=int, default=5300, help="UDP bind port") + parser.add_argument( + "--debug", + action="store_true", + help="log per-packet telemetry at DEBUG level", + ) + parser.add_argument( + "--exit-on-idle", + action="store_true", + help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)", + ) + args = parser.parse_args() + + level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") + logging.basicConfig( + level=level, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + return run(args.host, args.port, args.debug, args.exit_on_idle) + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/modules/desktop-steam.nix b/modules/desktop-steam.nix index 2a893c9..454a7cf 100644 --- a/modules/desktop-steam.nix +++ b/modules/desktop-steam.nix @@ -14,10 +14,7 @@ "steam-run" ]; - programs.steam = { - enable = true; - extraCompatPackages = with pkgs; [ proton-ge-bin ]; - }; + programs.steam.enable = true; environment.systemPackages = with pkgs; [ steamtinkerlaunch