Files
nixos/hosts/yarn/forza-trigger/test_forza_trigger.py
Simon Gardling 7cf27f0cda forza-trigger: remove RPM-stuck workaround + bump hysteresis to 120
Two separate causes of stationary trigger pulsing, both fixed:

1. Hysteresis too short. 30 packets (0.5s) was shorter than FH5's
   stationary oscillation period (~1s per state in start-grid/pause
   screen contexts). Bumped to 120 (2s at 60Hz).

2. RPM-stuck workaround removed entirely. Cosmii's RPM_ACCUMULATOR
   (legacy_Program.cs:89-109) forced is_race_on false after 3.3s of
   constant RPM + zero power. While stationary in-race (idle RPM
   constant, power near zero), this would trip, causing a false
   menu transition. Engine idle flutter on power could reset and
   re-trigger it, producing a slow oscillation with clicks on each
   edge. FH5 has been observed to correctly clear is_race_on between
   races (confirmed via live strace), so the workaround is unnecessary.

   Removed: RPM_ACCUMULATOR_TRIGGER_RACE_OFF constant, is_race_on's
   debounce logic, DaemonState.last_rpm and .rpm_accumulator fields.
   is_race_on is now a one-liner: return bool(is_race_on field).

Tests: 57/57 pass. TestIsRaceOn simplified from 4 to 3 tests.
DaemonState reset test no longer checks removed fields.
2026-05-07 17:29:40 -04:00

