Files
nixos/hosts/yarn/forza-trigger/forza_trigger.py
Simon Gardling 5798caef37 forza-trigger: hysteresis on is_race_on (real fix for between-race clicks)
Strace post-deploy showed the daemon flipping every other packet between
in-race state (mode 0x21 FEEDBACK + LED green) and out-of-race state
(mode 0x05 OFF + LED black). 8 writes, 8 transitions in 5s — every
HID OUT report a state change.

Root cause: FH5 emits packets where the is_race_on field alternates
True/False at packet rate during menu/loading/transition states.
Cosmii's RPM_ACCUMULATOR debounce only handles the 'flag stuck True
when we're really in a menu' case (quirk #1); it does nothing for
'flag flipping at packet rate' (quirk #2).

Fix: split the read from the hysteresis. is_race_on now returns the
raw flag with quirk #1 applied. commit_in_race applies a packet-count
hysteresis (IN_RACE_HYSTERESIS_PACKETS = 30 ≈ 0.5s at 60Hz) — only
after N consecutive packets of the new value does the committed
in-race state flip. Alternation just keeps the pending counter
oscillating near 0; it never reaches threshold and the run-loop sees
a stable state.

Architecture: hysteresis in a separate function (not in is_race_on)
because the run-loop must update state.last_in_race AFTER side
effects, not before — otherwise the elif transition-detection
breaks. is_race_on stays pure read; commit_in_race is the gatekeeper;
run-loop sets state.last_in_race once at end of packet handling.

Tests grew 54→58. New TestCommitInRace covers:
  - stable input (no pending growth)
  - worst-case alternation (never commits)
  - N consecutive packets commit
  - matching raw resets pending counter

TestIsRaceOn renamed to reflect new (narrower) responsibility.
2026-05-07 15:11:35 -04:00

875 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
Port of cosmii02/ForzaDSXlegacy ("ForzaDSX") — variant D in our reference
taxonomy. Replaces the previous Race-Element 1:1 port (variant A) which the
user found "silent unless slipping".
Design (variant D):
- Continuous baseline resistance on both triggers — the trigger always has
*some* feel under the finger, scaled by pedal input (L2) or computed
chassis acceleration (R2). This is the fix for the "no feedback at all"
complaint.
- Vibration overlay during wheel slip / wheelspin (both triggers). Frequency
scales with slip severity, amplitude scales with brake input (L2) or
avg accel (R2).
- RPM-reactive lightbar in-race; redline inverts green→red. In menus, the
lightbar carries the car's class color tinted by its performance index.
- EWMA smoothing on resistance and vibration frequency, separately tunable
per channel. Patmagauran's α_throttle=0.01 preserves the "creamy" throttle
feel; brake gets α=1.0 (instant) so ABS-like flutter isn't smoothed away.
- Per-trigger global intensity scale via FORZA_L2_INTENSITY / FORZA_R2_INTENSITY
env vars (∈ [0, 1]). Default 1.0. Lightbar gated by FORZA_LIGHTBAR (default on).
- IsRaceOn debounce: FH5 sometimes leaves is_race_on=1 after returning to
a menu. Cosmii's workaround is to count consecutive samples where RPM is
stable AND power ≤ 0; once the count exceeds RPM_ACCUMULATOR_TRIGGER, we
override is_race_on to false.
- No body LRA (the "shakes my whole hand" complaint of earlier iterations
came from L/R LRA buzz; this design never touches them).
Adaptations for the dualsense-controller library (yesbotics 0.3.1):
- Cosmii drives DSX via JSON-over-UDP at port 6969; we drive the controller
directly via hidraw and bypass DSX. The mode mapping is:
Cosmii Resistance(start, force ∈ 0-7) → effect.feedback(start, strength ∈ 0-8)
Cosmii VibrateResistance(start, freq, stiff) → effect.vibration(start, amp, freq)
— the position-dependent
stiffness gradient is lost
(per static-analysis
recommendation, option 1);
the vibration carries the
slip-event signal which is
the dominant feel.
- The library's `effect.vibration` is upstream-flagged "TODO: not working
properly" but our previous Race-Element 1:1 port used it successfully.
If the user reports the vibration mode feels broken, the fallback is raw
HID byte construction using DSX's TriggerEffectGenerator.cs as reference
(decompiled at /tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/).
Divergences from upstream Cosmii:
- Cosmii's brake-vibration entry condition is `slip < 0.5 AND brake < 10`,
inverted from Patmagauran's parent fork (`slip > 0.5 AND brake > 10`).
The inversion gates vibration to "very light brake at very low slip"
which produces a constant 35Hz buzz when nothing is happening — almost
certainly a copy-paste bug at the comparison operators when Cosmii forked
Patmagauran. We use Patmagauran's correct condition (slip > thr ∧ brake > thr).
- The vibration-frequency formula `freq = MAX - Map(slip, GRIP_LOSS, 1, 0, MAX)`
used by both Patmagauran and Cosmii is also non-intuitive: it maxes at the
threshold and decays to zero at full lockup. Race-Element's variant B uses
the conventional "more slip = higher freq" mapping. We follow Race-Element.
- We don't use a separate stiffness scale (1-200) for the vibration mode;
the library's vibration amplitude is 0-8 inclusive.
References (all decompiled C# on disk):
/tmp/impls/legacy_Program.cs cosmii02/ForzaDSXlegacy (primary)
/tmp/impls/fds_Program.cs + fds_Settings patmagauran/ForzaDualSense (fork parent, defaults)
/tmp/race-element/Race_Element.HUD.Common__Overlays__Driving__DSX__TriggerHaptics.cs
Race-Element variant B (slip→freq direction)
/tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/TriggerEffectGenerator.cs
DSX byte-level packet generator
(reference for option-3 raw HID fallback)
Setup on the user side: see default.nix.
"""
import argparse
import logging
import math
import os
import signal
import socket
import sys
import time
from dataclasses import dataclass, field
from typing import Optional
from dualsense_controller import DualSenseController
from fdp import ForzaDataPacket
LOG = logging.getLogger("forza-trigger")
# --- Tuning constants (Cosmii / Patmagauran defaults) ------------------------
# Source citations point to legacy_Program.cs (Cosmii) and fds_Settings.cs
# (Patmagauran defaults, inherited by Cosmii via INI file).
# --- L2 brake ---------------------------------------------------------------
# fds_Settings.cs (Cosmii inherits via appsettings.ini).
GRIP_LOSS_VAL = 0.5 # combined-slip threshold for vibration overlay
BRAKE_VIBRATION_MODE_START = 10 # 0-255 brake input scale; gate for slip mode
MIN_BRAKE_VIBRATION = 3 # Hz floor; below → revert to feedback only
MAX_BRAKE_VIBRATION = 35 # Hz ceiling at peak slip
MIN_BRAKE_RESISTANCE = 1 # baseline strength at zero brake (lib 0-8)
MAX_BRAKE_RESISTANCE = 6 # baseline strength at full brake (lib 0-8)
MIN_BRAKE_AMP = 1 # vibration-mode amplitude floor (lib 0-8)
MAX_BRAKE_AMP = 8 # vibration-mode amplitude ceiling
EWMA_ALPHA_BRAKE = 1.0 # 1.0 = instant (no smoothing). Patmagauran's choice.
EWMA_ALPHA_BRAKE_FREQ = 1.0
# --- R2 throttle ------------------------------------------------------------
THROTTLE_GRIP_LOSS_VAL = 0.5
THROTTLE_VIBRATION_MODE_START = 10 # 0-255 accelerator input scale
MIN_ACCEL_GRIPLOSS_VIBRATION = 3
MAX_ACCEL_GRIPLOSS_VIBRATION = 35
TURN_ACCEL_MOD = 0.5 # AccelX² weight (lateral)
FORWARD_ACCEL_MOD = 1.0 # AccelZ² weight (longitudinal)
ACCELERATION_LIMIT = 10.0 # m/s² clamp ceiling
MIN_THROTTLE_RESISTANCE = 1
MAX_THROTTLE_RESISTANCE = 6
MIN_ACCEL_GRIPLOSS_AMP = 1
MAX_ACCEL_GRIPLOSS_AMP = 8
EWMA_ALPHA_THROTTLE = 0.01 # very smooth (Patmagauran's "creamy" tuning)
EWMA_ALPHA_THROTTLE_FREQ = 0.5
# Hardcoded by Cosmii (legacy_Program.cs:224): rear-slip alone counts only
# when accelerator is firmly depressed. Avoids false-positive vibration when
# coasting on light throttle through slick patches.
ACCELERATOR_REAR_SLIP_GATE = 200 # 0-255 scale
# --- Lightbar ---------------------------------------------------------------
# In-race: green channel ∝ RPM ratio with redline inversion to red.
# Out-of-race: car-class color tinted by car performance index.
RPM_REDLINE_RATIO = 0.85 # above this, green→red inversion
GREEN_FLOOR = 50 # min green channel value (lightbar visible at idle)
MAX_CPI = 255.0 # car performance index ceiling for tint
# Cosmii car-class palettes (legacy_Program.cs:38-51). Class IDs are Forza's
# enum: D=0, C=1, B=2, A=3, S1=4, S2=5; X is "above the table".
CAR_CLASS_COLORS = {
0: (107, 185, 236), # D — cyan
1: (234, 202, 49), # C — gold
2: (211, 90, 37), # B — orange
3: (187, 59, 34), # A — red
4: (128, 54, 243), # S1 — purple
5: (75, 88, 229), # S2 — blue
}
COLOR_CLASS_X = (105, 182, 72) # green for above-S2
# --- IsRaceOn debounce ------------------------------------------------------
# FH5 sometimes leaves is_race_on=1 after menu transitions. Count samples
# where RPM is stable AND power ≤ 0; once the count exceeds the threshold,
# override is_race_on to false. ~3.3s at 60Hz packet rate. (legacy_Program.cs:35)
RPM_ACCUMULATOR_TRIGGER_RACE_OFF = 200
# Hysteresis on is_race_on flag flips. FH5 emits packets where the bit
# alternates True/False at packet rate during menu/loading transitions.
# This many consecutive packets of the OPPOSITE current state are required
# before is_race_on() commits a flip. ~0.5s at 60Hz — long enough to
# filter alternation, short enough to keep race-start/end responsive.
IN_RACE_HYSTERESIS_PACKETS = 30
# --- User-facing intensity scales (env vars) --------------------------------
# Read once at module import. Process restart picks up new values.
def _read_intensity(name: str, default: float = 1.0) -> float:
"""Parse an intensity env var clamped to [0, 1]. NaN/inf/junk fall back to
`default` with a warning — silent fallthrough to 1.0 would hide
misconfigurations (e.g. `FORZA_R2_INTENSITY=nan` is almost certainly
a typo, not "use full intensity").
"""
raw = os.environ.get(name)
if raw is None:
return default
try:
v = float(raw)
except ValueError:
LOG.warning("invalid %s=%r — using %.2f", name, raw, default)
return default
if not math.isfinite(v):
LOG.warning("non-finite %s=%r — using %.2f", name, raw, default)
return default
return max(0.0, min(1.0, v))
def _read_bool(name: str, default: bool = True) -> bool:
"""Parse a boolean env var. Accepts the usual disable-tokens
{0, false, no, off, ""} (case-insensitive) and treats anything else
as enabled. Mirrors the strict parsing of `_read_intensity` so that
typos don't silently flip behavior.
"""
raw = os.environ.get(name)
if raw is None:
return default
return raw.strip().lower() not in ("0", "false", "no", "off", "")
LEFT_TRIGGER_INTENSITY = _read_intensity("FORZA_L2_INTENSITY")
RIGHT_TRIGGER_INTENSITY = _read_intensity("FORZA_R2_INTENSITY")
LIGHTBAR_ENABLED = _read_bool("FORZA_LIGHTBAR", default=True)
# --- Forza UDP packet sizes -> fdp packet_format strings --------------------
# Note: the FM7 V1 232-byte sled format does NOT include `accel`, `brake`,
# `power`, `car_class`, or `car_performance_index` — fdp doesn't setattr
# those names from a sled-format unpack, and our handlers depend on them.
# Driving a sled stream would silently leave both triggers off. We don't
# advertise FM7 sled support; if the user ever points an FM7 sled feed at us,
# 232-byte packets are simply dropped at _parse_packet.
PACKET_FORMATS = {
311: "dash", # Forza Motorsport 7 V2 car-dash format
324: "fh4", # Forza Horizon 4 / 5 (12-byte gap that fdp's fh4 mode patches around)
}
# --- Daemon lifecycle constants ---------------------------------------------
IDLE_TIMEOUT_S = 3.0
RECONNECT_BACKOFF_S = 1.0
# --- Module state (signal + hot-plug flags) ---------------------------------
_shutdown = False
_disconnected = False
def _on_termination(signum: int, frame: object) -> None:
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag."""
global _shutdown
LOG.info("received signal %d — shutting down", signum)
_shutdown = True
def _on_controller_error(exc: Exception) -> None:
"""dualsense-controller `on_error` callback for HID read-thread failures.
The library dispatches state-change callbacks by parameter count
(StateValueCallbackManager.py): a 1-arg callback is bound to
`_event_name_1_args` which emits `(new_value)`. A 2-arg callback would
receive `(new_value, timestamp_ns)` instead — meaning a `(prev, exc)`
handler would log a stray monotonic-clock integer instead of the
exception. 1-arg keeps observability tight.
"""
global _disconnected
LOG.warning("dualsense controller error: %s", exc)
_disconnected = True
# --- Helpers ----------------------------------------------------------------
def _safe_field(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float:
"""Read a packet field defensively. Returns default for missing fields,
NaN, or inf. Forza packets are fixed-format but mismatched format strings
can produce NaN; this filters them so haptic math never receives garbage.
"""
try:
v = float(getattr(pkt, name))
except (AttributeError, TypeError, ValueError):
return default
if math.isnan(v) or math.isinf(v):
return default
return v
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
return abs(_safe_field(pkt, name, 0.0))
def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
"""Cosmii's Map function (legacy_Program.cs:478-484, also fds_Program.cs:276).
Like Arduino's `map()`: clamp x to the input range, then linearly remap to
the output range. Used pervasively throughout the references.
"""
if in_max == in_min:
return out_min
x = max(in_min, min(in_max, x))
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
def _ewma(value: float, last: float, alpha: float) -> float:
"""Exponential weighted moving average (legacy_Program.cs:747).
α=1.0 → instant (output = value), α=0.01 → very smooth (~100-sample lag).
"""
return alpha * value + (1.0 - alpha) * last
def _combined_slip(pkt: ForzaDataPacket) -> tuple[float, float, float]:
"""(combined_all, combined_front, combined_rear) — Cosmii's slip aggregates.
Cosmii uses the arithmetic mean per axle (legacy_Program.cs:112-114),
distinct from Race-Element's max-of-axle. The mean is more conservative
and reduces single-tire-pulse chatter.
"""
fl = _safe_abs(pkt, "tire_combined_slip_FL")
fr = _safe_abs(pkt, "tire_combined_slip_FR")
rl = _safe_abs(pkt, "tire_combined_slip_RL")
rr = _safe_abs(pkt, "tire_combined_slip_RR")
return (fl + fr + rl + rr) / 4.0, (fl + fr) / 2.0, (rl + rr) / 2.0
def _avg_accel(pkt: ForzaDataPacket) -> float:
"""Cosmii's avgAccel (legacy_Program.cs:219). Lateral and longitudinal
chassis accel combined under per-axis weights. Used as the input to the
throttle-baseline resistance curve.
"""
ax = _safe_field(pkt, "acceleration_x", 0.0)
az = _safe_field(pkt, "acceleration_z", 0.0)
return math.sqrt(TURN_ACCEL_MOD * ax * ax + FORWARD_ACCEL_MOD * az * az)
# --- Daemon state -----------------------------------------------------------
@dataclass
class DaemonState:
"""Per-session state: EWMA filter cells + IsRaceOn debounce + transition flags.
EWMA cell semantics are unscaled — they store the raw smoothed input.
Per-trigger intensity (LEFT/RIGHT_TRIGGER_INTENSITY) is applied at output
time only. Storing the scaled value back would compound the scale every
tick (steady-state for α<1 collapses to ~0 with intensity<1).
`was_throttle_slipping` / `was_brake_slipping` track the previous tick's
slip state so we can bypass EWMA on transition INTO a slip event. Without
this, α=0.01 throttle smoothing means the first ~100 packets of a slip
event (~1.7s at 60Hz) feel barely audible.
Initial values from Cosmii's static initializers (legacy_Program.cs:23-26),
rescaled to the dualsense-controller library's 0-8 strength domain.
"""
last_throttle_resistance: float = 1.0
last_throttle_freq: float = 0.0
last_brake_resistance: float = 1.0
last_brake_freq: float = 0.0
was_throttle_slipping: bool = False
was_brake_slipping: bool = False
# Tracks the previous tick's in_race verdict; the run-loop uses this
# to detect in-race → menu transitions and partial-reset the EWMA cells
# + slip flags so the next race resumption gets a clean cold-start
# path (otherwise stale slip flags suppress the seeding fix in
# handle_throttle/handle_brake on the first packet of race 2).
last_in_race: bool = False
# is_race_on hysteresis. Forza Horizon emits packets where the
# is_race_on field alternates True/False at packet rate during
# menu/loading transitions. Without hysteresis, the run-loop
# transitions in-race → menu → in-race → menu every other packet,
# which the controller perceives as periodic clicks (every state
# change forces a fresh HID OUT report; the firmware reacts on
# receipt). The hysteresis counter requires N consecutive packets
# of the OPPOSITE flag value before flipping committed_in_race.
in_race_pending_count: int = 0
# IsRaceOn debounce.
last_rpm: float = 0.0
rpm_accumulator: int = 0
# Last-known valid car class / CPI. We always update on observation —
# Cosmii's `> 0` guard treats Class-D (enum value 0) as "no info" and
# leaves the lightbar showing the previous class color, which is wrong.
last_car_class: int = 0
last_cpi: int = 0
# Last lightbar color (skip identical writes). Reset to (0,0,0) on idle
# so that resuming with the same color re-pushes after the (0,0,0) reset
# we send to the controller; otherwise the bar stays black.
last_color: tuple[int, int, int] = (0, 0, 0)
def reset(self) -> None:
"""Reset filters, debounce, and lightbar dedup — call when telemetry
resumes after idle so the next packet's effects fire from a clean
baseline.
"""
self.last_throttle_resistance = 1.0
self.last_throttle_freq = 0.0
self.last_brake_resistance = 1.0
self.last_brake_freq = 0.0
self.was_throttle_slipping = False
self.was_brake_slipping = False
self.last_rpm = 0.0
self.rpm_accumulator = 0
self.in_race_pending_count = 0
self.last_color = (0, 0, 0)
# --- Race detection ---------------------------------------------------------
def is_race_on(pkt: ForzaDataPacket, state: DaemonState) -> bool:
"""Read the raw is_race_on flag with FH5's RPM-stuck-true workaround
(Cosmii's RPM_ACCUMULATOR debounce — legacy_Program.cs:89-109) applied.
Hysteresis on flag flips is the caller's job (see commit_in_race).
"""
raw_flag = bool(_safe_field(pkt, "is_race_on", 0.0))
current_rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
power = _safe_field(pkt, "power", 0.0)
if abs(current_rpm - state.last_rpm) < 1e-3 and power <= 0:
state.rpm_accumulator += 1
if state.rpm_accumulator > RPM_ACCUMULATOR_TRIGGER_RACE_OFF:
raw_flag = False
else:
state.rpm_accumulator = 0
state.last_rpm = current_rpm
return raw_flag
def commit_in_race(raw_flag: bool, state: DaemonState) -> bool:
"""Apply per-packet hysteresis to the raw flag. Returns the committed
in-race state — the value the run-loop should react to.
FH5 emits packets where is_race_on alternates True/False at packet rate
during menu transitions and loading screens. Without hysteresis, the
run-loop transitions every other packet and the controller emits
audible clicks (every state change forces a fresh HID OUT report; the
firmware reacts on receipt). The hysteresis counter requires
IN_RACE_HYSTERESIS_PACKETS consecutive packets of the new value before
we commit a flip — long enough to filter alternation, short enough
(~0.5s at 60Hz) that legitimate race-start/end transitions stay
responsive.
Note: this function does NOT mutate state.last_in_race. The caller
must update state.last_in_race AFTER running any transition side
effects, otherwise the elif state.last_in_race transition-detection
in the run-loop won't fire.
"""
if raw_flag == state.last_in_race:
state.in_race_pending_count = 0
return state.last_in_race
state.in_race_pending_count += 1
if state.in_race_pending_count < IN_RACE_HYSTERESIS_PACKETS:
return state.last_in_race
state.in_race_pending_count = 0
return raw_flag
# --- Trigger handlers -------------------------------------------------------
def handle_throttle(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
"""R2 throttle. Port of legacy_Program.cs:216-263.
Modes:
- In-race + slip: vibration overlay (freq from slip, amp from avgAccel).
If filtered freq <= MIN_ACCEL_GRIPLOSS_VIBRATION
OR accelerator <= THROTTLE_VIBRATION_MODE_START,
fall through to feedback-only (avoids audible
clicking at very low frequencies).
- In-race + no slip: continuous feedback resistance scaled by avgAccel.
- Pedal at rest: baseline still active at MIN strength so the
trigger always has *some* feel under the finger.
Trigger is only `effect.off()` when out of race
(handled by the run-loop, not here).
"""
accel = _safe_field(pkt, "accel", 0.0)
avg_a = _avg_accel(pkt)
_, combined_front, combined_rear = _combined_slip(pkt)
losing_grip = (
combined_front > THROTTLE_GRIP_LOSS_VAL
or (combined_rear > THROTTLE_GRIP_LOSS_VAL and accel > ACCELERATOR_REAR_SLIP_GATE)
)
if losing_grip:
# Vibration mode. Frequency scales with slip severity (Race-Element's
# convention: more slip = higher freq, opposite of Patmagauran's
# decay-from-threshold formula). Amplitude scales with avgAccel so
# heavier acceleration during slip = stronger buzz.
# Use the slipping axle's slip for the frequency curve, not the
# all-4-tire average. Cosmii's `combinedTireSlip` (mean of all 4)
# collapses to ~slip/2 for one-axle slip events (RWD wheelspin =
# rear=0.8, front=0 → combined=0.4) which lands below threshold
# and silently falls through to feedback-only mode. RWD burnouts
# are the most common Forza wheelspin event; we want vibration there.
slip_for_freq = max(combined_front, combined_rear)
slip_above = max(0.0, slip_for_freq - THROTTLE_GRIP_LOSS_VAL)
slip_range = max(1e-6, 1.0 - THROTTLE_GRIP_LOSS_VAL)
freq_raw = MIN_ACCEL_GRIPLOSS_VIBRATION + (
MAX_ACCEL_GRIPLOSS_VIBRATION - MIN_ACCEL_GRIPLOSS_VIBRATION
) * min(1.0, slip_above / slip_range)
amp_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_ACCEL_GRIPLOSS_AMP, MAX_ACCEL_GRIPLOSS_AMP)
# Bypass EWMA on the first packet of a slip event so the buzz is
# immediate. With α=0.01 throttle smoothing, the EWMA-warmed first
# packet would converge over ~1.7s — slip events are sub-second.
if not state.was_throttle_slipping:
state.last_throttle_freq = freq_raw
state.last_throttle_resistance = amp_raw
freq_unscaled = _ewma(freq_raw, state.last_throttle_freq, EWMA_ALPHA_THROTTLE_FREQ)
amp_unscaled = _ewma(amp_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
# Store unscaled — applying intensity here would compound the scale
# every tick (steady-state collapses to ~0 with intensity<1, α<1).
state.last_throttle_freq = freq_unscaled
state.last_throttle_resistance = amp_unscaled
freq_out = freq_unscaled * RIGHT_TRIGGER_INTENSITY
amp_out = amp_unscaled * RIGHT_TRIGGER_INTENSITY
state.was_throttle_slipping = True
if freq_out <= MIN_ACCEL_GRIPLOSS_VIBRATION or accel <= THROTTLE_VIBRATION_MODE_START:
# Fallback to baseline-style feedback if the computed vibration
# is too quiet to be useful.
strength = max(MIN_THROTTLE_RESISTANCE, min(MAX_THROTTLE_RESISTANCE, int(round(amp_out))))
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
else:
f = max(1, min(255, int(round(freq_out))))
a = max(MIN_ACCEL_GRIPLOSS_AMP, min(MAX_ACCEL_GRIPLOSS_AMP, int(round(amp_out))))
controller.right_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
else:
# Baseline mode. Strength scales with avgAccel — at idle the trigger
# is barely resistant; under heavy accel it firms up.
resistance_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_THROTTLE_RESISTANCE, MAX_THROTTLE_RESISTANCE)
resistance_unscaled = _ewma(resistance_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
state.last_throttle_resistance = resistance_unscaled
state.was_throttle_slipping = False
strength = max(
MIN_THROTTLE_RESISTANCE,
min(MAX_THROTTLE_RESISTANCE, int(round(resistance_unscaled * RIGHT_TRIGGER_INTENSITY))),
)
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
def handle_brake(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
"""L2 brake. Port of legacy_Program.cs:281-313 with the bug fix described
in the module docstring (Cosmii's inverted comparison operators replaced
with Patmagauran's correct logic: vibration when slip > thr AND brake > thr).
Modes:
- In-race + slip: vibration overlay (freq from slip, amp from brake input).
Below MIN_BRAKE_VIBRATION freq → revert to feedback-only.
- In-race + no slip: continuous feedback resistance scaled by brake input.
- Pedal at rest: baseline still active at MIN strength so the
trigger always has some feel.
"""
brake = _safe_field(pkt, "brake", 0.0)
# Use the worst axle's slip for both detection and the freq curve.
# Cosmii's all-4 average misses single-axle ABS events (front locks
# first under weight transfer; combined-average can stay below 0.5
# even with the front fully locked). Race-Element variant B uses
# max-of-axle for the same reason.
_, combined_front, combined_rear = _combined_slip(pkt)
slip_for_freq = max(combined_front, combined_rear)
slipping = slip_for_freq > GRIP_LOSS_VAL and brake > BRAKE_VIBRATION_MODE_START
if slipping:
slip_above = max(0.0, slip_for_freq - GRIP_LOSS_VAL)
slip_range = max(1e-6, 1.0 - GRIP_LOSS_VAL)
freq_raw = MIN_BRAKE_VIBRATION + (
MAX_BRAKE_VIBRATION - MIN_BRAKE_VIBRATION
) * min(1.0, slip_above / slip_range)
amp_raw = _map(brake, 0, 255, MIN_BRAKE_AMP, MAX_BRAKE_AMP)
# Bypass EWMA on slip-event entry; same reasoning as throttle.
if not state.was_brake_slipping:
state.last_brake_freq = freq_raw
state.last_brake_resistance = amp_raw
freq_unscaled = _ewma(freq_raw, state.last_brake_freq, EWMA_ALPHA_BRAKE_FREQ)
amp_unscaled = _ewma(amp_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
state.last_brake_freq = freq_unscaled
state.last_brake_resistance = amp_unscaled
freq_out = freq_unscaled * LEFT_TRIGGER_INTENSITY
amp_out = amp_unscaled * LEFT_TRIGGER_INTENSITY
state.was_brake_slipping = True
if freq_out <= MIN_BRAKE_VIBRATION:
strength = max(MIN_BRAKE_RESISTANCE, min(MAX_BRAKE_RESISTANCE, int(round(amp_out))))
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
else:
f = max(1, min(255, int(round(freq_out))))
a = max(MIN_BRAKE_AMP, min(MAX_BRAKE_AMP, int(round(amp_out))))
controller.left_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
else:
resistance_raw = _map(brake, 0, 255, MIN_BRAKE_RESISTANCE, MAX_BRAKE_RESISTANCE)
resistance_unscaled = _ewma(resistance_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
state.last_brake_resistance = resistance_unscaled
state.was_brake_slipping = False
strength = max(
MIN_BRAKE_RESISTANCE,
min(MAX_BRAKE_RESISTANCE, int(round(resistance_unscaled * LEFT_TRIGGER_INTENSITY))),
)
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
# --- Lightbar ---------------------------------------------------------------
def lightbar_color(pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> tuple[int, int, int]:
"""Compute the (R, G, B) the lightbar should display this tick.
In-race: green ∝ RPM ratio, with redline inversion (the green channel is
flipped to 255-G so the bar appears red as you approach the rev limit).
Out-of-race: car-class color tinted by car performance index.
Pure function — caller is responsible for actually pushing the color to
the controller (skipping if unchanged from `state.last_color`).
"""
if in_race:
rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
idle = _safe_field(pkt, "engine_idle_rpm", 0.0)
max_rpm = _safe_field(pkt, "engine_max_rpm", 0.0)
rpm_range = max(1.0, max_rpm - idle)
ratio = max(0.0, min(1.0, (rpm - idle) / rpm_range))
green = max(GREEN_FLOOR, int(ratio * 255))
red = int(ratio * 255)
if ratio >= RPM_REDLINE_RATIO:
green = 255 - green
return (red, green, 0)
# Menu mode: car-class color tinted by performance index. We always update
# on observation — Cosmii's `> 0` guard treats Class-D (enum value 0) as
# "no info" and leaves the previous class color stuck on the lightbar
# when switching INTO a Class-D car. fdp always sets these fields from
# an fh4/dash unpack, so absence is impossible — only the value matters.
state.last_car_class = int(_safe_field(pkt, "car_class", 0.0))
state.last_cpi = max(0, min(int(MAX_CPI), int(_safe_field(pkt, "car_performance_index", 0.0))))
base = CAR_CLASS_COLORS.get(state.last_car_class, COLOR_CLASS_X)
if state.last_car_class > 5:
# X-class: untinted constant green.
return COLOR_CLASS_X
cpi_ratio = state.last_cpi / MAX_CPI if state.last_cpi > 0 else 0.0
return (
int(cpi_ratio * base[0]),
int(cpi_ratio * base[1]),
int(cpi_ratio * base[2]),
)
def apply_lightbar(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> None:
if not LIGHTBAR_ENABLED:
return
color = lightbar_color(pkt, state, in_race)
if color == state.last_color:
return
state.last_color = color
controller.lightbar.set_color(*color)
# --- Reset ------------------------------------------------------------------
def reset_triggers(controller: Optional[DualSenseController]) -> None:
"""Default both triggers — called on idle, reconnect, and shutdown.
Tolerates a None controller so the shutdown-during-reconnect cleanup
path doesn't raise.
"""
if controller is None:
return
controller.left_trigger.effect.off()
controller.right_trigger.effect.off()
def reset_all(controller: Optional[DualSenseController]) -> None:
"""reset_triggers + lightbar to (0, 0, 0). Tolerates None controller."""
if controller is None:
return
reset_triggers(controller)
if LIGHTBAR_ENABLED:
try:
controller.lightbar.set_color(0, 0, 0)
except Exception:
pass
# --- Connection / hot-plug --------------------------------------------------
def _connect_controller() -> Optional[DualSenseController]:
"""Block until a DualSense is reachable. Returns the activated controller,
or None if shutdown was requested before one appeared.
"""
LOG.info("opening dualsense controller")
first_failure_logged = False
while not _shutdown:
try:
devices = DualSenseController.enumerate_devices()
except Exception as e:
if not first_failure_logged:
LOG.warning("hidapi enumeration failed: %s", e)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
if not devices:
if not first_failure_logged:
LOG.warning("no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
global _disconnected
# Clear the disconnect flag BEFORE activate() so the HID worker
# thread (started by activate()) can't observe a stale-true flag if
# it raises immediately after spawn — a known failure mode right
# after USB enumeration when the kernel is still re-attaching the
# hidraw node. Without this, _on_controller_error fires inside
# activate(), sets _disconnected=True, then we'd unconditionally
# overwrite it back to False below.
_disconnected = False
try:
controller = DualSenseController(device_index_or_device_info=devices[0])
controller.on_error(_on_controller_error)
controller.activate()
except Exception as e:
if not first_failure_logged:
LOG.warning("controller activation failed: %s", e)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
# Push a clean initial state so we don't inherit residual effects from
# a previous Steam Input session, prior daemon instance, or stale
# firmware state.
reset_all(controller)
LOG.info("dualsense controller connected (%s)", controller.connection_type)
return controller
return None
def _close_controller(controller: Optional[DualSenseController]) -> None:
if controller is None:
return
try:
controller.deactivate()
except Exception:
pass
# --- UDP socket -------------------------------------------------------------
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
"""Create or inherit the UDP listener socket. Honors LISTEN_FDS for
systemd socket activation; falls back to opening a fresh socket bound
to (host, port).
"""
listen_pid = os.environ.get("LISTEN_PID")
listen_fds = os.environ.get("LISTEN_FDS")
if listen_pid and listen_fds and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
LOG.debug("inherited UDP socket fd=3 from systemd")
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.settimeout(timeout)
return sock
_warned_packet_sizes: set[int] = set()
def _parse_packet(data: bytes) -> Optional[ForzaDataPacket]:
n = len(data)
fmt = PACKET_FORMATS.get(n)
if fmt is None:
if n not in _warned_packet_sizes:
_warned_packet_sizes.add(n)
LOG.warning(
"ignoring unrecognised %d-byte UDP packet "
"(expected 311 [FM7 dash] or 324 [FH4/5]); "
"if FM7, switch HUD Data Out to CAR DASH format",
n,
)
return None
try:
return ForzaDataPacket(data, packet_format=fmt)
except Exception:
return None
# --- Main loop --------------------------------------------------------------
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
signal.signal(signal.SIGTERM, _on_termination)
signal.signal(signal.SIGINT, _on_termination)
LOG.info("listening for forza udp on %s:%d", host, port)
sock = _get_socket(host, port)
controller = _connect_controller()
if controller is None:
sock.close()
return 0
state = DaemonState()
last_seen = 0.0
have_telemetry = False
try:
while not _shutdown:
if _disconnected:
LOG.warning("dualsense disconnected; reconnecting")
_close_controller(controller)
controller = _connect_controller()
if controller is None:
return 0
state.reset()
now = time.monotonic()
try:
data, _ = sock.recvfrom(2048)
last_seen = now
have_telemetry = True
except socket.timeout:
if _shutdown:
break
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
reset_all(controller)
state.reset()
have_telemetry = False
if exit_on_idle:
LOG.info("exiting on idle")
break
continue
pkt = _parse_packet(data)
if pkt is None:
continue
in_race = commit_in_race(is_race_on(pkt, state), state)
if in_race:
handle_throttle(controller, pkt, state)
handle_brake(controller, pkt, state)
apply_lightbar(controller, pkt, state, in_race=True)
elif state.last_in_race:
# in-race → menu edge: one-shot reset of state, triggers,
# and lightbar. We do NOT touch the controller again until
# the next in-race packet. Doing per-packet writes between
# races causes audible clicks on the trigger actuator —
# every HID OUT report (even the ones that look idempotent
# via library dedup) re-encodes the trigger config and the
# firmware reacts on receipt.
state.reset()
reset_all(controller)
state.last_in_race = in_race
finally:
try:
reset_all(controller)
except Exception:
pass
_close_controller(controller)
try:
sock.close()
except Exception:
pass
return 0
def main() -> int:
parser = argparse.ArgumentParser(
prog="forza-trigger",
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
)
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
parser.add_argument(
"--debug", action="store_true", help="log per-packet decisions at DEBUG"
)
parser.add_argument(
"--exit-on-idle",
action="store_true",
help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)",
)
args = parser.parse_args()
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
logging.basicConfig(
level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
return run(args.host, args.port, args.exit_on_idle)
if __name__ == "__main__":
sys.exit(main())