"""Behavioral tests for forza_trigger.py. Pure-function tests + integration tests using a FakeController that records every effect call. No hardware required. Run: `python -m unittest discover -s hosts/yarn/forza-trigger -p 'test_*.py'` or via the nix derivation in default.nix (runs at build time). """ import math import os import sys import unittest from dataclasses import dataclass, field from typing import Any, Tuple # Disable lightbar by default for triggers-only tests; specific lightbar tests # re-enable by mutating module attributes. os.environ.setdefault("FORZA_LIGHTBAR", "0") import forza_trigger as ft # noqa: E402 # --- Fakes ------------------------------------------------------------------ class FakeEffect: def __init__(self, log: list, side: str): self.log = log self.side = side def off(self): self.log.append((self.side, "off")) def feedback(self, start_position=0, strength=0): self.log.append((self.side, "feedback", start_position, strength)) def vibration(self, start_position=0, amplitude=0, frequency=0): self.log.append((self.side, "vibration", start_position, amplitude, frequency)) class FakeTrigger: def __init__(self, log: list, side: str): self.effect = FakeEffect(log, side) class FakeLightbar: def __init__(self, log: list): self.log = log def set_color(self, r, g, b): self.log.append(("lightbar", "set_color", r, g, b)) class FakeController: def __init__(self): self.calls: list = [] self.left_trigger = FakeTrigger(self.calls, "L2") self.right_trigger = FakeTrigger(self.calls, "R2") self.lightbar = FakeLightbar(self.calls) @dataclass class FakePacket: """Shape mirrors fdp.ForzaDataPacket. Only the fields handlers read are populated; the rest stay 0. """ accel: float = 0.0 brake: float = 0.0 is_race_on: float = 1.0 current_engine_rpm: float = 3000.0 engine_idle_rpm: float = 800.0 engine_max_rpm: float = 8000.0 power: float = 100.0 acceleration_x: float = 0.0 acceleration_y: float = 0.0 acceleration_z: float = 0.0 tire_combined_slip_FL: float = 0.0 tire_combined_slip_FR: float = 0.0 tire_combined_slip_RL: float = 0.0 tire_combined_slip_RR: float = 0.0 car_class: float = 0.0 car_performance_index: float = 0.0 # --- Pure-function tests ---------------------------------------------------- class TestMap(unittest.TestCase): def test_endpoints(self): self.assertEqual(ft._map(0, 0, 1, 0, 100), 0) self.assertEqual(ft._map(1, 0, 1, 0, 100), 100) def test_midpoint(self): self.assertEqual(ft._map(0.5, 0, 1, 0, 100), 50) def test_clamps_low(self): self.assertEqual(ft._map(-5, 0, 10, 0, 100), 0) def test_clamps_high(self): self.assertEqual(ft._map(50, 0, 10, 0, 100), 100) def test_zero_input_range(self): # Degenerate input range — must not divide by zero. self.assertEqual(ft._map(5, 5, 5, 0, 100), 0) class TestEWMA(unittest.TestCase): def test_alpha_one_is_instant(self): # α=1.0 → output exactly equals the input (no smoothing). self.assertEqual(ft._ewma(10, 5, 1.0), 10) def test_alpha_zero_is_frozen(self): # α=0.0 → output exactly equals the previous value (no update). self.assertEqual(ft._ewma(10, 5, 0.0), 5) def test_alpha_half(self): self.assertEqual(ft._ewma(10, 6, 0.5), 8) class TestSafeField(unittest.TestCase): def test_missing_attribute_returns_default(self): pkt = FakePacket() self.assertEqual(ft._safe_field(pkt, "nonexistent_field", 42.0), 42.0) def test_nan_returns_default(self): pkt = FakePacket(accel=math.nan) self.assertEqual(ft._safe_field(pkt, "accel", 7.0), 7.0) def test_inf_returns_default(self): pkt = FakePacket(accel=math.inf) self.assertEqual(ft._safe_field(pkt, "accel", 0.0), 0.0) def test_normal_value(self): pkt = FakePacket(accel=128.0) self.assertEqual(ft._safe_field(pkt, "accel"), 128.0) class TestCombinedSlip(unittest.TestCase): def test_arithmetic_mean(self): pkt = FakePacket( tire_combined_slip_FL=0.4, tire_combined_slip_FR=0.6, tire_combined_slip_RL=0.2, tire_combined_slip_RR=0.8, ) all_, front, rear = ft._combined_slip(pkt) # All four = (0.4+0.6+0.2+0.8)/4 = 0.5 self.assertAlmostEqual(all_, 0.5) # Front = (0.4+0.6)/2 = 0.5 self.assertAlmostEqual(front, 0.5) # Rear = (0.2+0.8)/2 = 0.5 self.assertAlmostEqual(rear, 0.5) def test_takes_absolute_values(self): pkt = FakePacket( tire_combined_slip_FL=-0.5, tire_combined_slip_FR=0.5, tire_combined_slip_RL=-0.3, tire_combined_slip_RR=0.3, ) all_, front, rear = ft._combined_slip(pkt) self.assertAlmostEqual(all_, 0.4) self.assertAlmostEqual(front, 0.5) self.assertAlmostEqual(rear, 0.3) # --- Race-on debounce tests ------------------------------------------------- class TestIsRaceOn(unittest.TestCase): def test_simple_yes(self): s = ft.DaemonState() pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=200) self.assertTrue(ft.is_race_on(pkt, s)) self.assertEqual(s.rpm_accumulator, 0) def test_explicit_no(self): s = ft.DaemonState() pkt = FakePacket(is_race_on=0.0) self.assertFalse(ft.is_race_on(pkt, s)) def test_debounce_overrides_stuck_flag(self): # Simulate FH5's stuck-flag bug: is_race_on=1 but RPM and power show # the car is in a menu (power<=0, RPM unchanged). s = ft.DaemonState() pkt = FakePacket(is_race_on=1.0, current_engine_rpm=800, power=0) s.last_rpm = 800 # Pump packets until the accumulator trips. for _ in range(ft.RPM_ACCUMULATOR_TRIGGER_RACE_OFF): self.assertTrue(ft.is_race_on(pkt, s)) # One more samples and we cross the threshold. self.assertFalse(ft.is_race_on(pkt, s)) def test_rpm_change_resets_accumulator(self): s = ft.DaemonState() s.rpm_accumulator = 50 s.last_rpm = 800 pkt = FakePacket(is_race_on=1.0, current_engine_rpm=4000, power=100) ft.is_race_on(pkt, s) self.assertEqual(s.rpm_accumulator, 0) # --- Trigger handler tests -------------------------------------------------- class TestHandleThrottle(unittest.TestCase): def test_zero_pedal_baseline_min_feedback(self): # Foot off throttle should NOT turn the trigger off — variant D's # design is "always feels something". Baseline mode still fires # with strength = MIN_THROTTLE_RESISTANCE so the user feels light # constant resistance under the finger. The run-loop is the only # place that calls effect.off() (out-of-race idle reset). c = FakeController() s = ft.DaemonState() ft.handle_throttle(c, FakePacket(accel=0.0), s) self.assertEqual(c.calls, [("R2", "feedback", 0, ft.MIN_THROTTLE_RESISTANCE)]) def test_baseline_under_normal_acceleration(self): c = FakeController() s = ft.DaemonState() # No slip, moderate accel, throttle pressed. pkt = FakePacket(accel=128, acceleration_x=0.5, acceleration_z=2.0) ft.handle_throttle(c, pkt, s) # Baseline mode → feedback call (not vibration, not off). self.assertEqual(len(c.calls), 1) self.assertEqual(c.calls[0][1], "feedback") side, mode, start, strength = c.calls[0] self.assertEqual(side, "R2") self.assertEqual(start, 0) # Strength is in valid library range. self.assertGreaterEqual(strength, ft.MIN_THROTTLE_RESISTANCE) self.assertLessEqual(strength, ft.MAX_THROTTLE_RESISTANCE) def test_front_slip_triggers_vibration(self): c = FakeController() s = ft.DaemonState() # Front-slip > 0.5 with throttle pressed → vibration mode. # Use heavy accel so amp_raw is large enough that the EWMA-filtered # value crosses the suppression threshold and we see vibration not # feedback fallback. pkt = FakePacket( accel=255, acceleration_z=10.0, tire_combined_slip_FL=0.8, tire_combined_slip_FR=0.8, ) # Pre-warm EWMA so first call doesn't get heavily damped. s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP ft.handle_throttle(c, pkt, s) self.assertEqual(len(c.calls), 1) self.assertEqual(c.calls[0][1], "vibration") def test_rear_slip_below_accelerator_gate_no_vibration(self): # Rear-only slip but accelerator below the gate should NOT trigger # vibration mode (Cosmii's rule, legacy_Program.cs:224). c = FakeController() s = ft.DaemonState() pkt = FakePacket( accel=100, # below ACCELERATOR_REAR_SLIP_GATE=200 tire_combined_slip_RL=0.8, tire_combined_slip_RR=0.8, ) ft.handle_throttle(c, pkt, s) self.assertEqual(c.calls[0][1], "feedback") # baseline mode def test_rear_slip_above_accelerator_gate_triggers_vibration(self): c = FakeController() s = ft.DaemonState() s.last_throttle_freq = ft.MAX_ACCEL_GRIPLOSS_VIBRATION s.last_throttle_resistance = ft.MAX_ACCEL_GRIPLOSS_AMP pkt = FakePacket( accel=255, # above ACCELERATOR_REAR_SLIP_GATE=200 acceleration_z=10.0, tire_combined_slip_RL=0.8, tire_combined_slip_RR=0.8, ) ft.handle_throttle(c, pkt, s) self.assertEqual(c.calls[0][1], "vibration") class TestHandleBrake(unittest.TestCase): def test_zero_pedal_baseline_min_feedback(self): # Same as throttle — foot off brake = light constant resistance, not off. c = FakeController() s = ft.DaemonState() ft.handle_brake(c, FakePacket(brake=0.0), s) self.assertEqual(c.calls, [("L2", "feedback", 0, ft.MIN_BRAKE_RESISTANCE)]) def test_baseline_under_normal_braking(self): c = FakeController() s = ft.DaemonState() pkt = FakePacket(brake=128) ft.handle_brake(c, pkt, s) self.assertEqual(len(c.calls), 1) self.assertEqual(c.calls[0][1], "feedback") # Strength scales with brake input. brake=128 → midpoint of 1..6 = 4ish. _, _, _, strength = c.calls[0] self.assertGreaterEqual(strength, ft.MIN_BRAKE_RESISTANCE) self.assertLessEqual(strength, ft.MAX_BRAKE_RESISTANCE) def test_slip_with_heavy_brake_triggers_vibration(self): # Patmagauran/our condition: slip > thr AND brake > thr. c = FakeController() s = ft.DaemonState() s.last_brake_freq = ft.MAX_BRAKE_VIBRATION s.last_brake_resistance = ft.MAX_BRAKE_AMP pkt = FakePacket( brake=200, # well above BRAKE_VIBRATION_MODE_START=10 tire_combined_slip_FL=0.8, tire_combined_slip_FR=0.8, tire_combined_slip_RL=0.8, tire_combined_slip_RR=0.8, ) ft.handle_brake(c, pkt, s) self.assertEqual(c.calls[0][1], "vibration") def test_slip_with_light_brake_no_vibration(self): # Slip but brake below threshold → baseline mode (Patmagauran's logic, # NOT Cosmii's inverted version). c = FakeController() s = ft.DaemonState() pkt = FakePacket( brake=8, # below BRAKE_VIBRATION_MODE_START=10 tire_combined_slip_FL=0.8, tire_combined_slip_FR=0.8, ) ft.handle_brake(c, pkt, s) self.assertEqual(c.calls[0][1], "feedback") # baseline mode # --- Lightbar tests --------------------------------------------------------- class TestLightbarColor(unittest.TestCase): def test_in_race_low_rpm_green_floor(self): # At idle RPM the green channel should hit GREEN_FLOOR (50), not 0. s = ft.DaemonState() pkt = FakePacket( is_race_on=1.0, current_engine_rpm=800, # = idle engine_idle_rpm=800, engine_max_rpm=8000, ) r, g, b = ft.lightbar_color(pkt, s, in_race=True) self.assertEqual(r, 0) self.assertEqual(g, ft.GREEN_FLOOR) self.assertEqual(b, 0) def test_in_race_below_redline_green(self): s = ft.DaemonState() # 50% RPM ratio, below redline. pkt = FakePacket( is_race_on=1.0, current_engine_rpm=4400, engine_idle_rpm=800, engine_max_rpm=8000, ) r, g, b = ft.lightbar_color(pkt, s, in_race=True) # Both red and green should rise; green not inverted yet. self.assertGreater(g, 0) self.assertLess(g, 255) def test_in_race_above_redline_inverts_green(self): s = ft.DaemonState() # 95% RPM ratio, well past 85% redline. pkt = FakePacket( is_race_on=1.0, current_engine_rpm=8000, engine_idle_rpm=800, engine_max_rpm=8000, ) r, g, b = ft.lightbar_color(pkt, s, in_race=True) # At 100% ratio, green=255 then inverted to 0; red=255. self.assertEqual(r, 255) self.assertEqual(g, 0) def test_menu_class_d_color(self): s = ft.DaemonState() pkt = FakePacket(car_class=0, car_performance_index=255) rgb = ft.lightbar_color(pkt, s, in_race=False) # Class D at full CPI = exact palette color. self.assertEqual(rgb, ft.CAR_CLASS_COLORS[0]) def test_menu_x_class_constant(self): s = ft.DaemonState() pkt = FakePacket(car_class=7, car_performance_index=999) rgb = ft.lightbar_color(pkt, s, in_race=False) self.assertEqual(rgb, ft.COLOR_CLASS_X) def test_menu_class_d_packet_unsticks_previous_class(self): # Switching from S2 → D MUST update the lightbar to D's palette, # not keep showing S2 just because car_class=0 was previously # treated as "no info" by Cosmii's broken `> 0` guard. s = ft.DaemonState() s.last_car_class = 5 # S2 s.last_cpi = 200 d_pkt = FakePacket(car_class=0, car_performance_index=180) rgb = ft.lightbar_color(d_pkt, s, in_race=False) # Expect tinted D-class color, NOT S2. ratio = 180 / 255 expected = ( int(ratio * ft.CAR_CLASS_COLORS[0][0]), int(ratio * ft.CAR_CLASS_COLORS[0][1]), int(ratio * ft.CAR_CLASS_COLORS[0][2]), ) self.assertEqual(rgb, expected) # --- DaemonState lifecycle -------------------------------------------------- class TestDaemonStateReset(unittest.TestCase): def test_reset_clears_filters_and_accumulator(self): s = ft.DaemonState() s.last_throttle_resistance = 99.0 s.last_brake_resistance = 99.0 s.last_throttle_freq = 99.0 s.last_brake_freq = 99.0 s.rpm_accumulator = 50 s.last_rpm = 4000 s.reset() self.assertEqual(s.last_throttle_resistance, 1.0) self.assertEqual(s.last_brake_resistance, 1.0) self.assertEqual(s.last_throttle_freq, 0.0) self.assertEqual(s.last_brake_freq, 0.0) self.assertEqual(s.rpm_accumulator, 0) self.assertEqual(s.last_rpm, 0.0) def test_reset_clears_lightbar_dedup(self): # last_color MUST be cleared on idle reset. The `apply_lightbar` # dedup short-circuits when color == state.last_color; if reset # left the cache holding the in-race color, resuming with the # same color would skip the controller write — leaving the bar # black after the (0,0,0) write reset_all sent at idle. s = ft.DaemonState() s.last_color = (1, 2, 3) s.reset() self.assertEqual(s.last_color, (0, 0, 0)) # --- Intensity attenuation regression --------------------------------------- # Catches the bug where the EWMA cell stored the intensity-scaled value, so # any FORZA_*_INTENSITY < 1 geometrically attenuated the output (steady-state # at intensity=0.5, α=0.01 was ~50× too low). The fix decouples cell storage # from output scaling. This test would have caught it in CI. class TestIntensityScaling(unittest.TestCase): def _run_steady(self, intensity_attr: str, side: str, pkt_kwargs: dict, n_iter: int = 300) -> int: """Drive a handler n_iter times with constant input and the named intensity attribute monkey-patched, then return the final integer strength sent on `side` ("L2" or "R2"). """ c = FakeController() s = ft.DaemonState() for _ in range(n_iter): if side == "R2": ft.handle_throttle(c, FakePacket(**pkt_kwargs), s) else: ft.handle_brake(c, FakePacket(**pkt_kwargs), s) # Last call's strength. last = c.calls[-1] # ("R2"/"L2", "feedback", start, strength) OR ("R2"/"L2", "vibration", start, amp, freq) return last[3] def test_throttle_baseline_intensity_proportional(self): # With intensity=0.5, the output at α=0.01 (very smooth) must # converge to ~0.5× the intensity=1.0 output, NOT to ~1% of it. # We can't monkey-patch module-level constants from a method # cleanly, so we re-import after env tweaks isn't an option in # one process. Instead, pin RIGHT_TRIGGER_INTENSITY directly. original = ft.RIGHT_TRIGGER_INTENSITY try: ft.RIGHT_TRIGGER_INTENSITY = 1.0 full = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2", dict(accel=128, acceleration_z=10.0)) ft.RIGHT_TRIGGER_INTENSITY = 0.5 half = self._run_steady("RIGHT_TRIGGER_INTENSITY", "R2", dict(accel=128, acceleration_z=10.0)) finally: ft.RIGHT_TRIGGER_INTENSITY = original # full strength at this input is MAX=6 (avgAccel=10 → top of range). # half intensity should clamp around 3 (or MIN=1 floor; not tuple[set[str], int]: """Walk the daemon AST for every _safe_field/_safe_abs call. Returns (literal_names, non_literal_arg_count). Calls inside the wrapper functions `_safe_abs` and `_safe_field` themselves are excluded — those forward `name` from a parameter to the inner call and do NOT touch the fdp surface directly. Non-zero non-literal count from any OTHER caller indicates the contract isn't fully enforceable. """ import ast with open(src_path) as f: tree = ast.parse(f.read()) class V(ast.NodeVisitor): def __init__(self) -> None: self.names: set[str] = set() self.non_literal: int = 0 self._fn_stack: list[str] = [] def visit_FunctionDef(self, node: ast.FunctionDef) -> None: self._fn_stack.append(node.name) self.generic_visit(node) self._fn_stack.pop() def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: self._fn_stack.append(node.name) self.generic_visit(node) self._fn_stack.pop() def visit_Call(self, node: ast.Call) -> None: self.generic_visit(node) # nested calls fn = node.func if not isinstance(fn, ast.Name): return if fn.id not in ("_safe_field", "_safe_abs"): return # Skip the forwarding wrappers themselves: those call # _safe_field with a `name` parameter, not a literal. if self._fn_stack and self._fn_stack[-1] in ("_safe_abs", "_safe_field"): return if len(node.args) < 2: self.non_literal += 1 return arg = node.args[1] if isinstance(arg, ast.Constant) and isinstance(arg.value, str): self.names.add(arg.value) else: self.non_literal += 1 v = V() v.visit(tree) return v.names, v.non_literal def test_every_daemon_field_resolves_on_real_fdp_packet(self): import fdp # 324-byte zeroed buffer — fh4 unpack will succeed (just yields # zeros for everything) but every documented field will be # setattr'd on the resulting object. pkt = fdp.ForzaDataPacket(b"\x00" * 324, packet_format="fh4") used_names, non_literal = self._collect_field_names(ft.__file__) self.assertGreater(len(used_names), 5, "AST walker found too few names") self.assertEqual( non_literal, 0, "Every _safe_field/_safe_abs call MUST pass a string literal " "field name so this contract test can verify it. Variable / " "kwarg / computed names defeat the AST walker.", ) missing = sorted(n for n in used_names if not hasattr(pkt, n)) self.assertEqual( missing, [], f"daemon reads fdp fields that don't exist: {missing}. " "Check fdp.ForzaDataPacket.sled_props + dash_props for canonical names.", ) if __name__ == "__main__": unittest.main(verbosity=2)