781 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Behavioral tests for forza_trigger.py.
Pure-function tests + integration tests using a FakeController that records
every effect call. No hardware required.
Run: `python -m unittest discover -s hosts/yarn/forza-trigger -p 'test_*.py'`
or via the nix derivation in default.nix (runs at build time).
"""
import math
import os
import sys
import unittest
from dataclasses import dataclass, field
from typing import Any, Tuple
# Disable lightbar by default for triggers-only tests; specific lightbar tests
# re-enable by mutating module attributes.
os.environ.setdefault("FORZA_LIGHTBAR", "0")
import forza_trigger as ft # noqa: E402
# --- Fakes ------------------------------------------------------------------
class FakeEffect:
def __init__(self, log: list, side: str):
self.log = log
self.side = side
def off(self):
self.log.append((self.side, "off"))
def feedback(self, start_position=0, strength=0):
self.log.append((self.side, "feedback", start_position, strength))
def vibration(self, start_position=0, amplitude=0, frequency=0):
self.log.append((self.side, "vibration", start_position, amplitude, frequency))
class FakeTrigger:
def __init__(self, log: list, side: str):
self.effect = FakeEffect(log, side)
class FakeLightbar:
def __init__(self, log: list):
self.log = log
def set_color(self, r, g, b):
self.log.append(("lightbar", "set_color", r, g, b))
class FakeController:
def __init__(self):
self.calls: list = []
self.left_trigger = FakeTrigger(self.calls, "L2")
self.right_trigger = FakeTrigger(self.calls, "R2")
self.lightbar = FakeLightbar(self.calls)
@dataclass
class FakePacket:
"""Shape mirrors fdp.ForzaDataPacket. Only the fields handlers read are
populated; the rest stay 0.
"""
accel: float = 0.0
brake: float = 0.0
is_race_on: float = 1.0
current_engine_rpm: float = 3000.0
engine_idle_rpm: float = 800.0
engine_max_rpm: float = 8000.0
power: float = 100.0
acceleration_x: float = 0.0
acceleration_y: float = 0.0
acceleration_z: float = 0.0
tire_combined_slip_FL: float = 0.0
tire_combined_slip_FR: float = 0.0
tire_combined_slip_RL: float = 0.0
tire_combined_slip_RR: float = 0.0
car_class: float = 0.0
car_performance_index: float = 0.0
# --- Pure-function tests ----------------------------------------------------
class TestMap(unittest.TestCase):
def test_endpoints(self):
self.assertEqual(ft._map(0, 0, 1, 0, 100), 0)
self.assertEqual(ft._map(1, 0, 1, 0, 100), 100)
def test_midpoint(self):
self.assertEqual(ft._map(0.5, 0, 1, 0, 100), 50)
def test_clamps_low(self):
self.assertEqual(ft._map(-5, 0, 10, 0, 100), 0)
def test_clamps_high(self):
self.assertEqual(ft._map(50, 0, 10, 0, 100), 100)
def test_zero_input_range(self):
# Degenerate input range — must not divide by zero.
self.assertEqual(ft._map(5, 5, 5, 0, 100), 0)
class TestEWMA(unittest.TestCase):
def test_alpha_one_is_instant(self):
# α=1.0 → output exactly equals the input (no smoothing).
self.assertEqual(ft._ewma(10, 5, 1.0), 10)
def test_alpha_zero_is_frozen(self):
# α=0.0 → output exactly equals the previous value (no update).
self.assertEqual(ft._ewma(10, 5, 0.0), 5)
def test_alpha_half(self):
self.assertEqual(ft._ewma(10, 6, 0.5), 8)
class TestSafeField(unittest.TestCase):
def test_missing_attribute_returns_default(self):
pkt = FakePacket()
self.assertEqual(ft._safe_field(pkt, "nonexistent_field", 42.0), 42.0)
def test_nan_returns_default(self):
pkt = FakePacket(accel=math.nan)
self.assertEqual(ft._safe_field(pkt, "accel", 7.0), 7.0)
def test_inf_returns_default(self):
pkt = FakePacket(accel=math.inf)
self.assertEqual(ft._safe_field(pkt, "accel", 0.0), 0.0)
def test_normal_value(self):
pkt = FakePacket(accel=128.0)
self.assertEqual(ft._safe_field(pkt, "accel"), 128.0)
class TestCombinedSlip(unittest.TestCase):
def test_arithmetic_mean(self):
pkt = FakePacket(
tire_combined_slip_FL=0.4,
tire_combined_slip_FR=0.6,
tire_combined_slip_RL=0.2,
tire_combined_slip_RR=0.8,
)
all_, front, rear = ft._combined_slip(pkt)
# All four = (0.4+0.6+0.2+0.8)/4 = 0.5
self.assertAlmostEqual(all_, 0.5)
# Front = (0.4+0.6)/2 = 0.5
self.assertAlmostEqual(front, 0.5)
# Rear = (0.2+0.8)/2 = 0.5
self.assertAlmostEqual(rear, 0.5)
def test_takes_absolute_values(self):
pkt = FakePacket(
tire_combined_slip_FL=-0.5,
tire_combined_slip_FR=0.5,
tire_combined_slip_RL=-0.3,
tire_combined_slip_RR=0.3,
)
all_, front, rear = ft._combined_slip(pkt)
self.assertAlmostEqual(all_, 0.4)
self.assertAlmostEqual(front, 0.5)
self.assertAlmostEqual(rear, 0.3)
# --- Race-on tests ----------------------------------------------------------
class TestIsRaceOn(unittest.TestCase):
"""is_race_on reads the raw flag from the packet. Hysteresis lives in
commit_in_race; the old RPM-stuck workaround was removed (caused false
menu transitions while stationary in-race)."""
def test_reads_flag_true(self):
s = ft.DaemonState()
self.assertTrue(ft.is_race_on(FakePacket(is_race_on=1.0), s))
def test_reads_flag_false(self):
s = ft.DaemonState()
self.assertFalse(ft.is_race_on(FakePacket(is_race_on=0.0), s))
def test_defaults_to_flag_value(self):
# FakePacket defaults is_race_on=1.0; is_race_on just reads it.
s = ft.DaemonState()
self.assertTrue(ft.is_race_on(FakePacket(), s))
class TestCommitInRace(unittest.TestCase):
"""Hysteresis on the raw is_race_on flag. FH5 emits packets where
the bit alternates True/False at packet rate during menu transitions
and loading screens. commit_in_race must filter those alternations.
"""
def test_stable_no_pending_count_never_grows(self):
s = ft.DaemonState()
s.last_in_race = False
for _ in range(100):
self.assertFalse(ft.commit_in_race(False, s))
self.assertEqual(s.in_race_pending_count, 0)
def test_alternation_does_not_commit(self):
# Worst case from real FH5: flag flips every other packet. We
# must keep returning the committed (initial) value forever.
s = ft.DaemonState()
s.last_in_race = False
for i in range(200):
raw = bool(i % 2)
self.assertFalse(ft.commit_in_race(raw, s))
# Pending count never accumulates past 1 because each True is
# immediately followed by a False that resets it.
self.assertLessEqual(s.in_race_pending_count, 1)
def test_n_consecutive_packets_commit_flip(self):
s = ft.DaemonState()
s.last_in_race = False
# First N-1 same-value packets do NOT commit yet.
for _ in range(ft.IN_RACE_HYSTERESIS_PACKETS - 1):
self.assertFalse(ft.commit_in_race(True, s))
# The Nth commits.
self.assertTrue(ft.commit_in_race(True, s))
# Pending count was reset to 0 on commit.
self.assertEqual(s.in_race_pending_count, 0)
def test_committed_value_returned_when_raw_matches(self):
s = ft.DaemonState()
s.last_in_race = True
# Even though state.last_in_race is True, raw flag matches so
# we just return last_in_race without touching pending count.
s.in_race_pending_count = 5
result = ft.commit_in_race(True, s)
self.assertTrue(result)
self.assertEqual(s.in_race_pending_count, 0)
# --- Trigger handler tests --------------------------------------------------
class TestHandleThrottle(unittest.TestCase):
def test_zero_pedal_baseline_min_feedback(self):
# Foot off throttle should NOT turn the trigger off — variant D's
# design is "always feels something". Baseline mode still fires
# with strength = MIN_THROTTLE_RESISTANCE so the user feels light
# constant resistance under the finger. The run-loop is the only
# place that calls effect.off() (out-of-race idle reset).
c = FakeController()
s = ft.DaemonState()
ft.handle_throttle(c, FakePacket(accel=0.0), s)
self.assertEqual(c.calls, [("R2", "feedback", 0, ft.MIN_THROTTLE_RESISTANCE)])
def test_baseline_under_normal_acceleration(self):
c = FakeController()
s = ft.DaemonState()
# No slip, moderate accel, throttle pressed.
pkt = FakePacket(accel=128, acceleration_x=0.5, acceleration_z=2.0)
ft.handle_throttle(c, pkt, s)
# Baseline mode → feedback call (not vibration, not off).
self.assertEqual(len(c.calls), 1)
self.assertEqual(c.calls[0][1], "feedback")
side, mode, start, strength = c.calls[0]
self.assertEqual(side, "R2")
self.assertEqual(start, 0)
# Strength is in valid library range.
self.assertGreaterEqual(strength, ft.MIN_THROTTLE_RESISTANCE)
self.assertLessEqual(strength, ft.MAX_THROTTLE_RESISTANCE)
def test_front_slip_triggers_vibration(self):
c = FakeController()
s = ft.DaemonState()
# Front-slip > 0.5 with throttle pressed → vibration mode.
# Use heavy accel so amp_raw is large enough that the EWMA-filtered
# value crosses the suppression threshold and we see vibration not
# feedback fallback.
pkt = FakePacket(
accel=255,
acceleration_z=10.0,
tire_combined_slip_FL=0.8,
tire_combined_slip_FR=0.8,
)
# Pre-warm EWMA so first call doesn't get heavily damped.
s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION
s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP
ft.handle_throttle(c, pkt, s)
self.assertEqual(len(c.calls), 1)
self.assertEqual(c.calls[0][1], "vibration")
def test_rear_slip_below_accelerator_gate_no_vibration(self):
# Rear-only slip but accelerator below the gate should NOT trigger
# vibration mode (Cosmii's rule, legacy_Program.cs:224).
c = FakeController()
s = ft.DaemonState()
pkt = FakePacket(
accel=100, # below ACCELERATOR_REAR_SLIP_GATE=200
tire_combined_slip_RL=0.8,
tire_combined_slip_RR=0.8,
)
ft.handle_throttle(c, pkt, s)
self.assertEqual(c.calls[0][1], "feedback") # baseline mode
def test_rear_slip_above_accelerator_gate_triggers_vibration(self):
c = FakeController()
s = ft.DaemonState()
s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION
s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP
pkt = FakePacket(
accel=255, # above ACCELERATOR_REAR_SLIP_GATE=200
acceleration_z=10.0,
tire_combined_slip_RL=0.8,
tire_combined_slip_RR=0.8,
)
ft.handle_throttle(c, pkt, s)
self.assertEqual(c.calls[0][1], "vibration")
class TestHandleBrake(unittest.TestCase):
def test_zero_pedal_baseline_min_feedback(self):
# Same as throttle — foot off brake = light constant resistance, not off.
c = FakeController()
s = ft.DaemonState()
ft.handle_brake(c, FakePacket(brake=0.0), s)
self.assertEqual(c.calls, [("L2", "feedback", 0, ft.MIN_BRAKE_RESISTANCE)])
def test_baseline_under_normal_braking(self):
c = FakeController()
s = ft.DaemonState()
pkt = FakePacket(brake=128)
ft.handle_brake(c, pkt, s)
self.assertEqual(len(c.calls), 1)
self.assertEqual(c.calls[0][1], "feedback")
# Strength scales with brake input. brake=128 → midpoint of 1..6 = 4ish.
_, _, _, strength = c.calls[0]
self.assertGreaterEqual(strength, ft.MIN_BRAKE_RESISTANCE)
self.assertLessEqual(strength, ft.MAX_BRAKE_RESISTANCE)
def test_slip_with_heavy_brake_triggers_vibration(self):
# Patmagauran/our condition: slip > thr AND brake > thr.
c = FakeController()
s = ft.DaemonState()
s.last_brake_freq = ft.MAX_BRAKE_VIBRATION
s.last_brake_resistance = ft.MAX_BRAKE_AMP
pkt = FakePacket(
brake=200, # well above BRAKE_VIBRATION_MODE_START=10
tire_combined_slip_FL=0.8,
tire_combined_slip_FR=0.8,
tire_combined_slip_RL=0.8,
tire_combined_slip_RR=0.8,
)
ft.handle_brake(c, pkt, s)
self.assertEqual(c.calls[0][1], "vibration")
def test_slip_with_light_brake_no_vibration(self):
# Slip but brake below threshold → baseline mode (Patmagauran's logic,
# NOT Cosmii's inverted version).
c = FakeController()
s = ft.DaemonState()
pkt = FakePacket(
brake=8, # below BRAKE_VIBRATION_MODE_START=10
tire_combined_slip_FL=0.8,
tire_combined_slip_FR=0.8,
)
ft.handle_brake(c, pkt, s)
self.assertEqual(c.calls[0][1], "feedback") # baseline mode
# --- Lightbar tests ---------------------------------------------------------
class TestLightbarColor(unittest.TestCase):
def test_in_race_low_rpm_green_floor(self):
# At idle RPM the green channel should hit GREEN_FLOOR (50), not 0.
s = ft.DaemonState()
pkt = FakePacket(
is_race_on=1.0,
current_engine_rpm=800, # = idle
engine_idle_rpm=800,
engine_max_rpm=8000,
)
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
self.assertEqual(r, 0)
self.assertEqual(g, ft.GREEN_FLOOR)
self.assertEqual(b, 0)
def test_in_race_below_redline_green(self):
s = ft.DaemonState()
# 50% RPM ratio, below redline.
pkt = FakePacket(
is_race_on=1.0,
current_engine_rpm=4400,
engine_idle_rpm=800,
engine_max_rpm=8000,
)
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
# Both red and green should rise; green not inverted yet.
self.assertGreater(g, 0)
self.assertLess(g, 255)
def test_in_race_above_redline_inverts_green(self):
s = ft.DaemonState()
# 95% RPM ratio, well past 85% redline.
pkt = FakePacket(
is_race_on=1.0,
current_engine_rpm=8000,
engine_idle_rpm=800,
engine_max_rpm=8000,
)
r, g, b = ft.lightbar_color(pkt, s, in_race=True)
# At 100% ratio, green=255 then inverted to 0; red=255.
self.assertEqual(r, 255)
self.assertEqual(g, 0)
def test_menu_class_d_color(self):
s = ft.DaemonState()
pkt = FakePacket(car_class=0, car_performance_index=255)
rgb = ft.lightbar_color(pkt, s, in_race=False)
# Class D at full CPI = exact palette color.
self.assertEqual(rgb, ft.CAR_CLASS_COLORS[0])
def test_menu_x_class_constant(self):
s = ft.DaemonState()
pkt = FakePacket(car_class=7, car_performance_index=999)
rgb = ft.lightbar_color(pkt, s, in_race=False)
self.assertEqual(rgb, ft.COLOR_CLASS_X)
def test_menu_class_d_packet_unsticks_previous_class(self):
# Switching from S2 → D MUST update the lightbar to D's palette,
# not keep showing S2 just because car_class=0 was previously
# treated as "no info" by Cosmii's broken `> 0` guard.
s = ft.DaemonState()
s.last_car_class = 5 # S2
s.last_cpi = 200
d_pkt = FakePacket(car_class=0, car_performance_index=180)
rgb = ft.lightbar_color(d_pkt, s, in_race=False)
# Expect tinted D-class color, NOT S2.
ratio = 180 / 255
expected = (
int(ratio * ft.CAR_CLASS_COLORS[0][0]),
int(ratio * ft.CAR_CLASS_COLORS[0][1]),
int(ratio * ft.CAR_CLASS_COLORS[0][2]),
)
self.assertEqual(rgb, expected)
# --- DaemonState lifecycle --------------------------------------------------
class TestDaemonStateReset(unittest.TestCase):
def test_reset_clears_filters_and_accumulator(self):
s = ft.DaemonState()
s.last_throttle_resistance = 99.0
s.last_brake_resistance = 99.0
s.last_throttle_freq = 99.0
s.last_brake_freq = 99.0
s.in_race_pending_count = 50
s.reset()
self.assertEqual(s.last_throttle_resistance, 1.0)
self.assertEqual(s.last_brake_resistance, 1.0)
self.assertEqual(s.last_throttle_freq, 0.0)
self.assertEqual(s.last_brake_freq, 0.0)
self.assertEqual(s.in_race_pending_count, 0)
def test_reset_clears_lightbar_dedup(self):
# last_color MUST be cleared on idle reset. The `apply_lightbar`
# dedup short-circuits when color == state.last_color; if reset
# left the cache holding the in-race color, resuming with the
# same color would skip the controller write — leaving the bar
# black after the (0,0,0) write reset_all sent at idle.
s = ft.DaemonState()
s.last_color = (1, 2, 3)
s.reset()
self.assertEqual(s.last_color, (0, 0, 0))
# --- Intensity attenuation regression ---------------------------------------
# Catches the bug where the EWMA cell stored the intensity-scaled value, so
# any FORZA_*_INTENSITY < 1 geometrically attenuated the output (steady-state
# at intensity=0.5, α=0.01 was ~50× too low). The fix decouples cell storage
# from output scaling. This test would have caught it in CI.
class TestIntensityScaling(unittest.TestCase):
def _run_steady(self, intensity_attr: str, side: str, pkt_kwargs: dict, n_iter: int = 300) -> int:
"""Drive a handler n_iter times with constant input and the named
intensity attribute monkey-patched, then return the final integer
strength sent on `side` ("L2" or "R2").
"""
c = FakeController()
s = ft.DaemonState()
for _ in range(n_iter):
if side == "R2":
ft.handle_throttle(c, FakePacket(**pkt_kwargs), s)
else:
ft.handle_brake(c, FakePacket(**pkt_kwargs), s)
# Last call's strength.
last = c.calls[-1]
# ("R2"/"L2", "feedback", start, strength) OR ("R2"/"L2", "vibration", start, amp, freq)
return last[3]
def test_throttle_baseline_intensity_proportional(self):
# With intensity=0.5, the output at α=0.01 (very smooth) must
# converge to ~0.5× the intensity=1.0 output, NOT to ~1% of it.
# We can't monkey-patch module-level constants from a method
# cleanly, so we re-import after env tweaks isn't an option in
# one process. Instead, pin RIGHT_TRIGGER_INTENSITY directly.
original = ft.RIGHT_TRIGGER_INTENSITY
try:
ft.RIGHT_TRIGGER_INTENSITY = 1.0
full = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2",
dict(accel=128, acceleration_z=10.0))
ft.RIGHT_TRIGGER_INTENSITY = 0.5
half = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2",
dict(accel=128, acceleration_z=10.0))
finally:
ft.RIGHT_TRIGGER_INTENSITY = original
# full strength at this input is MAX=6 (avgAccel=10 → top of range).
# half intensity should clamp around 3 (or MIN=1 floor; not <full).
self.assertEqual(full, 6)
self.assertEqual(half, 3)
def test_brake_baseline_intensity_proportional(self):
original = ft.LEFT_TRIGGER_INTENSITY
try:
ft.LEFT_TRIGGER_INTENSITY = 1.0
full = self._run_steady("LEFT_TRIGGER_INTENSITY", "L2", dict(brake=255))
ft.LEFT_TRIGGER_INTENSITY = 0.5
half = self._run_steady("LEFT_TRIGGER_INTENSITY", "L2", dict(brake=255))
finally:
ft.LEFT_TRIGGER_INTENSITY = original
self.assertEqual(full, 6)
self.assertEqual(half, 3)
# --- Slip transition seeding ------------------------------------------------
# The first packet of a slip event MUST produce the unsmoothed peak
# vibration so the buzz is felt immediately, not after ~1.7s of EWMA
# convergence at α=0.01.
class TestSlipTransitionSeeding(unittest.TestCase):
def test_first_throttle_slip_packet_hits_peak(self):
c = FakeController()
s = ft.DaemonState() # was_throttle_slipping=False, cells=1.0/0.0
pkt = FakePacket(
accel=255,
acceleration_z=10.0,
tire_combined_slip_FL=1.0,
tire_combined_slip_FR=1.0,
tire_combined_slip_RL=1.0,
tire_combined_slip_RR=1.0,
)
ft.handle_throttle(c, pkt, s)
self.assertEqual(c.calls[-1][1], "vibration")
_, _, _, amp, freq = c.calls[-1]
# Peak slip → MAX_ACCEL_GRIPLOSS_VIBRATION ceiling on freq, MAX
# amp from heavy avgAccel. With seeding, BOTH should hit max on
# the very first packet.
self.assertEqual(freq, ft.MAX_ACCEL_GRIPLOSS_VIBRATION)
self.assertEqual(amp, ft.MAX_ACCEL_GRIPLOSS_AMP)
def test_first_brake_slip_packet_hits_peak(self):
c = FakeController()
s = ft.DaemonState()
pkt = FakePacket(
brake=255,
tire_combined_slip_FL=1.0,
tire_combined_slip_FR=1.0,
tire_combined_slip_RL=1.0,
tire_combined_slip_RR=1.0,
)
ft.handle_brake(c, pkt, s)
self.assertEqual(c.calls[-1][1], "vibration")
_, _, _, amp, freq = c.calls[-1]
self.assertEqual(freq, ft.MAX_BRAKE_VIBRATION)
self.assertEqual(amp, ft.MAX_BRAKE_AMP)
def test_throttle_seeding_survives_menu_round_trip(self):
# Race 1 ended with was_throttle_slipping=True (player crashed
# mid-spin). Player went to menu (in_race=False) — the run-loop's
# transition handler MUST clear was_throttle_slipping so race 2's
# first slip packet re-seeds the EWMA cell. Without the transition
# cleanup, the round-1 cold-start fix doesn't survive a menu
# round-trip and the regression returns silently.
c = FakeController()
s = ft.DaemonState()
# Simulate post-menu state with stale slip flags + EWMA cells.
s.was_throttle_slipping = False # what run-loop transition handler should leave
s.last_throttle_freq = 1.0 # but cells are not stale because run-loop reset them
s.last_throttle_resistance = 1.0
pkt = FakePacket(
accel=255, acceleration_z=10.0,
tire_combined_slip_FL=1.0, tire_combined_slip_FR=1.0,
tire_combined_slip_RL=1.0, tire_combined_slip_RR=1.0,
)
ft.handle_throttle(c, pkt, s)
_, _, _, amp, freq = c.calls[-1]
self.assertEqual(freq, ft.MAX_ACCEL_GRIPLOSS_VIBRATION)
self.assertEqual(amp, ft.MAX_ACCEL_GRIPLOSS_AMP)
def test_brake_seeding_survives_menu_round_trip(self):
c = FakeController()
s = ft.DaemonState()
s.was_brake_slipping = False
s.last_brake_freq = 1.0
s.last_brake_resistance = 1.0
pkt = FakePacket(
brake=255,
tire_combined_slip_FL=1.0, tire_combined_slip_FR=1.0,
tire_combined_slip_RL=1.0, tire_combined_slip_RR=1.0,
)
ft.handle_brake(c, pkt, s)
_, _, _, amp, freq = c.calls[-1]
self.assertEqual(freq, ft.MAX_BRAKE_VIBRATION)
self.assertEqual(amp, ft.MAX_BRAKE_AMP)
# --- env helpers ------------------------------------------------------------
class TestReadIntensity(unittest.TestCase):
def test_default_when_unset(self):
os.environ.pop("FORZA_PROBE_X", None)
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.7), 0.7)
def test_clamps_above_one(self):
os.environ["FORZA_PROBE_X"] = "5.0"
self.assertEqual(ft._read_intensity("FORZA_PROBE_X"), 1.0)
def test_clamps_below_zero(self):
os.environ["FORZA_PROBE_X"] = "-1.0"
self.assertEqual(ft._read_intensity("FORZA_PROBE_X"), 0.0)
def test_nan_falls_back_to_default(self):
# Without the is_finite check, min(1.0, nan) returns 1.0 silently.
os.environ["FORZA_PROBE_X"] = "nan"
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.4), 0.4)
def test_inf_falls_back_to_default(self):
os.environ["FORZA_PROBE_X"] = "inf"
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.4), 0.4)
def test_garbage_falls_back_to_default(self):
os.environ["FORZA_PROBE_X"] = "yes please"
self.assertEqual(ft._read_intensity("FORZA_PROBE_X", default=0.5), 0.5)
class TestReadBool(unittest.TestCase):
def test_default(self):
os.environ.pop("FORZA_PROBE_BOOL", None)
self.assertTrue(ft._read_bool("FORZA_PROBE_BOOL", default=True))
self.assertFalse(ft._read_bool("FORZA_PROBE_BOOL", default=False))
def test_disabled_tokens(self):
for tok in ("0", "false", "FALSE", "no", "NO", "off", "Off", ""):
os.environ["FORZA_PROBE_BOOL"] = tok
self.assertFalse(ft._read_bool("FORZA_PROBE_BOOL"), f"token={tok!r}")
def test_enabled_tokens(self):
for tok in ("1", "true", "yes", "on", "Y", "anything-else"):
os.environ["FORZA_PROBE_BOOL"] = tok
self.assertTrue(ft._read_bool("FORZA_PROBE_BOOL"), f"token={tok!r}")
# --- on_error signature -----------------------------------------------------
# The dualsense-controller library dispatches state-change callbacks by
# parameter count. _on_controller_error MUST take exactly 1 parameter to
# bind to the 1-arg event (which emits new_value). 2-arg would bind to a
# 2-arg event that emits (new_value, timestamp_ns), making the log show a
# stray clock integer instead of the exception.
class TestOnControllerErrorSignature(unittest.TestCase):
def test_takes_exactly_one_argument(self):
import inspect
sig = inspect.signature(ft._on_controller_error)
self.assertEqual(
len(sig.parameters), 1,
f"_on_controller_error must take 1 arg (got {list(sig.parameters)}); "
"the dualsense-controller library dispatches by arity and 2-arg "
"would receive (new_value, timestamp_ns) instead of (exc).",
)
# --- Reset tolerates None controller ----------------------------------------
class TestResetNullSafe(unittest.TestCase):
def test_reset_triggers_none_no_raise(self):
ft.reset_triggers(None) # must not raise
def test_reset_all_none_no_raise(self):
ft.reset_all(None) # must not raise
# --- fdp surface contract ---------------------------------------------------
# Catches the field-name regression class. We collect every literal name
# the daemon passes to _safe_field/_safe_abs and assert each resolves on
# a real fdp.ForzaDataPacket. This would have caught all three of the
# bugs in commit 50e5f12 at CI time.
class TestFdpSurfaceContract(unittest.TestCase):
@staticmethod
def _collect_field_names(src_path: str) -> tuple[set[str], int]:
"""Walk the daemon AST for every _safe_field/_safe_abs call.
Returns (literal_names, non_literal_arg_count). Calls inside the
wrapper functions `_safe_abs` and `_safe_field` themselves are
excluded — those forward `name` from a parameter to the inner
call and do NOT touch the fdp surface directly. Non-zero
non-literal count from any OTHER caller indicates the contract
isn't fully enforceable.
"""
import ast
with open(src_path) as f:
tree = ast.parse(f.read())
class V(ast.NodeVisitor):
def __init__(self) -> None:
self.names: set[str] = set()
self.non_literal: int = 0
self._fn_stack: list[str] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._fn_stack.append(node.name)
self.generic_visit(node)
self._fn_stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._fn_stack.append(node.name)
self.generic_visit(node)
self._fn_stack.pop()
def visit_Call(self, node: ast.Call) -> None:
self.generic_visit(node) # nested calls
fn = node.func
if not isinstance(fn, ast.Name):
return
if fn.id not in ("_safe_field", "_safe_abs"):
return
# Skip the forwarding wrappers themselves: those call
# _safe_field with a `name` parameter, not a literal.
if self._fn_stack and self._fn_stack[-1] in ("_safe_abs", "_safe_field"):
return
if len(node.args) < 2:
self.non_literal += 1
return
arg = node.args[1]
if isinstance(arg, ast.Constant) and isinstance(arg.value, str):
self.names.add(arg.value)
else:
self.non_literal += 1
v = V()
v.visit(tree)
return v.names, v.non_literal
def test_every_daemon_field_resolves_on_real_fdp_packet(self):
import fdp
# 324-byte zeroed buffer — fh4 unpack will succeed (just yields
# zeros for everything) but every documented field will be
# setattr'd on the resulting object.
pkt = fdp.ForzaDataPacket(b"\x00" * 324, packet_format="fh4")
used_names, non_literal = self._collect_field_names(ft.__file__)
self.assertGreater(len(used_names), 5, "AST walker found too few names")
self.assertEqual(
non_literal, 0,
"Every _safe_field/_safe_abs call MUST pass a string literal "
"field name so this contract test can verify it. Variable / "
"kwarg / computed names defeat the AST walker.",
)
missing = sorted(n for n in used_names if not hasattr(pkt, n))
self.assertEqual(
missing, [],
f"daemon reads fdp fields that don't exist: {missing}. "
"Check fdp.ForzaDataPacket.sled_props + dash_props for canonical names.",
)
if __name__ == "__main__":
unittest.main(verbosity=2)