User reports periodic clicks and LED color changes on the controller "between races". Theory: out-of-race, the run loop was calling reset_triggers(controller) AND apply_lightbar(controller, ...) on every Forza packet (60Hz). Even though both call sites land in library code that nominally dedupes "same state" writes, in practice any state change anywhere — including the lightbar dropping a green channel value as RPM coasts down to idle — triggers a full HID OUT report rewrite. The OUT report carries the trigger configuration too; the controller firmware reacts on receipt and produces an audible click each time. Fix: out-of-race path becomes edge-triggered. On the in-race → menu transition we run state.reset() + reset_all() once (turning both triggers off and the lightbar to (0,0,0)). Subsequent menu packets make no controller calls at all until in_race flips back to True. First in-race packet then re-engages handlers and the RPM-driven lightbar. Side effect: the menu-mode car-class lightbar coloring is gone — the bar stays black between races. If we want it back later, it should be one-shot on the menu transition (NOT updated per-packet). For now keep it simple: in-race only. Build clean; tests unchanged (54/54 still pass — they exercise handlers directly, not the run loop).
832 lines
36 KiB
Python
832 lines
36 KiB
Python
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
||
|
||
Port of cosmii02/ForzaDSXlegacy ("ForzaDSX") — variant D in our reference
|
||
taxonomy. Replaces the previous Race-Element 1:1 port (variant A) which the
|
||
user found "silent unless slipping".
|
||
|
||
Design (variant D):
|
||
- Continuous baseline resistance on both triggers — the trigger always has
|
||
*some* feel under the finger, scaled by pedal input (L2) or computed
|
||
chassis acceleration (R2). This is the fix for the "no feedback at all"
|
||
complaint.
|
||
- Vibration overlay during wheel slip / wheelspin (both triggers). Frequency
|
||
scales with slip severity, amplitude scales with brake input (L2) or
|
||
avg accel (R2).
|
||
- RPM-reactive lightbar in-race; redline inverts green→red. In menus, the
|
||
lightbar carries the car's class color tinted by its performance index.
|
||
- EWMA smoothing on resistance and vibration frequency, separately tunable
|
||
per channel. Patmagauran's α_throttle=0.01 preserves the "creamy" throttle
|
||
feel; brake gets α=1.0 (instant) so ABS-like flutter isn't smoothed away.
|
||
- Per-trigger global intensity scale via FORZA_L2_INTENSITY / FORZA_R2_INTENSITY
|
||
env vars (∈ [0, 1]). Default 1.0. Lightbar gated by FORZA_LIGHTBAR (default on).
|
||
- IsRaceOn debounce: FH5 sometimes leaves is_race_on=1 after returning to
|
||
a menu. Cosmii's workaround is to count consecutive samples where RPM is
|
||
stable AND power ≤ 0; once the count exceeds RPM_ACCUMULATOR_TRIGGER, we
|
||
override is_race_on to false.
|
||
- No body LRA (the "shakes my whole hand" complaint of earlier iterations
|
||
came from L/R LRA buzz; this design never touches them).
|
||
|
||
Adaptations for the dualsense-controller library (yesbotics 0.3.1):
|
||
- Cosmii drives DSX via JSON-over-UDP at port 6969; we drive the controller
|
||
directly via hidraw and bypass DSX. The mode mapping is:
|
||
Cosmii Resistance(start, force ∈ 0-7) → effect.feedback(start, strength ∈ 0-8)
|
||
Cosmii VibrateResistance(start, freq, stiff) → effect.vibration(start, amp, freq)
|
||
— the position-dependent
|
||
stiffness gradient is lost
|
||
(per static-analysis
|
||
recommendation, option 1);
|
||
the vibration carries the
|
||
slip-event signal which is
|
||
the dominant feel.
|
||
- The library's `effect.vibration` is upstream-flagged "TODO: not working
|
||
properly" but our previous Race-Element 1:1 port used it successfully.
|
||
If the user reports the vibration mode feels broken, the fallback is raw
|
||
HID byte construction using DSX's TriggerEffectGenerator.cs as reference
|
||
(decompiled at /tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/).
|
||
|
||
Divergences from upstream Cosmii:
|
||
- Cosmii's brake-vibration entry condition is `slip < 0.5 AND brake < 10`,
|
||
inverted from Patmagauran's parent fork (`slip > 0.5 AND brake > 10`).
|
||
The inversion gates vibration to "very light brake at very low slip"
|
||
which produces a constant 35Hz buzz when nothing is happening — almost
|
||
certainly a copy-paste bug at the comparison operators when Cosmii forked
|
||
Patmagauran. We use Patmagauran's correct condition (slip > thr ∧ brake > thr).
|
||
- The vibration-frequency formula `freq = MAX - Map(slip, GRIP_LOSS, 1, 0, MAX)`
|
||
used by both Patmagauran and Cosmii is also non-intuitive: it maxes at the
|
||
threshold and decays to zero at full lockup. Race-Element's variant B uses
|
||
the conventional "more slip = higher freq" mapping. We follow Race-Element.
|
||
- We don't use a separate stiffness scale (1-200) for the vibration mode;
|
||
the library's vibration amplitude is 0-8 inclusive.
|
||
|
||
References (all decompiled C# on disk):
|
||
/tmp/impls/legacy_Program.cs cosmii02/ForzaDSXlegacy (primary)
|
||
/tmp/impls/fds_Program.cs + fds_Settings patmagauran/ForzaDualSense (fork parent, defaults)
|
||
/tmp/race-element/Race_Element.HUD.Common__Overlays__Driving__DSX__TriggerHaptics.cs
|
||
Race-Element variant B (slip→freq direction)
|
||
/tmp/dsx/decompiled/ExtendInput.DataTools.DualSense/TriggerEffectGenerator.cs
|
||
DSX byte-level packet generator
|
||
(reference for option-3 raw HID fallback)
|
||
|
||
Setup on the user side: see default.nix.
|
||
"""
|
||
|
||
import argparse
|
||
import logging
|
||
import math
|
||
import os
|
||
import signal
|
||
import socket
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional
|
||
|
||
from dualsense_controller import DualSenseController
|
||
from fdp import ForzaDataPacket
|
||
|
||
|
||
LOG = logging.getLogger("forza-trigger")
|
||
|
||
|
||
# --- Tuning constants (Cosmii / Patmagauran defaults) ------------------------
|
||
# Source citations point to legacy_Program.cs (Cosmii) and fds_Settings.cs
|
||
# (Patmagauran defaults, inherited by Cosmii via INI file).
|
||
|
||
# --- L2 brake ---------------------------------------------------------------
|
||
# fds_Settings.cs (Cosmii inherits via appsettings.ini).
|
||
GRIP_LOSS_VAL = 0.5 # combined-slip threshold for vibration overlay
|
||
BRAKE_VIBRATION_MODE_START = 10 # 0-255 brake input scale; gate for slip mode
|
||
MIN_BRAKE_VIBRATION = 3 # Hz floor; below → revert to feedback only
|
||
MAX_BRAKE_VIBRATION = 35 # Hz ceiling at peak slip
|
||
MIN_BRAKE_RESISTANCE = 1 # baseline strength at zero brake (lib 0-8)
|
||
MAX_BRAKE_RESISTANCE = 6 # baseline strength at full brake (lib 0-8)
|
||
MIN_BRAKE_AMP = 1 # vibration-mode amplitude floor (lib 0-8)
|
||
MAX_BRAKE_AMP = 8 # vibration-mode amplitude ceiling
|
||
EWMA_ALPHA_BRAKE = 1.0 # 1.0 = instant (no smoothing). Patmagauran's choice.
|
||
EWMA_ALPHA_BRAKE_FREQ = 1.0
|
||
|
||
# --- R2 throttle ------------------------------------------------------------
|
||
THROTTLE_GRIP_LOSS_VAL = 0.5
|
||
THROTTLE_VIBRATION_MODE_START = 10 # 0-255 accelerator input scale
|
||
MIN_ACCEL_GRIPLOSS_VIBRATION = 3
|
||
MAX_ACCEL_GRIPLOSS_VIBRATION = 35
|
||
TURN_ACCEL_MOD = 0.5 # AccelX² weight (lateral)
|
||
FORWARD_ACCEL_MOD = 1.0 # AccelZ² weight (longitudinal)
|
||
ACCELERATION_LIMIT = 10.0 # m/s² clamp ceiling
|
||
MIN_THROTTLE_RESISTANCE = 1
|
||
MAX_THROTTLE_RESISTANCE = 6
|
||
MIN_ACCEL_GRIPLOSS_AMP = 1
|
||
MAX_ACCEL_GRIPLOSS_AMP = 8
|
||
EWMA_ALPHA_THROTTLE = 0.01 # very smooth (Patmagauran's "creamy" tuning)
|
||
EWMA_ALPHA_THROTTLE_FREQ = 0.5
|
||
|
||
# Hardcoded by Cosmii (legacy_Program.cs:224): rear-slip alone counts only
|
||
# when accelerator is firmly depressed. Avoids false-positive vibration when
|
||
# coasting on light throttle through slick patches.
|
||
ACCELERATOR_REAR_SLIP_GATE = 200 # 0-255 scale
|
||
|
||
# --- Lightbar ---------------------------------------------------------------
|
||
# In-race: green channel ∝ RPM ratio with redline inversion to red.
|
||
# Out-of-race: car-class color tinted by car performance index.
|
||
RPM_REDLINE_RATIO = 0.85 # above this, green→red inversion
|
||
GREEN_FLOOR = 50 # min green channel value (lightbar visible at idle)
|
||
MAX_CPI = 255.0 # car performance index ceiling for tint
|
||
|
||
# Cosmii car-class palettes (legacy_Program.cs:38-51). Class IDs are Forza's
|
||
# enum: D=0, C=1, B=2, A=3, S1=4, S2=5; X is "above the table".
|
||
CAR_CLASS_COLORS = {
|
||
0: (107, 185, 236), # D — cyan
|
||
1: (234, 202, 49), # C — gold
|
||
2: (211, 90, 37), # B — orange
|
||
3: (187, 59, 34), # A — red
|
||
4: (128, 54, 243), # S1 — purple
|
||
5: (75, 88, 229), # S2 — blue
|
||
}
|
||
COLOR_CLASS_X = (105, 182, 72) # green for above-S2
|
||
|
||
# --- IsRaceOn debounce ------------------------------------------------------
|
||
# FH5 sometimes leaves is_race_on=1 after menu transitions. Count samples
|
||
# where RPM is stable AND power ≤ 0; once the count exceeds the threshold,
|
||
# override is_race_on to false. ~3.3s at 60Hz packet rate. (legacy_Program.cs:35)
|
||
RPM_ACCUMULATOR_TRIGGER_RACE_OFF = 200
|
||
|
||
# --- User-facing intensity scales (env vars) --------------------------------
|
||
# Read once at module import. Process restart picks up new values.
|
||
|
||
def _read_intensity(name: str, default: float = 1.0) -> float:
|
||
"""Parse an intensity env var clamped to [0, 1]. NaN/inf/junk fall back to
|
||
`default` with a warning — silent fallthrough to 1.0 would hide
|
||
misconfigurations (e.g. `FORZA_R2_INTENSITY=nan` is almost certainly
|
||
a typo, not "use full intensity").
|
||
"""
|
||
raw = os.environ.get(name)
|
||
if raw is None:
|
||
return default
|
||
try:
|
||
v = float(raw)
|
||
except ValueError:
|
||
LOG.warning("invalid %s=%r — using %.2f", name, raw, default)
|
||
return default
|
||
if not math.isfinite(v):
|
||
LOG.warning("non-finite %s=%r — using %.2f", name, raw, default)
|
||
return default
|
||
return max(0.0, min(1.0, v))
|
||
|
||
|
||
def _read_bool(name: str, default: bool = True) -> bool:
|
||
"""Parse a boolean env var. Accepts the usual disable-tokens
|
||
{0, false, no, off, ""} (case-insensitive) and treats anything else
|
||
as enabled. Mirrors the strict parsing of `_read_intensity` so that
|
||
typos don't silently flip behavior.
|
||
"""
|
||
raw = os.environ.get(name)
|
||
if raw is None:
|
||
return default
|
||
return raw.strip().lower() not in ("0", "false", "no", "off", "")
|
||
|
||
|
||
LEFT_TRIGGER_INTENSITY = _read_intensity("FORZA_L2_INTENSITY")
|
||
RIGHT_TRIGGER_INTENSITY = _read_intensity("FORZA_R2_INTENSITY")
|
||
LIGHTBAR_ENABLED = _read_bool("FORZA_LIGHTBAR", default=True)
|
||
|
||
# --- Forza UDP packet sizes -> fdp packet_format strings --------------------
|
||
# Note: the FM7 V1 232-byte sled format does NOT include `accel`, `brake`,
|
||
# `power`, `car_class`, or `car_performance_index` — fdp doesn't setattr
|
||
# those names from a sled-format unpack, and our handlers depend on them.
|
||
# Driving a sled stream would silently leave both triggers off. We don't
|
||
# advertise FM7 sled support; if the user ever points an FM7 sled feed at us,
|
||
# 232-byte packets are simply dropped at _parse_packet.
|
||
PACKET_FORMATS = {
|
||
311: "dash", # Forza Motorsport 7 V2 car-dash format
|
||
324: "fh4", # Forza Horizon 4 / 5 (12-byte gap that fdp's fh4 mode patches around)
|
||
}
|
||
|
||
# --- Daemon lifecycle constants ---------------------------------------------
|
||
IDLE_TIMEOUT_S = 3.0
|
||
RECONNECT_BACKOFF_S = 1.0
|
||
|
||
|
||
# --- Module state (signal + hot-plug flags) ---------------------------------
|
||
_shutdown = False
|
||
_disconnected = False
|
||
|
||
|
||
def _on_termination(signum: int, frame: object) -> None:
|
||
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag."""
|
||
global _shutdown
|
||
LOG.info("received signal %d — shutting down", signum)
|
||
_shutdown = True
|
||
|
||
|
||
def _on_controller_error(exc: Exception) -> None:
|
||
"""dualsense-controller `on_error` callback for HID read-thread failures.
|
||
|
||
The library dispatches state-change callbacks by parameter count
|
||
(StateValueCallbackManager.py): a 1-arg callback is bound to
|
||
`_event_name_1_args` which emits `(new_value)`. A 2-arg callback would
|
||
receive `(new_value, timestamp_ns)` instead — meaning a `(prev, exc)`
|
||
handler would log a stray monotonic-clock integer instead of the
|
||
exception. 1-arg keeps observability tight.
|
||
"""
|
||
global _disconnected
|
||
LOG.warning("dualsense controller error: %s", exc)
|
||
_disconnected = True
|
||
|
||
|
||
# --- Helpers ----------------------------------------------------------------
|
||
|
||
|
||
def _safe_field(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float:
|
||
"""Read a packet field defensively. Returns default for missing fields,
|
||
NaN, or inf. Forza packets are fixed-format but mismatched format strings
|
||
can produce NaN; this filters them so haptic math never receives garbage.
|
||
"""
|
||
try:
|
||
v = float(getattr(pkt, name))
|
||
except (AttributeError, TypeError, ValueError):
|
||
return default
|
||
if math.isnan(v) or math.isinf(v):
|
||
return default
|
||
return v
|
||
|
||
|
||
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
|
||
return abs(_safe_field(pkt, name, 0.0))
|
||
|
||
|
||
def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
|
||
"""Cosmii's Map function (legacy_Program.cs:478-484, also fds_Program.cs:276).
|
||
|
||
Like Arduino's `map()`: clamp x to the input range, then linearly remap to
|
||
the output range. Used pervasively throughout the references.
|
||
"""
|
||
if in_max == in_min:
|
||
return out_min
|
||
x = max(in_min, min(in_max, x))
|
||
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
||
|
||
|
||
def _ewma(value: float, last: float, alpha: float) -> float:
|
||
"""Exponential weighted moving average (legacy_Program.cs:747).
|
||
|
||
α=1.0 → instant (output = value), α=0.01 → very smooth (~100-sample lag).
|
||
"""
|
||
return alpha * value + (1.0 - alpha) * last
|
||
|
||
|
||
def _combined_slip(pkt: ForzaDataPacket) -> tuple[float, float, float]:
|
||
"""(combined_all, combined_front, combined_rear) — Cosmii's slip aggregates.
|
||
|
||
Cosmii uses the arithmetic mean per axle (legacy_Program.cs:112-114),
|
||
distinct from Race-Element's max-of-axle. The mean is more conservative
|
||
and reduces single-tire-pulse chatter.
|
||
"""
|
||
fl = _safe_abs(pkt, "tire_combined_slip_FL")
|
||
fr = _safe_abs(pkt, "tire_combined_slip_FR")
|
||
rl = _safe_abs(pkt, "tire_combined_slip_RL")
|
||
rr = _safe_abs(pkt, "tire_combined_slip_RR")
|
||
return (fl + fr + rl + rr) / 4.0, (fl + fr) / 2.0, (rl + rr) / 2.0
|
||
|
||
|
||
def _avg_accel(pkt: ForzaDataPacket) -> float:
|
||
"""Cosmii's avgAccel (legacy_Program.cs:219). Lateral and longitudinal
|
||
chassis accel combined under per-axis weights. Used as the input to the
|
||
throttle-baseline resistance curve.
|
||
"""
|
||
ax = _safe_field(pkt, "acceleration_x", 0.0)
|
||
az = _safe_field(pkt, "acceleration_z", 0.0)
|
||
return math.sqrt(TURN_ACCEL_MOD * ax * ax + FORWARD_ACCEL_MOD * az * az)
|
||
|
||
|
||
# --- Daemon state -----------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class DaemonState:
|
||
"""Per-session state: EWMA filter cells + IsRaceOn debounce + transition flags.
|
||
|
||
EWMA cell semantics are unscaled — they store the raw smoothed input.
|
||
Per-trigger intensity (LEFT/RIGHT_TRIGGER_INTENSITY) is applied at output
|
||
time only. Storing the scaled value back would compound the scale every
|
||
tick (steady-state for α<1 collapses to ~0 with intensity<1).
|
||
|
||
`was_throttle_slipping` / `was_brake_slipping` track the previous tick's
|
||
slip state so we can bypass EWMA on transition INTO a slip event. Without
|
||
this, α=0.01 throttle smoothing means the first ~100 packets of a slip
|
||
event (~1.7s at 60Hz) feel barely audible.
|
||
|
||
Initial values from Cosmii's static initializers (legacy_Program.cs:23-26),
|
||
rescaled to the dualsense-controller library's 0-8 strength domain.
|
||
"""
|
||
last_throttle_resistance: float = 1.0
|
||
last_throttle_freq: float = 0.0
|
||
last_brake_resistance: float = 1.0
|
||
last_brake_freq: float = 0.0
|
||
was_throttle_slipping: bool = False
|
||
was_brake_slipping: bool = False
|
||
# Tracks the previous tick's in_race verdict; the run-loop uses this
|
||
# to detect in-race → menu transitions and partial-reset the EWMA cells
|
||
# + slip flags so the next race resumption gets a clean cold-start
|
||
# path (otherwise stale slip flags suppress the seeding fix in
|
||
# handle_throttle/handle_brake on the first packet of race 2).
|
||
last_in_race: bool = False
|
||
# IsRaceOn debounce.
|
||
last_rpm: float = 0.0
|
||
rpm_accumulator: int = 0
|
||
# Last-known valid car class / CPI. We always update on observation —
|
||
# Cosmii's `> 0` guard treats Class-D (enum value 0) as "no info" and
|
||
# leaves the lightbar showing the previous class color, which is wrong.
|
||
last_car_class: int = 0
|
||
last_cpi: int = 0
|
||
# Last lightbar color (skip identical writes). Reset to (0,0,0) on idle
|
||
# so that resuming with the same color re-pushes after the (0,0,0) reset
|
||
# we send to the controller; otherwise the bar stays black.
|
||
last_color: tuple[int, int, int] = (0, 0, 0)
|
||
|
||
def reset(self) -> None:
|
||
"""Reset filters, debounce, and lightbar dedup — call when telemetry
|
||
resumes after idle so the next packet's effects fire from a clean
|
||
baseline.
|
||
"""
|
||
self.last_throttle_resistance = 1.0
|
||
self.last_throttle_freq = 0.0
|
||
self.last_brake_resistance = 1.0
|
||
self.last_brake_freq = 0.0
|
||
self.was_throttle_slipping = False
|
||
self.was_brake_slipping = False
|
||
self.last_rpm = 0.0
|
||
self.rpm_accumulator = 0
|
||
self.last_color = (0, 0, 0)
|
||
|
||
|
||
# --- Race detection ---------------------------------------------------------
|
||
|
||
|
||
def is_race_on(pkt: ForzaDataPacket, state: DaemonState) -> bool:
|
||
"""Cosmii's IsRaceOn debounce (legacy_Program.cs:89-109).
|
||
|
||
Returns True iff the car is actively being driven. Combines the explicit
|
||
is_race_on flag with the RPM-stability + zero-power workaround for FH5's
|
||
unreliable flag.
|
||
"""
|
||
flag = bool(_safe_field(pkt, "is_race_on", 0.0))
|
||
current_rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
|
||
power = _safe_field(pkt, "power", 0.0)
|
||
|
||
if abs(current_rpm - state.last_rpm) < 1e-3 and power <= 0:
|
||
state.rpm_accumulator += 1
|
||
if state.rpm_accumulator > RPM_ACCUMULATOR_TRIGGER_RACE_OFF:
|
||
flag = False
|
||
else:
|
||
state.rpm_accumulator = 0
|
||
|
||
state.last_rpm = current_rpm
|
||
return flag
|
||
|
||
|
||
# --- Trigger handlers -------------------------------------------------------
|
||
|
||
|
||
def handle_throttle(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
|
||
"""R2 throttle. Port of legacy_Program.cs:216-263.
|
||
|
||
Modes:
|
||
- In-race + slip: vibration overlay (freq from slip, amp from avgAccel).
|
||
If filtered freq <= MIN_ACCEL_GRIPLOSS_VIBRATION
|
||
OR accelerator <= THROTTLE_VIBRATION_MODE_START,
|
||
fall through to feedback-only (avoids audible
|
||
clicking at very low frequencies).
|
||
- In-race + no slip: continuous feedback resistance scaled by avgAccel.
|
||
- Pedal at rest: baseline still active at MIN strength so the
|
||
trigger always has *some* feel under the finger.
|
||
Trigger is only `effect.off()` when out of race
|
||
(handled by the run-loop, not here).
|
||
"""
|
||
accel = _safe_field(pkt, "accel", 0.0)
|
||
|
||
avg_a = _avg_accel(pkt)
|
||
_, combined_front, combined_rear = _combined_slip(pkt)
|
||
|
||
losing_grip = (
|
||
combined_front > THROTTLE_GRIP_LOSS_VAL
|
||
or (combined_rear > THROTTLE_GRIP_LOSS_VAL and accel > ACCELERATOR_REAR_SLIP_GATE)
|
||
)
|
||
|
||
if losing_grip:
|
||
# Vibration mode. Frequency scales with slip severity (Race-Element's
|
||
# convention: more slip = higher freq, opposite of Patmagauran's
|
||
# decay-from-threshold formula). Amplitude scales with avgAccel so
|
||
# heavier acceleration during slip = stronger buzz.
|
||
# Use the slipping axle's slip for the frequency curve, not the
|
||
# all-4-tire average. Cosmii's `combinedTireSlip` (mean of all 4)
|
||
# collapses to ~slip/2 for one-axle slip events (RWD wheelspin =
|
||
# rear=0.8, front=0 → combined=0.4) which lands below threshold
|
||
# and silently falls through to feedback-only mode. RWD burnouts
|
||
# are the most common Forza wheelspin event; we want vibration there.
|
||
slip_for_freq = max(combined_front, combined_rear)
|
||
slip_above = max(0.0, slip_for_freq - THROTTLE_GRIP_LOSS_VAL)
|
||
slip_range = max(1e-6, 1.0 - THROTTLE_GRIP_LOSS_VAL)
|
||
freq_raw = MIN_ACCEL_GRIPLOSS_VIBRATION + (
|
||
MAX_ACCEL_GRIPLOSS_VIBRATION - MIN_ACCEL_GRIPLOSS_VIBRATION
|
||
) * min(1.0, slip_above / slip_range)
|
||
amp_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_ACCEL_GRIPLOSS_AMP, MAX_ACCEL_GRIPLOSS_AMP)
|
||
|
||
# Bypass EWMA on the first packet of a slip event so the buzz is
|
||
# immediate. With α=0.01 throttle smoothing, the EWMA-warmed first
|
||
# packet would converge over ~1.7s — slip events are sub-second.
|
||
if not state.was_throttle_slipping:
|
||
state.last_throttle_freq = freq_raw
|
||
state.last_throttle_resistance = amp_raw
|
||
freq_unscaled = _ewma(freq_raw, state.last_throttle_freq, EWMA_ALPHA_THROTTLE_FREQ)
|
||
amp_unscaled = _ewma(amp_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
|
||
# Store unscaled — applying intensity here would compound the scale
|
||
# every tick (steady-state collapses to ~0 with intensity<1, α<1).
|
||
state.last_throttle_freq = freq_unscaled
|
||
state.last_throttle_resistance = amp_unscaled
|
||
freq_out = freq_unscaled * RIGHT_TRIGGER_INTENSITY
|
||
amp_out = amp_unscaled * RIGHT_TRIGGER_INTENSITY
|
||
state.was_throttle_slipping = True
|
||
|
||
if freq_out <= MIN_ACCEL_GRIPLOSS_VIBRATION or accel <= THROTTLE_VIBRATION_MODE_START:
|
||
# Fallback to baseline-style feedback if the computed vibration
|
||
# is too quiet to be useful.
|
||
strength = max(MIN_THROTTLE_RESISTANCE, min(MAX_THROTTLE_RESISTANCE, int(round(amp_out))))
|
||
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
|
||
else:
|
||
f = max(1, min(255, int(round(freq_out))))
|
||
a = max(MIN_ACCEL_GRIPLOSS_AMP, min(MAX_ACCEL_GRIPLOSS_AMP, int(round(amp_out))))
|
||
controller.right_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
|
||
else:
|
||
# Baseline mode. Strength scales with avgAccel — at idle the trigger
|
||
# is barely resistant; under heavy accel it firms up.
|
||
resistance_raw = _map(avg_a, 0, ACCELERATION_LIMIT, MIN_THROTTLE_RESISTANCE, MAX_THROTTLE_RESISTANCE)
|
||
resistance_unscaled = _ewma(resistance_raw, state.last_throttle_resistance, EWMA_ALPHA_THROTTLE)
|
||
state.last_throttle_resistance = resistance_unscaled
|
||
state.was_throttle_slipping = False
|
||
strength = max(
|
||
MIN_THROTTLE_RESISTANCE,
|
||
min(MAX_THROTTLE_RESISTANCE, int(round(resistance_unscaled * RIGHT_TRIGGER_INTENSITY))),
|
||
)
|
||
controller.right_trigger.effect.feedback(start_position=0, strength=strength)
|
||
|
||
|
||
def handle_brake(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState) -> None:
|
||
"""L2 brake. Port of legacy_Program.cs:281-313 with the bug fix described
|
||
in the module docstring (Cosmii's inverted comparison operators replaced
|
||
with Patmagauran's correct logic: vibration when slip > thr AND brake > thr).
|
||
|
||
Modes:
|
||
- In-race + slip: vibration overlay (freq from slip, amp from brake input).
|
||
Below MIN_BRAKE_VIBRATION freq → revert to feedback-only.
|
||
- In-race + no slip: continuous feedback resistance scaled by brake input.
|
||
- Pedal at rest: baseline still active at MIN strength so the
|
||
trigger always has some feel.
|
||
"""
|
||
brake = _safe_field(pkt, "brake", 0.0)
|
||
|
||
# Use the worst axle's slip for both detection and the freq curve.
|
||
# Cosmii's all-4 average misses single-axle ABS events (front locks
|
||
# first under weight transfer; combined-average can stay below 0.5
|
||
# even with the front fully locked). Race-Element variant B uses
|
||
# max-of-axle for the same reason.
|
||
_, combined_front, combined_rear = _combined_slip(pkt)
|
||
slip_for_freq = max(combined_front, combined_rear)
|
||
slipping = slip_for_freq > GRIP_LOSS_VAL and brake > BRAKE_VIBRATION_MODE_START
|
||
|
||
if slipping:
|
||
slip_above = max(0.0, slip_for_freq - GRIP_LOSS_VAL)
|
||
slip_range = max(1e-6, 1.0 - GRIP_LOSS_VAL)
|
||
freq_raw = MIN_BRAKE_VIBRATION + (
|
||
MAX_BRAKE_VIBRATION - MIN_BRAKE_VIBRATION
|
||
) * min(1.0, slip_above / slip_range)
|
||
amp_raw = _map(brake, 0, 255, MIN_BRAKE_AMP, MAX_BRAKE_AMP)
|
||
|
||
# Bypass EWMA on slip-event entry; same reasoning as throttle.
|
||
if not state.was_brake_slipping:
|
||
state.last_brake_freq = freq_raw
|
||
state.last_brake_resistance = amp_raw
|
||
freq_unscaled = _ewma(freq_raw, state.last_brake_freq, EWMA_ALPHA_BRAKE_FREQ)
|
||
amp_unscaled = _ewma(amp_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
|
||
state.last_brake_freq = freq_unscaled
|
||
state.last_brake_resistance = amp_unscaled
|
||
freq_out = freq_unscaled * LEFT_TRIGGER_INTENSITY
|
||
amp_out = amp_unscaled * LEFT_TRIGGER_INTENSITY
|
||
state.was_brake_slipping = True
|
||
|
||
if freq_out <= MIN_BRAKE_VIBRATION:
|
||
strength = max(MIN_BRAKE_RESISTANCE, min(MAX_BRAKE_RESISTANCE, int(round(amp_out))))
|
||
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
|
||
else:
|
||
f = max(1, min(255, int(round(freq_out))))
|
||
a = max(MIN_BRAKE_AMP, min(MAX_BRAKE_AMP, int(round(amp_out))))
|
||
controller.left_trigger.effect.vibration(start_position=0, amplitude=a, frequency=f)
|
||
else:
|
||
resistance_raw = _map(brake, 0, 255, MIN_BRAKE_RESISTANCE, MAX_BRAKE_RESISTANCE)
|
||
resistance_unscaled = _ewma(resistance_raw, state.last_brake_resistance, EWMA_ALPHA_BRAKE)
|
||
state.last_brake_resistance = resistance_unscaled
|
||
state.was_brake_slipping = False
|
||
strength = max(
|
||
MIN_BRAKE_RESISTANCE,
|
||
min(MAX_BRAKE_RESISTANCE, int(round(resistance_unscaled * LEFT_TRIGGER_INTENSITY))),
|
||
)
|
||
controller.left_trigger.effect.feedback(start_position=0, strength=strength)
|
||
|
||
|
||
# --- Lightbar ---------------------------------------------------------------
|
||
|
||
|
||
def lightbar_color(pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> tuple[int, int, int]:
|
||
"""Compute the (R, G, B) the lightbar should display this tick.
|
||
|
||
In-race: green ∝ RPM ratio, with redline inversion (the green channel is
|
||
flipped to 255-G so the bar appears red as you approach the rev limit).
|
||
Out-of-race: car-class color tinted by car performance index.
|
||
|
||
Pure function — caller is responsible for actually pushing the color to
|
||
the controller (skipping if unchanged from `state.last_color`).
|
||
"""
|
||
if in_race:
|
||
rpm = _safe_field(pkt, "current_engine_rpm", 0.0)
|
||
idle = _safe_field(pkt, "engine_idle_rpm", 0.0)
|
||
max_rpm = _safe_field(pkt, "engine_max_rpm", 0.0)
|
||
rpm_range = max(1.0, max_rpm - idle)
|
||
ratio = max(0.0, min(1.0, (rpm - idle) / rpm_range))
|
||
green = max(GREEN_FLOOR, int(ratio * 255))
|
||
red = int(ratio * 255)
|
||
if ratio >= RPM_REDLINE_RATIO:
|
||
green = 255 - green
|
||
return (red, green, 0)
|
||
|
||
# Menu mode: car-class color tinted by performance index. We always update
|
||
# on observation — Cosmii's `> 0` guard treats Class-D (enum value 0) as
|
||
# "no info" and leaves the previous class color stuck on the lightbar
|
||
# when switching INTO a Class-D car. fdp always sets these fields from
|
||
# an fh4/dash unpack, so absence is impossible — only the value matters.
|
||
state.last_car_class = int(_safe_field(pkt, "car_class", 0.0))
|
||
state.last_cpi = max(0, min(int(MAX_CPI), int(_safe_field(pkt, "car_performance_index", 0.0))))
|
||
|
||
base = CAR_CLASS_COLORS.get(state.last_car_class, COLOR_CLASS_X)
|
||
if state.last_car_class > 5:
|
||
# X-class: untinted constant green.
|
||
return COLOR_CLASS_X
|
||
cpi_ratio = state.last_cpi / MAX_CPI if state.last_cpi > 0 else 0.0
|
||
return (
|
||
int(cpi_ratio * base[0]),
|
||
int(cpi_ratio * base[1]),
|
||
int(cpi_ratio * base[2]),
|
||
)
|
||
|
||
|
||
def apply_lightbar(controller: DualSenseController, pkt: ForzaDataPacket, state: DaemonState, in_race: bool) -> None:
|
||
if not LIGHTBAR_ENABLED:
|
||
return
|
||
color = lightbar_color(pkt, state, in_race)
|
||
if color == state.last_color:
|
||
return
|
||
state.last_color = color
|
||
controller.lightbar.set_color(*color)
|
||
|
||
|
||
# --- Reset ------------------------------------------------------------------
|
||
|
||
|
||
def reset_triggers(controller: Optional[DualSenseController]) -> None:
|
||
"""Default both triggers — called on idle, reconnect, and shutdown.
|
||
Tolerates a None controller so the shutdown-during-reconnect cleanup
|
||
path doesn't raise.
|
||
"""
|
||
if controller is None:
|
||
return
|
||
controller.left_trigger.effect.off()
|
||
controller.right_trigger.effect.off()
|
||
|
||
|
||
def reset_all(controller: Optional[DualSenseController]) -> None:
|
||
"""reset_triggers + lightbar to (0, 0, 0). Tolerates None controller."""
|
||
if controller is None:
|
||
return
|
||
reset_triggers(controller)
|
||
if LIGHTBAR_ENABLED:
|
||
try:
|
||
controller.lightbar.set_color(0, 0, 0)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# --- Connection / hot-plug --------------------------------------------------
|
||
|
||
|
||
def _connect_controller() -> Optional[DualSenseController]:
|
||
"""Block until a DualSense is reachable. Returns the activated controller,
|
||
or None if shutdown was requested before one appeared.
|
||
"""
|
||
LOG.info("opening dualsense controller")
|
||
first_failure_logged = False
|
||
while not _shutdown:
|
||
try:
|
||
devices = DualSenseController.enumerate_devices()
|
||
except Exception as e:
|
||
if not first_failure_logged:
|
||
LOG.warning("hidapi enumeration failed: %s", e)
|
||
first_failure_logged = True
|
||
time.sleep(RECONNECT_BACKOFF_S)
|
||
continue
|
||
|
||
if not devices:
|
||
if not first_failure_logged:
|
||
LOG.warning("no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S)
|
||
first_failure_logged = True
|
||
time.sleep(RECONNECT_BACKOFF_S)
|
||
continue
|
||
|
||
|
||
global _disconnected
|
||
# Clear the disconnect flag BEFORE activate() so the HID worker
|
||
# thread (started by activate()) can't observe a stale-true flag if
|
||
# it raises immediately after spawn — a known failure mode right
|
||
# after USB enumeration when the kernel is still re-attaching the
|
||
# hidraw node. Without this, _on_controller_error fires inside
|
||
# activate(), sets _disconnected=True, then we'd unconditionally
|
||
# overwrite it back to False below.
|
||
_disconnected = False
|
||
try:
|
||
controller = DualSenseController(device_index_or_device_info=devices[0])
|
||
controller.on_error(_on_controller_error)
|
||
controller.activate()
|
||
except Exception as e:
|
||
if not first_failure_logged:
|
||
LOG.warning("controller activation failed: %s", e)
|
||
first_failure_logged = True
|
||
time.sleep(RECONNECT_BACKOFF_S)
|
||
continue
|
||
|
||
# Push a clean initial state so we don't inherit residual effects from
|
||
# a previous Steam Input session, prior daemon instance, or stale
|
||
# firmware state.
|
||
reset_all(controller)
|
||
LOG.info("dualsense controller connected (%s)", controller.connection_type)
|
||
return controller
|
||
|
||
return None
|
||
|
||
|
||
def _close_controller(controller: Optional[DualSenseController]) -> None:
|
||
if controller is None:
|
||
return
|
||
try:
|
||
controller.deactivate()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# --- UDP socket -------------------------------------------------------------
|
||
|
||
|
||
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
|
||
"""Create or inherit the UDP listener socket. Honors LISTEN_FDS for
|
||
systemd socket activation; falls back to opening a fresh socket bound
|
||
to (host, port).
|
||
"""
|
||
listen_pid = os.environ.get("LISTEN_PID")
|
||
listen_fds = os.environ.get("LISTEN_FDS")
|
||
if listen_pid and listen_fds and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
|
||
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
|
||
LOG.debug("inherited UDP socket fd=3 from systemd")
|
||
else:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.bind((host, port))
|
||
sock.settimeout(timeout)
|
||
return sock
|
||
|
||
|
||
_warned_packet_sizes: set[int] = set()
|
||
|
||
|
||
def _parse_packet(data: bytes) -> Optional[ForzaDataPacket]:
|
||
n = len(data)
|
||
fmt = PACKET_FORMATS.get(n)
|
||
if fmt is None:
|
||
if n not in _warned_packet_sizes:
|
||
_warned_packet_sizes.add(n)
|
||
LOG.warning(
|
||
"ignoring unrecognised %d-byte UDP packet "
|
||
"(expected 311 [FM7 dash] or 324 [FH4/5]); "
|
||
"if FM7, switch HUD Data Out to CAR DASH format",
|
||
n,
|
||
)
|
||
return None
|
||
try:
|
||
return ForzaDataPacket(data, packet_format=fmt)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# --- Main loop --------------------------------------------------------------
|
||
|
||
|
||
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
||
signal.signal(signal.SIGTERM, _on_termination)
|
||
signal.signal(signal.SIGINT, _on_termination)
|
||
|
||
LOG.info("listening for forza udp on %s:%d", host, port)
|
||
sock = _get_socket(host, port)
|
||
|
||
controller = _connect_controller()
|
||
if controller is None:
|
||
sock.close()
|
||
return 0
|
||
|
||
state = DaemonState()
|
||
last_seen = 0.0
|
||
have_telemetry = False
|
||
|
||
try:
|
||
while not _shutdown:
|
||
if _disconnected:
|
||
LOG.warning("dualsense disconnected; reconnecting")
|
||
_close_controller(controller)
|
||
controller = _connect_controller()
|
||
if controller is None:
|
||
return 0
|
||
state.reset()
|
||
|
||
now = time.monotonic()
|
||
try:
|
||
data, _ = sock.recvfrom(2048)
|
||
last_seen = now
|
||
have_telemetry = True
|
||
except socket.timeout:
|
||
if _shutdown:
|
||
break
|
||
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
||
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
|
||
reset_all(controller)
|
||
state.reset()
|
||
have_telemetry = False
|
||
if exit_on_idle:
|
||
LOG.info("exiting on idle")
|
||
break
|
||
continue
|
||
|
||
pkt = _parse_packet(data)
|
||
if pkt is None:
|
||
continue
|
||
|
||
in_race = is_race_on(pkt, state)
|
||
if in_race:
|
||
handle_throttle(controller, pkt, state)
|
||
handle_brake(controller, pkt, state)
|
||
apply_lightbar(controller, pkt, state, in_race=True)
|
||
elif state.last_in_race:
|
||
# in-race → menu edge: one-shot reset of state, triggers,
|
||
# and lightbar. We do NOT touch the controller again until
|
||
# the next in-race packet. Doing per-packet writes between
|
||
# races causes audible clicks on the trigger actuator —
|
||
# every HID OUT report (even the ones that look idempotent
|
||
# via library dedup) re-encodes the trigger config and the
|
||
# firmware reacts on receipt.
|
||
state.reset()
|
||
reset_all(controller)
|
||
state.last_in_race = in_race
|
||
finally:
|
||
try:
|
||
reset_all(controller)
|
||
except Exception:
|
||
pass
|
||
_close_controller(controller)
|
||
try:
|
||
sock.close()
|
||
except Exception:
|
||
pass
|
||
|
||
return 0
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(
|
||
prog="forza-trigger",
|
||
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
|
||
)
|
||
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
|
||
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
|
||
parser.add_argument(
|
||
"--debug", action="store_true", help="log per-packet decisions at DEBUG"
|
||
)
|
||
parser.add_argument(
|
||
"--exit-on-idle",
|
||
action="store_true",
|
||
help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
|
||
logging.basicConfig(
|
||
level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s"
|
||
)
|
||
return run(args.host, args.port, args.exit_on_idle)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|