From 03c3d01c6626738f8c79f895c274edb128a8ae6d Mon Sep 17 00:00:00 2001 From: Simon Gardling Date: Wed, 6 May 2026 19:14:54 -0400 Subject: [PATCH] forza-trigger: things --- hosts/yarn/forza-trigger/default.nix | 59 +- hosts/yarn/forza-trigger/forza_trigger.py | 836 +++++++++++++----- .../yarn/forza-trigger/test_forza_trigger.py | 745 ++++++++++++++++ 3 files changed, 1415 insertions(+), 225 deletions(-) create mode 100644 hosts/yarn/forza-trigger/test_forza_trigger.py diff --git a/hosts/yarn/forza-trigger/default.nix b/hosts/yarn/forza-trigger/default.nix index 7aa1354..dc595e5 100644 --- a/hosts/yarn/forza-trigger/default.nix +++ b/hosts/yarn/forza-trigger/default.nix @@ -5,12 +5,20 @@ username, ... }: -# Forza Horizon 4 / 5 → DualSense adaptive trigger bridge. +# Forza Horizon 4 / 5 → DualSense adaptive trigger bridge (variant D, "Cosmii" +# port). The daemon listens for Forza's fixed-format UDP "Data Out" telemetry +# stream at 60 Hz, parses each packet via fdp (nettrom/forza_motorsport, MIT), +# and drives the PS5 DualSense's adaptive triggers and lightbar via +# dualsense-controller (PyPI, MIT) over hidraw. # -# Forza emits a fixed-format UDP telemetry stream ("Data Out") at 60 Hz on a -# user-configured port. We listen on that port, parse each packet via fdp -# (nettrom/forza_motorsport, MIT), and drive the PS5 DualSense's adaptive -# triggers via dualsense-controller (PyPI, MIT) which talks HID over hidraw. +# Reference design: cosmii02/ForzaDSXlegacy (variant D in our taxonomy): +# - Continuous baseline resistance on both triggers (always feels) +# - Vibration overlay on slip events (both triggers) +# - RPM-reactive lightbar in-race; car-class color in menus +# - EWMA smoothing per channel +# - No body LRA (avoids the "shakes my whole hand" complaint) +# See forza_trigger.py module docstring for the full reference table and +# divergences from upstream. # # Setup on the user side, once enabled here: # - plug the DualSense in over USB and disable Steam Input for the @@ -20,12 +28,20 @@ # - in Forza, HUD options → set Data Out: ON, Data Out IP: 127.0.0.1, # Data Out IP Port: 5300, and (FM7 only) Data Out Packet Format: CAR DASH. # +# Tuning env vars (read by the daemon at startup; values clamp to [0, 1]): +# FORZA_L2_INTENSITY=<0..1> global L2 (brake) feel scale. default 1.0 +# FORZA_R2_INTENSITY=<0..1> global R2 (throttle) feel scale. default 1.0 +# FORZA_LIGHTBAR=<0|1> enable lightbar feedback. default 1 +# Set them in `services.forzaTrigger.environment` (an Environment= block on +# the systemd unit) to override. let cfg = config.services.forzaTrigger; pythonPackages = import ./python-packages.nix { inherit lib pkgs; }; inherit (pythonPackages) dualsense-controller fdp; - forzaTrigger = pkgs.writers.writePython3Bin "forza-trigger" { + forzaTriggerSrc = ./forza_trigger.py; + + forzaTriggerBin = pkgs.writers.writePython3Bin "forza-trigger" { libraries = [ dualsense-controller fdp @@ -33,7 +49,36 @@ let # The wrapped binary doesn't need style enforcement — readability of # the source file is what matters, and that lives in forza_trigger.py. doCheck = false; - } (builtins.readFile ./forza_trigger.py); + } (builtins.readFile forzaTriggerSrc); + + # Build-time unit tests for the haptic computation. Failure here breaks the + # NixOS build, so deploys can't ship a daemon whose pure-function logic has + # regressed. Tests use stdlib unittest + a FakeController; no hardware needed. + forzaTriggerTests = + pkgs.runCommand "forza-trigger-tests" + { + nativeBuildInputs = [ + (pkgs.python3.withPackages (_: [ + dualsense-controller + fdp + ])) + ]; + } + '' + cp ${forzaTriggerSrc} forza_trigger.py + cp ${./test_forza_trigger.py} test_forza_trigger.py + python -m unittest discover -p 'test_*.py' -v + touch $out + ''; + + # The binary the system actually depends on. overrideAttrs adds the test + # derivation as a build dependency: if the tests fail, forzaTriggerTests + # fails to build, forzaTriggerBin's buildInputs can't be satisfied, and + # the system build fails. This is the standard nixpkgs idiom for build-time + # gating without ceremony around system.checks / passthru.tests. + forzaTrigger = forzaTriggerBin.overrideAttrs (old: { + buildInputs = (old.buildInputs or [ ]) ++ [ forzaTriggerTests ]; + }); in { diff --git a/hosts/yarn/forza-trigger/forza_trigger.py b/hosts/yarn/forza-trigger/forza_trigger.py index 349b3c2..080716b 100644 --- a/hosts/yarn/forza-trigger/forza_trigger.py +++ b/hosts/yarn/forza-trigger/forza_trigger.py @@ -1,75 +1,73 @@ """Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. -This is a one-to-one behavioural port of Race-Element's DualSense haptic -overlay (RiddleTime/Race-Element, GPL-3.0). Reference files: +Port of cosmii02/ForzaDSXlegacy ("ForzaDSX") — variant D in our reference +taxonomy. Replaces the previous Race-Element 1:1 port (variant A) which the +user found "silent unless slipping". - * Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/ - TriggerHaptics.cs — slip detection + frequency mapping - DsiConfiguration.cs — tuning defaults - DsiJob.cs — per-frame dispatch +Design (variant D): + - Continuous baseline resistance on both triggers — the trigger always has + *some* feel under the finger, scaled by pedal input (L2) or computed + chassis acceleration (R2). This is the fix for the "no feedback at all" + complaint. + - Vibration overlay during wheel slip / wheelspin (both triggers). Frequency + scales with slip severity, amplitude scales with brake input (L2) or + avg accel (R2). + - RPM-reactive lightbar in-race; redline inverts green→red. In menus, the + lightbar carries the car's class color tinted by its performance index. + - EWMA smoothing on resistance and vibration frequency, separately tunable + per channel. Patmagauran's α_throttle=0.01 preserves the "creamy" throttle + feel; brake gets α=1.0 (instant) so ABS-like flutter isn't smoothed away. + - Per-trigger global intensity scale via FORZA_L2_INTENSITY / FORZA_R2_INTENSITY + env vars (∈ [0, 1]). Default 1.0. Lightbar gated by FORZA_LIGHTBAR (default on). + - IsRaceOn debounce: FH5 sometimes leaves is_race_on=1 after returning to + a menu. Cosmii's workaround is to count consecutive samples where RPM is + stable AND power ≤ 0; once the count exceeds RPM_ACCUMULATOR_TRIGGER, we + override is_race_on to false. + - No body LRA (the "shakes my whole hand" complaint of earlier iterations + came from L/R LRA buzz; this design never touches them). -## What this daemon does +Adaptations for the dualsense-controller library (yesbotics 0.3.1): + - Cosmii drives DSX via JSON-over-UDP at port 6969; we drive the controller + directly via hidraw and bypass DSX. The mode mapping is: + Cosmii Resistance(start, force ∈ 0-7) → effect.feedback(start, strength ∈ 0-8) + Cosmii VibrateResistance(start, freq, stiff) → effect.vibration(start, amp, freq) + — the position-dependent + stiffness gradient is lost + (per static-analysis + recommendation, option 1); + the vibration carries the + slip-event signal which is + the dominant feel. + - The library's `effect.vibration` is upstream-flagged "TODO: not working + properly" but our previous Race-Element 1:1 port used it successfully. + If the user reports the vibration mode feels broken, the fallback is raw + HID byte construction using DSX's TriggerEffectGenerator.cs as reference + (decompiled at /tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/). - - Listens for Forza Horizon "Data Out" UDP telemetry. - - On each packet: dispatches `handle_braking` (L2) and `handle_acceleration` - (R2) — same two-function structure as Race-Element's `DsiJob.RunAction`. - - Each handler reads the relevant input pedal and the four tire slip - ratios; if both pedal and slip exceed their thresholds, computes a - frequency from slip severity and emits a Vibration trigger effect. - Otherwise resets the trigger to no-resistance. +Divergences from upstream Cosmii: + - Cosmii's brake-vibration entry condition is `slip < 0.5 AND brake < 10`, + inverted from Patmagauran's parent fork (`slip > 0.5 AND brake > 10`). + The inversion gates vibration to "very light brake at very low slip" + which produces a constant 35Hz buzz when nothing is happening — almost + certainly a copy-paste bug at the comparison operators when Cosmii forked + Patmagauran. We use Patmagauran's correct condition (slip > thr ∧ brake > thr). + - The vibration-frequency formula `freq = MAX - Map(slip, GRIP_LOSS, 1, 0, MAX)` + used by both Patmagauran and Cosmii is also non-intuitive: it maxes at the + threshold and decays to zero at full lockup. Race-Element's variant B uses + the conventional "more slip = higher freq" mapping. We follow Race-Element. + - We don't use a separate stiffness scale (1-200) for the vibration mode; + the library's vibration amplitude is 0-8 inclusive. -## What this daemon explicitly does NOT do +References (all decompiled C# on disk): + /tmp/impls/legacy_Program.cs cosmii02/ForzaDSXlegacy (primary) + /tmp/impls/fds_Program.cs + fds_Settings patmagauran/ForzaDualSense (fork parent, defaults) + /tmp/race-element/Race_Element.HUD.Common__Overlays__Driving__DSX__TriggerHaptics.cs + Race-Element variant B (slip→freq direction) + /tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/TriggerEffectGenerator.cs + DSX byte-level packet generator + (reference for option-3 raw HID fallback) -Compared to the previous implementation, every haptic channel beyond the -two trigger Vibration effects is gone: - - - No body LRA rumble (left/right motors). User reported the previous - multi-channel body rumble as "shakes my whole hand"; Race-Element - deliberately keeps the controller body silent so the trigger fingers - carry all information. - - No lightbar effects. Race-Element's DSI overlay leaves the lightbar - untouched (it's at the controller's default). - - No EWMA smoothing. Effects track slip frame-to-frame. - - No event-impulse system (no gear-shift Bow, no collision burst). - - No Machine mode for ABS. Race-Element uses the same Vibration - encoder for everything. - - No SlopeFeedback / cornering Feedback strength. The trigger has - zero resistance when not slipping. - - No surface texture, engine RPM rumble, lateral-G bias, or kerb - floor amplitude. - -## Faithful reproduction of upstream - -Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30). - -The slip-coefficient formulas in `_brake_frequency_pct` and -`_throttle_frequency_pct` are byte-faithful ports of Race-Element's -upstream code, including a copy-paste bug in their throttle and brake -paths where the "rear slip coefficient" multiplies `front_slip` instead -of `rear_slip`. The bug is preserved for behavioural parity; see the -inline comments tagged "RACE-ELEMENT BUG". - -## Tuning constants - -Race-Element exposes these as runtime config sliders. We bake them in as -module constants matching the upstream defaults from `DsiConfiguration`: - -| | Brake | Throttle | -|-----------------------|--------|----------| -| input deadzone | 3 % | 3 % | -| front slip threshold | 0.25 | 0.35 | -| rear slip threshold | 0.25 | 0.25 | -| amplitude | 8 | 7 | -| min frequency | 3 Hz | 6 Hz | -| max frequency | 85 Hz | 96 Hz | - -## Transport - -`dualsense-controller` (yesbotics/dualsense-controller-python) handles -the HID transport, BT/USB framing including BT CRC32, and on-demand -output writes (state-changed-since-last-input-tick gate). Hot-plug -recovery routes through the library's `on_error` callback, which sets a -flag the main loop polls. +Setup on the user side: see default.nix. """ import argparse @@ -80,6 +78,8 @@ import signal import socket import sys import time +from dataclasses import dataclass, field +from typing import Optional from dualsense_controller import DualSenseController from fdp import ForzaDataPacket @@ -88,205 +88,544 @@ from fdp import ForzaDataPacket LOG = logging.getLogger("forza-trigger") -# --- Tuning constants (Race-Element DsiConfiguration defaults) --------------- -# Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale. -BRAKE_INPUT_THRESHOLD = 0.03 -BRAKE_FRONT_SLIP_THRESHOLD = 0.25 -BRAKE_REAR_SLIP_THRESHOLD = 0.25 -BRAKE_AMPLITUDE = 8 -BRAKE_MIN_FREQUENCY = 3 -BRAKE_MAX_FREQUENCY = 85 +# --- Tuning constants (Cosmii / Patmagauran defaults) ------------------------ +# Source citations point to legacy_Program.cs (Cosmii) and fds_Settings.cs +# (Patmagauran defaults, inherited by Cosmii via INI file). +# Pedal input gates: avoid noise at zero pedal pressure. Not in upstream; +# carried over from the previous Race-Element port (BRAKE_INPUT_THRESHOLD = 3%). +BRAKE_INPUT_THRESHOLD = 0.03 # 0..1 fraction of pedal travel THROTTLE_INPUT_THRESHOLD = 0.03 -THROTTLE_FRONT_SLIP_THRESHOLD = 0.35 -THROTTLE_REAR_SLIP_THRESHOLD = 0.25 -THROTTLE_AMPLITUDE = 7 -THROTTLE_MIN_FREQUENCY = 6 -THROTTLE_MAX_FREQUENCY = 96 -# Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals -# (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes -# `pct` scale to [0, 1] without an explicit cap downstream: -# brake: front_ceil(10) + rear_ceil(7.5) = 17.5 -# throttle: front_ceil(5) + rear_ceil(7.5) = 12.5 -BRAKE_PCT_DIVISOR = 17.5 -THROTTLE_PCT_DIVISOR = 12.5 +# --- L2 brake --------------------------------------------------------------- +# fds_Settings.cs (Cosmii inherits via appsettings.ini). +GRIP_LOSS_VAL = 0.5 # combined-slip threshold for vibration overlay +BRAKE_VIBRATION_MODE_START = 10 # 0-255 brake input scale; gate for slip mode +MIN_BRAKE_VIBRATION = 3 # Hz floor; below → revert to feedback only +MAX_BRAKE_VIBRATION = 35 # Hz ceiling at peak slip +MIN_BRAKE_RESISTANCE = 1 # baseline strength at zero brake (lib 0-8) +MAX_BRAKE_RESISTANCE = 6 # baseline strength at full brake (lib 0-8) +MIN_BRAKE_AMP = 1 # vibration-mode amplitude floor (lib 0-8) +MAX_BRAKE_AMP = 8 # vibration-mode amplitude ceiling +EWMA_ALPHA_BRAKE = 1.0 # 1.0 = instant (no smoothing). Patmagauran's choice. +EWMA_ALPHA_BRAKE_FREQ = 1.0 -# --- Forza UDP packet sizes -> fdp packet_format strings --------------------- +# --- R2 throttle ------------------------------------------------------------ +THROTTLE_GRIP_LOSS_VAL = 0.5 +THROTTLE_VIBRATION_MODE_START = 10 # 0-255 accelerator input scale +MIN_ACCEL_GRIPLOSS_VIBRATION = 3 +MAX_ACCEL_GRIPLOSS_VIBRATION = 35 +TURN_ACCEL_MOD = 0.5 # AccelX² weight (lateral) +FORWARD_ACCEL_MOD = 1.0 # AccelZ² weight (longitudinal) +ACCELERATION_LIMIT = 10.0 # m/s² clamp ceiling +MIN_THROTTLE_RESISTANCE = 1 +MAX_THROTTLE_RESISTANCE = 6 +MIN_ACCEL_GRIPLOSS_AMP = 1 +MAX_ACCEL_GRIPLOSS_AMP = 8 +EWMA_ALPHA_THROTTLE = 0.01 # very smooth (Patmagauran's "creamy" tuning) +EWMA_ALPHA_THROTTLE_FREQ = 0.5 + +# Hardcoded by Cosmii (legacy_Program.cs:224): rear-slip alone counts only +# when accelerator is firmly depressed. Avoids false-positive vibration when +# coasting on light throttle through slick patches. +ACCELERATOR_REAR_SLIP_GATE = 200 # 0-255 scale + +# --- Lightbar --------------------------------------------------------------- +# In-race: green channel ∝ RPM ratio with redline inversion to red. +# Out-of-race: car-class color tinted by car performance index. +RPM_REDLINE_RATIO = 0.85 # above this, green→red inversion +GREEN_FLOOR = 50 # min green channel value (lightbar visible at idle) +MAX_CPI = 255.0 # car performance index ceiling for tint + +# Cosmii car-class palettes (legacy_Program.cs:38-51). Class IDs are Forza's +# enum: D=0, C=1, B=2, A=3, S1=4, S2=5; X is "above the table". +CAR_CLASS_COLORS = { + 0: (107, 185, 236), # D — cyan + 1: (234, 202, 49), # C — gold + 2: (211, 90, 37), # B — orange + 3: (187, 59, 34), # A — red + 4: (128, 54, 243), # S1 — purple + 5: (75, 88, 229), # S2 — blue +} +COLOR_CLASS_X = (105, 182, 72) # green for above-S2 + +# --- IsRaceOn debounce ------------------------------------------------------ +# FH5 sometimes leaves is_race_on=1 after menu transitions. Count samples +# where RPM is stable AND power ≤ 0; once the count exceeds the threshold, +# override is_race_on to false. ~3.3s at 60Hz packet rate. (legacy_Program.cs:35) +RPM_ACCUMULATOR_TRIGGER_RACE_OFF = 200 + +# --- User-facing intensity scales (env vars) -------------------------------- +# Read once at module import. Process restart picks up new values. + +def _read_intensity(name: str, default: float = 1.0) -> float: + """Parse an intensity env var clamped to [0, 1]. NaN/inf/junk fall back to + `default` with a warning — silent fallthrough to 1.0 would hide + misconfigurations (e.g. `FORZA_R2_INTENSITY=nan` is almost certainly + a typo, not "use full intensity"). + """ + raw = os.environ.get(name) + if raw is None: + return default + try: + v = float(raw) + except ValueError: + LOG.warning("invalid %s=%r — using %.2f", name, raw, default) + return default + if not math.isfinite(v): + LOG.warning("non-finite %s=%r — using %.2f", name, raw, default) + return default + return max(0.0, min(1.0, v)) + + +def _read_bool(name: str, default: bool = True) -> bool: + """Parse a boolean env var. Accepts the usual disable-tokens + {0, false, no, off, ""} (case-insensitive) and treats anything else + as enabled. Mirrors the strict parsing of `_read_intensity` so that + typos don't silently flip behavior. + """ + raw = os.environ.get(name) + if raw is None: + return default + return raw.strip().lower() not in ("0", "false", "no", "off", "") + + +LEFT_TRIGGER_INTENSITY = _read_intensity("FORZA_L2_INTENSITY") +RIGHT_TRIGGER_INTENSITY = _read_intensity("FORZA_R2_INTENSITY") +LIGHTBAR_ENABLED = _read_bool("FORZA_LIGHTBAR", default=True) + +# --- Forza UDP packet sizes -> fdp packet_format strings -------------------- +# Note: the FM7 V1 232-byte sled format does NOT include `accel`, `brake`, +# `power`, `car_class`, or `car_performance_index` — fdp doesn't setattr +# those names from a sled-format unpack, and our handlers depend on them. +# Driving a sled stream would silently leave both triggers off. We don't +# advertise FM7 sled support; if the user ever points an FM7 sled feed at us, +# 232-byte packets are simply dropped at _parse_packet. PACKET_FORMATS = { - 232: "sled", - 311: "dash", - 324: "fh4", # FH4 and FH5 share the same layout + 311: "dash", # Forza Motorsport 7 V2 car-dash format + 324: "fh4", # Forza Horizon 4 / 5 (12-byte gap that fdp's fh4 mode patches around) } -# --- Daemon lifecycle constants ---------------------------------------------- +# --- Daemon lifecycle constants --------------------------------------------- IDLE_TIMEOUT_S = 3.0 RECONNECT_BACKOFF_S = 1.0 -# --- Module state (signal + hot-plug flags) ---------------------------------- +# --- Module state (signal + hot-plug flags) --------------------------------- _shutdown = False _disconnected = False def _on_termination(signum: int, frame: object) -> None: - """SIGTERM/SIGINT handler — sets the main-loop shutdown flag. - - systemd sends SIGTERM by default; without an explicit handler Python - ignores it and systemd waits TimeoutStopSec=90s before SIGKILL. - """ + """SIGTERM/SIGINT handler — sets the main-loop shutdown flag.""" global _shutdown + LOG.info("received signal %d — shutting down", signum) _shutdown = True -def _on_controller_error(prev: object, exc: Exception) -> None: - """dualsense-controller `on_error` callback for HID read-thread failures.""" +def _on_controller_error(exc: Exception) -> None: + """dualsense-controller `on_error` callback for HID read-thread failures. + + The library dispatches state-change callbacks by parameter count + (StateValueCallbackManager.py): a 1-arg callback is bound to + `_event_name_1_args` which emits `(new_value)`. A 2-arg callback would + receive `(new_value, timestamp_ns)` instead — meaning a `(prev, exc)` + handler would log a stray monotonic-clock integer instead of the + exception. 1-arg keeps observability tight. + """ global _disconnected - LOG.warning("dualsense exception: %s", exc) + LOG.warning("dualsense controller error: %s", exc) _disconnected = True -# --- Helpers ----------------------------------------------------------------- +# --- Helpers ---------------------------------------------------------------- + + +def _safe_field(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: + """Read a packet field defensively. Returns default for missing fields, + NaN, or inf. Forza packets are fixed-format but mismatched format strings + can produce NaN; this filters them so haptic math never receives garbage. + """ + try: + v = float(getattr(pkt, name)) + except (AttributeError, TypeError, ValueError): + return default + if math.isnan(v) or math.isinf(v): + return default + return v def _safe_abs(pkt: ForzaDataPacket, name: str) -> float: - """Read a packet field defensively. Returns 0.0 for missing fields, - NaN, or +/-inf; otherwise returns the absolute value of the float.""" - try: - v = float(getattr(pkt, name, 0.0)) - except (TypeError, ValueError): - return 0.0 - if not math.isfinite(v): - return 0.0 - return abs(v) + return abs(_safe_field(pkt, name, 0.0)) -def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: - """Create or inherit the UDP listener socket. +def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: + """Cosmii's Map function (legacy_Program.cs:478-484, also fds_Program.cs:276). - Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours) - fd 3 is the pre-bound socket. Otherwise bind normally. + Like Arduino's `map()`: clamp x to the input range, then linearly remap to + the output range. Used pervasively throughout the references. """ - listen_pid = os.environ.get("LISTEN_PID", "") - listen_fds = os.environ.get("LISTEN_FDS", "0") - if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1: - sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(timeout) - LOG.info("using systemd-pre-bound socket on %s:%d", host, port) - return sock - - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((host, port)) - sock.settimeout(timeout) - return sock + if in_max == in_min: + return out_min + x = max(in_min, min(in_max, x)) + return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min -def _parse_packet(data: bytes) -> ForzaDataPacket | None: - fmt = PACKET_FORMATS.get(len(data)) - if fmt is None: - LOG.debug("ignoring packet of unexpected length %d", len(data)) - return None - try: - return ForzaDataPacket(data, packet_format=fmt) - except Exception: - LOG.exception("failed to parse forza packet (len=%d)", len(data)) - return None +def _ewma(value: float, last: float, alpha: float) -> float: + """Exponential weighted moving average (legacy_Program.cs:747). + + α=1.0 → instant (output = value), α=0.01 → very smooth (~100-sample lag). + """ + return alpha * value + (1.0 - alpha) * last -def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]: - """(front_slip, rear_slip) — dominant tire of each axle. +def _combined_slip(pkt: ForzaDataPacket) -> tuple[float, float, float]: + """(combined_all, combined_front, combined_rear) — Cosmii's slip aggregates. - Race-Element uses `Math.Max` over each axle's two tires to surface the - worst-slipping wheel, since slip on either side counts. fdp emits - `tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf - and takes the absolute value (Race-Element calls this NegateIfNegative). + Cosmii uses the arithmetic mean per axle (legacy_Program.cs:112-114), + distinct from Race-Element's max-of-axle. The mean is more conservative + and reduces single-tire-pulse chatter. """ fl = _safe_abs(pkt, "tire_combined_slip_FL") fr = _safe_abs(pkt, "tire_combined_slip_FR") rl = _safe_abs(pkt, "tire_combined_slip_RL") rr = _safe_abs(pkt, "tire_combined_slip_RR") - return max(fl, fr), max(rl, rr) + return (fl + fr + rl + rr) / 4.0, (fl + fr) / 2.0, (rl + rr) / 2.0 -# --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) ----------------- - - -def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None: - """Brake (L2). Mirrors `TriggerHaptics.HandleBraking`. - - Note: Race-Element's brake path leaves the trigger in its prior effect - when brake is engaged but slip is below threshold — i.e. you can hold - the brake without slip after an ABS event and the trigger keeps - vibrating until you release the brake. This matches the upstream code - exactly (no else-branch around the slip check). +def _avg_accel(pkt: ForzaDataPacket) -> float: + """Cosmii's avgAccel (legacy_Program.cs:219). Lateral and longitudinal + chassis accel combined under per-axis weights. Used as the input to the + throttle-baseline resistance curve. """ - brake_input = _safe_abs(pkt, "brake") / 255.0 - if brake_input <= BRAKE_INPUT_THRESHOLD: + ax = _safe_field(pkt, "acceleration_x", 0.0) + az = _safe_field(pkt, "acceleration_z", 0.0) + return math.sqrt(TURN_ACCEL_MOD * ax * ax + FORWARD_ACCEL_MOD * az * az) + + +# --- Daemon state ----------------------------------------------------------- + + +@dataclass +class DaemonState: + """Per-session state: EWMA filter cells + IsRaceOn debounce + transition flags. + + EWMA cell semantics are unscaled — they store the raw smoothed input. + Per-trigger intensity (LEFT/RIGHT_TRIGGER_INTENSITY) is applied at output + time only. Storing the scaled value back would compound the scale every + tick (steady-state for α<1 collapses to ~0 with intensity<1). + + `was_throttle_slipping` / `was_brake_slipping` track the previous tick's + slip state so we can bypass EWMA on transition INTO a slip event. Without + this, α=0.01 throttle smoothing means the first ~100 packets of a slip + event (~1.7s at 60Hz) feel barely audible. + + Initial values from Cosmii's static initializers (legacy_Program.cs:23-26), + rescaled to the dualsense-controller library's 0-8 strength domain. + """ + last_throttle_resistance: float = 1.0 + last_throttle_freq: float = 0.0 + last_brake_resistance: float = 1.0 + last_brake_freq: float = 0.0 + was_throttle_slipping: bool = False + was_brake_slipping: bool = False + # Tracks the previous tick's in_race verdict; the run-loop uses this + # to detect in-race → menu transitions and partial-reset the EWMA cells + # + slip flags so the next race resumption gets a clean cold-start + # path (otherwise stale slip flags suppress the seeding fix in + # handle_throttle/handle_brake on the first packet of race 2). + last_in_race: bool = False + # IsRaceOn debounce. + last_rpm: float = 0.0 + rpm_accumulator: int = 0 + # Last-known valid car class / CPI. We always update on observation — + # Cosmii's `> 0` guard treats Class-D (enum value 0) as "no info" and + # leaves the lightbar showing the previous class color, which is wrong. + last_car_class: int = 0 + last_cpi: int = 0 + # Last lightbar color (skip identical writes). Reset to (0,0,0) on idle + # so that resuming with the same color re-pushes after the (0,0,0) reset + # we send to the controller; otherwise the bar stays black. + last_color: tuple[int, int, int] = (0, 0, 0) + + def reset(self) -> None: + """Reset filters, debounce, and lightbar dedup — call when telemetry + resumes after idle so the next packet's effects fire from a clean + baseline. + """ + self.last_throttle_resistance = 1.0 + self.last_throttle_freq = 0.0 + self.last_brake_resistance = 1.0 + self.last_brake_freq = 0.0 + self.was_throttle_slipping = False + self.was_brake_slipping = False + self.last_rpm = 0.0 + self.rpm_accumulator = 0 + self.last_color = (0, 0, 0) + + +# --- Race detection --------------------------------------------------------- + + +def is_race_on(pkt: ForzaDataPacket, state: DaemonState) -> bool: + """Cosmii's IsRaceOn debounce (legacy_Program.cs:89-109). + + Returns True iff the car is actively being driven. Combines the explicit + is_race_on flag with the RPM-stability + zero-power workaround for FH5's + unreliable flag. + """ + flag = bool(_safe_field(pkt, "is_race_on", 0.0)) + current_rpm = _safe_field(pkt, "current_engine_rpm", 0.0) + power = _safe_field(pkt, "power", 0.0) + + if abs(current_rpm - state.last_rpm) < 1e-3 and power <= 0: + state.rpm_accumulator += 1 + if state.rpm_accumulator > RPM_ACCUMULATOR_TRIGGER_RACE_OFF: + flag = False + else: + state.rpm_accumulator = 0 + + state.last_rpm = current_rpm + return flag + + +# --- Trigger handlers ------------------------------------------------------- + + +def handle_throttle(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None: + """R2 throttle. Port of legacy_Program.cs:216-263. + + Modes: + - In-race + slip: vibration overlay (freq from slip, amp from avgAccel). + If filtered freq <= MIN_ACCEL_GRIPLOSS_VIBRATION + OR accelerator <= THROTTLE_VIBRATION_MODE_START, + fall through to feedback-only (avoids audible + clicking at very low frequencies). + - In-race + no slip: continuous feedback resistance scaled by avgAccel. + - Pedal not pressed: trigger off. + """ + accel = _safe_field(pkt, "accel", 0.0) + if accel / 255.0 <= THROTTLE_INPUT_THRESHOLD: + controller.right_trigger.effect.off() + return + + avg_a = _avg_accel(pkt) + _, combined_front, combined_rear = _combined_slip(pkt) + + losing_grip = ( + combined_front > THROTTLE_GRIP_LOSS_VAL + or (combined_rear > THROTTLE_GRIP_LOSS_VAL and accel > ACCELERATOR_REAR_SLIP_GATE) + ) + + if losing_grip: + # Vibration mode. Frequency scales with slip severity (Race-Element's + # convention: more slip = higher freq, opposite of Patmagauran's + # decay-from-threshold formula). Amplitude scales with avgAccel so + # heavier acceleration during slip = stronger buzz. + # Use the slipping axle's slip for the frequency curve, not the + # all-4-tire average. Cosmii's `combinedTireSlip` (mean of all 4) + # collapses to ~slip/2 for one-axle slip events (RWD wheelspin = + # rear=0.8, front=0 → combined=0.4) which lands below threshold + # and silently falls through to feedback-only mode. RWD burnouts + # are the most common Forza wheelspin event; we want vibration there. + slip_for_freq = max(combined_front, combined_rear) + slip_above = max(0.0, slip_for_freq - THROTTLE_GRIP_LOSS_VAL) + slip_range = max(1e-6, 1.0 - THROTTLE_GRIP_LOSS_VAL) + freq_raw = MIN_ACCEL_GRIPLOSS_VIBRATION + ( + MAX_ACCEL_GRIPLOSS_VIBRATION - MIN_ACCEL_GRIPLOSS_VIBRATION + ) * min(1.0, slip_above / slip_range) + amp_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_ACCEL_GRIPLOSS_AMP, MAX_ACCEL_GRIPLOSS_AMP) + + # Bypass EWMA on the first packet of a slip event so the buzz is + # immediate. With α=0.01 throttle smoothing, the EWMA-warmed first + # packet would converge over ~1.7s — slip events are sub-second. + if not state.was_throttle_slipping: + state.last_throttle_freq = freq_raw + state.last_throttle_resistance = amp_raw + freq_unscaled = _ewma(freq_raw, state.last_throttle_freq, EWMA_ALPHA_THROTTLE_FREQ) + amp_unscaled = _ewma(amp_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE) + # Store unscaled — applying intensity here would compound the scale + # every tick (steady-state collapses to ~0 with intensity<1, α<1). + state.last_throttle_freq = freq_unscaled + state.last_throttle_resistance = amp_unscaled + freq_out = freq_unscaled * RIGHT_TRIGGER_INTENSITY + amp_out = amp_unscaled * RIGHT_TRIGGER_INTENSITY + state.was_throttle_slipping = True + + if freq_out <= MIN_ACCEL_GRIPLOSS_VIBRATION or accel <= THROTTLE_VIBRATION_MODE_START: + # Fallback to baseline-style feedback if the computed vibration + # is too quiet to be useful. + strength = max(MIN_THROTTLE_RESISTANCE, min(MAX_THROTTLE_RESISTANCE, int(round(amp_out)))) + controller.right_trigger.effect.feedback(start_position=0, strength=strength) + else: + f = max(1, min(255, int(round(freq_out)))) + a = max(MIN_ACCEL_GRIPLOSS_AMP, min(MAX_ACCEL_GRIPLOSS_AMP, int(round(amp_out)))) + controller.right_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f) + else: + # Baseline mode. Strength scales with avgAccel — at idle the trigger + # is barely resistant; under heavy accel it firms up. + resistance_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_THROTTLE_RESISTANCE, MAX_THROTTLE_RESISTANCE) + resistance_unscaled = _ewma(resistance_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE) + state.last_throttle_resistance = resistance_unscaled + state.was_throttle_slipping = False + strength = max( + MIN_THROTTLE_RESISTANCE, + min(MAX_THROTTLE_RESISTANCE, int(round(resistance_unscaled * RIGHT_TRIGGER_INTENSITY))), + ) + controller.right_trigger.effect.feedback(start_position=0, strength=strength) + + +def handle_brake(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None: + """L2 brake. Port of legacy_Program.cs:281-313 with the bug fix described + in the module docstring (Cosmii's inverted comparison operators replaced + with Patmagauran's correct logic: vibration when slip > thr AND brake > thr). + + Modes: + - In-race + slip: vibration overlay (freq from slip, amp from brake input). + Below MIN_BRAKE_VIBRATION freq → revert to feedback-only. + - In-race + no slip: continuous feedback resistance scaled by brake input. + - Pedal not pressed: trigger off. + """ + brake = _safe_field(pkt, "brake", 0.0) + if brake / 255.0 <= BRAKE_INPUT_THRESHOLD: controller.left_trigger.effect.off() return - front_slip, rear_slip = _axle_slip(pkt) - if ( - front_slip <= BRAKE_FRONT_SLIP_THRESHOLD - and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD - ): - return # Race-Element falls through here: trigger keeps prior effect. + # Use the worst axle's slip for both detection and the freq curve. + # Cosmii's all-4 average misses single-axle ABS events (front locks + # first under weight transfer; combined-average can stay below 0.5 + # even with the front fully locked). Race-Element variant B uses + # max-of-axle for the same reason. + _, combined_front, combined_rear = _combined_slip(pkt) + slip_for_freq = max(combined_front, combined_rear) + slipping = slip_for_freq > GRIP_LOSS_VAL and brake > BRAKE_VIBRATION_MODE_START - front_coef = min(front_slip * 4, 10) - # RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip - # (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps - # the bug; if you'd rather use rear_slip * 2, change one symbol. - rear_coef = min(front_slip * 2, 7.5) - pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR - freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct)) + if slipping: + slip_above = max(0.0, slip_for_freq - GRIP_LOSS_VAL) + slip_range = max(1e-6, 1.0 - GRIP_LOSS_VAL) + freq_raw = MIN_BRAKE_VIBRATION + ( + MAX_BRAKE_VIBRATION - MIN_BRAKE_VIBRATION + ) * min(1.0, slip_above / slip_range) + amp_raw = _map(brake, 0, 255, MIN_BRAKE_AMP, MAX_BRAKE_AMP) - controller.left_trigger.effect.vibration( - start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq + # Bypass EWMA on slip-event entry; same reasoning as throttle. + if not state.was_brake_slipping: + state.last_brake_freq = freq_raw + state.last_brake_resistance = amp_raw + freq_unscaled = _ewma(freq_raw, state.last_brake_freq, EWMA_ALPHA_BRAKE_FREQ) + amp_unscaled = _ewma(amp_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE) + state.last_brake_freq = freq_unscaled + state.last_brake_resistance = amp_unscaled + freq_out = freq_unscaled * LEFT_TRIGGER_INTENSITY + amp_out = amp_unscaled * LEFT_TRIGGER_INTENSITY + state.was_brake_slipping = True + + if freq_out <= MIN_BRAKE_VIBRATION: + strength = max(MIN_BRAKE_RESISTANCE, min(MAX_BRAKE_RESISTANCE, int(round(amp_out)))) + controller.left_trigger.effect.feedback(start_position=0, strength=strength) + else: + f = max(1, min(255, int(round(freq_out)))) + a = max(MIN_BRAKE_AMP, min(MAX_BRAKE_AMP, int(round(amp_out)))) + controller.left_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f) + else: + resistance_raw = _map(brake, 0, 255, MIN_BRAKE_RESISTANCE, MAX_BRAKE_RESISTANCE) + resistance_unscaled = _ewma(resistance_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE) + state.last_brake_resistance = resistance_unscaled + state.was_brake_slipping = False + strength = max( + MIN_BRAKE_RESISTANCE, + min(MAX_BRAKE_RESISTANCE, int(round(resistance_unscaled * LEFT_TRIGGER_INTENSITY))), + ) + controller.left_trigger.effect.feedback(start_position=0, strength=strength) + + +# --- Lightbar --------------------------------------------------------------- + + +def lightbar_color(pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> tuple[int, int, int]: + """Compute the (R, G, B) the lightbar should display this tick. + + In-race: green ∝ RPM ratio, with redline inversion (the green channel is + flipped to 255-G so the bar appears red as you approach the rev limit). + Out-of-race: car-class color tinted by car performance index. + + Pure function — caller is responsible for actually pushing the color to + the controller (skipping if unchanged from `state.last_color`). + """ + if in_race: + rpm = _safe_field(pkt, "current_engine_rpm", 0.0) + idle = _safe_field(pkt, "engine_idle_rpm", 0.0) + max_rpm = _safe_field(pkt, "engine_max_rpm", 0.0) + rpm_range = max(1.0, max_rpm - idle) + ratio = max(0.0, min(1.0, (rpm - idle) / rpm_range)) + green = max(GREEN_FLOOR, int(ratio * 255)) + red = int(ratio * 255) + if ratio >= RPM_REDLINE_RATIO: + green = 255 - green + return (red, green, 0) + + # Menu mode: car-class color tinted by performance index. We always update + # on observation — Cosmii's `> 0` guard treats Class-D (enum value 0) as + # "no info" and leaves the previous class color stuck on the lightbar + # when switching INTO a Class-D car. fdp always sets these fields from + # an fh4/dash unpack, so absence is impossible — only the value matters. + state.last_car_class = int(_safe_field(pkt, "car_class", 0.0)) + state.last_cpi = max(0, min(int(MAX_CPI), int(_safe_field(pkt, "car_performance_index", 0.0)))) + + base = CAR_CLASS_COLORS.get(state.last_car_class, COLOR_CLASS_X) + if state.last_car_class > 5: + # X-class: untinted constant green. + return COLOR_CLASS_X + cpi_ratio = state.last_cpi / MAX_CPI if state.last_cpi > 0 else 0.0 + return ( + int(cpi_ratio * base[0]), + int(cpi_ratio * base[1]), + int(cpi_ratio * base[2]), ) -def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None: - """Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`.""" - throttle_input = _safe_abs(pkt, "accel") / 255.0 - if throttle_input <= THROTTLE_INPUT_THRESHOLD: - controller.right_trigger.effect.off() +def apply_lightbar(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> None: + if not LIGHTBAR_ENABLED: return - - front_slip, rear_slip = _axle_slip(pkt) - if ( - front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD - and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD - ): - # Throttle path resets to default explicitly when not slipping - # (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`). - controller.right_trigger.effect.off() + color = lightbar_color(pkt, state, in_race) + if color == state.last_color: return - - front_coef = min(front_slip * 3, 5) - # RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip - # (TriggerHaptics.cs line 92, `slipRatioFront * 5f`). - rear_coef = min(front_slip * 5, 7.5) - pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR - freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct)) - - controller.right_trigger.effect.vibration( - start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq - ) + state.last_color = color + controller.lightbar.set_color(*color) -def reset_triggers(controller: DualSenseController) -> None: - """Default both triggers — called on idle, reconnect, and shutdown.""" +# --- Reset ------------------------------------------------------------------ + + +def reset_triggers(controller: Optional[DualSenseController]) -> None: + """Default both triggers — called on idle, reconnect, and shutdown. + Tolerates a None controller so the shutdown-during-reconnect cleanup + path doesn't raise. + """ + if controller is None: + return controller.left_trigger.effect.off() controller.right_trigger.effect.off() -# --- Connection / hot-plug --------------------------------------------------- +def reset_all(controller: Optional[DualSenseController]) -> None: + """reset_triggers + lightbar to (0, 0, 0). Tolerates None controller.""" + if controller is None: + return + reset_triggers(controller) + if LIGHTBAR_ENABLED: + try: + controller.lightbar.set_color(0, 0, 0) + except Exception: + pass -def _connect_controller() -> DualSenseController | None: - """Block until a DualSense is reachable. Returns the activated - controller, or None if shutdown was requested before one appeared. +# --- Connection / hot-plug -------------------------------------------------- + + +def _connect_controller() -> Optional[DualSenseController]: + """Block until a DualSense is reachable. Returns the activated controller, + or None if shutdown was requested before one appeared. """ LOG.info("opening dualsense controller") first_failure_logged = False @@ -302,13 +641,21 @@ def _connect_controller() -> DualSenseController | None: if not devices: if not first_failure_logged: - LOG.warning( - "no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S - ) + LOG.warning("no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) continue + + global _disconnected + # Clear the disconnect flag BEFORE activate() so the HID worker + # thread (started by activate()) can't observe a stale-true flag if + # it raises immediately after spawn — a known failure mode right + # after USB enumeration when the kernel is still re-attaching the + # hidraw node. Without this, _on_controller_error fires inside + # activate(), sets _disconnected=True, then we'd unconditionally + # overwrite it back to False below. + _disconnected = False try: controller = DualSenseController(device_index_or_device_info=devices[0]) controller.on_error(_on_controller_error) @@ -323,16 +670,14 @@ def _connect_controller() -> DualSenseController | None: # Push a clean initial state so we don't inherit residual effects from # a previous Steam Input session, prior daemon instance, or stale # firmware state. - reset_triggers(controller) - global _disconnected - _disconnected = False + reset_all(controller) LOG.info("dualsense controller connected (%s)", controller.connection_type) return controller return None -def _close_controller(controller: DualSenseController | None) -> None: +def _close_controller(controller: Optional[DualSenseController]) -> None: if controller is None: return try: @@ -341,17 +686,56 @@ def _close_controller(controller: DualSenseController | None) -> None: pass -# --- Main loop --------------------------------------------------------------- +# --- UDP socket ------------------------------------------------------------- + + +def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: + """Create or inherit the UDP listener socket. Honors LISTEN_FDS for + systemd socket activation; falls back to opening a fresh socket bound + to (host, port). + """ + listen_pid = os.environ.get("LISTEN_PID") + listen_fds = os.environ.get("LISTEN_FDS") + if listen_pid and listen_fds and int(listen_pid) == os.getpid() and int(listen_fds) >= 1: + sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) + LOG.debug("inherited UDP socket fd=3 from systemd") + else: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.settimeout(timeout) + return sock + + +_warned_packet_sizes: set[int] = set() + + +def _parse_packet(data: bytes) -> Optional[ForzaDataPacket]: + n = len(data) + fmt = PACKET_FORMATS.get(n) + if fmt is None: + if n not in _warned_packet_sizes: + _warned_packet_sizes.add(n) + LOG.warning( + "ignoring unrecognised %d-byte UDP packet " + "(expected 311 [FM7 dash] or 324 [FH4/5]); " + "if FM7, switch HUD Data Out to CAR DASH format", + n, + ) + return None + try: + return ForzaDataPacket(data, packet_format=fmt) + except Exception: + return None + + +# --- Main loop -------------------------------------------------------------- def run(host: str, port: int, exit_on_idle: bool = False) -> int: signal.signal(signal.SIGTERM, _on_termination) signal.signal(signal.SIGINT, _on_termination) - # Bind the UDP socket BEFORE opening the controller. If the bind fails - # (port in use, systemd-passed fd unusable, etc.) we exit cleanly without - # having activated the controller's HID stack (which would otherwise leak - # an active session). _get_socket raises on bind failure. LOG.info("listening for forza udp on %s:%d", host, port) sock = _get_socket(host, port) @@ -360,6 +744,7 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int: sock.close() return 0 + state = DaemonState() last_seen = 0.0 have_telemetry = False @@ -371,6 +756,7 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int: controller = _connect_controller() if controller is None: return 0 + state.reset() now = time.monotonic() try: @@ -382,7 +768,8 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int: break if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S: LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S) - reset_triggers(controller) + reset_all(controller) + state.reset() have_telemetry = False if exit_on_idle: LOG.info("exiting on idle") @@ -393,11 +780,24 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int: if pkt is None: continue - handle_acceleration(controller, pkt) - handle_braking(controller, pkt) + in_race = is_race_on(pkt, state) + if in_race: + handle_throttle(controller, pkt, state) + handle_brake(controller, pkt, state) + else: + # On the in-race → menu transition, partial-reset state so + # the next race resumption gets clean EWMA cells, fresh slip + # flags (so the cold-start fix at handle_throttle/brake + # re-fires), and a redrawn lightbar. Edge-only — repeated + # menu packets shouldn't keep clearing state. + if state.last_in_race: + state.reset() + reset_triggers(controller) + state.last_in_race = in_race + apply_lightbar(controller, pkt, state, in_race) finally: try: - reset_triggers(controller) + reset_all(controller) except Exception: pass _close_controller(controller) diff --git a/hosts/yarn/forza-trigger/test_forza_trigger.py b/hosts/yarn/forza-trigger/test_forza_trigger.py new file mode 100644 index 0000000..bace4b8 --- /dev/null +++ b/hosts/yarn/forza-trigger/test_forza_trigger.py @@ -0,0 +1,745 @@ +"""Behavioral tests for forza_trigger.py. + +Pure-function tests + integration tests using a FakeController that records +every effect call. No hardware required. + +Run: `python -m unittest discover -s hosts/yarn/forza-trigger -p 'test_*.py'` +or via the nix derivation in default.nix (runs at build time). +""" + +import math +import os +import sys +import unittest +from dataclasses import dataclass, field +from typing import Any, Tuple + +# Disable lightbar by default for triggers-only tests; specific lightbar tests +# re-enable by mutating module attributes. +os.environ.setdefault("FORZA_LIGHTBAR", "0") + +import forza_trigger as ft # noqa: E402 + + +# --- Fakes ------------------------------------------------------------------ + + +class FakeEffect: + def __init__(self, log: list, side: str): + self.log = log + self.side = side + + def off(self): + self.log.append((self.side, "off")) + + def feedback(self, start_position=0, strength=0): + self.log.append((self.side, "feedback", start_position, strength)) + + def vibration(self, start_position=0, amplitude=0, frequency=0): + self.log.append((self.side, "vibration", start_position, amplitude, frequency)) + + +class FakeTrigger: + def __init__(self, log: list, side: str): + self.effect = FakeEffect(log, side) + + +class FakeLightbar: + def __init__(self, log: list): + self.log = log + + def set_color(self, r, g, b): + self.log.append(("lightbar", "set_color", r, g, b)) + + +class FakeController: + def __init__(self): + self.calls: list = [] + self.left_trigger = FakeTrigger(self.calls, "L2") + self.right_trigger = FakeTrigger(self.calls, "R2") + self.lightbar = FakeLightbar(self.calls) + + +@dataclass +class FakePacket: + """Shape mirrors fdp.ForzaDataPacket. Only the fields handlers read are + populated; the rest stay 0. + """ + accel: float = 0.0 + brake: float = 0.0 + is_race_on: float = 1.0 + current_engine_rpm: float = 3000.0 + engine_idle_rpm: float = 800.0 + engine_max_rpm: float = 8000.0 + power: float = 100.0 + acceleration_x: float = 0.0 + acceleration_y: float = 0.0 + acceleration_z: float = 0.0 + tire_combined_slip_FL: float = 0.0 + tire_combined_slip_FR: float = 0.0 + tire_combined_slip_RL: float = 0.0 + tire_combined_slip_RR: float = 0.0 + car_class: float = 0.0 + car_performance_index: float = 0.0 + + +# --- Pure-function tests ---------------------------------------------------- + + +class TestMap(unittest.TestCase): + def test_endpoints(self): + self.assertEqual(ft._map(0, 0, 1, 0, 100), 0) + self.assertEqual(ft._map(1, 0, 1, 0, 100), 100) + + def test_midpoint(self): + self.assertEqual(ft._map(0.5, 0, 1, 0, 100), 50) + + def test_clamps_low(self): + self.assertEqual(ft._map(-5, 0, 10, 0, 100), 0) + + def test_clamps_high(self): + self.assertEqual(ft._map(50, 0, 10, 0, 100), 100) + + def test_zero_input_range(self): + # Degenerate input range — must not divide by zero. + self.assertEqual(ft._map(5, 5, 5, 0, 100), 0) + + +class TestEWMA(unittest.TestCase): + def test_alpha_one_is_instant(self): + # α=1.0 → output exactly equals the input (no smoothing). + self.assertEqual(ft._ewma(10, 5, 1.0), 10) + + def test_alpha_zero_is_frozen(self): + # α=0.0 → output exactly equals the previous value (no update). + self.assertEqual(ft._ewma(10, 5, 0.0), 5) + + def test_alpha_half(self): + self.assertEqual(ft._ewma(10, 6, 0.5), 8) + + +class TestSafeField(unittest.TestCase): + def test_missing_attribute_returns_default(self): + pkt = FakePacket() + self.assertEqual(ft._safe_field(pkt, "nonexistent_field", 42.0), 42.0) + + def test_nan_returns_default(self): + pkt = FakePacket(accel=math.nan) + self.assertEqual(ft._safe_field(pkt, "accel", 7.0), 7.0) + + def test_inf_returns_default(self): + pkt = FakePacket(accel=math.inf) + self.assertEqual(ft._safe_field(pkt, "accel", 0.0), 0.0) + + def test_normal_value(self): + pkt = FakePacket(accel=128.0) + self.assertEqual(ft._safe_field(pkt, "accel"), 128.0) + + +class TestCombinedSlip(unittest.TestCase): + def test_arithmetic_mean(self): + pkt = FakePacket( + tire_combined_slip_FL=0.4, + tire_combined_slip_FR=0.6, + tire_combined_slip_RL=0.2, + tire_combined_slip_RR=0.8, + ) + all_, front, rear = ft._combined_slip(pkt) + # All four = (0.4+0.6+0.2+0.8)/4 = 0.5 + self.assertAlmostEqual(all_, 0.5) + # Front = (0.4+0.6)/2 = 0.5 + self.assertAlmostEqual(front, 0.5) + # Rear = (0.2+0.8)/2 = 0.5 + self.assertAlmostEqual(rear, 0.5) + + def test_takes_absolute_values(self): + pkt = FakePacket( + tire_combined_slip_FL=-0.5, + tire_combined_slip_FR=0.5, + tire_combined_slip_RL=-0.3, + tire_combined_slip_RR=0.3, + ) + all_, front, rear = ft._combined_slip(pkt) + self.assertAlmostEqual(all_, 0.4) + self.assertAlmostEqual(front, 0.5) + self.assertAlmostEqual(rear, 0.3) + + +# --- Race-on debounce tests ------------------------------------------------- + + +class TestIsRaceOn(unittest.TestCase): + def test_simple_yes(self): + s = ft.DaemonState() + pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=200) + self.assertTrue(ft.is_race_on(pkt, s)) + self.assertEqual(s.rpm_accumulator, 0) + + def test_explicit_no(self): + s = ft.DaemonState() + pkt = FakePacket(is_race_on=0.0) + self.assertFalse(ft.is_race_on(pkt, s)) + + def test_debounce_overrides_stuck_flag(self): + # Simulate FH5's stuck-flag bug: is_race_on=1 but RPM and power show + # the car is in a menu (power<=0, RPM unchanged). + s = ft.DaemonState() + pkt = FakePacket(is_race_on=1.0, current_engine_rpm=800, power=0) + s.last_rpm = 800 + # Pump packets until the accumulator trips. + for _ in range(ft.RPM_ACCUMULATOR_TRIGGER_RACE_OFF): + self.assertTrue(ft.is_race_on(pkt, s)) + # One more samples and we cross the threshold. + self.assertFalse(ft.is_race_on(pkt, s)) + + def test_rpm_change_resets_accumulator(self): + s = ft.DaemonState() + s.rpm_accumulator = 50 + s.last_rpm = 800 + pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=100) + ft.is_race_on(pkt, s) + self.assertEqual(s.rpm_accumulator, 0) + + +# --- Trigger handler tests -------------------------------------------------- + + +class TestHandleThrottle(unittest.TestCase): + def test_zero_pedal_turns_off(self): + c = FakeController() + s = ft.DaemonState() + ft.handle_throttle(c, FakePacket(accel=0.0), s) + self.assertEqual(c.calls, [("R2", "off")]) + + def test_baseline_under_normal_acceleration(self): + c = FakeController() + s = ft.DaemonState() + # No slip, moderate accel, throttle pressed. + pkt = FakePacket(accel=128, acceleration_x=0.5, acceleration_z=2.0) + ft.handle_throttle(c, pkt, s) + # Baseline mode → feedback call (not vibration, not off). + self.assertEqual(len(c.calls), 1) + self.assertEqual(c.calls[0][1], "feedback") + side, mode, start, strength = c.calls[0] + self.assertEqual(side, "R2") + self.assertEqual(start, 0) + # Strength is in valid library range. + self.assertGreaterEqual(strength, ft.MIN_THROTTLE_RESISTANCE) + self.assertLessEqual(strength, ft.MAX_THROTTLE_RESISTANCE) + + def test_front_slip_triggers_vibration(self): + c = FakeController() + s = ft.DaemonState() + # Front-slip > 0.5 with throttle pressed → vibration mode. + # Use heavy accel so amp_raw is large enough that the EWMA-filtered + # value crosses the suppression threshold and we see vibration not + # feedback fallback. + pkt = FakePacket( + accel=255, + acceleration_z=10.0, + tire_combined_slip_FL=0.8, + tire_combined_slip_FR=0.8, + ) + # Pre-warm EWMA so first call doesn't get heavily damped. + s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION + s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP + ft.handle_throttle(c, pkt, s) + self.assertEqual(len(c.calls), 1) + self.assertEqual(c.calls[0][1], "vibration") + + def test_rear_slip_below_accelerator_gate_no_vibration(self): + # Rear-only slip but accelerator below the gate should NOT trigger + # vibration mode (Cosmii's rule, legacy_Program.cs:224). + c = FakeController() + s = ft.DaemonState() + pkt = FakePacket( + accel=100, # below ACCELERATOR_REAR_SLIP_GATE=200 + tire_combined_slip_RL=0.8, + tire_combined_slip_RR=0.8, + ) + ft.handle_throttle(c, pkt, s) + self.assertEqual(c.calls[0][1], "feedback") # baseline mode + + def test_rear_slip_above_accelerator_gate_triggers_vibration(self): + c = FakeController() + s = ft.DaemonState() + s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION + s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP + pkt = FakePacket( + accel=255, # above ACCELERATOR_REAR_SLIP_GATE=200 + acceleration_z=10.0, + tire_combined_slip_RL=0.8, + tire_combined_slip_RR=0.8, + ) + ft.handle_throttle(c, pkt, s) + self.assertEqual(c.calls[0][1], "vibration") + + +class TestHandleBrake(unittest.TestCase): + def test_zero_pedal_turns_off(self): + c = FakeController() + s = ft.DaemonState() + ft.handle_brake(c, FakePacket(brake=0.0), s) + self.assertEqual(c.calls, [("L2", "off")]) + + def test_baseline_under_normal_braking(self): + c = FakeController() + s = ft.DaemonState() + pkt = FakePacket(brake=128) + ft.handle_brake(c, pkt, s) + self.assertEqual(len(c.calls), 1) + self.assertEqual(c.calls[0][1], "feedback") + # Strength scales with brake input. brake=128 → midpoint of 1..6 = 4ish. + _, _, _, strength = c.calls[0] + self.assertGreaterEqual(strength, ft.MIN_BRAKE_RESISTANCE) + self.assertLessEqual(strength, ft.MAX_BRAKE_RESISTANCE) + + def test_slip_with_heavy_brake_triggers_vibration(self): + # Patmagauran/our condition: slip > thr AND brake > thr. + c = FakeController() + s = ft.DaemonState() + s.last_brake_freq = ft.MAX_BRAKE_VIBRATION + s.last_brake_resistance = ft.MAX_BRAKE_AMP + pkt = FakePacket( + brake=200, # well above BRAKE_VIBRATION_MODE_START=10 + tire_combined_slip_FL=0.8, + tire_combined_slip_FR=0.8, + tire_combined_slip_RL=0.8, + tire_combined_slip_RR=0.8, + ) + ft.handle_brake(c, pkt, s) + self.assertEqual(c.calls[0][1], "vibration") + + def test_slip_with_light_brake_no_vibration(self): + # Slip but brake below threshold → baseline mode (Patmagauran's logic, + # NOT Cosmii's inverted version). + c = FakeController() + s = ft.DaemonState() + pkt = FakePacket( + brake=8, # below BRAKE_VIBRATION_MODE_START=10 + tire_combined_slip_FL=0.8, + tire_combined_slip_FR=0.8, + ) + ft.handle_brake(c, pkt, s) + self.assertEqual(c.calls[0][1], "feedback") # baseline mode + + +# --- Lightbar tests --------------------------------------------------------- + + +class TestLightbarColor(unittest.TestCase): + def test_in_race_low_rpm_green_floor(self): + # At idle RPM the green channel should hit GREEN_FLOOR (50), not 0. + s = ft.DaemonState() + pkt = FakePacket( + is_race_on=1.0, + current_engine_rpm=800, # = idle + engine_idle_rpm=800, + engine_max_rpm=8000, + ) + r, g, b = ft.lightbar_color(pkt, s, in_race=True) + self.assertEqual(r, 0) + self.assertEqual(g, ft.GREEN_FLOOR) + self.assertEqual(b, 0) + + def test_in_race_below_redline_green(self): + s = ft.DaemonState() + # 50% RPM ratio, below redline. + pkt = FakePacket( + is_race_on=1.0, + current_engine_rpm=4400, + engine_idle_rpm=800, + engine_max_rpm=8000, + ) + r, g, b = ft.lightbar_color(pkt, s, in_race=True) + # Both red and green should rise; green not inverted yet. + self.assertGreater(g, 0) + self.assertLess(g, 255) + + def test_in_race_above_redline_inverts_green(self): + s = ft.DaemonState() + # 95% RPM ratio, well past 85% redline. + pkt = FakePacket( + is_race_on=1.0, + current_engine_rpm=8000, + engine_idle_rpm=800, + engine_max_rpm=8000, + ) + r, g, b = ft.lightbar_color(pkt, s, in_race=True) + # At 100% ratio, green=255 then inverted to 0; red=255. + self.assertEqual(r, 255) + self.assertEqual(g, 0) + + def test_menu_class_d_color(self): + s = ft.DaemonState() + pkt = FakePacket(car_class=0, car_performance_index=255) + rgb = ft.lightbar_color(pkt, s, in_race=False) + # Class D at full CPI = exact palette color. + self.assertEqual(rgb, ft.CAR_CLASS_COLORS[0]) + + def test_menu_x_class_constant(self): + s = ft.DaemonState() + pkt = FakePacket(car_class=7, car_performance_index=999) + rgb = ft.lightbar_color(pkt, s, in_race=False) + self.assertEqual(rgb, ft.COLOR_CLASS_X) + + def test_menu_class_d_packet_unsticks_previous_class(self): + # Switching from S2 → D MUST update the lightbar to D's palette, + # not keep showing S2 just because car_class=0 was previously + # treated as "no info" by Cosmii's broken `> 0` guard. + s = ft.DaemonState() + s.last_car_class = 5 # S2 + s.last_cpi = 200 + d_pkt = FakePacket(car_class=0, car_performance_index=180) + rgb = ft.lightbar_color(d_pkt, s, in_race=False) + # Expect tinted D-class color, NOT S2. + ratio = 180 / 255 + expected = ( + int(ratio * ft.CAR_CLASS_COLORS[0][0]), + int(ratio * ft.CAR_CLASS_COLORS[0][1]), + int(ratio * ft.CAR_CLASS_COLORS[0][2]), + ) + self.assertEqual(rgb, expected) + + +# --- DaemonState lifecycle -------------------------------------------------- + + +class TestDaemonStateReset(unittest.TestCase): + def test_reset_clears_filters_and_accumulator(self): + s = ft.DaemonState() + s.last_throttle_resistance = 99.0 + s.last_brake_resistance = 99.0 + s.last_throttle_freq = 99.0 + s.last_brake_freq = 99.0 + s.rpm_accumulator = 50 + s.last_rpm = 4000 + + s.reset() + + self.assertEqual(s.last_throttle_resistance, 1.0) + self.assertEqual(s.last_brake_resistance, 1.0) + self.assertEqual(s.last_throttle_freq, 0.0) + self.assertEqual(s.last_brake_freq, 0.0) + self.assertEqual(s.rpm_accumulator, 0) + self.assertEqual(s.last_rpm, 0.0) + + def test_reset_clears_lightbar_dedup(self): + # last_color MUST be cleared on idle reset. The `apply_lightbar` + # dedup short-circuits when color == state.last_color; if reset + # left the cache holding the in-race color, resuming with the + # same color would skip the controller write — leaving the bar + # black after the (0,0,0) write reset_all sent at idle. + s = ft.DaemonState() + s.last_color = (1, 2, 3) + s.reset() + self.assertEqual(s.last_color, (0, 0, 0)) + +# --- Intensity attenuation regression --------------------------------------- +# Catches the bug where the EWMA cell stored the intensity-scaled value, so +# any FORZA_*_INTENSITY < 1 geometrically attenuated the output (steady-state +# at intensity=0.5, α=0.01 was ~50× too low). The fix decouples cell storage +# from output scaling. This test would have caught it in CI. + + +class TestIntensityScaling(unittest.TestCase): + def _run_steady(self, intensity_attr: str, side: str, pkt_kwargs: dict, n_iter: int = 300) -> int: + """Drive a handler n_iter times with constant input and the named + intensity attribute monkey-patched, then return the final integer + strength sent on `side` ("L2" or "R2"). + """ + c = FakeController() + s = ft.DaemonState() + for _ in range(n_iter): + if side == "R2": + ft.handle_throttle(c, FakePacket(**pkt_kwargs), s) + else: + ft.handle_brake(c, FakePacket(**pkt_kwargs), s) + # Last call's strength. + last = c.calls[-1] + # ("R2"/"L2", "feedback", start, strength) OR ("R2"/"L2", "vibration", start, amp, freq) + return last[3] + + def test_throttle_baseline_intensity_proportional(self): + # With intensity=0.5, the output at α=0.01 (very smooth) must + # converge to ~0.5× the intensity=1.0 output, NOT to ~1% of it. + # We can't monkey-patch module-level constants from a method + # cleanly, so we re-import after env tweaks isn't an option in + # one process. Instead, pin RIGHT_TRIGGER_INTENSITY directly. + original = ft.RIGHT_TRIGGER_INTENSITY + try: + ft.RIGHT_TRIGGER_INTENSITY = 1.0 + full = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2", + dict(accel=128, acceleration_z=10.0)) + ft.RIGHT_TRIGGER_INTENSITY = 0.5 + half = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2", + dict(accel=128, acceleration_z=10.0)) + finally: + ft.RIGHT_TRIGGER_INTENSITY = original + # full strength at this input is MAX=6 (avgAccel=10 → top of range). + # half intensity should clamp around 3 (or MIN=1 floor; not tuple[set[str], int]: + """Walk the daemon AST for every _safe_field/_safe_abs call. + + Returns (literal_names, non_literal_arg_count). Calls inside the + wrapper functions `_safe_abs` and `_safe_field` themselves are + excluded — those forward `name` from a parameter to the inner + call and do NOT touch the fdp surface directly. Non-zero + non-literal count from any OTHER caller indicates the contract + isn't fully enforceable. + """ + import ast + with open(src_path) as f: + tree = ast.parse(f.read()) + + class V(ast.NodeVisitor): + def __init__(self) -> None: + self.names: set[str] = set() + self.non_literal: int = 0 + self._fn_stack: list[str] = [] + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._fn_stack.append(node.name) + self.generic_visit(node) + self._fn_stack.pop() + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self._fn_stack.append(node.name) + self.generic_visit(node) + self._fn_stack.pop() + + def visit_Call(self, node: ast.Call) -> None: + self.generic_visit(node) # nested calls + fn = node.func + if not isinstance(fn, ast.Name): + return + if fn.id not in ("_safe_field", "_safe_abs"): + return + # Skip the forwarding wrappers themselves: those call + # _safe_field with a `name` parameter, not a literal. + if self._fn_stack and self._fn_stack[-1] in ("_safe_abs", "_safe_field"): + return + if len(node.args) < 2: + self.non_literal += 1 + return + arg = node.args[1] + if isinstance(arg, ast.Constant) and isinstance(arg.value, str): + self.names.add(arg.value) + else: + self.non_literal += 1 + + v = V() + v.visit(tree) + return v.names, v.non_literal + + def test_every_daemon_field_resolves_on_real_fdp_packet(self): + import fdp + # 324-byte zeroed buffer — fh4 unpack will succeed (just yields + # zeros for everything) but every documented field will be + # setattr'd on the resulting object. + pkt = fdp.ForzaDataPacket(b"\x00" * 324, packet_format="fh4") + used_names, non_literal = self._collect_field_names(ft.__file__) + self.assertGreater(len(used_names), 5, "AST walker found too few names") + self.assertEqual( + non_literal, 0, + "Every _safe_field/_safe_abs call MUST pass a string literal " + "field name so this contract test can verify it. Variable / " + "kwarg / computed names defeat the AST walker.", + ) + missing = sorted(n for n in used_names if not hasattr(pkt, n)) + self.assertEqual( + missing, [], + f"daemon reads fdp fields that don't exist: {missing}. " + "Check fdp.ForzaDataPacket.sled_props + dash_props for canonical names.", + ) + + + +if __name__ == "__main__": + unittest.main(verbosity=2)