forza-trigger: things
This commit is contained in:
@@ -5,12 +5,20 @@
|
|||||||
username,
|
username,
|
||||||
...
|
...
|
||||||
}:
|
}:
|
||||||
# Forza Horizon 4 / 5 → DualSense adaptive trigger bridge.
|
# Forza Horizon 4 / 5 → DualSense adaptive trigger bridge (variant D, "Cosmii"
|
||||||
|
# port). The daemon listens for Forza's fixed-format UDP "Data Out" telemetry
|
||||||
|
# stream at 60 Hz, parses each packet via fdp (nettrom/forza_motorsport, MIT),
|
||||||
|
# and drives the PS5 DualSense's adaptive triggers and lightbar via
|
||||||
|
# dualsense-controller (PyPI, MIT) over hidraw.
|
||||||
#
|
#
|
||||||
# Forza emits a fixed-format UDP telemetry stream ("Data Out") at 60 Hz on a
|
# Reference design: cosmii02/ForzaDSXlegacy (variant D in our taxonomy):
|
||||||
# user-configured port. We listen on that port, parse each packet via fdp
|
# - Continuous baseline resistance on both triggers (always feels)
|
||||||
# (nettrom/forza_motorsport, MIT), and drive the PS5 DualSense's adaptive
|
# - Vibration overlay on slip events (both triggers)
|
||||||
# triggers via dualsense-controller (PyPI, MIT) which talks HID over hidraw.
|
# - RPM-reactive lightbar in-race; car-class color in menus
|
||||||
|
# - EWMA smoothing per channel
|
||||||
|
# - No body LRA (avoids the "shakes my whole hand" complaint)
|
||||||
|
# See forza_trigger.py module docstring for the full reference table and
|
||||||
|
# divergences from upstream.
|
||||||
#
|
#
|
||||||
# Setup on the user side, once enabled here:
|
# Setup on the user side, once enabled here:
|
||||||
# - plug the DualSense in over USB and disable Steam Input for the
|
# - plug the DualSense in over USB and disable Steam Input for the
|
||||||
@@ -20,12 +28,20 @@
|
|||||||
# - in Forza, HUD options → set Data Out: ON, Data Out IP: 127.0.0.1,
|
# - in Forza, HUD options → set Data Out: ON, Data Out IP: 127.0.0.1,
|
||||||
# Data Out IP Port: 5300, and (FM7 only) Data Out Packet Format: CAR DASH.
|
# Data Out IP Port: 5300, and (FM7 only) Data Out Packet Format: CAR DASH.
|
||||||
#
|
#
|
||||||
|
# Tuning env vars (read by the daemon at startup; values clamp to [0, 1]):
|
||||||
|
# FORZA_L2_INTENSITY=<0..1> global L2 (brake) feel scale. default 1.0
|
||||||
|
# FORZA_R2_INTENSITY=<0..1> global R2 (throttle) feel scale. default 1.0
|
||||||
|
# FORZA_LIGHTBAR=<0|1> enable lightbar feedback. default 1
|
||||||
|
# Set them in `services.forzaTrigger.environment` (an Environment= block on
|
||||||
|
# the systemd unit) to override.
|
||||||
let
|
let
|
||||||
cfg = config.services.forzaTrigger;
|
cfg = config.services.forzaTrigger;
|
||||||
pythonPackages = import ./python-packages.nix { inherit lib pkgs; };
|
pythonPackages = import ./python-packages.nix { inherit lib pkgs; };
|
||||||
inherit (pythonPackages) dualsense-controller fdp;
|
inherit (pythonPackages) dualsense-controller fdp;
|
||||||
|
|
||||||
forzaTrigger = pkgs.writers.writePython3Bin "forza-trigger" {
|
forzaTriggerSrc = ./forza_trigger.py;
|
||||||
|
|
||||||
|
forzaTriggerBin = pkgs.writers.writePython3Bin "forza-trigger" {
|
||||||
libraries = [
|
libraries = [
|
||||||
dualsense-controller
|
dualsense-controller
|
||||||
fdp
|
fdp
|
||||||
@@ -33,7 +49,36 @@ let
|
|||||||
# The wrapped binary doesn't need style enforcement — readability of
|
# The wrapped binary doesn't need style enforcement — readability of
|
||||||
# the source file is what matters, and that lives in forza_trigger.py.
|
# the source file is what matters, and that lives in forza_trigger.py.
|
||||||
doCheck = false;
|
doCheck = false;
|
||||||
} (builtins.readFile ./forza_trigger.py);
|
} (builtins.readFile forzaTriggerSrc);
|
||||||
|
|
||||||
|
# Build-time unit tests for the haptic computation. Failure here breaks the
|
||||||
|
# NixOS build, so deploys can't ship a daemon whose pure-function logic has
|
||||||
|
# regressed. Tests use stdlib unittest + a FakeController; no hardware needed.
|
||||||
|
forzaTriggerTests =
|
||||||
|
pkgs.runCommand "forza-trigger-tests"
|
||||||
|
{
|
||||||
|
nativeBuildInputs = [
|
||||||
|
(pkgs.python3.withPackages (_: [
|
||||||
|
dualsense-controller
|
||||||
|
fdp
|
||||||
|
]))
|
||||||
|
];
|
||||||
|
}
|
||||||
|
''
|
||||||
|
cp ${forzaTriggerSrc} forza_trigger.py
|
||||||
|
cp ${./test_forza_trigger.py} test_forza_trigger.py
|
||||||
|
python -m unittest discover -p 'test_*.py' -v
|
||||||
|
touch $out
|
||||||
|
'';
|
||||||
|
|
||||||
|
# The binary the system actually depends on. overrideAttrs adds the test
|
||||||
|
# derivation as a build dependency: if the tests fail, forzaTriggerTests
|
||||||
|
# fails to build, forzaTriggerBin's buildInputs can't be satisfied, and
|
||||||
|
# the system build fails. This is the standard nixpkgs idiom for build-time
|
||||||
|
# gating without ceremony around system.checks / passthru.tests.
|
||||||
|
forzaTrigger = forzaTriggerBin.overrideAttrs (old: {
|
||||||
|
buildInputs = (old.buildInputs or [ ]) ++ [ forzaTriggerTests ];
|
||||||
|
});
|
||||||
|
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,75 +1,73 @@
|
|||||||
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
||||||
|
|
||||||
This is a one-to-one behavioural port of Race-Element's DualSense haptic
|
Port of cosmii02/ForzaDSXlegacy ("ForzaDSX") — variant D in our reference
|
||||||
overlay (RiddleTime/Race-Element, GPL-3.0). Reference files:
|
taxonomy. Replaces the previous Race-Element 1:1 port (variant A) which the
|
||||||
|
user found "silent unless slipping".
|
||||||
|
|
||||||
* Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/
|
Design (variant D):
|
||||||
TriggerHaptics.cs — slip detection + frequency mapping
|
- Continuous baseline resistance on both triggers — the trigger always has
|
||||||
DsiConfiguration.cs — tuning defaults
|
*some* feel under the finger, scaled by pedal input (L2) or computed
|
||||||
DsiJob.cs — per-frame dispatch
|
chassis acceleration (R2). This is the fix for the "no feedback at all"
|
||||||
|
complaint.
|
||||||
|
- Vibration overlay during wheel slip / wheelspin (both triggers). Frequency
|
||||||
|
scales with slip severity, amplitude scales with brake input (L2) or
|
||||||
|
avg accel (R2).
|
||||||
|
- RPM-reactive lightbar in-race; redline inverts green→red. In menus, the
|
||||||
|
lightbar carries the car's class color tinted by its performance index.
|
||||||
|
- EWMA smoothing on resistance and vibration frequency, separately tunable
|
||||||
|
per channel. Patmagauran's α_throttle=0.01 preserves the "creamy" throttle
|
||||||
|
feel; brake gets α=1.0 (instant) so ABS-like flutter isn't smoothed away.
|
||||||
|
- Per-trigger global intensity scale via FORZA_L2_INTENSITY / FORZA_R2_INTENSITY
|
||||||
|
env vars (∈ [0, 1]). Default 1.0. Lightbar gated by FORZA_LIGHTBAR (default on).
|
||||||
|
- IsRaceOn debounce: FH5 sometimes leaves is_race_on=1 after returning to
|
||||||
|
a menu. Cosmii's workaround is to count consecutive samples where RPM is
|
||||||
|
stable AND power ≤ 0; once the count exceeds RPM_ACCUMULATOR_TRIGGER, we
|
||||||
|
override is_race_on to false.
|
||||||
|
- No body LRA (the "shakes my whole hand" complaint of earlier iterations
|
||||||
|
came from L/R LRA buzz; this design never touches them).
|
||||||
|
|
||||||
## What this daemon does
|
Adaptations for the dualsense-controller library (yesbotics 0.3.1):
|
||||||
|
- Cosmii drives DSX via JSON-over-UDP at port 6969; we drive the controller
|
||||||
|
directly via hidraw and bypass DSX. The mode mapping is:
|
||||||
|
Cosmii Resistance(start, force ∈ 0-7) → effect.feedback(start, strength ∈ 0-8)
|
||||||
|
Cosmii VibrateResistance(start, freq, stiff) → effect.vibration(start, amp, freq)
|
||||||
|
— the position-dependent
|
||||||
|
stiffness gradient is lost
|
||||||
|
(per static-analysis
|
||||||
|
recommendation, option 1);
|
||||||
|
the vibration carries the
|
||||||
|
slip-event signal which is
|
||||||
|
the dominant feel.
|
||||||
|
- The library's `effect.vibration` is upstream-flagged "TODO: not working
|
||||||
|
properly" but our previous Race-Element 1:1 port used it successfully.
|
||||||
|
If the user reports the vibration mode feels broken, the fallback is raw
|
||||||
|
HID byte construction using DSX's TriggerEffectGenerator.cs as reference
|
||||||
|
(decompiled at /tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/).
|
||||||
|
|
||||||
- Listens for Forza Horizon "Data Out" UDP telemetry.
|
Divergences from upstream Cosmii:
|
||||||
- On each packet: dispatches `handle_braking` (L2) and `handle_acceleration`
|
- Cosmii's brake-vibration entry condition is `slip < 0.5 AND brake < 10`,
|
||||||
(R2) — same two-function structure as Race-Element's `DsiJob.RunAction`.
|
inverted from Patmagauran's parent fork (`slip > 0.5 AND brake > 10`).
|
||||||
- Each handler reads the relevant input pedal and the four tire slip
|
The inversion gates vibration to "very light brake at very low slip"
|
||||||
ratios; if both pedal and slip exceed their thresholds, computes a
|
which produces a constant 35Hz buzz when nothing is happening — almost
|
||||||
frequency from slip severity and emits a Vibration trigger effect.
|
certainly a copy-paste bug at the comparison operators when Cosmii forked
|
||||||
Otherwise resets the trigger to no-resistance.
|
Patmagauran. We use Patmagauran's correct condition (slip > thr ∧ brake > thr).
|
||||||
|
- The vibration-frequency formula `freq = MAX - Map(slip, GRIP_LOSS, 1, 0, MAX)`
|
||||||
|
used by both Patmagauran and Cosmii is also non-intuitive: it maxes at the
|
||||||
|
threshold and decays to zero at full lockup. Race-Element's variant B uses
|
||||||
|
the conventional "more slip = higher freq" mapping. We follow Race-Element.
|
||||||
|
- We don't use a separate stiffness scale (1-200) for the vibration mode;
|
||||||
|
the library's vibration amplitude is 0-8 inclusive.
|
||||||
|
|
||||||
## What this daemon explicitly does NOT do
|
References (all decompiled C# on disk):
|
||||||
|
/tmp/impls/legacy_Program.cs cosmii02/ForzaDSXlegacy (primary)
|
||||||
|
/tmp/impls/fds_Program.cs + fds_Settings patmagauran/ForzaDualSense (fork parent, defaults)
|
||||||
|
/tmp/race-element/Race_Element.HUD.Common__Overlays__Driving__DSX__TriggerHaptics.cs
|
||||||
|
Race-Element variant B (slip→freq direction)
|
||||||
|
/tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/TriggerEffectGenerator.cs
|
||||||
|
DSX byte-level packet generator
|
||||||
|
(reference for option-3 raw HID fallback)
|
||||||
|
|
||||||
Compared to the previous implementation, every haptic channel beyond the
|
Setup on the user side: see default.nix.
|
||||||
two trigger Vibration effects is gone:
|
|
||||||
|
|
||||||
- No body LRA rumble (left/right motors). User reported the previous
|
|
||||||
multi-channel body rumble as "shakes my whole hand"; Race-Element
|
|
||||||
deliberately keeps the controller body silent so the trigger fingers
|
|
||||||
carry all information.
|
|
||||||
- No lightbar effects. Race-Element's DSI overlay leaves the lightbar
|
|
||||||
untouched (it's at the controller's default).
|
|
||||||
- No EWMA smoothing. Effects track slip frame-to-frame.
|
|
||||||
- No event-impulse system (no gear-shift Bow, no collision burst).
|
|
||||||
- No Machine mode for ABS. Race-Element uses the same Vibration
|
|
||||||
encoder for everything.
|
|
||||||
- No SlopeFeedback / cornering Feedback strength. The trigger has
|
|
||||||
zero resistance when not slipping.
|
|
||||||
- No surface texture, engine RPM rumble, lateral-G bias, or kerb
|
|
||||||
floor amplitude.
|
|
||||||
|
|
||||||
## Faithful reproduction of upstream
|
|
||||||
|
|
||||||
Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30).
|
|
||||||
|
|
||||||
The slip-coefficient formulas in `_brake_frequency_pct` and
|
|
||||||
`_throttle_frequency_pct` are byte-faithful ports of Race-Element's
|
|
||||||
upstream code, including a copy-paste bug in their throttle and brake
|
|
||||||
paths where the "rear slip coefficient" multiplies `front_slip` instead
|
|
||||||
of `rear_slip`. The bug is preserved for behavioural parity; see the
|
|
||||||
inline comments tagged "RACE-ELEMENT BUG".
|
|
||||||
|
|
||||||
## Tuning constants
|
|
||||||
|
|
||||||
Race-Element exposes these as runtime config sliders. We bake them in as
|
|
||||||
module constants matching the upstream defaults from `DsiConfiguration`:
|
|
||||||
|
|
||||||
| | Brake | Throttle |
|
|
||||||
|-----------------------|--------|----------|
|
|
||||||
| input deadzone | 3 % | 3 % |
|
|
||||||
| front slip threshold | 0.25 | 0.35 |
|
|
||||||
| rear slip threshold | 0.25 | 0.25 |
|
|
||||||
| amplitude | 8 | 7 |
|
|
||||||
| min frequency | 3 Hz | 6 Hz |
|
|
||||||
| max frequency | 85 Hz | 96 Hz |
|
|
||||||
|
|
||||||
## Transport
|
|
||||||
|
|
||||||
`dualsense-controller` (yesbotics/dualsense-controller-python) handles
|
|
||||||
the HID transport, BT/USB framing including BT CRC32, and on-demand
|
|
||||||
output writes (state-changed-since-last-input-tick gate). Hot-plug
|
|
||||||
recovery routes through the library's `on_error` callback, which sets a
|
|
||||||
flag the main loop polls.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
@@ -80,6 +78,8 @@ import signal
|
|||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from dualsense_controller import DualSenseController
|
from dualsense_controller import DualSenseController
|
||||||
from fdp import ForzaDataPacket
|
from fdp import ForzaDataPacket
|
||||||
@@ -88,205 +88,544 @@ from fdp import ForzaDataPacket
|
|||||||
LOG = logging.getLogger("forza-trigger")
|
LOG = logging.getLogger("forza-trigger")
|
||||||
|
|
||||||
|
|
||||||
# --- Tuning constants (Race-Element DsiConfiguration defaults) ---------------
|
# --- Tuning constants (Cosmii / Patmagauran defaults) ------------------------
|
||||||
# Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale.
|
# Source citations point to legacy_Program.cs (Cosmii) and fds_Settings.cs
|
||||||
BRAKE_INPUT_THRESHOLD = 0.03
|
# (Patmagauran defaults, inherited by Cosmii via INI file).
|
||||||
BRAKE_FRONT_SLIP_THRESHOLD = 0.25
|
|
||||||
BRAKE_REAR_SLIP_THRESHOLD = 0.25
|
|
||||||
BRAKE_AMPLITUDE = 8
|
|
||||||
BRAKE_MIN_FREQUENCY = 3
|
|
||||||
BRAKE_MAX_FREQUENCY = 85
|
|
||||||
|
|
||||||
|
# Pedal input gates: avoid noise at zero pedal pressure. Not in upstream;
|
||||||
|
# carried over from the previous Race-Element port (BRAKE_INPUT_THRESHOLD = 3%).
|
||||||
|
BRAKE_INPUT_THRESHOLD = 0.03 # 0..1 fraction of pedal travel
|
||||||
THROTTLE_INPUT_THRESHOLD = 0.03
|
THROTTLE_INPUT_THRESHOLD = 0.03
|
||||||
THROTTLE_FRONT_SLIP_THRESHOLD = 0.35
|
|
||||||
THROTTLE_REAR_SLIP_THRESHOLD = 0.25
|
|
||||||
THROTTLE_AMPLITUDE = 7
|
|
||||||
THROTTLE_MIN_FREQUENCY = 6
|
|
||||||
THROTTLE_MAX_FREQUENCY = 96
|
|
||||||
|
|
||||||
# Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals
|
# --- L2 brake ---------------------------------------------------------------
|
||||||
# (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes
|
# fds_Settings.cs (Cosmii inherits via appsettings.ini).
|
||||||
# `pct` scale to [0, 1] without an explicit cap downstream:
|
GRIP_LOSS_VAL = 0.5 # combined-slip threshold for vibration overlay
|
||||||
# brake: front_ceil(10) + rear_ceil(7.5) = 17.5
|
BRAKE_VIBRATION_MODE_START = 10 # 0-255 brake input scale; gate for slip mode
|
||||||
# throttle: front_ceil(5) + rear_ceil(7.5) = 12.5
|
MIN_BRAKE_VIBRATION = 3 # Hz floor; below → revert to feedback only
|
||||||
BRAKE_PCT_DIVISOR = 17.5
|
MAX_BRAKE_VIBRATION = 35 # Hz ceiling at peak slip
|
||||||
THROTTLE_PCT_DIVISOR = 12.5
|
MIN_BRAKE_RESISTANCE = 1 # baseline strength at zero brake (lib 0-8)
|
||||||
|
MAX_BRAKE_RESISTANCE = 6 # baseline strength at full brake (lib 0-8)
|
||||||
|
MIN_BRAKE_AMP = 1 # vibration-mode amplitude floor (lib 0-8)
|
||||||
|
MAX_BRAKE_AMP = 8 # vibration-mode amplitude ceiling
|
||||||
|
EWMA_ALPHA_BRAKE = 1.0 # 1.0 = instant (no smoothing). Patmagauran's choice.
|
||||||
|
EWMA_ALPHA_BRAKE_FREQ = 1.0
|
||||||
|
|
||||||
# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------
|
# --- R2 throttle ------------------------------------------------------------
|
||||||
|
THROTTLE_GRIP_LOSS_VAL = 0.5
|
||||||
|
THROTTLE_VIBRATION_MODE_START = 10 # 0-255 accelerator input scale
|
||||||
|
MIN_ACCEL_GRIPLOSS_VIBRATION = 3
|
||||||
|
MAX_ACCEL_GRIPLOSS_VIBRATION = 35
|
||||||
|
TURN_ACCEL_MOD = 0.5 # AccelX² weight (lateral)
|
||||||
|
FORWARD_ACCEL_MOD = 1.0 # AccelZ² weight (longitudinal)
|
||||||
|
ACCELERATION_LIMIT = 10.0 # m/s² clamp ceiling
|
||||||
|
MIN_THROTTLE_RESISTANCE = 1
|
||||||
|
MAX_THROTTLE_RESISTANCE = 6
|
||||||
|
MIN_ACCEL_GRIPLOSS_AMP = 1
|
||||||
|
MAX_ACCEL_GRIPLOSS_AMP = 8
|
||||||
|
EWMA_ALPHA_THROTTLE = 0.01 # very smooth (Patmagauran's "creamy" tuning)
|
||||||
|
EWMA_ALPHA_THROTTLE_FREQ = 0.5
|
||||||
|
|
||||||
|
# Hardcoded by Cosmii (legacy_Program.cs:224): rear-slip alone counts only
|
||||||
|
# when accelerator is firmly depressed. Avoids false-positive vibration when
|
||||||
|
# coasting on light throttle through slick patches.
|
||||||
|
ACCELERATOR_REAR_SLIP_GATE = 200 # 0-255 scale
|
||||||
|
|
||||||
|
# --- Lightbar ---------------------------------------------------------------
|
||||||
|
# In-race: green channel ∝ RPM ratio with redline inversion to red.
|
||||||
|
# Out-of-race: car-class color tinted by car performance index.
|
||||||
|
RPM_REDLINE_RATIO = 0.85 # above this, green→red inversion
|
||||||
|
GREEN_FLOOR = 50 # min green channel value (lightbar visible at idle)
|
||||||
|
MAX_CPI = 255.0 # car performance index ceiling for tint
|
||||||
|
|
||||||
|
# Cosmii car-class palettes (legacy_Program.cs:38-51). Class IDs are Forza's
|
||||||
|
# enum: D=0, C=1, B=2, A=3, S1=4, S2=5; X is "above the table".
|
||||||
|
CAR_CLASS_COLORS = {
|
||||||
|
0: (107, 185, 236), # D — cyan
|
||||||
|
1: (234, 202, 49), # C — gold
|
||||||
|
2: (211, 90, 37), # B — orange
|
||||||
|
3: (187, 59, 34), # A — red
|
||||||
|
4: (128, 54, 243), # S1 — purple
|
||||||
|
5: (75, 88, 229), # S2 — blue
|
||||||
|
}
|
||||||
|
COLOR_CLASS_X = (105, 182, 72) # green for above-S2
|
||||||
|
|
||||||
|
# --- IsRaceOn debounce ------------------------------------------------------
|
||||||
|
# FH5 sometimes leaves is_race_on=1 after menu transitions. Count samples
|
||||||
|
# where RPM is stable AND power ≤ 0; once the count exceeds the threshold,
|
||||||
|
# override is_race_on to false. ~3.3s at 60Hz packet rate. (legacy_Program.cs:35)
|
||||||
|
RPM_ACCUMULATOR_TRIGGER_RACE_OFF = 200
|
||||||
|
|
||||||
|
# --- User-facing intensity scales (env vars) --------------------------------
|
||||||
|
# Read once at module import. Process restart picks up new values.
|
||||||
|
|
||||||
|
def _read_intensity(name: str, default: float = 1.0) -> float:
|
||||||
|
"""Parse an intensity env var clamped to [0, 1]. NaN/inf/junk fall back to
|
||||||
|
`default` with a warning — silent fallthrough to 1.0 would hide
|
||||||
|
misconfigurations (e.g. `FORZA_R2_INTENSITY=nan` is almost certainly
|
||||||
|
a typo, not "use full intensity").
|
||||||
|
"""
|
||||||
|
raw = os.environ.get(name)
|
||||||
|
if raw is None:
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
v = float(raw)
|
||||||
|
except ValueError:
|
||||||
|
LOG.warning("invalid %s=%r — using %.2f", name, raw, default)
|
||||||
|
return default
|
||||||
|
if not math.isfinite(v):
|
||||||
|
LOG.warning("non-finite %s=%r — using %.2f", name, raw, default)
|
||||||
|
return default
|
||||||
|
return max(0.0, min(1.0, v))
|
||||||
|
|
||||||
|
|
||||||
|
def _read_bool(name: str, default: bool = True) -> bool:
|
||||||
|
"""Parse a boolean env var. Accepts the usual disable-tokens
|
||||||
|
{0, false, no, off, ""} (case-insensitive) and treats anything else
|
||||||
|
as enabled. Mirrors the strict parsing of `_read_intensity` so that
|
||||||
|
typos don't silently flip behavior.
|
||||||
|
"""
|
||||||
|
raw = os.environ.get(name)
|
||||||
|
if raw is None:
|
||||||
|
return default
|
||||||
|
return raw.strip().lower() not in ("0", "false", "no", "off", "")
|
||||||
|
|
||||||
|
|
||||||
|
LEFT_TRIGGER_INTENSITY = _read_intensity("FORZA_L2_INTENSITY")
|
||||||
|
RIGHT_TRIGGER_INTENSITY = _read_intensity("FORZA_R2_INTENSITY")
|
||||||
|
LIGHTBAR_ENABLED = _read_bool("FORZA_LIGHTBAR", default=True)
|
||||||
|
|
||||||
|
# --- Forza UDP packet sizes -> fdp packet_format strings --------------------
|
||||||
|
# Note: the FM7 V1 232-byte sled format does NOT include `accel`, `brake`,
|
||||||
|
# `power`, `car_class`, or `car_performance_index` — fdp doesn't setattr
|
||||||
|
# those names from a sled-format unpack, and our handlers depend on them.
|
||||||
|
# Driving a sled stream would silently leave both triggers off. We don't
|
||||||
|
# advertise FM7 sled support; if the user ever points an FM7 sled feed at us,
|
||||||
|
# 232-byte packets are simply dropped at _parse_packet.
|
||||||
PACKET_FORMATS = {
|
PACKET_FORMATS = {
|
||||||
232: "sled",
|
311: "dash", # Forza Motorsport 7 V2 car-dash format
|
||||||
311: "dash",
|
324: "fh4", # Forza Horizon 4 / 5 (12-byte gap that fdp's fh4 mode patches around)
|
||||||
324: "fh4", # FH4 and FH5 share the same layout
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# --- Daemon lifecycle constants ----------------------------------------------
|
# --- Daemon lifecycle constants ---------------------------------------------
|
||||||
IDLE_TIMEOUT_S = 3.0
|
IDLE_TIMEOUT_S = 3.0
|
||||||
RECONNECT_BACKOFF_S = 1.0
|
RECONNECT_BACKOFF_S = 1.0
|
||||||
|
|
||||||
|
|
||||||
# --- Module state (signal + hot-plug flags) ----------------------------------
|
# --- Module state (signal + hot-plug flags) ---------------------------------
|
||||||
_shutdown = False
|
_shutdown = False
|
||||||
_disconnected = False
|
_disconnected = False
|
||||||
|
|
||||||
|
|
||||||
def _on_termination(signum: int, frame: object) -> None:
|
def _on_termination(signum: int, frame: object) -> None:
|
||||||
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag.
|
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag."""
|
||||||
|
|
||||||
systemd sends SIGTERM by default; without an explicit handler Python
|
|
||||||
ignores it and systemd waits TimeoutStopSec=90s before SIGKILL.
|
|
||||||
"""
|
|
||||||
global _shutdown
|
global _shutdown
|
||||||
|
LOG.info("received signal %d — shutting down", signum)
|
||||||
_shutdown = True
|
_shutdown = True
|
||||||
|
|
||||||
|
|
||||||
def _on_controller_error(prev: object, exc: Exception) -> None:
|
def _on_controller_error(exc: Exception) -> None:
|
||||||
"""dualsense-controller `on_error` callback for HID read-thread failures."""
|
"""dualsense-controller `on_error` callback for HID read-thread failures.
|
||||||
|
|
||||||
|
The library dispatches state-change callbacks by parameter count
|
||||||
|
(StateValueCallbackManager.py): a 1-arg callback is bound to
|
||||||
|
`_event_name_1_args` which emits `(new_value)`. A 2-arg callback would
|
||||||
|
receive `(new_value, timestamp_ns)` instead — meaning a `(prev, exc)`
|
||||||
|
handler would log a stray monotonic-clock integer instead of the
|
||||||
|
exception. 1-arg keeps observability tight.
|
||||||
|
"""
|
||||||
global _disconnected
|
global _disconnected
|
||||||
LOG.warning("dualsense exception: %s", exc)
|
LOG.warning("dualsense controller error: %s", exc)
|
||||||
_disconnected = True
|
_disconnected = True
|
||||||
|
|
||||||
|
|
||||||
# --- Helpers -----------------------------------------------------------------
|
# --- Helpers ----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_field(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float:
|
||||||
|
"""Read a packet field defensively. Returns default for missing fields,
|
||||||
|
NaN, or inf. Forza packets are fixed-format but mismatched format strings
|
||||||
|
can produce NaN; this filters them so haptic math never receives garbage.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
v = float(getattr(pkt, name))
|
||||||
|
except (AttributeError, TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
if math.isnan(v) or math.isinf(v):
|
||||||
|
return default
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
|
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
|
||||||
"""Read a packet field defensively. Returns 0.0 for missing fields,
|
return abs(_safe_field(pkt, name, 0.0))
|
||||||
NaN, or +/-inf; otherwise returns the absolute value of the float."""
|
|
||||||
try:
|
|
||||||
v = float(getattr(pkt, name, 0.0))
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return 0.0
|
|
||||||
if not math.isfinite(v):
|
|
||||||
return 0.0
|
|
||||||
return abs(v)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
|
def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
|
||||||
"""Create or inherit the UDP listener socket.
|
"""Cosmii's Map function (legacy_Program.cs:478-484, also fds_Program.cs:276).
|
||||||
|
|
||||||
Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours)
|
Like Arduino's `map()`: clamp x to the input range, then linearly remap to
|
||||||
fd 3 is the pre-bound socket. Otherwise bind normally.
|
the output range. Used pervasively throughout the references.
|
||||||
"""
|
"""
|
||||||
listen_pid = os.environ.get("LISTEN_PID", "")
|
if in_max == in_min:
|
||||||
listen_fds = os.environ.get("LISTEN_FDS", "0")
|
return out_min
|
||||||
if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
|
x = max(in_min, min(in_max, x))
|
||||||
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
|
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
||||||
sock.settimeout(timeout)
|
|
||||||
LOG.info("using systemd-pre-bound socket on %s:%d", host, port)
|
|
||||||
return sock
|
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
sock.bind((host, port))
|
|
||||||
sock.settimeout(timeout)
|
|
||||||
return sock
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_packet(data: bytes) -> ForzaDataPacket | None:
|
def _ewma(value: float, last: float, alpha: float) -> float:
|
||||||
fmt = PACKET_FORMATS.get(len(data))
|
"""Exponential weighted moving average (legacy_Program.cs:747).
|
||||||
if fmt is None:
|
|
||||||
LOG.debug("ignoring packet of unexpected length %d", len(data))
|
α=1.0 → instant (output = value), α=0.01 → very smooth (~100-sample lag).
|
||||||
return None
|
"""
|
||||||
try:
|
return alpha * value + (1.0 - alpha) * last
|
||||||
return ForzaDataPacket(data, packet_format=fmt)
|
|
||||||
except Exception:
|
|
||||||
LOG.exception("failed to parse forza packet (len=%d)", len(data))
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]:
|
def _combined_slip(pkt: ForzaDataPacket) -> tuple[float, float, float]:
|
||||||
"""(front_slip, rear_slip) — dominant tire of each axle.
|
"""(combined_all, combined_front, combined_rear) — Cosmii's slip aggregates.
|
||||||
|
|
||||||
Race-Element uses `Math.Max` over each axle's two tires to surface the
|
Cosmii uses the arithmetic mean per axle (legacy_Program.cs:112-114),
|
||||||
worst-slipping wheel, since slip on either side counts. fdp emits
|
distinct from Race-Element's max-of-axle. The mean is more conservative
|
||||||
`tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf
|
and reduces single-tire-pulse chatter.
|
||||||
and takes the absolute value (Race-Element calls this NegateIfNegative).
|
|
||||||
"""
|
"""
|
||||||
fl = _safe_abs(pkt, "tire_combined_slip_FL")
|
fl = _safe_abs(pkt, "tire_combined_slip_FL")
|
||||||
fr = _safe_abs(pkt, "tire_combined_slip_FR")
|
fr = _safe_abs(pkt, "tire_combined_slip_FR")
|
||||||
rl = _safe_abs(pkt, "tire_combined_slip_RL")
|
rl = _safe_abs(pkt, "tire_combined_slip_RL")
|
||||||
rr = _safe_abs(pkt, "tire_combined_slip_RR")
|
rr = _safe_abs(pkt, "tire_combined_slip_RR")
|
||||||
return max(fl, fr), max(rl, rr)
|
return (fl + fr + rl + rr) / 4.0, (fl + fr) / 2.0, (rl + rr) / 2.0
|
||||||
|
|
||||||
|
|
||||||
# --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) -----------------
|
def _avg_accel(pkt: ForzaDataPacket) -> float:
|
||||||
|
"""Cosmii's avgAccel (legacy_Program.cs:219). Lateral and longitudinal
|
||||||
|
chassis accel combined under per-axis weights. Used as the input to the
|
||||||
def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
|
throttle-baseline resistance curve.
|
||||||
"""Brake (L2). Mirrors `TriggerHaptics.HandleBraking`.
|
|
||||||
|
|
||||||
Note: Race-Element's brake path leaves the trigger in its prior effect
|
|
||||||
when brake is engaged but slip is below threshold — i.e. you can hold
|
|
||||||
the brake without slip after an ABS event and the trigger keeps
|
|
||||||
vibrating until you release the brake. This matches the upstream code
|
|
||||||
exactly (no else-branch around the slip check).
|
|
||||||
"""
|
"""
|
||||||
brake_input = _safe_abs(pkt, "brake") / 255.0
|
ax = _safe_field(pkt, "acceleration_x", 0.0)
|
||||||
if brake_input <= BRAKE_INPUT_THRESHOLD:
|
az = _safe_field(pkt, "acceleration_z", 0.0)
|
||||||
|
return math.sqrt(TURN_ACCEL_MOD * ax * ax + FORWARD_ACCEL_MOD * az * az)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Daemon state -----------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DaemonState:
|
||||||
|
"""Per-session state: EWMA filter cells + IsRaceOn debounce + transition flags.
|
||||||
|
|
||||||
|
EWMA cell semantics are unscaled — they store the raw smoothed input.
|
||||||
|
Per-trigger intensity (LEFT/RIGHT_TRIGGER_INTENSITY) is applied at output
|
||||||
|
time only. Storing the scaled value back would compound the scale every
|
||||||
|
tick (steady-state for α<1 collapses to ~0 with intensity<1).
|
||||||
|
|
||||||
|
`was_throttle_slipping` / `was_brake_slipping` track the previous tick's
|
||||||
|
slip state so we can bypass EWMA on transition INTO a slip event. Without
|
||||||
|
this, α=0.01 throttle smoothing means the first ~100 packets of a slip
|
||||||
|
event (~1.7s at 60Hz) feel barely audible.
|
||||||
|
|
||||||
|
Initial values from Cosmii's static initializers (legacy_Program.cs:23-26),
|
||||||
|
rescaled to the dualsense-controller library's 0-8 strength domain.
|
||||||
|
"""
|
||||||
|
last_throttle_resistance: float = 1.0
|
||||||
|
last_throttle_freq: float = 0.0
|
||||||
|
last_brake_resistance: float = 1.0
|
||||||
|
last_brake_freq: float = 0.0
|
||||||
|
was_throttle_slipping: bool = False
|
||||||
|
was_brake_slipping: bool = False
|
||||||
|
# Tracks the previous tick's in_race verdict; the run-loop uses this
|
||||||
|
# to detect in-race → menu transitions and partial-reset the EWMA cells
|
||||||
|
# + slip flags so the next race resumption gets a clean cold-start
|
||||||
|
# path (otherwise stale slip flags suppress the seeding fix in
|
||||||
|
# handle_throttle/handle_brake on the first packet of race 2).
|
||||||
|
last_in_race: bool = False
|
||||||
|
# IsRaceOn debounce.
|
||||||
|
last_rpm: float = 0.0
|
||||||
|
rpm_accumulator: int = 0
|
||||||
|
# Last-known valid car class / CPI. We always update on observation —
|
||||||
|
# Cosmii's `> 0` guard treats Class-D (enum value 0) as "no info" and
|
||||||
|
# leaves the lightbar showing the previous class color, which is wrong.
|
||||||
|
last_car_class: int = 0
|
||||||
|
last_cpi: int = 0
|
||||||
|
# Last lightbar color (skip identical writes). Reset to (0,0,0) on idle
|
||||||
|
# so that resuming with the same color re-pushes after the (0,0,0) reset
|
||||||
|
# we send to the controller; otherwise the bar stays black.
|
||||||
|
last_color: tuple[int, int, int] = (0, 0, 0)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Reset filters, debounce, and lightbar dedup — call when telemetry
|
||||||
|
resumes after idle so the next packet's effects fire from a clean
|
||||||
|
baseline.
|
||||||
|
"""
|
||||||
|
self.last_throttle_resistance = 1.0
|
||||||
|
self.last_throttle_freq = 0.0
|
||||||
|
self.last_brake_resistance = 1.0
|
||||||
|
self.last_brake_freq = 0.0
|
||||||
|
self.was_throttle_slipping = False
|
||||||
|
self.was_brake_slipping = False
|
||||||
|
self.last_rpm = 0.0
|
||||||
|
self.rpm_accumulator = 0
|
||||||
|
self.last_color = (0, 0, 0)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Race detection ---------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def is_race_on(pkt: ForzaDataPacket, state: DaemonState) -> bool:
|
||||||
|
"""Cosmii's IsRaceOn debounce (legacy_Program.cs:89-109).
|
||||||
|
|
||||||
|
Returns True iff the car is actively being driven. Combines the explicit
|
||||||
|
is_race_on flag with the RPM-stability + zero-power workaround for FH5's
|
||||||
|
unreliable flag.
|
||||||
|
"""
|
||||||
|
flag = bool(_safe_field(pkt, "is_race_on", 0.0))
|
||||||
|
current_rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
|
||||||
|
power = _safe_field(pkt, "power", 0.0)
|
||||||
|
|
||||||
|
if abs(current_rpm - state.last_rpm) < 1e-3 and power <= 0:
|
||||||
|
state.rpm_accumulator += 1
|
||||||
|
if state.rpm_accumulator > RPM_ACCUMULATOR_TRIGGER_RACE_OFF:
|
||||||
|
flag = False
|
||||||
|
else:
|
||||||
|
state.rpm_accumulator = 0
|
||||||
|
|
||||||
|
state.last_rpm = current_rpm
|
||||||
|
return flag
|
||||||
|
|
||||||
|
|
||||||
|
# --- Trigger handlers -------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def handle_throttle(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
|
||||||
|
"""R2 throttle. Port of legacy_Program.cs:216-263.
|
||||||
|
|
||||||
|
Modes:
|
||||||
|
- In-race + slip: vibration overlay (freq from slip, amp from avgAccel).
|
||||||
|
If filtered freq <= MIN_ACCEL_GRIPLOSS_VIBRATION
|
||||||
|
OR accelerator <= THROTTLE_VIBRATION_MODE_START,
|
||||||
|
fall through to feedback-only (avoids audible
|
||||||
|
clicking at very low frequencies).
|
||||||
|
- In-race + no slip: continuous feedback resistance scaled by avgAccel.
|
||||||
|
- Pedal not pressed: trigger off.
|
||||||
|
"""
|
||||||
|
accel = _safe_field(pkt, "accel", 0.0)
|
||||||
|
if accel / 255.0 <= THROTTLE_INPUT_THRESHOLD:
|
||||||
|
controller.right_trigger.effect.off()
|
||||||
|
return
|
||||||
|
|
||||||
|
avg_a = _avg_accel(pkt)
|
||||||
|
_, combined_front, combined_rear = _combined_slip(pkt)
|
||||||
|
|
||||||
|
losing_grip = (
|
||||||
|
combined_front > THROTTLE_GRIP_LOSS_VAL
|
||||||
|
or (combined_rear > THROTTLE_GRIP_LOSS_VAL and accel > ACCELERATOR_REAR_SLIP_GATE)
|
||||||
|
)
|
||||||
|
|
||||||
|
if losing_grip:
|
||||||
|
# Vibration mode. Frequency scales with slip severity (Race-Element's
|
||||||
|
# convention: more slip = higher freq, opposite of Patmagauran's
|
||||||
|
# decay-from-threshold formula). Amplitude scales with avgAccel so
|
||||||
|
# heavier acceleration during slip = stronger buzz.
|
||||||
|
# Use the slipping axle's slip for the frequency curve, not the
|
||||||
|
# all-4-tire average. Cosmii's `combinedTireSlip` (mean of all 4)
|
||||||
|
# collapses to ~slip/2 for one-axle slip events (RWD wheelspin =
|
||||||
|
# rear=0.8, front=0 → combined=0.4) which lands below threshold
|
||||||
|
# and silently falls through to feedback-only mode. RWD burnouts
|
||||||
|
# are the most common Forza wheelspin event; we want vibration there.
|
||||||
|
slip_for_freq = max(combined_front, combined_rear)
|
||||||
|
slip_above = max(0.0, slip_for_freq - THROTTLE_GRIP_LOSS_VAL)
|
||||||
|
slip_range = max(1e-6, 1.0 - THROTTLE_GRIP_LOSS_VAL)
|
||||||
|
freq_raw = MIN_ACCEL_GRIPLOSS_VIBRATION + (
|
||||||
|
MAX_ACCEL_GRIPLOSS_VIBRATION - MIN_ACCEL_GRIPLOSS_VIBRATION
|
||||||
|
) * min(1.0, slip_above / slip_range)
|
||||||
|
amp_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_ACCEL_GRIPLOSS_AMP, MAX_ACCEL_GRIPLOSS_AMP)
|
||||||
|
|
||||||
|
# Bypass EWMA on the first packet of a slip event so the buzz is
|
||||||
|
# immediate. With α=0.01 throttle smoothing, the EWMA-warmed first
|
||||||
|
# packet would converge over ~1.7s — slip events are sub-second.
|
||||||
|
if not state.was_throttle_slipping:
|
||||||
|
state.last_throttle_freq = freq_raw
|
||||||
|
state.last_throttle_resistance = amp_raw
|
||||||
|
freq_unscaled = _ewma(freq_raw, state.last_throttle_freq, EWMA_ALPHA_THROTTLE_FREQ)
|
||||||
|
amp_unscaled = _ewma(amp_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
|
||||||
|
# Store unscaled — applying intensity here would compound the scale
|
||||||
|
# every tick (steady-state collapses to ~0 with intensity<1, α<1).
|
||||||
|
state.last_throttle_freq = freq_unscaled
|
||||||
|
state.last_throttle_resistance = amp_unscaled
|
||||||
|
freq_out = freq_unscaled * RIGHT_TRIGGER_INTENSITY
|
||||||
|
amp_out = amp_unscaled * RIGHT_TRIGGER_INTENSITY
|
||||||
|
state.was_throttle_slipping = True
|
||||||
|
|
||||||
|
if freq_out <= MIN_ACCEL_GRIPLOSS_VIBRATION or accel <= THROTTLE_VIBRATION_MODE_START:
|
||||||
|
# Fallback to baseline-style feedback if the computed vibration
|
||||||
|
# is too quiet to be useful.
|
||||||
|
strength = max(MIN_THROTTLE_RESISTANCE, min(MAX_THROTTLE_RESISTANCE, int(round(amp_out))))
|
||||||
|
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
|
||||||
|
else:
|
||||||
|
f = max(1, min(255, int(round(freq_out))))
|
||||||
|
a = max(MIN_ACCEL_GRIPLOSS_AMP, min(MAX_ACCEL_GRIPLOSS_AMP, int(round(amp_out))))
|
||||||
|
controller.right_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
|
||||||
|
else:
|
||||||
|
# Baseline mode. Strength scales with avgAccel — at idle the trigger
|
||||||
|
# is barely resistant; under heavy accel it firms up.
|
||||||
|
resistance_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_THROTTLE_RESISTANCE, MAX_THROTTLE_RESISTANCE)
|
||||||
|
resistance_unscaled = _ewma(resistance_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
|
||||||
|
state.last_throttle_resistance = resistance_unscaled
|
||||||
|
state.was_throttle_slipping = False
|
||||||
|
strength = max(
|
||||||
|
MIN_THROTTLE_RESISTANCE,
|
||||||
|
min(MAX_THROTTLE_RESISTANCE, int(round(resistance_unscaled * RIGHT_TRIGGER_INTENSITY))),
|
||||||
|
)
|
||||||
|
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_brake(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
|
||||||
|
"""L2 brake. Port of legacy_Program.cs:281-313 with the bug fix described
|
||||||
|
in the module docstring (Cosmii's inverted comparison operators replaced
|
||||||
|
with Patmagauran's correct logic: vibration when slip > thr AND brake > thr).
|
||||||
|
|
||||||
|
Modes:
|
||||||
|
- In-race + slip: vibration overlay (freq from slip, amp from brake input).
|
||||||
|
Below MIN_BRAKE_VIBRATION freq → revert to feedback-only.
|
||||||
|
- In-race + no slip: continuous feedback resistance scaled by brake input.
|
||||||
|
- Pedal not pressed: trigger off.
|
||||||
|
"""
|
||||||
|
brake = _safe_field(pkt, "brake", 0.0)
|
||||||
|
if brake / 255.0 <= BRAKE_INPUT_THRESHOLD:
|
||||||
controller.left_trigger.effect.off()
|
controller.left_trigger.effect.off()
|
||||||
return
|
return
|
||||||
|
|
||||||
front_slip, rear_slip = _axle_slip(pkt)
|
# Use the worst axle's slip for both detection and the freq curve.
|
||||||
if (
|
# Cosmii's all-4 average misses single-axle ABS events (front locks
|
||||||
front_slip <= BRAKE_FRONT_SLIP_THRESHOLD
|
# first under weight transfer; combined-average can stay below 0.5
|
||||||
and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD
|
# even with the front fully locked). Race-Element variant B uses
|
||||||
):
|
# max-of-axle for the same reason.
|
||||||
return # Race-Element falls through here: trigger keeps prior effect.
|
_, combined_front, combined_rear = _combined_slip(pkt)
|
||||||
|
slip_for_freq = max(combined_front, combined_rear)
|
||||||
|
slipping = slip_for_freq > GRIP_LOSS_VAL and brake > BRAKE_VIBRATION_MODE_START
|
||||||
|
|
||||||
front_coef = min(front_slip * 4, 10)
|
if slipping:
|
||||||
# RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip
|
slip_above = max(0.0, slip_for_freq - GRIP_LOSS_VAL)
|
||||||
# (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps
|
slip_range = max(1e-6, 1.0 - GRIP_LOSS_VAL)
|
||||||
# the bug; if you'd rather use rear_slip * 2, change one symbol.
|
freq_raw = MIN_BRAKE_VIBRATION + (
|
||||||
rear_coef = min(front_slip * 2, 7.5)
|
MAX_BRAKE_VIBRATION - MIN_BRAKE_VIBRATION
|
||||||
pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR
|
) * min(1.0, slip_above / slip_range)
|
||||||
freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct))
|
amp_raw = _map(brake, 0, 255, MIN_BRAKE_AMP, MAX_BRAKE_AMP)
|
||||||
|
|
||||||
controller.left_trigger.effect.vibration(
|
# Bypass EWMA on slip-event entry; same reasoning as throttle.
|
||||||
start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq
|
if not state.was_brake_slipping:
|
||||||
|
state.last_brake_freq = freq_raw
|
||||||
|
state.last_brake_resistance = amp_raw
|
||||||
|
freq_unscaled = _ewma(freq_raw, state.last_brake_freq, EWMA_ALPHA_BRAKE_FREQ)
|
||||||
|
amp_unscaled = _ewma(amp_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
|
||||||
|
state.last_brake_freq = freq_unscaled
|
||||||
|
state.last_brake_resistance = amp_unscaled
|
||||||
|
freq_out = freq_unscaled * LEFT_TRIGGER_INTENSITY
|
||||||
|
amp_out = amp_unscaled * LEFT_TRIGGER_INTENSITY
|
||||||
|
state.was_brake_slipping = True
|
||||||
|
|
||||||
|
if freq_out <= MIN_BRAKE_VIBRATION:
|
||||||
|
strength = max(MIN_BRAKE_RESISTANCE, min(MAX_BRAKE_RESISTANCE, int(round(amp_out))))
|
||||||
|
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
|
||||||
|
else:
|
||||||
|
f = max(1, min(255, int(round(freq_out))))
|
||||||
|
a = max(MIN_BRAKE_AMP, min(MAX_BRAKE_AMP, int(round(amp_out))))
|
||||||
|
controller.left_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
|
||||||
|
else:
|
||||||
|
resistance_raw = _map(brake, 0, 255, MIN_BRAKE_RESISTANCE, MAX_BRAKE_RESISTANCE)
|
||||||
|
resistance_unscaled = _ewma(resistance_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
|
||||||
|
state.last_brake_resistance = resistance_unscaled
|
||||||
|
state.was_brake_slipping = False
|
||||||
|
strength = max(
|
||||||
|
MIN_BRAKE_RESISTANCE,
|
||||||
|
min(MAX_BRAKE_RESISTANCE, int(round(resistance_unscaled * LEFT_TRIGGER_INTENSITY))),
|
||||||
|
)
|
||||||
|
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Lightbar ---------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def lightbar_color(pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> tuple[int, int, int]:
|
||||||
|
"""Compute the (R, G, B) the lightbar should display this tick.
|
||||||
|
|
||||||
|
In-race: green ∝ RPM ratio, with redline inversion (the green channel is
|
||||||
|
flipped to 255-G so the bar appears red as you approach the rev limit).
|
||||||
|
Out-of-race: car-class color tinted by car performance index.
|
||||||
|
|
||||||
|
Pure function — caller is responsible for actually pushing the color to
|
||||||
|
the controller (skipping if unchanged from `state.last_color`).
|
||||||
|
"""
|
||||||
|
if in_race:
|
||||||
|
rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
|
||||||
|
idle = _safe_field(pkt, "engine_idle_rpm", 0.0)
|
||||||
|
max_rpm = _safe_field(pkt, "engine_max_rpm", 0.0)
|
||||||
|
rpm_range = max(1.0, max_rpm - idle)
|
||||||
|
ratio = max(0.0, min(1.0, (rpm - idle) / rpm_range))
|
||||||
|
green = max(GREEN_FLOOR, int(ratio * 255))
|
||||||
|
red = int(ratio * 255)
|
||||||
|
if ratio >= RPM_REDLINE_RATIO:
|
||||||
|
green = 255 - green
|
||||||
|
return (red, green, 0)
|
||||||
|
|
||||||
|
# Menu mode: car-class color tinted by performance index. We always update
|
||||||
|
# on observation — Cosmii's `> 0` guard treats Class-D (enum value 0) as
|
||||||
|
# "no info" and leaves the previous class color stuck on the lightbar
|
||||||
|
# when switching INTO a Class-D car. fdp always sets these fields from
|
||||||
|
# an fh4/dash unpack, so absence is impossible — only the value matters.
|
||||||
|
state.last_car_class = int(_safe_field(pkt, "car_class", 0.0))
|
||||||
|
state.last_cpi = max(0, min(int(MAX_CPI), int(_safe_field(pkt, "car_performance_index", 0.0))))
|
||||||
|
|
||||||
|
base = CAR_CLASS_COLORS.get(state.last_car_class, COLOR_CLASS_X)
|
||||||
|
if state.last_car_class > 5:
|
||||||
|
# X-class: untinted constant green.
|
||||||
|
return COLOR_CLASS_X
|
||||||
|
cpi_ratio = state.last_cpi / MAX_CPI if state.last_cpi > 0 else 0.0
|
||||||
|
return (
|
||||||
|
int(cpi_ratio * base[0]),
|
||||||
|
int(cpi_ratio * base[1]),
|
||||||
|
int(cpi_ratio * base[2]),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
|
def apply_lightbar(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> None:
|
||||||
"""Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`."""
|
if not LIGHTBAR_ENABLED:
|
||||||
throttle_input = _safe_abs(pkt, "accel") / 255.0
|
|
||||||
if throttle_input <= THROTTLE_INPUT_THRESHOLD:
|
|
||||||
controller.right_trigger.effect.off()
|
|
||||||
return
|
return
|
||||||
|
color = lightbar_color(pkt, state, in_race)
|
||||||
front_slip, rear_slip = _axle_slip(pkt)
|
if color == state.last_color:
|
||||||
if (
|
|
||||||
front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD
|
|
||||||
and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD
|
|
||||||
):
|
|
||||||
# Throttle path resets to default explicitly when not slipping
|
|
||||||
# (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`).
|
|
||||||
controller.right_trigger.effect.off()
|
|
||||||
return
|
return
|
||||||
|
state.last_color = color
|
||||||
front_coef = min(front_slip * 3, 5)
|
controller.lightbar.set_color(*color)
|
||||||
# RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip
|
|
||||||
# (TriggerHaptics.cs line 92, `slipRatioFront * 5f`).
|
|
||||||
rear_coef = min(front_slip * 5, 7.5)
|
|
||||||
pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR
|
|
||||||
freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct))
|
|
||||||
|
|
||||||
controller.right_trigger.effect.vibration(
|
|
||||||
start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def reset_triggers(controller: DualSenseController) -> None:
|
# --- Reset ------------------------------------------------------------------
|
||||||
"""Default both triggers — called on idle, reconnect, and shutdown."""
|
|
||||||
|
|
||||||
|
def reset_triggers(controller: Optional[DualSenseController]) -> None:
|
||||||
|
"""Default both triggers — called on idle, reconnect, and shutdown.
|
||||||
|
Tolerates a None controller so the shutdown-during-reconnect cleanup
|
||||||
|
path doesn't raise.
|
||||||
|
"""
|
||||||
|
if controller is None:
|
||||||
|
return
|
||||||
controller.left_trigger.effect.off()
|
controller.left_trigger.effect.off()
|
||||||
controller.right_trigger.effect.off()
|
controller.right_trigger.effect.off()
|
||||||
|
|
||||||
|
|
||||||
# --- Connection / hot-plug ---------------------------------------------------
|
def reset_all(controller: Optional[DualSenseController]) -> None:
|
||||||
|
"""reset_triggers + lightbar to (0, 0, 0). Tolerates None controller."""
|
||||||
|
if controller is None:
|
||||||
|
return
|
||||||
|
reset_triggers(controller)
|
||||||
|
if LIGHTBAR_ENABLED:
|
||||||
|
try:
|
||||||
|
controller.lightbar.set_color(0, 0, 0)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _connect_controller() -> DualSenseController | None:
|
# --- Connection / hot-plug --------------------------------------------------
|
||||||
"""Block until a DualSense is reachable. Returns the activated
|
|
||||||
controller, or None if shutdown was requested before one appeared.
|
|
||||||
|
def _connect_controller() -> Optional[DualSenseController]:
|
||||||
|
"""Block until a DualSense is reachable. Returns the activated controller,
|
||||||
|
or None if shutdown was requested before one appeared.
|
||||||
"""
|
"""
|
||||||
LOG.info("opening dualsense controller")
|
LOG.info("opening dualsense controller")
|
||||||
first_failure_logged = False
|
first_failure_logged = False
|
||||||
@@ -302,13 +641,21 @@ def _connect_controller() -> DualSenseController | None:
|
|||||||
|
|
||||||
if not devices:
|
if not devices:
|
||||||
if not first_failure_logged:
|
if not first_failure_logged:
|
||||||
LOG.warning(
|
LOG.warning("no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S)
|
||||||
"no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S
|
|
||||||
)
|
|
||||||
first_failure_logged = True
|
first_failure_logged = True
|
||||||
time.sleep(RECONNECT_BACKOFF_S)
|
time.sleep(RECONNECT_BACKOFF_S)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
global _disconnected
|
||||||
|
# Clear the disconnect flag BEFORE activate() so the HID worker
|
||||||
|
# thread (started by activate()) can't observe a stale-true flag if
|
||||||
|
# it raises immediately after spawn — a known failure mode right
|
||||||
|
# after USB enumeration when the kernel is still re-attaching the
|
||||||
|
# hidraw node. Without this, _on_controller_error fires inside
|
||||||
|
# activate(), sets _disconnected=True, then we'd unconditionally
|
||||||
|
# overwrite it back to False below.
|
||||||
|
_disconnected = False
|
||||||
try:
|
try:
|
||||||
controller = DualSenseController(device_index_or_device_info=devices[0])
|
controller = DualSenseController(device_index_or_device_info=devices[0])
|
||||||
controller.on_error(_on_controller_error)
|
controller.on_error(_on_controller_error)
|
||||||
@@ -323,16 +670,14 @@ def _connect_controller() -> DualSenseController | None:
|
|||||||
# Push a clean initial state so we don't inherit residual effects from
|
# Push a clean initial state so we don't inherit residual effects from
|
||||||
# a previous Steam Input session, prior daemon instance, or stale
|
# a previous Steam Input session, prior daemon instance, or stale
|
||||||
# firmware state.
|
# firmware state.
|
||||||
reset_triggers(controller)
|
reset_all(controller)
|
||||||
global _disconnected
|
|
||||||
_disconnected = False
|
|
||||||
LOG.info("dualsense controller connected (%s)", controller.connection_type)
|
LOG.info("dualsense controller connected (%s)", controller.connection_type)
|
||||||
return controller
|
return controller
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _close_controller(controller: DualSenseController | None) -> None:
|
def _close_controller(controller: Optional[DualSenseController]) -> None:
|
||||||
if controller is None:
|
if controller is None:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
@@ -341,17 +686,56 @@ def _close_controller(controller: DualSenseController | None) -> None:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# --- Main loop ---------------------------------------------------------------
|
# --- UDP socket -------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
|
||||||
|
"""Create or inherit the UDP listener socket. Honors LISTEN_FDS for
|
||||||
|
systemd socket activation; falls back to opening a fresh socket bound
|
||||||
|
to (host, port).
|
||||||
|
"""
|
||||||
|
listen_pid = os.environ.get("LISTEN_PID")
|
||||||
|
listen_fds = os.environ.get("LISTEN_FDS")
|
||||||
|
if listen_pid and listen_fds and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
|
||||||
|
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
LOG.debug("inherited UDP socket fd=3 from systemd")
|
||||||
|
else:
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
sock.bind((host, port))
|
||||||
|
sock.settimeout(timeout)
|
||||||
|
return sock
|
||||||
|
|
||||||
|
|
||||||
|
_warned_packet_sizes: set[int] = set()
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_packet(data: bytes) -> Optional[ForzaDataPacket]:
|
||||||
|
n = len(data)
|
||||||
|
fmt = PACKET_FORMATS.get(n)
|
||||||
|
if fmt is None:
|
||||||
|
if n not in _warned_packet_sizes:
|
||||||
|
_warned_packet_sizes.add(n)
|
||||||
|
LOG.warning(
|
||||||
|
"ignoring unrecognised %d-byte UDP packet "
|
||||||
|
"(expected 311 [FM7 dash] or 324 [FH4/5]); "
|
||||||
|
"if FM7, switch HUD Data Out to CAR DASH format",
|
||||||
|
n,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return ForzaDataPacket(data, packet_format=fmt)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# --- Main loop --------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
||||||
signal.signal(signal.SIGTERM, _on_termination)
|
signal.signal(signal.SIGTERM, _on_termination)
|
||||||
signal.signal(signal.SIGINT, _on_termination)
|
signal.signal(signal.SIGINT, _on_termination)
|
||||||
|
|
||||||
# Bind the UDP socket BEFORE opening the controller. If the bind fails
|
|
||||||
# (port in use, systemd-passed fd unusable, etc.) we exit cleanly without
|
|
||||||
# having activated the controller's HID stack (which would otherwise leak
|
|
||||||
# an active session). _get_socket raises on bind failure.
|
|
||||||
LOG.info("listening for forza udp on %s:%d", host, port)
|
LOG.info("listening for forza udp on %s:%d", host, port)
|
||||||
sock = _get_socket(host, port)
|
sock = _get_socket(host, port)
|
||||||
|
|
||||||
@@ -360,6 +744,7 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
|||||||
sock.close()
|
sock.close()
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
state = DaemonState()
|
||||||
last_seen = 0.0
|
last_seen = 0.0
|
||||||
have_telemetry = False
|
have_telemetry = False
|
||||||
|
|
||||||
@@ -371,6 +756,7 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
|||||||
controller = _connect_controller()
|
controller = _connect_controller()
|
||||||
if controller is None:
|
if controller is None:
|
||||||
return 0
|
return 0
|
||||||
|
state.reset()
|
||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
try:
|
try:
|
||||||
@@ -382,7 +768,8 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
|||||||
break
|
break
|
||||||
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
||||||
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
|
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
|
||||||
reset_triggers(controller)
|
reset_all(controller)
|
||||||
|
state.reset()
|
||||||
have_telemetry = False
|
have_telemetry = False
|
||||||
if exit_on_idle:
|
if exit_on_idle:
|
||||||
LOG.info("exiting on idle")
|
LOG.info("exiting on idle")
|
||||||
@@ -393,11 +780,24 @@ def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
|||||||
if pkt is None:
|
if pkt is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
handle_acceleration(controller, pkt)
|
in_race = is_race_on(pkt, state)
|
||||||
handle_braking(controller, pkt)
|
if in_race:
|
||||||
|
handle_throttle(controller, pkt, state)
|
||||||
|
handle_brake(controller, pkt, state)
|
||||||
|
else:
|
||||||
|
# On the in-race → menu transition, partial-reset state so
|
||||||
|
# the next race resumption gets clean EWMA cells, fresh slip
|
||||||
|
# flags (so the cold-start fix at handle_throttle/brake
|
||||||
|
# re-fires), and a redrawn lightbar. Edge-only — repeated
|
||||||
|
# menu packets shouldn't keep clearing state.
|
||||||
|
if state.last_in_race:
|
||||||
|
state.reset()
|
||||||
|
reset_triggers(controller)
|
||||||
|
state.last_in_race = in_race
|
||||||
|
apply_lightbar(controller, pkt, state, in_race)
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
reset_triggers(controller)
|
reset_all(controller)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
_close_controller(controller)
|
_close_controller(controller)
|
||||||
|
|||||||
745
hosts/yarn/forza-trigger/test_forza_trigger.py
Normal file
745
hosts/yarn/forza-trigger/test_forza_trigger.py
Normal file
@@ -0,0 +1,745 @@
|
|||||||
|
"""Behavioral tests for forza_trigger.py.
|
||||||
|
|
||||||
|
Pure-function tests + integration tests using a FakeController that records
|
||||||
|
every effect call. No hardware required.
|
||||||
|
|
||||||
|
Run: `python -m unittest discover -s hosts/yarn/forza-trigger -p 'test_*.py'`
|
||||||
|
or via the nix derivation in default.nix (runs at build time).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any, Tuple
|
||||||
|
|
||||||
|
# Disable lightbar by default for triggers-only tests; specific lightbar tests
|
||||||
|
# re-enable by mutating module attributes.
|
||||||
|
os.environ.setdefault("FORZA_LIGHTBAR", "0")
|
||||||
|
|
||||||
|
import forza_trigger as ft # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
# --- Fakes ------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class FakeEffect:
|
||||||
|
def __init__(self, log: list, side: str):
|
||||||
|
self.log = log
|
||||||
|
self.side = side
|
||||||
|
|
||||||
|
def off(self):
|
||||||
|
self.log.append((self.side, "off"))
|
||||||
|
|
||||||
|
def feedback(self, start_position=0, strength=0):
|
||||||
|
self.log.append((self.side, "feedback", start_position, strength))
|
||||||
|
|
||||||
|
def vibration(self, start_position=0, amplitude=0, frequency=0):
|
||||||
|
self.log.append((self.side, "vibration", start_position, amplitude, frequency))
|
||||||
|
|
||||||
|
|
||||||
|
class FakeTrigger:
|
||||||
|
def __init__(self, log: list, side: str):
|
||||||
|
self.effect = FakeEffect(log, side)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeLightbar:
|
||||||
|
def __init__(self, log: list):
|
||||||
|
self.log = log
|
||||||
|
|
||||||
|
def set_color(self, r, g, b):
|
||||||
|
self.log.append(("lightbar", "set_color", r, g, b))
|
||||||
|
|
||||||
|
|
||||||
|
class FakeController:
|
||||||
|
def __init__(self):
|
||||||
|
self.calls: list = []
|
||||||
|
self.left_trigger = FakeTrigger(self.calls, "L2")
|
||||||
|
self.right_trigger = FakeTrigger(self.calls, "R2")
|
||||||
|
self.lightbar = FakeLightbar(self.calls)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FakePacket:
|
||||||
|
"""Shape mirrors fdp.ForzaDataPacket. Only the fields handlers read are
|
||||||
|
populated; the rest stay 0.
|
||||||
|
"""
|
||||||
|
accel: float = 0.0
|
||||||
|
brake: float = 0.0
|
||||||
|
is_race_on: float = 1.0
|
||||||
|
current_engine_rpm: float = 3000.0
|
||||||
|
engine_idle_rpm: float = 800.0
|
||||||
|
engine_max_rpm: float = 8000.0
|
||||||
|
power: float = 100.0
|
||||||
|
acceleration_x: float = 0.0
|
||||||
|
acceleration_y: float = 0.0
|
||||||
|
acceleration_z: float = 0.0
|
||||||
|
tire_combined_slip_FL: float = 0.0
|
||||||
|
tire_combined_slip_FR: float = 0.0
|
||||||
|
tire_combined_slip_RL: float = 0.0
|
||||||
|
tire_combined_slip_RR: float = 0.0
|
||||||
|
car_class: float = 0.0
|
||||||
|
car_performance_index: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# --- Pure-function tests ----------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestMap(unittest.TestCase):
|
||||||
|
def test_endpoints(self):
|
||||||
|
self.assertEqual(ft._map(0, 0, 1, 0, 100), 0)
|
||||||
|
self.assertEqual(ft._map(1, 0, 1, 0, 100), 100)
|
||||||
|
|
||||||
|
def test_midpoint(self):
|
||||||
|
self.assertEqual(ft._map(0.5, 0, 1, 0, 100), 50)
|
||||||
|
|
||||||
|
def test_clamps_low(self):
|
||||||
|
self.assertEqual(ft._map(-5, 0, 10, 0, 100), 0)
|
||||||
|
|
||||||
|
def test_clamps_high(self):
|
||||||
|
self.assertEqual(ft._map(50, 0, 10, 0, 100), 100)
|
||||||
|
|
||||||
|
def test_zero_input_range(self):
|
||||||
|
# Degenerate input range — must not divide by zero.
|
||||||
|
self.assertEqual(ft._map(5, 5, 5, 0, 100), 0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEWMA(unittest.TestCase):
|
||||||
|
def test_alpha_one_is_instant(self):
|
||||||
|
# α=1.0 → output exactly equals the input (no smoothing).
|
||||||
|
self.assertEqual(ft._ewma(10, 5, 1.0), 10)
|
||||||
|
|
||||||
|
def test_alpha_zero_is_frozen(self):
|
||||||
|
# α=0.0 → output exactly equals the previous value (no update).
|
||||||
|
self.assertEqual(ft._ewma(10, 5, 0.0), 5)
|
||||||
|
|
||||||
|
def test_alpha_half(self):
|
||||||
|
self.assertEqual(ft._ewma(10, 6, 0.5), 8)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSafeField(unittest.TestCase):
|
||||||
|
def test_missing_attribute_returns_default(self):
|
||||||
|
pkt = FakePacket()
|
||||||
|
self.assertEqual(ft._safe_field(pkt, "nonexistent_field", 42.0), 42.0)
|
||||||
|
|
||||||
|
def test_nan_returns_default(self):
|
||||||
|
pkt = FakePacket(accel=math.nan)
|
||||||
|
self.assertEqual(ft._safe_field(pkt, "accel", 7.0), 7.0)
|
||||||
|
|
||||||
|
def test_inf_returns_default(self):
|
||||||
|
pkt = FakePacket(accel=math.inf)
|
||||||
|
self.assertEqual(ft._safe_field(pkt, "accel", 0.0), 0.0)
|
||||||
|
|
||||||
|
def test_normal_value(self):
|
||||||
|
pkt = FakePacket(accel=128.0)
|
||||||
|
self.assertEqual(ft._safe_field(pkt, "accel"), 128.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCombinedSlip(unittest.TestCase):
|
||||||
|
def test_arithmetic_mean(self):
|
||||||
|
pkt = FakePacket(
|
||||||
|
tire_combined_slip_FL=0.4,
|
||||||
|
tire_combined_slip_FR=0.6,
|
||||||
|
tire_combined_slip_RL=0.2,
|
||||||
|
tire_combined_slip_RR=0.8,
|
||||||
|
)
|
||||||
|
all_, front, rear = ft._combined_slip(pkt)
|
||||||
|
# All four = (0.4+0.6+0.2+0.8)/4 = 0.5
|
||||||
|
self.assertAlmostEqual(all_, 0.5)
|
||||||
|
# Front = (0.4+0.6)/2 = 0.5
|
||||||
|
self.assertAlmostEqual(front, 0.5)
|
||||||
|
# Rear = (0.2+0.8)/2 = 0.5
|
||||||
|
self.assertAlmostEqual(rear, 0.5)
|
||||||
|
|
||||||
|
def test_takes_absolute_values(self):
|
||||||
|
pkt = FakePacket(
|
||||||
|
tire_combined_slip_FL=-0.5,
|
||||||
|
tire_combined_slip_FR=0.5,
|
||||||
|
tire_combined_slip_RL=-0.3,
|
||||||
|
tire_combined_slip_RR=0.3,
|
||||||
|
)
|
||||||
|
all_, front, rear = ft._combined_slip(pkt)
|
||||||
|
self.assertAlmostEqual(all_, 0.4)
|
||||||
|
self.assertAlmostEqual(front, 0.5)
|
||||||
|
self.assertAlmostEqual(rear, 0.3)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Race-on debounce tests -------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsRaceOn(unittest.TestCase):
|
||||||
|
def test_simple_yes(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=200)
|
||||||
|
self.assertTrue(ft.is_race_on(pkt, s))
|
||||||
|
self.assertEqual(s.rpm_accumulator, 0)
|
||||||
|
|
||||||
|
def test_explicit_no(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(is_race_on=0.0)
|
||||||
|
self.assertFalse(ft.is_race_on(pkt, s))
|
||||||
|
|
||||||
|
def test_debounce_overrides_stuck_flag(self):
|
||||||
|
# Simulate FH5's stuck-flag bug: is_race_on=1 but RPM and power show
|
||||||
|
# the car is in a menu (power<=0, RPM unchanged).
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(is_race_on=1.0, current_engine_rpm=800, power=0)
|
||||||
|
s.last_rpm = 800
|
||||||
|
# Pump packets until the accumulator trips.
|
||||||
|
for _ in range(ft.RPM_ACCUMULATOR_TRIGGER_RACE_OFF):
|
||||||
|
self.assertTrue(ft.is_race_on(pkt, s))
|
||||||
|
# One more samples and we cross the threshold.
|
||||||
|
self.assertFalse(ft.is_race_on(pkt, s))
|
||||||
|
|
||||||
|
def test_rpm_change_resets_accumulator(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.rpm_accumulator = 50
|
||||||
|
s.last_rpm = 800
|
||||||
|
pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=100)
|
||||||
|
ft.is_race_on(pkt, s)
|
||||||
|
self.assertEqual(s.rpm_accumulator, 0)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Trigger handler tests --------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandleThrottle(unittest.TestCase):
|
||||||
|
def test_zero_pedal_turns_off(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
ft.handle_throttle(c, FakePacket(accel=0.0), s)
|
||||||
|
self.assertEqual(c.calls, [("R2", "off")])
|
||||||
|
|
||||||
|
def test_baseline_under_normal_acceleration(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
# No slip, moderate accel, throttle pressed.
|
||||||
|
pkt = FakePacket(accel=128, acceleration_x=0.5, acceleration_z=2.0)
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
# Baseline mode → feedback call (not vibration, not off).
|
||||||
|
self.assertEqual(len(c.calls), 1)
|
||||||
|
self.assertEqual(c.calls[0][1], "feedback")
|
||||||
|
side, mode, start, strength = c.calls[0]
|
||||||
|
self.assertEqual(side, "R2")
|
||||||
|
self.assertEqual(start, 0)
|
||||||
|
# Strength is in valid library range.
|
||||||
|
self.assertGreaterEqual(strength, ft.MIN_THROTTLE_RESISTANCE)
|
||||||
|
self.assertLessEqual(strength, ft.MAX_THROTTLE_RESISTANCE)
|
||||||
|
|
||||||
|
def test_front_slip_triggers_vibration(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
# Front-slip > 0.5 with throttle pressed → vibration mode.
|
||||||
|
# Use heavy accel so amp_raw is large enough that the EWMA-filtered
|
||||||
|
# value crosses the suppression threshold and we see vibration not
|
||||||
|
# feedback fallback.
|
||||||
|
pkt = FakePacket(
|
||||||
|
accel=255,
|
||||||
|
acceleration_z=10.0,
|
||||||
|
tire_combined_slip_FL=0.8,
|
||||||
|
tire_combined_slip_FR=0.8,
|
||||||
|
)
|
||||||
|
# Pre-warm EWMA so first call doesn't get heavily damped.
|
||||||
|
s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION
|
||||||
|
s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
self.assertEqual(len(c.calls), 1)
|
||||||
|
self.assertEqual(c.calls[0][1], "vibration")
|
||||||
|
|
||||||
|
def test_rear_slip_below_accelerator_gate_no_vibration(self):
|
||||||
|
# Rear-only slip but accelerator below the gate should NOT trigger
|
||||||
|
# vibration mode (Cosmii's rule, legacy_Program.cs:224).
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(
|
||||||
|
accel=100, # below ACCELERATOR_REAR_SLIP_GATE=200
|
||||||
|
tire_combined_slip_RL=0.8,
|
||||||
|
tire_combined_slip_RR=0.8,
|
||||||
|
)
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[0][1], "feedback") # baseline mode
|
||||||
|
|
||||||
|
def test_rear_slip_above_accelerator_gate_triggers_vibration(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION
|
||||||
|
s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP
|
||||||
|
pkt = FakePacket(
|
||||||
|
accel=255, # above ACCELERATOR_REAR_SLIP_GATE=200
|
||||||
|
acceleration_z=10.0,
|
||||||
|
tire_combined_slip_RL=0.8,
|
||||||
|
tire_combined_slip_RR=0.8,
|
||||||
|
)
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[0][1], "vibration")
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandleBrake(unittest.TestCase):
|
||||||
|
def test_zero_pedal_turns_off(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
ft.handle_brake(c, FakePacket(brake=0.0), s)
|
||||||
|
self.assertEqual(c.calls, [("L2", "off")])
|
||||||
|
|
||||||
|
def test_baseline_under_normal_braking(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(brake=128)
|
||||||
|
ft.handle_brake(c, pkt, s)
|
||||||
|
self.assertEqual(len(c.calls), 1)
|
||||||
|
self.assertEqual(c.calls[0][1], "feedback")
|
||||||
|
# Strength scales with brake input. brake=128 → midpoint of 1..6 = 4ish.
|
||||||
|
_, _, _, strength = c.calls[0]
|
||||||
|
self.assertGreaterEqual(strength, ft.MIN_BRAKE_RESISTANCE)
|
||||||
|
self.assertLessEqual(strength, ft.MAX_BRAKE_RESISTANCE)
|
||||||
|
|
||||||
|
def test_slip_with_heavy_brake_triggers_vibration(self):
|
||||||
|
# Patmagauran/our condition: slip > thr AND brake > thr.
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.last_brake_freq = ft.MAX_BRAKE_VIBRATION
|
||||||
|
s.last_brake_resistance = ft.MAX_BRAKE_AMP
|
||||||
|
pkt = FakePacket(
|
||||||
|
brake=200, # well above BRAKE_VIBRATION_MODE_START=10
|
||||||
|
tire_combined_slip_FL=0.8,
|
||||||
|
tire_combined_slip_FR=0.8,
|
||||||
|
tire_combined_slip_RL=0.8,
|
||||||
|
tire_combined_slip_RR=0.8,
|
||||||
|
)
|
||||||
|
ft.handle_brake(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[0][1], "vibration")
|
||||||
|
|
||||||
|
def test_slip_with_light_brake_no_vibration(self):
|
||||||
|
# Slip but brake below threshold → baseline mode (Patmagauran's logic,
|
||||||
|
# NOT Cosmii's inverted version).
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(
|
||||||
|
brake=8, # below BRAKE_VIBRATION_MODE_START=10
|
||||||
|
tire_combined_slip_FL=0.8,
|
||||||
|
tire_combined_slip_FR=0.8,
|
||||||
|
)
|
||||||
|
ft.handle_brake(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[0][1], "feedback") # baseline mode
|
||||||
|
|
||||||
|
|
||||||
|
# --- Lightbar tests ---------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestLightbarColor(unittest.TestCase):
|
||||||
|
def test_in_race_low_rpm_green_floor(self):
|
||||||
|
# At idle RPM the green channel should hit GREEN_FLOOR (50), not 0.
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(
|
||||||
|
is_race_on=1.0,
|
||||||
|
current_engine_rpm=800, # = idle
|
||||||
|
engine_idle_rpm=800,
|
||||||
|
engine_max_rpm=8000,
|
||||||
|
)
|
||||||
|
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
|
||||||
|
self.assertEqual(r, 0)
|
||||||
|
self.assertEqual(g, ft.GREEN_FLOOR)
|
||||||
|
self.assertEqual(b, 0)
|
||||||
|
|
||||||
|
def test_in_race_below_redline_green(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
# 50% RPM ratio, below redline.
|
||||||
|
pkt = FakePacket(
|
||||||
|
is_race_on=1.0,
|
||||||
|
current_engine_rpm=4400,
|
||||||
|
engine_idle_rpm=800,
|
||||||
|
engine_max_rpm=8000,
|
||||||
|
)
|
||||||
|
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
|
||||||
|
# Both red and green should rise; green not inverted yet.
|
||||||
|
self.assertGreater(g, 0)
|
||||||
|
self.assertLess(g, 255)
|
||||||
|
|
||||||
|
def test_in_race_above_redline_inverts_green(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
# 95% RPM ratio, well past 85% redline.
|
||||||
|
pkt = FakePacket(
|
||||||
|
is_race_on=1.0,
|
||||||
|
current_engine_rpm=8000,
|
||||||
|
engine_idle_rpm=800,
|
||||||
|
engine_max_rpm=8000,
|
||||||
|
)
|
||||||
|
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
|
||||||
|
# At 100% ratio, green=255 then inverted to 0; red=255.
|
||||||
|
self.assertEqual(r, 255)
|
||||||
|
self.assertEqual(g, 0)
|
||||||
|
|
||||||
|
def test_menu_class_d_color(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(car_class=0, car_performance_index=255)
|
||||||
|
rgb = ft.lightbar_color(pkt, s, in_race=False)
|
||||||
|
# Class D at full CPI = exact palette color.
|
||||||
|
self.assertEqual(rgb, ft.CAR_CLASS_COLORS[0])
|
||||||
|
|
||||||
|
def test_menu_x_class_constant(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(car_class=7, car_performance_index=999)
|
||||||
|
rgb = ft.lightbar_color(pkt, s, in_race=False)
|
||||||
|
self.assertEqual(rgb, ft.COLOR_CLASS_X)
|
||||||
|
|
||||||
|
def test_menu_class_d_packet_unsticks_previous_class(self):
|
||||||
|
# Switching from S2 → D MUST update the lightbar to D's palette,
|
||||||
|
# not keep showing S2 just because car_class=0 was previously
|
||||||
|
# treated as "no info" by Cosmii's broken `> 0` guard.
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.last_car_class = 5 # S2
|
||||||
|
s.last_cpi = 200
|
||||||
|
d_pkt = FakePacket(car_class=0, car_performance_index=180)
|
||||||
|
rgb = ft.lightbar_color(d_pkt, s, in_race=False)
|
||||||
|
# Expect tinted D-class color, NOT S2.
|
||||||
|
ratio = 180 / 255
|
||||||
|
expected = (
|
||||||
|
int(ratio * ft.CAR_CLASS_COLORS[0][0]),
|
||||||
|
int(ratio * ft.CAR_CLASS_COLORS[0][1]),
|
||||||
|
int(ratio * ft.CAR_CLASS_COLORS[0][2]),
|
||||||
|
)
|
||||||
|
self.assertEqual(rgb, expected)
|
||||||
|
|
||||||
|
|
||||||
|
# --- DaemonState lifecycle --------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestDaemonStateReset(unittest.TestCase):
|
||||||
|
def test_reset_clears_filters_and_accumulator(self):
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.last_throttle_resistance = 99.0
|
||||||
|
s.last_brake_resistance = 99.0
|
||||||
|
s.last_throttle_freq = 99.0
|
||||||
|
s.last_brake_freq = 99.0
|
||||||
|
s.rpm_accumulator = 50
|
||||||
|
s.last_rpm = 4000
|
||||||
|
|
||||||
|
s.reset()
|
||||||
|
|
||||||
|
self.assertEqual(s.last_throttle_resistance, 1.0)
|
||||||
|
self.assertEqual(s.last_brake_resistance, 1.0)
|
||||||
|
self.assertEqual(s.last_throttle_freq, 0.0)
|
||||||
|
self.assertEqual(s.last_brake_freq, 0.0)
|
||||||
|
self.assertEqual(s.rpm_accumulator, 0)
|
||||||
|
self.assertEqual(s.last_rpm, 0.0)
|
||||||
|
|
||||||
|
def test_reset_clears_lightbar_dedup(self):
|
||||||
|
# last_color MUST be cleared on idle reset. The `apply_lightbar`
|
||||||
|
# dedup short-circuits when color == state.last_color; if reset
|
||||||
|
# left the cache holding the in-race color, resuming with the
|
||||||
|
# same color would skip the controller write — leaving the bar
|
||||||
|
# black after the (0,0,0) write reset_all sent at idle.
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.last_color = (1, 2, 3)
|
||||||
|
s.reset()
|
||||||
|
self.assertEqual(s.last_color, (0, 0, 0))
|
||||||
|
|
||||||
|
# --- Intensity attenuation regression ---------------------------------------
|
||||||
|
# Catches the bug where the EWMA cell stored the intensity-scaled value, so
|
||||||
|
# any FORZA_*_INTENSITY < 1 geometrically attenuated the output (steady-state
|
||||||
|
# at intensity=0.5, α=0.01 was ~50× too low). The fix decouples cell storage
|
||||||
|
# from output scaling. This test would have caught it in CI.
|
||||||
|
|
||||||
|
|
||||||
|
class TestIntensityScaling(unittest.TestCase):
|
||||||
|
def _run_steady(self, intensity_attr: str, side: str, pkt_kwargs: dict, n_iter: int = 300) -> int:
|
||||||
|
"""Drive a handler n_iter times with constant input and the named
|
||||||
|
intensity attribute monkey-patched, then return the final integer
|
||||||
|
strength sent on `side` ("L2" or "R2").
|
||||||
|
"""
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
for _ in range(n_iter):
|
||||||
|
if side == "R2":
|
||||||
|
ft.handle_throttle(c, FakePacket(**pkt_kwargs), s)
|
||||||
|
else:
|
||||||
|
ft.handle_brake(c, FakePacket(**pkt_kwargs), s)
|
||||||
|
# Last call's strength.
|
||||||
|
last = c.calls[-1]
|
||||||
|
# ("R2"/"L2", "feedback", start, strength) OR ("R2"/"L2", "vibration", start, amp, freq)
|
||||||
|
return last[3]
|
||||||
|
|
||||||
|
def test_throttle_baseline_intensity_proportional(self):
|
||||||
|
# With intensity=0.5, the output at α=0.01 (very smooth) must
|
||||||
|
# converge to ~0.5× the intensity=1.0 output, NOT to ~1% of it.
|
||||||
|
# We can't monkey-patch module-level constants from a method
|
||||||
|
# cleanly, so we re-import after env tweaks isn't an option in
|
||||||
|
# one process. Instead, pin RIGHT_TRIGGER_INTENSITY directly.
|
||||||
|
original = ft.RIGHT_TRIGGER_INTENSITY
|
||||||
|
try:
|
||||||
|
ft.RIGHT_TRIGGER_INTENSITY = 1.0
|
||||||
|
full = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2",
|
||||||
|
dict(accel=128, acceleration_z=10.0))
|
||||||
|
ft.RIGHT_TRIGGER_INTENSITY = 0.5
|
||||||
|
half = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2",
|
||||||
|
dict(accel=128, acceleration_z=10.0))
|
||||||
|
finally:
|
||||||
|
ft.RIGHT_TRIGGER_INTENSITY = original
|
||||||
|
# full strength at this input is MAX=6 (avgAccel=10 → top of range).
|
||||||
|
# half intensity should clamp around 3 (or MIN=1 floor; not <full).
|
||||||
|
self.assertEqual(full, 6)
|
||||||
|
self.assertEqual(half, 3)
|
||||||
|
|
||||||
|
def test_brake_baseline_intensity_proportional(self):
|
||||||
|
original = ft.LEFT_TRIGGER_INTENSITY
|
||||||
|
try:
|
||||||
|
ft.LEFT_TRIGGER_INTENSITY = 1.0
|
||||||
|
full = self._run_steady("LEFT_TRIGGER_INTENSITY", "L2", dict(brake=255))
|
||||||
|
ft.LEFT_TRIGGER_INTENSITY = 0.5
|
||||||
|
half = self._run_steady("LEFT_TRIGGER_INTENSITY", "L2", dict(brake=255))
|
||||||
|
finally:
|
||||||
|
ft.LEFT_TRIGGER_INTENSITY = original
|
||||||
|
self.assertEqual(full, 6)
|
||||||
|
self.assertEqual(half, 3)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Slip transition seeding ------------------------------------------------
|
||||||
|
# The first packet of a slip event MUST produce the unsmoothed peak
|
||||||
|
# vibration so the buzz is felt immediately, not after ~1.7s of EWMA
|
||||||
|
# convergence at α=0.01.
|
||||||
|
|
||||||
|
|
||||||
|
class TestSlipTransitionSeeding(unittest.TestCase):
|
||||||
|
def test_first_throttle_slip_packet_hits_peak(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState() # was_throttle_slipping=False, cells=1.0/0.0
|
||||||
|
pkt = FakePacket(
|
||||||
|
accel=255,
|
||||||
|
acceleration_z=10.0,
|
||||||
|
tire_combined_slip_FL=1.0,
|
||||||
|
tire_combined_slip_FR=1.0,
|
||||||
|
tire_combined_slip_RL=1.0,
|
||||||
|
tire_combined_slip_RR=1.0,
|
||||||
|
)
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[-1][1], "vibration")
|
||||||
|
_, _, _, amp, freq = c.calls[-1]
|
||||||
|
# Peak slip → MAX_ACCEL_GRIPLOSS_VIBRATION ceiling on freq, MAX
|
||||||
|
# amp from heavy avgAccel. With seeding, BOTH should hit max on
|
||||||
|
# the very first packet.
|
||||||
|
self.assertEqual(freq, ft.MAX_ACCEL_GRIPLOSS_VIBRATION)
|
||||||
|
self.assertEqual(amp, ft.MAX_ACCEL_GRIPLOSS_AMP)
|
||||||
|
|
||||||
|
def test_first_brake_slip_packet_hits_peak(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
pkt = FakePacket(
|
||||||
|
brake=255,
|
||||||
|
tire_combined_slip_FL=1.0,
|
||||||
|
tire_combined_slip_FR=1.0,
|
||||||
|
tire_combined_slip_RL=1.0,
|
||||||
|
tire_combined_slip_RR=1.0,
|
||||||
|
)
|
||||||
|
ft.handle_brake(c, pkt, s)
|
||||||
|
self.assertEqual(c.calls[-1][1], "vibration")
|
||||||
|
_, _, _, amp, freq = c.calls[-1]
|
||||||
|
self.assertEqual(freq, ft.MAX_BRAKE_VIBRATION)
|
||||||
|
self.assertEqual(amp, ft.MAX_BRAKE_AMP)
|
||||||
|
|
||||||
|
def test_throttle_seeding_survives_menu_round_trip(self):
|
||||||
|
# Race 1 ended with was_throttle_slipping=True (player crashed
|
||||||
|
# mid-spin). Player went to menu (in_race=False) — the run-loop's
|
||||||
|
# transition handler MUST clear was_throttle_slipping so race 2's
|
||||||
|
# first slip packet re-seeds the EWMA cell. Without the transition
|
||||||
|
# cleanup, the round-1 cold-start fix doesn't survive a menu
|
||||||
|
# round-trip and the regression returns silently.
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
# Simulate post-menu state with stale slip flags + EWMA cells.
|
||||||
|
s.was_throttle_slipping = False # what run-loop transition handler should leave
|
||||||
|
s.last_throttle_freq = 1.0 # but cells are not stale because run-loop reset them
|
||||||
|
s.last_throttle_resistance = 1.0
|
||||||
|
pkt = FakePacket(
|
||||||
|
accel=255, acceleration_z=10.0,
|
||||||
|
tire_combined_slip_FL=1.0, tire_combined_slip_FR=1.0,
|
||||||
|
tire_combined_slip_RL=1.0, tire_combined_slip_RR=1.0,
|
||||||
|
)
|
||||||
|
ft.handle_throttle(c, pkt, s)
|
||||||
|
_, _, _, amp, freq = c.calls[-1]
|
||||||
|
self.assertEqual(freq, ft.MAX_ACCEL_GRIPLOSS_VIBRATION)
|
||||||
|
self.assertEqual(amp, ft.MAX_ACCEL_GRIPLOSS_AMP)
|
||||||
|
|
||||||
|
def test_brake_seeding_survives_menu_round_trip(self):
|
||||||
|
c = FakeController()
|
||||||
|
s = ft.DaemonState()
|
||||||
|
s.was_brake_slipping = False
|
||||||
|
s.last_brake_freq = 1.0
|
||||||
|
s.last_brake_resistance = 1.0
|
||||||
|
pkt = FakePacket(
|
||||||
|
brake=255,
|
||||||
|
tire_combined_slip_FL=1.0, tire_combined_slip_FR=1.0,
|
||||||
|
tire_combined_slip_RL=1.0, tire_combined_slip_RR=1.0,
|
||||||
|
)
|
||||||
|
ft.handle_brake(c, pkt, s)
|
||||||
|
_, _, _, amp, freq = c.calls[-1]
|
||||||
|
self.assertEqual(freq, ft.MAX_BRAKE_VIBRATION)
|
||||||
|
self.assertEqual(amp, ft.MAX_BRAKE_AMP)
|
||||||
|
|
||||||
|
|
||||||
|
# --- env helpers ------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestReadIntensity(unittest.TestCase):
|
||||||
|
def test_default_when_unset(self):
|
||||||
|
os.environ.pop("FORZA_PROBE_X", None)
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.7), 0.7)
|
||||||
|
|
||||||
|
def test_clamps_above_one(self):
|
||||||
|
os.environ["FORZA_PROBE_X"] = "5.0"
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X"), 1.0)
|
||||||
|
|
||||||
|
def test_clamps_below_zero(self):
|
||||||
|
os.environ["FORZA_PROBE_X"] = "-1.0"
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X"), 0.0)
|
||||||
|
|
||||||
|
def test_nan_falls_back_to_default(self):
|
||||||
|
# Without the is_finite check, min(1.0, nan) returns 1.0 silently.
|
||||||
|
os.environ["FORZA_PROBE_X"] = "nan"
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.4), 0.4)
|
||||||
|
|
||||||
|
def test_inf_falls_back_to_default(self):
|
||||||
|
os.environ["FORZA_PROBE_X"] = "inf"
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.4), 0.4)
|
||||||
|
|
||||||
|
def test_garbage_falls_back_to_default(self):
|
||||||
|
os.environ["FORZA_PROBE_X"] = "yes please"
|
||||||
|
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.5), 0.5)
|
||||||
|
|
||||||
|
|
||||||
|
class TestReadBool(unittest.TestCase):
|
||||||
|
def test_default(self):
|
||||||
|
os.environ.pop("FORZA_PROBE_BOOL", None)
|
||||||
|
self.assertTrue(ft._read_bool("FORZA_PROBE_BOOL", default=True))
|
||||||
|
self.assertFalse(ft._read_bool("FORZA_PROBE_BOOL", default=False))
|
||||||
|
|
||||||
|
def test_disabled_tokens(self):
|
||||||
|
for tok in ("0", "false", "FALSE", "no", "NO", "off", "Off", ""):
|
||||||
|
os.environ["FORZA_PROBE_BOOL"] = tok
|
||||||
|
self.assertFalse(ft._read_bool("FORZA_PROBE_BOOL"), f"token={tok!r}")
|
||||||
|
|
||||||
|
def test_enabled_tokens(self):
|
||||||
|
for tok in ("1", "true", "yes", "on", "Y", "anything-else"):
|
||||||
|
os.environ["FORZA_PROBE_BOOL"] = tok
|
||||||
|
self.assertTrue(ft._read_bool("FORZA_PROBE_BOOL"), f"token={tok!r}")
|
||||||
|
|
||||||
|
|
||||||
|
# --- on_error signature -----------------------------------------------------
|
||||||
|
# The dualsense-controller library dispatches state-change callbacks by
|
||||||
|
# parameter count. _on_controller_error MUST take exactly 1 parameter to
|
||||||
|
# bind to the 1-arg event (which emits new_value). 2-arg would bind to a
|
||||||
|
# 2-arg event that emits (new_value, timestamp_ns), making the log show a
|
||||||
|
# stray clock integer instead of the exception.
|
||||||
|
|
||||||
|
|
||||||
|
class TestOnControllerErrorSignature(unittest.TestCase):
|
||||||
|
def test_takes_exactly_one_argument(self):
|
||||||
|
import inspect
|
||||||
|
sig = inspect.signature(ft._on_controller_error)
|
||||||
|
self.assertEqual(
|
||||||
|
len(sig.parameters), 1,
|
||||||
|
f"_on_controller_error must take 1 arg (got {list(sig.parameters)}); "
|
||||||
|
"the dualsense-controller library dispatches by arity and 2-arg "
|
||||||
|
"would receive (new_value, timestamp_ns) instead of (exc).",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Reset tolerates None controller ----------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestResetNullSafe(unittest.TestCase):
|
||||||
|
def test_reset_triggers_none_no_raise(self):
|
||||||
|
ft.reset_triggers(None) # must not raise
|
||||||
|
|
||||||
|
def test_reset_all_none_no_raise(self):
|
||||||
|
ft.reset_all(None) # must not raise
|
||||||
|
|
||||||
|
|
||||||
|
# --- fdp surface contract ---------------------------------------------------
|
||||||
|
# Catches the field-name regression class. We collect every literal name
|
||||||
|
# the daemon passes to _safe_field/_safe_abs and assert each resolves on
|
||||||
|
# a real fdp.ForzaDataPacket. This would have caught all three of the
|
||||||
|
# bugs in commit 50e5f12 at CI time.
|
||||||
|
|
||||||
|
|
||||||
|
class TestFdpSurfaceContract(unittest.TestCase):
|
||||||
|
@staticmethod
|
||||||
|
def _collect_field_names(src_path: str) -> tuple[set[str], int]:
|
||||||
|
"""Walk the daemon AST for every _safe_field/_safe_abs call.
|
||||||
|
|
||||||
|
Returns (literal_names, non_literal_arg_count). Calls inside the
|
||||||
|
wrapper functions `_safe_abs` and `_safe_field` themselves are
|
||||||
|
excluded — those forward `name` from a parameter to the inner
|
||||||
|
call and do NOT touch the fdp surface directly. Non-zero
|
||||||
|
non-literal count from any OTHER caller indicates the contract
|
||||||
|
isn't fully enforceable.
|
||||||
|
"""
|
||||||
|
import ast
|
||||||
|
with open(src_path) as f:
|
||||||
|
tree = ast.parse(f.read())
|
||||||
|
|
||||||
|
class V(ast.NodeVisitor):
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.names: set[str] = set()
|
||||||
|
self.non_literal: int = 0
|
||||||
|
self._fn_stack: list[str] = []
|
||||||
|
|
||||||
|
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
|
||||||
|
self._fn_stack.append(node.name)
|
||||||
|
self.generic_visit(node)
|
||||||
|
self._fn_stack.pop()
|
||||||
|
|
||||||
|
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
|
||||||
|
self._fn_stack.append(node.name)
|
||||||
|
self.generic_visit(node)
|
||||||
|
self._fn_stack.pop()
|
||||||
|
|
||||||
|
def visit_Call(self, node: ast.Call) -> None:
|
||||||
|
self.generic_visit(node) # nested calls
|
||||||
|
fn = node.func
|
||||||
|
if not isinstance(fn, ast.Name):
|
||||||
|
return
|
||||||
|
if fn.id not in ("_safe_field", "_safe_abs"):
|
||||||
|
return
|
||||||
|
# Skip the forwarding wrappers themselves: those call
|
||||||
|
# _safe_field with a `name` parameter, not a literal.
|
||||||
|
if self._fn_stack and self._fn_stack[-1] in ("_safe_abs", "_safe_field"):
|
||||||
|
return
|
||||||
|
if len(node.args) < 2:
|
||||||
|
self.non_literal += 1
|
||||||
|
return
|
||||||
|
arg = node.args[1]
|
||||||
|
if isinstance(arg, ast.Constant) and isinstance(arg.value, str):
|
||||||
|
self.names.add(arg.value)
|
||||||
|
else:
|
||||||
|
self.non_literal += 1
|
||||||
|
|
||||||
|
v = V()
|
||||||
|
v.visit(tree)
|
||||||
|
return v.names, v.non_literal
|
||||||
|
|
||||||
|
def test_every_daemon_field_resolves_on_real_fdp_packet(self):
|
||||||
|
import fdp
|
||||||
|
# 324-byte zeroed buffer — fh4 unpack will succeed (just yields
|
||||||
|
# zeros for everything) but every documented field will be
|
||||||
|
# setattr'd on the resulting object.
|
||||||
|
pkt = fdp.ForzaDataPacket(b"\x00" * 324, packet_format="fh4")
|
||||||
|
used_names, non_literal = self._collect_field_names(ft.__file__)
|
||||||
|
self.assertGreater(len(used_names), 5, "AST walker found too few names")
|
||||||
|
self.assertEqual(
|
||||||
|
non_literal, 0,
|
||||||
|
"Every _safe_field/_safe_abs call MUST pass a string literal "
|
||||||
|
"field name so this contract test can verify it. Variable / "
|
||||||
|
"kwarg / computed names defeat the AST walker.",
|
||||||
|
)
|
||||||
|
missing = sorted(n for n in used_names if not hasattr(pkt, n))
|
||||||
|
self.assertEqual(
|
||||||
|
missing, [],
|
||||||
|
f"daemon reads fdp fields that don't exist: {missing}. "
|
||||||
|
"Check fdp.ForzaDataPacket.sled_props + dash_props for canonical names.",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main(verbosity=2)
|
||||||
Reference in New Issue
Block a user