Two issues in the deployed daemon:
1. After FH5 exits, the lightbar stayed lit. reset_triggers() touched
only triggers; pydualsense's BG sendReport thread kept re-publishing
whatever TouchpadColor we last set, so the controller stayed in the
last race color forever.
2. R2 had residual tension in FH5's main menu and on the desktop after
a race. Pre-race / idle states were emitting RacingDSX's NormalTrigger
(mode byte 0x00), which per Sony's docs (Nielk1 Rev6) only clears
state without retracting the trigger motor; mode 0x05 (canonical Off
/ Reset) actively returns the trigger to neutral. RacingDSX-on-Windows
gets away with 0x00 because something else (Steam Input or the OS)
reliably resets the motor on focus loss; on Linux nothing does.
Fixes:
- Drop _apply_normal/DS_MODE_NORMAL. Use _apply_off (mode 0x05) for every
'release the trigger' intent: pre-race per-frame, idle timeout, mid-race
zero-strength fallback, shutdown.
- Add reset_lightbar() that writes RGB(0,0,0).
- Track have_telemetry and fire the idle-timeout branch whenever
telemetry has been silent for IDLE_TIMEOUT_S, regardless of in_race.
Reset both triggers and lightbar in that branch.
Documented as divergence #4 in the module docstring.
819 lines
33 KiB
Python
819 lines
33 KiB
Python
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
|
|
|
This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline.
|
|
Every numeric value, every threshold, every map() / EWMA() coefficient,
|
|
and every output byte sequence has been verified against published
|
|
sources or against decompiled DSX 1.4.9 itself.
|
|
|
|
Sources, in priority order:
|
|
|
|
1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31)
|
|
decompiled with ILSpy. The decompilation revealed:
|
|
a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as
|
|
its trigger-effect encoder.
|
|
b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's
|
|
high-level CustomTriggerValueMode names to mode bytes:
|
|
VibrateResistance -> 6 (Simple_Vibration / 0x06)
|
|
VibrateResistanceA / AB -> 38 (Vibration / 0x26)
|
|
VibrateResistanceB -> 6
|
|
When the dispatcher hits the `else` branch in
|
|
DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than
|
|
9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into
|
|
the trigger param region — no bit-packing, no scale conversion.
|
|
This is why RacingDSX's 0-255-scale stiffness values ARE the
|
|
actual amplitude bytes that reach the controller's firmware.
|
|
|
|
2. Nielk1's reverse-engineering gist Rev 6 (MIT,
|
|
https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db).
|
|
Source for the canonical Sony bit-packed Feedback (0x21) encoder used
|
|
for the non-slip path. Identical to the implementation shipped inside
|
|
DSX 1.4.9.
|
|
|
|
3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for
|
|
Forza Horizon 4 / 5 since 2022. Specifically:
|
|
Config/ThrottleSettings.cs, Config/BrakeSettings.cs,
|
|
GameParsers/Parser.cs.
|
|
|
|
The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is
|
|
provided by pydualsense (PyPI, MIT). We drive its low-level
|
|
`triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because
|
|
pydualsense's high-level setMode/setForce API does not understand any
|
|
specific mode's parameter encoding — it just shovels bytes into the
|
|
output report at fixed offsets. That is exactly what we want.
|
|
|
|
## Documented intentional divergences from RacingDSX
|
|
|
|
1. Motion gate (`_is_in_motion()`): slip detection is gated on speed or wheel
|
|
rotation. RacingDSX has no gate, so locked stationary wheels (e.g. after a
|
|
hard stop with brake held) keep the slip path active forever and the trigger
|
|
stuck in vibration mode. The gate fixes the user-reported bug.
|
|
|
|
2. Clamp-to-8 in `_apply_feedback()`: DSX's `TriggerEffectGenerator.Resistance`
|
|
silently skips when force > 8, leaving the trigger stuck in whatever mode it
|
|
was in (Simple_Vibration during slip\u2192non-slip transitions). We clamp to 8
|
|
instead so the transition produces a smooth Feedback ramp.
|
|
|
|
3. Car performance index width: fdp parses `car_performance_index` as Int32 (the
|
|
field's actual width per Forza's spec), while RacingDSX's `FMData.cs` reads
|
|
only `GetUInt8(bytes, 220)` \u2014 the low byte. For any car with CPI > 255 (B/A/
|
|
S1/S2/X) the two implementations disagree on the pre-race lightbar's CPI
|
|
tint. We use the correct value; RacingDSX's lightbar is dimmer/inconsistent
|
|
on those cars. Mask `cpi & 0xFF` in `apply_lightbar_pre_race` to match
|
|
RacingDSX byte-for-byte if you want bug-faithful Windows-equivalent dimming.
|
|
|
|
4. Trigger release uses canonical Off (mode 0x05) instead of Normal (mode 0x00).
|
|
RacingDSX dispatches `TriggerMode.Normal` for pre-race / between-race state,
|
|
which becomes mode byte 0x00. Per Sony's docs (Nielk1 Rev 6), mode 0x00 only
|
|
*clears* state and does not retract the trigger motor; mode 0x05 *actively*
|
|
returns the trigger stop to neutral. On Linux/pydualsense, RacingDSX's 0x00
|
|
leaves R2 with residual tension after a race ends because nothing forces a
|
|
physical reset (Windows DSX or Steam Input must do it on that platform).
|
|
We always emit 0x05 for any \"trigger should feel free\" intent.
|
|
|
|
## Threading note
|
|
|
|
pydualsense's `sendReport` background thread reads `triggerR/L.mode` and
|
|
`forces[0..6]` independently \u2014 there's no atomic publish primitive. Our
|
|
`_apply_*` helpers write `forces[]` first and `mode` last; the BG thread reads
|
|
`mode` first, so this ordering keeps the worst-case torn frame to one ~4 ms
|
|
HID write at slip\u2194non-slip mode transitions. Audible as a brief click on
|
|
transitions, not stuck state. Without lock/atomic primitives in pydualsense's
|
|
API this is the cleanest mitigation available.
|
|
|
|
## System interaction notes
|
|
|
|
**Single-controller assumption.** pydualsense's `__find_device` enumerates all
|
|
DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the
|
|
last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)`
|
|
without serial/path \u2014 `hid_open` returns the first match, which is not
|
|
necessarily the one selected. With multiple DualSense controllers the picked
|
|
controller is non-deterministic. pydualsense's source explicitly notes
|
|
`# TODO: implement multiple controllers working`. RacingDSX/DSX are also
|
|
single-controller (DSX's `connectedController` is a singleton). Forza Horizon
|
|
is single-player so this is fine in practice; if multi-controller selection
|
|
matters, monkey-patch `__find_device` to filter by `serial_number`.
|
|
|
|
**Steam Input.** When Steam Input's PlayStation Configuration Support is
|
|
enabled for the game, Steam intercepts hidraw input AND writes its own HID
|
|
output reports (rumble, lightbar, sometimes triggers). Our daemon writes
|
|
competing output reports at ~1 kHz; the controller observes whichever wrote
|
|
last. Effect: trigger oscillates and feels broken. The Nix module's README
|
|
in `default.nix` instructs users to disable PlayStation Configuration Support
|
|
for Forza in Steam (Settings \u2192 Controller).
|
|
|
|
**dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single-
|
|
shot writes from `dualsensectl trigger left feedback ...` get overwritten by
|
|
our BG thread's next iteration ~4 ms later. Use it only when the daemon is
|
|
stopped (`systemctl --user stop forza-trigger`).
|
|
|
|
**Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on
|
|
hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls
|
|
`ds.report_thread.is_alive()` and reconnects in-process via
|
|
`_connect_controller()`, which retries `pydualsense.init()` every
|
|
`RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not
|
|
depend on systemd or any other supervisor for plug-event recovery; running it
|
|
directly from a shell handles unplug/replug exactly the same way.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import math
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
from fdp import ForzaDataPacket
|
|
from pydualsense import TriggerModes, pydualsense
|
|
|
|
LOG = logging.getLogger("forza-trigger")
|
|
|
|
# --- Mode bytes ---------------------------------------------------------------
|
|
# pydualsense's IntFlag aliases happen to cover the modes we need:
|
|
# TriggerModes(0x05) = 0x05 (canonical Sony Off / Reset)
|
|
# TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun)
|
|
# TriggerModes.Rigid_A = 0x21 (Feedback, canonical)
|
|
DS_MODE_OFF = TriggerModes(0x05)
|
|
DS_MODE_SIMPLE_VIBRATION = TriggerModes.Pulse_B
|
|
DS_MODE_FEEDBACK = TriggerModes.Rigid_A
|
|
|
|
# --- RacingDSX defaults (Config/ThrottleSettings.cs) --------------------------
|
|
THROTTLE_GRIP_LOSS = 0.6
|
|
THROTTLE_REAR_SLIP_ACCEL_MIN = 200
|
|
THROTTLE_VIB_POSITION = 5 # VibrationModeStart
|
|
THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance
|
|
THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5
|
|
THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0
|
|
THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit
|
|
THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0
|
|
THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit
|
|
THROTTLE_ACCELERATION_LIMIT = 10
|
|
THROTTLE_TURN_ACCEL_SCALE = 0.25
|
|
THROTTLE_FORWARD_ACCEL_SCALE = 1.0
|
|
THROTTLE_RESISTANCE_SMOOTHING = 0.9
|
|
THROTTLE_VIBRATION_SMOOTHING = 1.0
|
|
THROTTLE_EFFECT_INTENSITY = 1.0
|
|
THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance
|
|
|
|
# --- RacingDSX defaults (Config/BrakeSettings.cs) -----------------------------
|
|
BRAKE_GRIP_LOSS = 0.05
|
|
BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100
|
|
BRAKE_VIB_POSITION = 0 # VibrationStart
|
|
BRAKE_MIN_VIBRATION = 15
|
|
BRAKE_MAX_VIBRATION = 20
|
|
BRAKE_MIN_STIFFNESS = 150
|
|
BRAKE_MAX_STIFFNESS = 5
|
|
BRAKE_MIN_RESISTANCE = 0
|
|
BRAKE_MAX_RESISTANCE = 7
|
|
BRAKE_RESISTANCE_SMOOTHING = 0.4
|
|
BRAKE_VIBRATION_SMOOTHING = 1.0
|
|
BRAKE_EFFECT_INTENSITY = 1.0
|
|
BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance
|
|
|
|
# --- Forza UDP packet sizes -> fdp packet_format strings ----------------------
|
|
PACKET_FORMATS = {
|
|
232: "sled",
|
|
311: "dash",
|
|
324: "fh4", # FH4 and FH5 share the same layout
|
|
}
|
|
|
|
# --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) --------
|
|
# CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM).
|
|
# Parser.cs uses an `<=` cascade, so any value > 5 is treated as X.
|
|
CAR_CLASS_COLORS = [
|
|
(107, 185, 236), # ColorClassD
|
|
(234, 202, 49), # ColorClassC
|
|
(211, 90, 37), # ColorClassB
|
|
(187, 59, 34), # ColorClassA
|
|
(128, 54, 243), # ColorClassS1
|
|
(75, 88, 229), # ColorClassS2
|
|
(105, 182, 72), # ColorClassX (no CPI tint)
|
|
]
|
|
MAX_CPI = 255 # ForzaParser.MaxCPI
|
|
RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio
|
|
GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path
|
|
RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff
|
|
|
|
# --- Reset on idle (UDP timeout) ---------------------------------------------
|
|
# Not present in RacingDSX; an additional safety so the controller doesn't get
|
|
# stuck if Forza is killed mid-race or the network drops.
|
|
IDLE_TIMEOUT_S = 3.0
|
|
|
|
# --- Hot-plug reconnect backoff ----------------------------------------------
|
|
# pydualsense's BG sendReport thread terminates silently on hidraw IOError
|
|
# (controller unplugged, BT disconnect, USB resuspend). The main loop polls
|
|
# the thread's liveness and reconnects in-process \u2014 the script is agnostic
|
|
# of supervisors like systemd. The same backoff governs the initial-connect
|
|
# wait when the daemon starts before any controller is plugged in.
|
|
RECONNECT_BACKOFF_S = 1.0
|
|
|
|
# --- Stationary motion gate --------------------------------------------------
|
|
# Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked
|
|
# wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for
|
|
# this and end up with the brake (and sometimes throttle) trigger stuck in
|
|
# Simple_Vibration mode forever, because the slip path keeps firing. We
|
|
# additionally require either the car or any wheel to be in real motion before
|
|
# treating slip as a haptic event.
|
|
STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped
|
|
STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked
|
|
|
|
|
|
def _is_in_motion(pkt: ForzaDataPacket) -> bool:
|
|
"""True iff the car is moving or any wheel is rotating meaningfully.
|
|
|
|
Used to gate slip-detection: when both car and all four wheels read as
|
|
stopped, any nonzero `tire_combined_slip` Forza emits is data noise from
|
|
locked wheels and should not drive haptic vibration.
|
|
"""
|
|
if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS:
|
|
return True
|
|
for wheel in ("FL", "FR", "RL", "RR"):
|
|
if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S:
|
|
return True
|
|
return False
|
|
# --- Effect encoders ----------------------------------------------------------
|
|
|
|
|
|
|
|
def _apply_off(trig) -> None:
|
|
"""Canonical Sony Off / Reset \u2014 mode byte 0x05, all params 0.
|
|
|
|
Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs (Nielk1
|
|
Rev 6), mode 0x05 *actively* returns the trigger stop to the neutral
|
|
position; mode 0x00 (TriggerModes.Off, what RacingDSX writes for its
|
|
NormalTrigger pre-race state) only clears state without retracting the
|
|
motor, so the trigger stays in whatever Feedback/Vibration position was
|
|
last applied. We route every \"release the trigger\" intent here \u2014
|
|
pre-race, idle-timeout, mid-race zero-strength fallback, shutdown.
|
|
"""
|
|
# Write forces before mode so pydualsense's BG sendReport thread, which
|
|
# reads mode then forces non-atomically, is more likely to observe a
|
|
# self-consistent (mode, forces) pair. See module-docstring threading note.
|
|
for i in range(7):
|
|
trig.forces[i] = 0
|
|
trig.mode = DS_MODE_OFF
|
|
|
|
|
|
def _apply_feedback(trig, position: int, strength: int) -> bool:
|
|
"""Sony Feedback (mode 0x21), bit-packed.
|
|
|
|
Verbatim port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator
|
|
.Resistance` from DSX 1.4.9 \u2014 with one deliberate divergence.
|
|
|
|
DSX's TriggerEffectGenerator.Resistance returns `false` without writing
|
|
when strength > 8, and RacingDSX's fall-through path routinely sends 5..255-
|
|
range slip-mode stiffness values into Feedback, hitting that branch every
|
|
transition out of slip. The result observed by the player: \"ABS feedback
|
|
continues even when stationary\" \u2014 the trigger remains stuck in whatever
|
|
mode (typically Simple_Vibration) was set before the failed Resistance
|
|
call, sometimes indefinitely if Forza keeps reporting nonzero slip on
|
|
locked wheels.
|
|
|
|
We clamp out-of-range strength to 8 instead. The transition out of slip
|
|
now produces a smooth Feedback ramp from full-stiffness down to the
|
|
non-slip target as the EWMA decays, rather than freezing on stale
|
|
Simple_Vibration bytes. The return value (kept for symmetry with DSX's
|
|
bool-returning Resistance) is False on invalid position, True otherwise.
|
|
"""
|
|
if position > 9:
|
|
return False
|
|
if strength > 8:
|
|
strength = 8
|
|
if strength <= 0:
|
|
# Sony's algorithm: zero force -> Reset (canonical Off, mode 0x05).
|
|
# DSX's TriggerEffectGenerator.Resistance falls through to Reset()
|
|
# here, so we do the same for byte-perfect parity.
|
|
_apply_off(trig)
|
|
return True
|
|
|
|
force_value = (strength - 1) & 0x07
|
|
force_zones = 0
|
|
active_zones = 0
|
|
for i in range(position, 10):
|
|
force_zones |= force_value << (3 * i)
|
|
active_zones |= 1 << i
|
|
|
|
trig.forces[0] = active_zones & 0xFF
|
|
trig.forces[1] = (active_zones >> 8) & 0xFF
|
|
trig.forces[2] = force_zones & 0xFF
|
|
trig.forces[3] = (force_zones >> 8) & 0xFF
|
|
trig.forces[4] = (force_zones >> 16) & 0xFF
|
|
trig.forces[5] = (force_zones >> 24) & 0xFF
|
|
trig.forces[6] = 0 # frequency byte unused for Feedback
|
|
trig.mode = DS_MODE_FEEDBACK
|
|
return True
|
|
|
|
|
|
def _apply_simple_vibration(trig, position: int, amplitude: int, frequency: int) -> None:
|
|
"""Legacy Simple_Vibration (mode 0x06), raw byte passthrough.
|
|
|
|
Mirrors DSX's `else` branch in `DualSense_USB_Updated.cs::CustomTriggerValues`:
|
|
array[11] = TriggerValue1 (= 6 for VibrateResistance)
|
|
array[12] = TriggerValue2 (= frequency)
|
|
array[13] = TriggerValue3 (= amplitude)
|
|
array[14] = TriggerValue4 (= position)
|
|
array[15..17,20] = 0
|
|
|
|
Per Nielk1, Simple_Vibration was Sony's pre-firmware-update vibration
|
|
mode — same effect as canonical Vibration (0x26) on every shipped
|
|
DualSense, but takes raw 0-255 amplitude bytes instead of the bit-
|
|
packed 0-8 zone format. RacingDSX/DSX have used it since v1.0; the
|
|
entire Forza-on-DualSense community ships these byte values.
|
|
"""
|
|
if amplitude <= 0 or frequency <= 0:
|
|
_apply_off(trig)
|
|
return
|
|
trig.forces[0] = frequency & 0xFF
|
|
trig.forces[1] = amplitude & 0xFF
|
|
trig.forces[2] = position & 0xFF
|
|
trig.forces[3] = 0
|
|
trig.forces[4] = 0
|
|
trig.forces[5] = 0
|
|
trig.forces[6] = 0
|
|
trig.mode = DS_MODE_SIMPLE_VIBRATION
|
|
|
|
|
|
def reset_triggers(ds: pydualsense) -> None:
|
|
"""Both triggers to canonical Off (mode 0x05). Actively retracts the motor."""
|
|
_apply_off(ds.triggerL)
|
|
_apply_off(ds.triggerR)
|
|
|
|
|
|
def reset_lightbar(ds: pydualsense) -> None:
|
|
"""Lightbar to off (RGB 0,0,0).
|
|
|
|
Used when telemetry has been idle long enough that we should stop asserting
|
|
a race color \u2014 e.g. Forza exited or hasn't started a session yet. Without
|
|
this, pydualsense's BG sendReport thread keeps re-publishing whatever
|
|
`TouchpadColor` we last set, so the controller stays lit indefinitely.
|
|
"""
|
|
ds.light.setColorI(0, 0, 0)
|
|
|
|
|
|
# --- RacingDSX math primitives ------------------------------------------------
|
|
|
|
|
|
def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
|
|
"""Mirrors Parser.Map() in RacingDSX, including endpoint clamping."""
|
|
if x > in_max:
|
|
x = in_max
|
|
elif x < in_min:
|
|
x = in_min
|
|
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
|
|
|
|
|
def _ewma(value: float, last: float, alpha: float) -> float:
|
|
"""Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing."""
|
|
return alpha * value + (1.0 - alpha) * last
|
|
|
|
|
|
def _ewma_int(value: int, last: int, alpha: float) -> int:
|
|
"""Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA."""
|
|
return math.floor(alpha * value + (1.0 - alpha) * last)
|
|
|
|
|
|
def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float:
|
|
return float(getattr(pkt, name, default))
|
|
|
|
|
|
# --- Per-trigger persistent state for EWMA ------------------------------------
|
|
|
|
|
|
class _TriggerState:
|
|
__slots__ = ("last_resistance", "last_freq")
|
|
|
|
def __init__(self, init_resistance: int) -> None:
|
|
# Mirrors RacingDSX's `int lastThrottleResistance` / `int lastBrakeResistance`.
|
|
self.last_resistance: int = int(init_resistance)
|
|
self.last_freq: int = 0
|
|
|
|
|
|
# --- Forza game-level persistent state (ForzaParser.cs fields) ----------------
|
|
|
|
|
|
class _ForzaState:
|
|
"""Persistent across packets. Mirrors ForzaParser's instance fields:
|
|
LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI."""
|
|
|
|
__slots__ = (
|
|
"last_engine_rpm",
|
|
"rpm_accumulator",
|
|
"last_valid_car_class",
|
|
"last_valid_car_cpi",
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self.last_engine_rpm = 0.0
|
|
self.rpm_accumulator = 0
|
|
self.last_valid_car_class = 0
|
|
self.last_valid_car_cpi = 0
|
|
|
|
|
|
def _clamp_byte(v: float) -> int:
|
|
"""Clamp to [0, 255] before writing to a uint8 RGB channel."""
|
|
return max(0, min(255, int(v)))
|
|
|
|
|
|
def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool:
|
|
"""Mirrors `ForzaParser.IsRaceOn()` verbatim.
|
|
|
|
FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after
|
|
the player exits a race or pauses. ForzaParser detects the off state by
|
|
watching for unchanged engine RPM combined with non-positive Power across
|
|
`RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only,
|
|
so for sled-format packets it reads as 0; that matches RacingDSX exactly.
|
|
"""
|
|
in_race = bool(int(getattr(pkt, "is_race_on", 0)))
|
|
current_rpm = _safe(pkt, "current_engine_rpm")
|
|
power = _safe(pkt, "power")
|
|
|
|
if current_rpm == state.last_engine_rpm and power <= 0:
|
|
state.rpm_accumulator += 1
|
|
if state.rpm_accumulator > RACE_OFF_RPM_FRAMES:
|
|
in_race = False
|
|
else:
|
|
state.rpm_accumulator = 0
|
|
|
|
state.last_engine_rpm = current_rpm
|
|
return in_race
|
|
|
|
|
|
# --- Lightbar (touchpad LED ring) ---------------------------------------------
|
|
|
|
|
|
def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None:
|
|
"""Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic.
|
|
|
|
Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`.
|
|
X-class cars use the fixed ColorClassX without a CPI tint. Car class and
|
|
CPI fields can briefly read 0 during loading screens, so we cache the
|
|
last valid value seen \u2014 also matching ForzaParser's behavior."""
|
|
car_class = int(_safe(pkt, "car_class"))
|
|
if car_class > 0:
|
|
state.last_valid_car_class = car_class
|
|
car_class = state.last_valid_car_class
|
|
|
|
cpi = int(_safe(pkt, "car_performance_index"))
|
|
if cpi > 0:
|
|
state.last_valid_car_cpi = min(cpi, 255)
|
|
cpi = state.last_valid_car_cpi
|
|
|
|
cpi_ratio = cpi / MAX_CPI
|
|
|
|
if car_class <= 5:
|
|
cr, cg, cb = CAR_CLASS_COLORS[car_class]
|
|
r = math.floor(cpi_ratio * cr)
|
|
g = math.floor(cpi_ratio * cg)
|
|
b = math.floor(cpi_ratio * cb)
|
|
else:
|
|
r, g, b = CAR_CLASS_COLORS[6]
|
|
|
|
ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b))
|
|
|
|
|
|
def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None:
|
|
"""Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic.
|
|
|
|
Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and
|
|
green falls linearly with rpm_ratio, with green floored at 50. At or
|
|
above redline the lightbar goes pure red (255, 0, 0)."""
|
|
max_rpm = _safe(pkt, "engine_max_rpm")
|
|
idle_rpm = _safe(pkt, "engine_idle_rpm")
|
|
current_rpm = _safe(pkt, "current_engine_rpm")
|
|
|
|
engine_range = max_rpm - idle_rpm
|
|
if engine_range <= 0:
|
|
rpm_ratio = 0.0
|
|
else:
|
|
rpm_ratio = (current_rpm - idle_rpm) / engine_range
|
|
|
|
if rpm_ratio >= RPM_REDLINE_RATIO:
|
|
r, g, b = 255, 0, 0
|
|
else:
|
|
r = math.floor(rpm_ratio * 255)
|
|
g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR)
|
|
b = 0
|
|
|
|
ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b))
|
|
|
|
|
|
# --- Throttle (right trigger) -------------------------------------------------
|
|
|
|
|
|
def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None:
|
|
"""Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line."""
|
|
accel_x = _safe(pkt, "acceleration_x")
|
|
accel_z = _safe(pkt, "acceleration_z")
|
|
avg_accel = math.sqrt(
|
|
THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x)
|
|
+ THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z)
|
|
)
|
|
|
|
fl = abs(_safe(pkt, "tire_combined_slip_FL"))
|
|
fr = abs(_safe(pkt, "tire_combined_slip_FR"))
|
|
rl = abs(_safe(pkt, "tire_combined_slip_RL"))
|
|
rr = abs(_safe(pkt, "tire_combined_slip_RR"))
|
|
front_slip = (fl + fr) * 0.5
|
|
rear_slip = (rl + rr) * 0.5
|
|
four_wheel_slip = (fl + fr + rl + rr) * 0.25
|
|
|
|
accelerator = int(_safe(pkt, "accel"))
|
|
|
|
losing_grip = (
|
|
front_slip > THROTTLE_GRIP_LOSS
|
|
or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN)
|
|
) and _is_in_motion(pkt)
|
|
|
|
if losing_grip:
|
|
# Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs).
|
|
target_freq = math.floor(
|
|
_map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION)
|
|
)
|
|
target_resistance = math.floor(
|
|
_map(
|
|
avg_accel,
|
|
0.0,
|
|
THROTTLE_ACCELERATION_LIMIT,
|
|
THROTTLE_MIN_STIFFNESS,
|
|
THROTTLE_MAX_STIFFNESS,
|
|
)
|
|
)
|
|
# Floor after EWMA (matches `(int)EWMA(int, int, float)` overload).
|
|
freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING)
|
|
resistance = _ewma_int(
|
|
target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING
|
|
)
|
|
st.last_freq = freq
|
|
st.last_resistance = resistance
|
|
|
|
if freq <= THROTTLE_MIN_VIBRATION or accelerator <= THROTTLE_VIB_POSITION:
|
|
# RacingDSX throttle fall-through: sends `Resistance(0, filteredResistance)`
|
|
# where filteredResistance is in slip-mode range (175..255). DSX's
|
|
# TriggerEffectGenerator.Resistance returns false for force > 8 without
|
|
# writing, leaving the trigger stuck in whatever mode (typically
|
|
# Simple_Vibration) was set previously. Our `_apply_feedback` clamps
|
|
# strength to 8 instead, producing a smooth Feedback ramp \u2014 a
|
|
# documented divergence that fixes the user-visible \"vibration
|
|
# continues briefly after slip ends\" symptom.
|
|
_apply_feedback(
|
|
ds.triggerR,
|
|
0,
|
|
int(resistance * THROTTLE_EFFECT_INTENSITY),
|
|
)
|
|
else:
|
|
_apply_simple_vibration(
|
|
ds.triggerR,
|
|
THROTTLE_VIB_POSITION,
|
|
int(resistance * THROTTLE_EFFECT_INTENSITY),
|
|
int(freq * THROTTLE_EFFECT_INTENSITY),
|
|
)
|
|
return
|
|
|
|
target_resistance = math.floor(
|
|
_map(
|
|
avg_accel,
|
|
0.0,
|
|
THROTTLE_ACCELERATION_LIMIT,
|
|
THROTTLE_MIN_RESISTANCE,
|
|
THROTTLE_MAX_RESISTANCE,
|
|
)
|
|
)
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING)
|
|
st.last_resistance = resistance
|
|
_apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY))
|
|
|
|
|
|
# --- Brake (left trigger) -----------------------------------------------------
|
|
|
|
|
|
def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None:
|
|
"""Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line."""
|
|
fl = abs(_safe(pkt, "tire_combined_slip_FL"))
|
|
fr = abs(_safe(pkt, "tire_combined_slip_FR"))
|
|
rl = abs(_safe(pkt, "tire_combined_slip_RL"))
|
|
rr = abs(_safe(pkt, "tire_combined_slip_RR"))
|
|
four_wheel_slip = (fl + fr + rl + rr) * 0.25
|
|
brake = int(_safe(pkt, "brake"))
|
|
|
|
losing_grip = (
|
|
four_wheel_slip > BRAKE_GRIP_LOSS
|
|
and brake > BRAKE_DEADZONE
|
|
and _is_in_motion(pkt)
|
|
)
|
|
|
|
if losing_grip:
|
|
target_freq = math.floor(
|
|
_map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION)
|
|
)
|
|
target_resistance = math.floor(
|
|
_map(
|
|
brake,
|
|
0,
|
|
255,
|
|
BRAKE_MAX_STIFFNESS,
|
|
BRAKE_MIN_STIFFNESS,
|
|
)
|
|
)
|
|
freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING)
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING)
|
|
st.last_freq = freq
|
|
st.last_resistance = resistance
|
|
|
|
if freq <= BRAKE_MIN_VIBRATION:
|
|
# RacingDSX brake fall-through (Parser.cs:128) sends Resistance(0, 0)
|
|
# explicitly \u2014 strength=0 routes to canonical Off (mode 0x05).
|
|
# Subtle slip while braking should leave the trigger neutral.
|
|
_apply_feedback(ds.triggerL, 0, 0)
|
|
else:
|
|
_apply_simple_vibration(
|
|
ds.triggerL,
|
|
BRAKE_VIB_POSITION,
|
|
int(resistance * BRAKE_EFFECT_INTENSITY),
|
|
int(freq * BRAKE_EFFECT_INTENSITY),
|
|
)
|
|
return
|
|
|
|
target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE))
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING)
|
|
st.last_resistance = resistance
|
|
_apply_feedback(ds.triggerL, 0, int(resistance * BRAKE_EFFECT_INTENSITY))
|
|
|
|
|
|
# --- UDP main loop ------------------------------------------------------------
|
|
|
|
|
|
def parse_packet(data: bytes) -> ForzaDataPacket | None:
|
|
fmt = PACKET_FORMATS.get(len(data))
|
|
if fmt is None:
|
|
LOG.debug("ignoring packet of unexpected length %d", len(data))
|
|
return None
|
|
try:
|
|
return ForzaDataPacket(data, packet_format=fmt)
|
|
except Exception:
|
|
LOG.exception("failed to parse forza packet (len=%d)", len(data))
|
|
return None
|
|
|
|
|
|
def _close_controller(ds: pydualsense | None) -> None:
|
|
"""Best-effort close. The HID device may already be gone (unplug, BT drop)
|
|
in which case `device.close()` raises; we don't care."""
|
|
if ds is None:
|
|
return
|
|
try:
|
|
ds.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _connect_controller() -> pydualsense:
|
|
"""Open the DualSense, blocking until one is reachable.
|
|
|
|
`pydualsense.init()` raises when no DualSense is plugged in. That's a
|
|
normal startup-or-replug condition for us, not a fatal error \u2014 the
|
|
daemon is meant to live for the whole user session and self-heal across
|
|
plug events without external supervision. We log the first failure once,
|
|
then retry quietly every `RECONNECT_BACKOFF_S` seconds.
|
|
"""
|
|
LOG.info("opening dualsense controller")
|
|
first_failure_logged = False
|
|
while True:
|
|
ds = pydualsense()
|
|
try:
|
|
ds.init()
|
|
except Exception as e:
|
|
_close_controller(ds)
|
|
if not first_failure_logged:
|
|
LOG.warning(
|
|
"dualsense not available (%s); retrying every %.1fs",
|
|
e,
|
|
RECONNECT_BACKOFF_S,
|
|
)
|
|
first_failure_logged = True
|
|
time.sleep(RECONNECT_BACKOFF_S)
|
|
continue
|
|
LOG.info("dualsense controller connected")
|
|
return ds
|
|
|
|
|
|
def run(host: str, port: int, debug: bool) -> int:
|
|
ds = _connect_controller()
|
|
|
|
LOG.info("listening for forza udp on %s:%d", host, port)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind((host, port))
|
|
sock.settimeout(1.0)
|
|
|
|
throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT)
|
|
brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT)
|
|
forza_state = _ForzaState()
|
|
last_seen = 0.0
|
|
in_race = False
|
|
have_telemetry = False # True between the first packet and the IDLE_TIMEOUT_S reset
|
|
|
|
try:
|
|
while True:
|
|
# Hot-plug detection: pydualsense's BG sendReport thread terminates
|
|
# silently on hidraw IOError (unplug, BT disconnect, USB resuspend).
|
|
# When it dies our triggerL/R writes go nowhere. Reconnect in-process
|
|
# so the daemon doesn't depend on a supervisor for plug-event recovery.
|
|
if not ds.report_thread.is_alive():
|
|
LOG.warning("dualsense disconnected; reconnecting")
|
|
_close_controller(ds)
|
|
ds = _connect_controller()
|
|
now = time.monotonic()
|
|
try:
|
|
data, _ = sock.recvfrom(2048)
|
|
last_seen = now
|
|
have_telemetry = True
|
|
except socket.timeout:
|
|
# Reset on telemetry-idle regardless of in_race state. After Forza
|
|
# exits with the user in its main menu (is_race_on=0 packets just
|
|
# before exit, so in_race was already False), the old check would
|
|
# leave the last lightbar/trigger state asserted forever.
|
|
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
|
LOG.info("forza idle for %.1fs \u2014 resetting controller", IDLE_TIMEOUT_S)
|
|
reset_triggers(ds)
|
|
reset_lightbar(ds)
|
|
have_telemetry = False
|
|
in_race = False
|
|
continue
|
|
|
|
pkt = parse_packet(data)
|
|
if pkt is None:
|
|
continue
|
|
|
|
# ForzaParser.IsRaceOn() override: combines packet field with the
|
|
# FH-specific RPM-accumulator workaround. Must be called once per
|
|
# packet so the accumulator state stays accurate.
|
|
in_race = forza_is_race_on(pkt, forza_state)
|
|
|
|
if not in_race:
|
|
# Pre-race: lightbar -> car class color, both triggers actively
|
|
# released to neutral (mode 0x05, Sony's canonical Off). RacingDSX
|
|
# writes mode 0x00 (Normal) here; see divergence #4 in module docstring.
|
|
_apply_off(ds.triggerL)
|
|
_apply_off(ds.triggerR)
|
|
apply_lightbar_pre_race(ds, pkt, forza_state)
|
|
continue
|
|
|
|
if debug:
|
|
LOG.debug(
|
|
"rpm=%.0f/%.0f accel=%d brake=%d "
|
|
"slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f "
|
|
"throttle[freq=%d res=%d] brake[freq=%d res=%d]",
|
|
_safe(pkt, "current_engine_rpm"),
|
|
_safe(pkt, "engine_max_rpm"),
|
|
int(_safe(pkt, "accel")),
|
|
int(_safe(pkt, "brake")),
|
|
_safe(pkt, "tire_combined_slip_FL"),
|
|
_safe(pkt, "tire_combined_slip_FR"),
|
|
_safe(pkt, "tire_combined_slip_RL"),
|
|
_safe(pkt, "tire_combined_slip_RR"),
|
|
throttle_state.last_freq,
|
|
throttle_state.last_resistance,
|
|
brake_state.last_freq,
|
|
brake_state.last_resistance,
|
|
)
|
|
apply_lightbar_in_race(ds, pkt)
|
|
apply_left_trigger(ds, pkt, brake_state)
|
|
apply_right_trigger(ds, pkt, throttle_state)
|
|
except KeyboardInterrupt:
|
|
LOG.info("shutting down")
|
|
finally:
|
|
try:
|
|
reset_triggers(ds)
|
|
except Exception:
|
|
pass
|
|
_close_controller(ds)
|
|
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="forza-trigger",
|
|
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
|
|
)
|
|
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
|
|
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
|
|
parser.add_argument(
|
|
"--debug",
|
|
action="store_true",
|
|
help="log per-packet telemetry at DEBUG level",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
|
|
logging.basicConfig(
|
|
level=level,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
return run(args.host, args.port, args.debug)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|