User report: with the clutch in (pedal pressed, engine disconnected from wheels), steering left still produced resistance on R2. The throttle shouldn't have any feel when it's mechanically irrelevant. RacingDSX's throttle resistance formula is `avgAccel = sqrt(0.25*X^2 + 1.0*Z^2)` derived from the accelerometer alone. It never checks clutch state, so cornering G-forces keep producing trigger resistance even while the clutch pedal is floored. Bug. Fix: when Forza's clutch byte > 128 (clutch fully or mostly disengaged) bypass the entire throttle path \u2014 slip detection and non-slip Feedback both \u2014 and release the trigger. Uses the same one-shot 0x05 (active retract) on transition + steady-state 0x00 (no-op) pattern as the in-race \u2192 not-in-race transition (divergence #4) so we don't get the trigger-motor whine from re-asserting 0x05 every frame. Brake is unaffected: brake calipers operate independently of clutch state, so ABS feel during clutch-in is still correct. For auto-clutch users this also produces brief (~100 ms) trigger relaxations during shifts \u2014 physically accurate (the engine *is* momentarily disconnected during a shift) and matches the haptic feel of a real manual transmission. Documented as divergence #5 in the module docstring.
899 lines
38 KiB
Python
899 lines
38 KiB
Python
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
|
|
|
This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline.
|
|
Every numeric value, every threshold, every map() / EWMA() coefficient,
|
|
and every output byte sequence has been verified against published
|
|
sources or against decompiled DSX 1.4.9 itself.
|
|
|
|
Sources, in priority order:
|
|
|
|
1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31)
|
|
decompiled with ILSpy. The decompilation revealed:
|
|
a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as
|
|
its trigger-effect encoder.
|
|
b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's
|
|
high-level CustomTriggerValueMode names to mode bytes:
|
|
VibrateResistance -> 6 (Simple_Vibration / 0x06)
|
|
VibrateResistanceA / AB -> 38 (Vibration / 0x26)
|
|
VibrateResistanceB -> 6
|
|
When the dispatcher hits the `else` branch in
|
|
DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than
|
|
9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into
|
|
the trigger param region — no bit-packing, no scale conversion.
|
|
This is why RacingDSX's 0-255-scale stiffness values ARE the
|
|
actual amplitude bytes that reach the controller's firmware.
|
|
|
|
2. Nielk1's reverse-engineering gist Rev 6 (MIT,
|
|
https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db).
|
|
Source for the canonical Sony bit-packed Feedback (0x21) encoder used
|
|
for the non-slip path. Identical to the implementation shipped inside
|
|
DSX 1.4.9.
|
|
|
|
3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for
|
|
Forza Horizon 4 / 5 since 2022. Specifically:
|
|
Config/ThrottleSettings.cs, Config/BrakeSettings.cs,
|
|
GameParsers/Parser.cs.
|
|
|
|
The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is
|
|
provided by pydualsense (PyPI, MIT). We drive its low-level
|
|
`triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because
|
|
pydualsense's high-level setMode/setForce API does not understand any
|
|
specific mode's parameter encoding — it just shovels bytes into the
|
|
output report at fixed offsets. That is exactly what we want.
|
|
|
|
## Documented intentional divergences from RacingDSX
|
|
|
|
1. Motion gate (`_is_in_motion()`): slip detection is gated on speed or wheel
|
|
rotation. RacingDSX has no gate, so locked stationary wheels (e.g. after a
|
|
hard stop with brake held) keep the slip path active forever and the trigger
|
|
stuck in vibration mode. The gate fixes the user-reported bug.
|
|
|
|
2. Clamp-to-8 in `_apply_feedback()`: DSX's `TriggerEffectGenerator.Resistance`
|
|
silently skips when force > 8, leaving the trigger stuck in whatever mode it
|
|
was in (Simple_Vibration during slip\u2192non-slip transitions). We clamp to 8
|
|
instead so the transition produces a smooth Feedback ramp.
|
|
|
|
3. Car performance index width: fdp parses `car_performance_index` as Int32 (the
|
|
field's actual width per Forza's spec), while RacingDSX's `FMData.cs` reads
|
|
only `GetUInt8(bytes, 220)` \u2014 the low byte. For any car with CPI > 255 (B/A/
|
|
S1/S2/X) the two implementations disagree on the pre-race lightbar's CPI
|
|
tint. We use the correct value; RacingDSX's lightbar is dimmer/inconsistent
|
|
on those cars. Mask `cpi & 0xFF` in `apply_lightbar_pre_race` to match
|
|
RacingDSX byte-for-byte if you want bug-faithful Windows-equivalent dimming.
|
|
|
|
4. Trigger release combines mode 0x05 (active) with mode 0x00 (steady-state).
|
|
RacingDSX dispatches `TriggerMode.Normal` for pre-race / between-race state,
|
|
which becomes mode byte 0x00. Per Sony's docs (Nielk1 Rev 6), mode 0x00 only
|
|
*clears* state and does not retract the trigger motor; mode 0x05 *actively*
|
|
returns the trigger stop to neutral. RacingDSX-on-Windows gets away with
|
|
0x00 because something on Windows (Steam Input or the OS) reliably resets
|
|
the motor on focus loss; on Linux nothing does, and R2 keeps residual
|
|
tension after a race ends. But re-asserting 0x05 every frame in steady-state
|
|
pre-race causes the trigger motor to audibly whine as the firmware repeatedly
|
|
snaps the (already-neutral) trigger back to neutral. So we use 0x05 as a
|
|
one-shot on the in-race \u2192 not-in-race transition (and on the telemetry-idle
|
|
timeout), then mode 0x00 for steady-state pre-race / idle frames \u2014 motor
|
|
stays released, no continuous retraction noise.
|
|
|
|
5. Throttle gated on clutch state. Forza emits a `clutch` byte (0..255). When
|
|
the clutch is disengaged (byte > 128) the engine is mechanically disconnected
|
|
from the wheels and the throttle pedal can't transmit power; the trigger has
|
|
no business resisting. RacingDSX's throttle resistance formula is
|
|
`avgAccel = sqrt(0.25*X\u00b2 + 1.0*Z\u00b2)` derived from the accelerometer alone
|
|
with no clutch check, so the trigger keeps producing resistance from
|
|
cornering G-forces while the clutch is in. We bypass the throttle path
|
|
entirely when clutch > 128, releasing the trigger using the same one-shot-
|
|
then-steady pattern as divergence #4. Auto-clutch users will notice ~100 ms
|
|
trigger relaxations during shifts; that's actually physically accurate \u2014
|
|
the engine *is* momentarily disconnected during a shift.
|
|
|
|
## Threading note
|
|
|
|
pydualsense's `sendReport` background thread reads `triggerR/L.mode` and
|
|
`forces[0..6]` independently \u2014 there's no atomic publish primitive. Our
|
|
`_apply_*` helpers write `forces[]` first and `mode` last; the BG thread reads
|
|
`mode` first, so this ordering keeps the worst-case torn frame to one ~4 ms
|
|
HID write at slip\u2194non-slip mode transitions. Audible as a brief click on
|
|
transitions, not stuck state. Without lock/atomic primitives in pydualsense's
|
|
API this is the cleanest mitigation available.
|
|
|
|
## System interaction notes
|
|
|
|
**Single-controller assumption.** pydualsense's `__find_device` enumerates all
|
|
DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the
|
|
last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)`
|
|
without serial/path \u2014 `hid_open` returns the first match, which is not
|
|
necessarily the one selected. With multiple DualSense controllers the picked
|
|
controller is non-deterministic. pydualsense's source explicitly notes
|
|
`# TODO: implement multiple controllers working`. RacingDSX/DSX are also
|
|
single-controller (DSX's `connectedController` is a singleton). Forza Horizon
|
|
is single-player so this is fine in practice; if multi-controller selection
|
|
matters, monkey-patch `__find_device` to filter by `serial_number`.
|
|
|
|
**Steam Input.** When Steam Input's PlayStation Configuration Support is
|
|
enabled for the game, Steam intercepts hidraw input AND writes its own HID
|
|
output reports (rumble, lightbar, sometimes triggers). Our daemon writes
|
|
competing output reports at ~1 kHz; the controller observes whichever wrote
|
|
last. Effect: trigger oscillates and feels broken. The Nix module's README
|
|
in `default.nix` instructs users to disable PlayStation Configuration Support
|
|
for Forza in Steam (Settings \u2192 Controller).
|
|
|
|
**dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single-
|
|
shot writes from `dualsensectl trigger left feedback ...` get overwritten by
|
|
our BG thread's next iteration ~4 ms later. Use it only when the daemon is
|
|
stopped (`systemctl --user stop forza-trigger`).
|
|
|
|
**Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on
|
|
hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls
|
|
`ds.report_thread.is_alive()` and reconnects in-process via
|
|
`_connect_controller()`, which retries `pydualsense.init()` every
|
|
`RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not
|
|
depend on systemd or any other supervisor for plug-event recovery; running it
|
|
directly from a shell handles unplug/replug exactly the same way.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import math
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
from fdp import ForzaDataPacket
|
|
from pydualsense import TriggerModes, pydualsense
|
|
|
|
LOG = logging.getLogger("forza-trigger")
|
|
|
|
# --- Mode bytes ---------------------------------------------------------------
|
|
# pydualsense's IntFlag aliases happen to cover the modes we need:
|
|
# TriggerModes.Off = 0x00 (no-op; clears command without retracting motor)
|
|
# TriggerModes(0x05) = 0x05 (canonical Sony Off / Reset)
|
|
# TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun)
|
|
# TriggerModes.Rigid_A = 0x21 (Feedback, canonical)
|
|
DS_MODE_NORMAL = TriggerModes.Off # 0x00 "clear command"; motor stays in last-set physical state
|
|
DS_MODE_OFF = TriggerModes(0x05)
|
|
DS_MODE_SIMPLE_VIBRATION = TriggerModes.Pulse_B
|
|
DS_MODE_FEEDBACK = TriggerModes.Rigid_A
|
|
|
|
# --- RacingDSX defaults (Config/ThrottleSettings.cs) --------------------------
|
|
THROTTLE_GRIP_LOSS = 0.6
|
|
THROTTLE_REAR_SLIP_ACCEL_MIN = 200
|
|
THROTTLE_VIB_POSITION = 5 # VibrationModeStart
|
|
THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance
|
|
THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5
|
|
THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0
|
|
THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit
|
|
THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0
|
|
THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit
|
|
THROTTLE_ACCELERATION_LIMIT = 10
|
|
THROTTLE_TURN_ACCEL_SCALE = 0.25
|
|
THROTTLE_FORWARD_ACCEL_SCALE = 1.0
|
|
THROTTLE_RESISTANCE_SMOOTHING = 0.9
|
|
THROTTLE_VIBRATION_SMOOTHING = 1.0
|
|
THROTTLE_EFFECT_INTENSITY = 1.0
|
|
THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance
|
|
|
|
# --- RacingDSX defaults (Config/BrakeSettings.cs) -----------------------------
|
|
BRAKE_GRIP_LOSS = 0.05
|
|
BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100
|
|
BRAKE_VIB_POSITION = 0 # VibrationStart
|
|
BRAKE_MIN_VIBRATION = 15
|
|
BRAKE_MAX_VIBRATION = 20
|
|
BRAKE_MIN_STIFFNESS = 150
|
|
BRAKE_MAX_STIFFNESS = 5
|
|
BRAKE_MIN_RESISTANCE = 0
|
|
BRAKE_MAX_RESISTANCE = 7
|
|
BRAKE_RESISTANCE_SMOOTHING = 0.4
|
|
BRAKE_VIBRATION_SMOOTHING = 1.0
|
|
BRAKE_EFFECT_INTENSITY = 1.0
|
|
BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance
|
|
|
|
# --- Forza UDP packet sizes -> fdp packet_format strings ----------------------
|
|
PACKET_FORMATS = {
|
|
232: "sled",
|
|
311: "dash",
|
|
324: "fh4", # FH4 and FH5 share the same layout
|
|
}
|
|
|
|
# --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) --------
|
|
# CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM).
|
|
# Parser.cs uses an `<=` cascade, so any value > 5 is treated as X.
|
|
CAR_CLASS_COLORS = [
|
|
(107, 185, 236), # ColorClassD
|
|
(234, 202, 49), # ColorClassC
|
|
(211, 90, 37), # ColorClassB
|
|
(187, 59, 34), # ColorClassA
|
|
(128, 54, 243), # ColorClassS1
|
|
(75, 88, 229), # ColorClassS2
|
|
(105, 182, 72), # ColorClassX (no CPI tint)
|
|
]
|
|
MAX_CPI = 255 # ForzaParser.MaxCPI
|
|
RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio
|
|
GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path
|
|
RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff
|
|
|
|
# --- Clutch gate (throttle only) ---------------------------------------------
|
|
# Forza emits `clutch` 0..255 (0 = pedal up / engaged / engine connected to
|
|
# wheels, 255 = pedal floored / fully disengaged). With the clutch disengaged
|
|
# the throttle pedal is mechanically irrelevant \u2014 pressing it just revs the
|
|
# engine without transmitting power. RacingDSX has no clutch gate, so its
|
|
# `avgAccel = sqrt(0.25*X\u00b2 + 1.0*Z\u00b2)` formula keeps producing throttle
|
|
# resistance from cornering G-forces even while the clutch is in.
|
|
CLUTCH_DISENGAGE_THRESHOLD = 128
|
|
|
|
|
|
# --- Reset on idle (UDP timeout) ---------------------------------------------
|
|
# Not present in RacingDSX; an additional safety so the controller doesn't get
|
|
# stuck if Forza is killed mid-race or the network drops.
|
|
IDLE_TIMEOUT_S = 3.0
|
|
|
|
# --- Hot-plug reconnect backoff ----------------------------------------------
|
|
# pydualsense's BG sendReport thread terminates silently on hidraw IOError
|
|
# (controller unplugged, BT disconnect, USB resuspend). The main loop polls
|
|
# the thread's liveness and reconnects in-process \u2014 the script is agnostic
|
|
# of supervisors like systemd. The same backoff governs the initial-connect
|
|
# wait when the daemon starts before any controller is plugged in.
|
|
RECONNECT_BACKOFF_S = 1.0
|
|
|
|
# --- Stationary motion gate --------------------------------------------------
|
|
# Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked
|
|
# wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for
|
|
# this and end up with the brake (and sometimes throttle) trigger stuck in
|
|
# Simple_Vibration mode forever, because the slip path keeps firing. We
|
|
# additionally require either the car or any wheel to be in real motion before
|
|
# treating slip as a haptic event.
|
|
STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped
|
|
STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked
|
|
|
|
|
|
def _is_in_motion(pkt: ForzaDataPacket) -> bool:
|
|
"""True iff the car is moving or any wheel is rotating meaningfully.
|
|
|
|
Used to gate slip-detection: when both car and all four wheels read as
|
|
stopped, any nonzero `tire_combined_slip` Forza emits is data noise from
|
|
locked wheels and should not drive haptic vibration.
|
|
"""
|
|
if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS:
|
|
return True
|
|
for wheel in ("FL", "FR", "RL", "RR"):
|
|
if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S:
|
|
return True
|
|
return False
|
|
# --- Effect encoders ----------------------------------------------------------
|
|
|
|
|
|
|
|
def _apply_off(trig) -> None:
|
|
"""Canonical Sony Off / Reset \u2014 mode byte 0x05, all params 0.
|
|
|
|
Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs (Nielk1
|
|
Rev 6), mode 0x05 *actively* returns the trigger stop to the neutral
|
|
position; mode 0x00 (TriggerModes.Off, what RacingDSX writes for its
|
|
NormalTrigger pre-race state) only clears state without retracting the
|
|
motor, so the trigger stays in whatever Feedback/Vibration position was
|
|
last applied. We route every \"release the trigger\" intent here \u2014
|
|
pre-race, idle-timeout, mid-race zero-strength fallback, shutdown.
|
|
"""
|
|
# Write forces before mode so pydualsense's BG sendReport thread, which
|
|
# reads mode then forces non-atomically, is more likely to observe a
|
|
# self-consistent (mode, forces) pair. See module-docstring threading note.
|
|
for i in range(7):
|
|
trig.forces[i] = 0
|
|
trig.mode = DS_MODE_OFF
|
|
|
|
|
|
def _apply_feedback(trig, position: int, strength: int) -> bool:
|
|
"""Sony Feedback (mode 0x21), bit-packed.
|
|
|
|
Verbatim port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator
|
|
.Resistance` from DSX 1.4.9 \u2014 with one deliberate divergence.
|
|
|
|
DSX's TriggerEffectGenerator.Resistance returns `false` without writing
|
|
when strength > 8, and RacingDSX's fall-through path routinely sends 5..255-
|
|
range slip-mode stiffness values into Feedback, hitting that branch every
|
|
transition out of slip. The result observed by the player: \"ABS feedback
|
|
continues even when stationary\" \u2014 the trigger remains stuck in whatever
|
|
mode (typically Simple_Vibration) was set before the failed Resistance
|
|
call, sometimes indefinitely if Forza keeps reporting nonzero slip on
|
|
locked wheels.
|
|
|
|
We clamp out-of-range strength to 8 instead. The transition out of slip
|
|
now produces a smooth Feedback ramp from full-stiffness down to the
|
|
non-slip target as the EWMA decays, rather than freezing on stale
|
|
Simple_Vibration bytes. The return value (kept for symmetry with DSX's
|
|
bool-returning Resistance) is False on invalid position, True otherwise.
|
|
"""
|
|
if position > 9:
|
|
return False
|
|
if strength > 8:
|
|
strength = 8
|
|
if strength <= 0:
|
|
# Sony's algorithm: zero force -> Reset (canonical Off, mode 0x05).
|
|
# DSX's TriggerEffectGenerator.Resistance falls through to Reset()
|
|
# here, so we do the same for byte-perfect parity.
|
|
_apply_off(trig)
|
|
return True
|
|
|
|
force_value = (strength - 1) & 0x07
|
|
force_zones = 0
|
|
active_zones = 0
|
|
for i in range(position, 10):
|
|
force_zones |= force_value << (3 * i)
|
|
active_zones |= 1 << i
|
|
|
|
trig.forces[0] = active_zones & 0xFF
|
|
trig.forces[1] = (active_zones >> 8) & 0xFF
|
|
trig.forces[2] = force_zones & 0xFF
|
|
trig.forces[3] = (force_zones >> 8) & 0xFF
|
|
trig.forces[4] = (force_zones >> 16) & 0xFF
|
|
trig.forces[5] = (force_zones >> 24) & 0xFF
|
|
trig.forces[6] = 0 # frequency byte unused for Feedback
|
|
trig.mode = DS_MODE_FEEDBACK
|
|
return True
|
|
|
|
|
|
def _apply_simple_vibration(trig, position: int, amplitude: int, frequency: int) -> None:
|
|
"""Legacy Simple_Vibration (mode 0x06), raw byte passthrough.
|
|
|
|
Mirrors DSX's `else` branch in `DualSense_USB_Updated.cs::CustomTriggerValues`:
|
|
array[11] = TriggerValue1 (= 6 for VibrateResistance)
|
|
array[12] = TriggerValue2 (= frequency)
|
|
array[13] = TriggerValue3 (= amplitude)
|
|
array[14] = TriggerValue4 (= position)
|
|
array[15..17,20] = 0
|
|
|
|
Per Nielk1, Simple_Vibration was Sony's pre-firmware-update vibration
|
|
mode — same effect as canonical Vibration (0x26) on every shipped
|
|
DualSense, but takes raw 0-255 amplitude bytes instead of the bit-
|
|
packed 0-8 zone format. RacingDSX/DSX have used it since v1.0; the
|
|
entire Forza-on-DualSense community ships these byte values.
|
|
"""
|
|
if amplitude <= 0 or frequency <= 0:
|
|
_apply_off(trig)
|
|
return
|
|
trig.forces[0] = frequency & 0xFF
|
|
trig.forces[1] = amplitude & 0xFF
|
|
trig.forces[2] = position & 0xFF
|
|
trig.forces[3] = 0
|
|
trig.forces[4] = 0
|
|
trig.forces[5] = 0
|
|
trig.forces[6] = 0
|
|
trig.mode = DS_MODE_SIMPLE_VIBRATION
|
|
|
|
|
|
|
|
def _apply_normal(trig) -> None:
|
|
"""Mode 0x00 (TriggerModes.Off) + zero forces.
|
|
|
|
Per Sony's docs (Nielk1 Rev 6) mode 0x00 is a *clear/no-op* command \u2014 the
|
|
firmware's last-set physical effect persists. We use this for steady-state
|
|
pre-race / idle frames after `_apply_off` has already retracted the motor
|
|
via mode 0x05. Re-asserting 0x05 every frame causes the motor to audibly
|
|
whine as the firmware repeatedly snaps the (already-neutral) trigger back
|
|
to neutral.
|
|
"""
|
|
for i in range(7):
|
|
trig.forces[i] = 0
|
|
trig.mode = DS_MODE_NORMAL
|
|
|
|
|
|
def reset_triggers(ds: pydualsense) -> None:
|
|
"""Both triggers to canonical Off (mode 0x05). Actively retracts the motor."""
|
|
_apply_off(ds.triggerL)
|
|
_apply_off(ds.triggerR)
|
|
|
|
|
|
def reset_lightbar(ds: pydualsense) -> None:
|
|
"""Lightbar to off (RGB 0,0,0).
|
|
|
|
Used when telemetry has been idle long enough that we should stop asserting
|
|
a race color \u2014 e.g. Forza exited or hasn't started a session yet. Without
|
|
this, pydualsense's BG sendReport thread keeps re-publishing whatever
|
|
`TouchpadColor` we last set, so the controller stays lit indefinitely.
|
|
"""
|
|
ds.light.setColorI(0, 0, 0)
|
|
|
|
|
|
# --- RacingDSX math primitives ------------------------------------------------
|
|
|
|
|
|
def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
|
|
"""Mirrors Parser.Map() in RacingDSX, including endpoint clamping."""
|
|
if x > in_max:
|
|
x = in_max
|
|
elif x < in_min:
|
|
x = in_min
|
|
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
|
|
|
|
|
|
def _ewma(value: float, last: float, alpha: float) -> float:
|
|
"""Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing."""
|
|
return alpha * value + (1.0 - alpha) * last
|
|
|
|
|
|
def _ewma_int(value: int, last: int, alpha: float) -> int:
|
|
"""Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA."""
|
|
return math.floor(alpha * value + (1.0 - alpha) * last)
|
|
|
|
|
|
def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float:
|
|
return float(getattr(pkt, name, default))
|
|
|
|
|
|
# --- Per-trigger persistent state for EWMA ------------------------------------
|
|
|
|
|
|
class _TriggerState:
|
|
__slots__ = ("last_resistance", "last_freq", "prev_clutched")
|
|
|
|
def __init__(self, init_resistance: int) -> None:
|
|
# Mirrors RacingDSX's `int lastThrottleResistance` / `int lastBrakeResistance`.
|
|
self.last_resistance: int = int(init_resistance)
|
|
self.last_freq: int = 0
|
|
# Throttle only: tracks last frame's clutch state so the throttle path
|
|
# can fire one-shot 0x05 (active retract) on the engaged-\u2192-disengaged
|
|
# transition and 0x00 (no-op) for steady-state held-in. See
|
|
# CLUTCH_DISENGAGE_THRESHOLD and divergence #5 in the module docstring.
|
|
self.prev_clutched: bool = False
|
|
|
|
|
|
# --- Forza game-level persistent state (ForzaParser.cs fields) ----------------
|
|
|
|
|
|
class _ForzaState:
|
|
"""Persistent across packets. Mirrors ForzaParser's instance fields:
|
|
LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI."""
|
|
|
|
__slots__ = (
|
|
"last_engine_rpm",
|
|
"rpm_accumulator",
|
|
"last_valid_car_class",
|
|
"last_valid_car_cpi",
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self.last_engine_rpm = 0.0
|
|
self.rpm_accumulator = 0
|
|
self.last_valid_car_class = 0
|
|
self.last_valid_car_cpi = 0
|
|
|
|
|
|
def _clamp_byte(v: float) -> int:
|
|
"""Clamp to [0, 255] before writing to a uint8 RGB channel."""
|
|
return max(0, min(255, int(v)))
|
|
|
|
|
|
def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool:
|
|
"""Mirrors `ForzaParser.IsRaceOn()` verbatim.
|
|
|
|
FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after
|
|
the player exits a race or pauses. ForzaParser detects the off state by
|
|
watching for unchanged engine RPM combined with non-positive Power across
|
|
`RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only,
|
|
so for sled-format packets it reads as 0; that matches RacingDSX exactly.
|
|
"""
|
|
in_race = bool(int(getattr(pkt, "is_race_on", 0)))
|
|
current_rpm = _safe(pkt, "current_engine_rpm")
|
|
power = _safe(pkt, "power")
|
|
|
|
if current_rpm == state.last_engine_rpm and power <= 0:
|
|
state.rpm_accumulator += 1
|
|
if state.rpm_accumulator > RACE_OFF_RPM_FRAMES:
|
|
in_race = False
|
|
else:
|
|
state.rpm_accumulator = 0
|
|
|
|
state.last_engine_rpm = current_rpm
|
|
return in_race
|
|
|
|
|
|
# --- Lightbar (touchpad LED ring) ---------------------------------------------
|
|
|
|
|
|
def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None:
|
|
"""Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic.
|
|
|
|
Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`.
|
|
X-class cars use the fixed ColorClassX without a CPI tint. Car class and
|
|
CPI fields can briefly read 0 during loading screens, so we cache the
|
|
last valid value seen \u2014 also matching ForzaParser's behavior."""
|
|
car_class = int(_safe(pkt, "car_class"))
|
|
if car_class > 0:
|
|
state.last_valid_car_class = car_class
|
|
car_class = state.last_valid_car_class
|
|
|
|
cpi = int(_safe(pkt, "car_performance_index"))
|
|
if cpi > 0:
|
|
state.last_valid_car_cpi = min(cpi, 255)
|
|
cpi = state.last_valid_car_cpi
|
|
|
|
cpi_ratio = cpi / MAX_CPI
|
|
|
|
if car_class <= 5:
|
|
cr, cg, cb = CAR_CLASS_COLORS[car_class]
|
|
r = math.floor(cpi_ratio * cr)
|
|
g = math.floor(cpi_ratio * cg)
|
|
b = math.floor(cpi_ratio * cb)
|
|
else:
|
|
r, g, b = CAR_CLASS_COLORS[6]
|
|
|
|
ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b))
|
|
|
|
|
|
def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None:
|
|
"""Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic.
|
|
|
|
Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and
|
|
green falls linearly with rpm_ratio, with green floored at 50. At or
|
|
above redline the lightbar goes pure red (255, 0, 0)."""
|
|
max_rpm = _safe(pkt, "engine_max_rpm")
|
|
idle_rpm = _safe(pkt, "engine_idle_rpm")
|
|
current_rpm = _safe(pkt, "current_engine_rpm")
|
|
|
|
engine_range = max_rpm - idle_rpm
|
|
if engine_range <= 0:
|
|
rpm_ratio = 0.0
|
|
else:
|
|
rpm_ratio = (current_rpm - idle_rpm) / engine_range
|
|
|
|
if rpm_ratio >= RPM_REDLINE_RATIO:
|
|
r, g, b = 255, 0, 0
|
|
else:
|
|
r = math.floor(rpm_ratio * 255)
|
|
g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR)
|
|
b = 0
|
|
|
|
ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b))
|
|
|
|
|
|
# --- Throttle (right trigger) -------------------------------------------------
|
|
|
|
|
|
def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None:
|
|
"""Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line, with
|
|
one divergence: the throttle is released when the clutch is disengaged.
|
|
See divergence #5 in the module docstring."""
|
|
# Clutch gate: 0..255, byte > 128 means "clutch fully or mostly pressed";
|
|
# engine is mechanically disconnected, so the throttle pedal can't transmit
|
|
# power and shouldn't have any feel. One-shot 0x05 on transition into the
|
|
# clutched state, then steady-state 0x00 to avoid the trigger-motor whine
|
|
# described in divergence #4.
|
|
if int(_safe(pkt, "clutch", 0.0)) > CLUTCH_DISENGAGE_THRESHOLD:
|
|
if not st.prev_clutched:
|
|
_apply_off(ds.triggerR)
|
|
else:
|
|
_apply_normal(ds.triggerR)
|
|
st.prev_clutched = True
|
|
return
|
|
st.prev_clutched = False
|
|
|
|
accel_x = _safe(pkt, "acceleration_x")
|
|
accel_z = _safe(pkt, "acceleration_z")
|
|
avg_accel = math.sqrt(
|
|
THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x)
|
|
+ THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z)
|
|
)
|
|
|
|
fl = abs(_safe(pkt, "tire_combined_slip_FL"))
|
|
fr = abs(_safe(pkt, "tire_combined_slip_FR"))
|
|
rl = abs(_safe(pkt, "tire_combined_slip_RL"))
|
|
rr = abs(_safe(pkt, "tire_combined_slip_RR"))
|
|
front_slip = (fl + fr) * 0.5
|
|
rear_slip = (rl + rr) * 0.5
|
|
four_wheel_slip = (fl + fr + rl + rr) * 0.25
|
|
|
|
accelerator = int(_safe(pkt, "accel"))
|
|
|
|
losing_grip = (
|
|
front_slip > THROTTLE_GRIP_LOSS
|
|
or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN)
|
|
) and _is_in_motion(pkt)
|
|
|
|
if losing_grip:
|
|
# Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs).
|
|
target_freq = math.floor(
|
|
_map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION)
|
|
)
|
|
target_resistance = math.floor(
|
|
_map(
|
|
avg_accel,
|
|
0.0,
|
|
THROTTLE_ACCELERATION_LIMIT,
|
|
THROTTLE_MIN_STIFFNESS,
|
|
THROTTLE_MAX_STIFFNESS,
|
|
)
|
|
)
|
|
# Floor after EWMA (matches `(int)EWMA(int, int, float)` overload).
|
|
freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING)
|
|
resistance = _ewma_int(
|
|
target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING
|
|
)
|
|
st.last_freq = freq
|
|
st.last_resistance = resistance
|
|
|
|
if freq <= THROTTLE_MIN_VIBRATION or accelerator <= THROTTLE_VIB_POSITION:
|
|
# RacingDSX throttle fall-through: sends `Resistance(0, filteredResistance)`
|
|
# where filteredResistance is in slip-mode range (175..255). DSX's
|
|
# TriggerEffectGenerator.Resistance returns false for force > 8 without
|
|
# writing, leaving the trigger stuck in whatever mode (typically
|
|
# Simple_Vibration) was set previously. Our `_apply_feedback` clamps
|
|
# strength to 8 instead, producing a smooth Feedback ramp \u2014 a
|
|
# documented divergence that fixes the user-visible \"vibration
|
|
# continues briefly after slip ends\" symptom.
|
|
_apply_feedback(
|
|
ds.triggerR,
|
|
0,
|
|
int(resistance * THROTTLE_EFFECT_INTENSITY),
|
|
)
|
|
else:
|
|
_apply_simple_vibration(
|
|
ds.triggerR,
|
|
THROTTLE_VIB_POSITION,
|
|
int(resistance * THROTTLE_EFFECT_INTENSITY),
|
|
int(freq * THROTTLE_EFFECT_INTENSITY),
|
|
)
|
|
return
|
|
|
|
target_resistance = math.floor(
|
|
_map(
|
|
avg_accel,
|
|
0.0,
|
|
THROTTLE_ACCELERATION_LIMIT,
|
|
THROTTLE_MIN_RESISTANCE,
|
|
THROTTLE_MAX_RESISTANCE,
|
|
)
|
|
)
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING)
|
|
st.last_resistance = resistance
|
|
_apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY))
|
|
|
|
|
|
# --- Brake (left trigger) -----------------------------------------------------
|
|
|
|
|
|
def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None:
|
|
"""Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line."""
|
|
fl = abs(_safe(pkt, "tire_combined_slip_FL"))
|
|
fr = abs(_safe(pkt, "tire_combined_slip_FR"))
|
|
rl = abs(_safe(pkt, "tire_combined_slip_RL"))
|
|
rr = abs(_safe(pkt, "tire_combined_slip_RR"))
|
|
four_wheel_slip = (fl + fr + rl + rr) * 0.25
|
|
brake = int(_safe(pkt, "brake"))
|
|
|
|
losing_grip = (
|
|
four_wheel_slip > BRAKE_GRIP_LOSS
|
|
and brake > BRAKE_DEADZONE
|
|
and _is_in_motion(pkt)
|
|
)
|
|
|
|
if losing_grip:
|
|
target_freq = math.floor(
|
|
_map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION)
|
|
)
|
|
target_resistance = math.floor(
|
|
_map(
|
|
brake,
|
|
0,
|
|
255,
|
|
BRAKE_MAX_STIFFNESS,
|
|
BRAKE_MIN_STIFFNESS,
|
|
)
|
|
)
|
|
freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING)
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING)
|
|
st.last_freq = freq
|
|
st.last_resistance = resistance
|
|
|
|
if freq <= BRAKE_MIN_VIBRATION:
|
|
# RacingDSX brake fall-through (Parser.cs:128) sends Resistance(0, 0)
|
|
# explicitly \u2014 strength=0 routes to canonical Off (mode 0x05).
|
|
# Subtle slip while braking should leave the trigger neutral.
|
|
_apply_feedback(ds.triggerL, 0, 0)
|
|
else:
|
|
_apply_simple_vibration(
|
|
ds.triggerL,
|
|
BRAKE_VIB_POSITION,
|
|
int(resistance * BRAKE_EFFECT_INTENSITY),
|
|
int(freq * BRAKE_EFFECT_INTENSITY),
|
|
)
|
|
return
|
|
|
|
target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE))
|
|
resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING)
|
|
st.last_resistance = resistance
|
|
_apply_feedback(ds.triggerL, 0, int(resistance * BRAKE_EFFECT_INTENSITY))
|
|
|
|
|
|
# --- UDP main loop ------------------------------------------------------------
|
|
|
|
|
|
def parse_packet(data: bytes) -> ForzaDataPacket | None:
|
|
fmt = PACKET_FORMATS.get(len(data))
|
|
if fmt is None:
|
|
LOG.debug("ignoring packet of unexpected length %d", len(data))
|
|
return None
|
|
try:
|
|
return ForzaDataPacket(data, packet_format=fmt)
|
|
except Exception:
|
|
LOG.exception("failed to parse forza packet (len=%d)", len(data))
|
|
return None
|
|
|
|
|
|
def _close_controller(ds: pydualsense | None) -> None:
|
|
"""Best-effort close. The HID device may already be gone (unplug, BT drop)
|
|
in which case `device.close()` raises; we don't care."""
|
|
if ds is None:
|
|
return
|
|
try:
|
|
ds.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _connect_controller() -> pydualsense:
|
|
"""Open the DualSense, blocking until one is reachable.
|
|
|
|
`pydualsense.init()` raises when no DualSense is plugged in. That's a
|
|
normal startup-or-replug condition for us, not a fatal error \u2014 the
|
|
daemon is meant to live for the whole user session and self-heal across
|
|
plug events without external supervision. We log the first failure once,
|
|
then retry quietly every `RECONNECT_BACKOFF_S` seconds.
|
|
"""
|
|
LOG.info("opening dualsense controller")
|
|
first_failure_logged = False
|
|
while True:
|
|
ds = pydualsense()
|
|
try:
|
|
ds.init()
|
|
except Exception as e:
|
|
_close_controller(ds)
|
|
if not first_failure_logged:
|
|
LOG.warning(
|
|
"dualsense not available (%s); retrying every %.1fs",
|
|
e,
|
|
RECONNECT_BACKOFF_S,
|
|
)
|
|
first_failure_logged = True
|
|
time.sleep(RECONNECT_BACKOFF_S)
|
|
continue
|
|
LOG.info("dualsense controller connected")
|
|
return ds
|
|
|
|
|
|
def run(host: str, port: int, debug: bool) -> int:
|
|
ds = _connect_controller()
|
|
|
|
LOG.info("listening for forza udp on %s:%d", host, port)
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind((host, port))
|
|
sock.settimeout(1.0)
|
|
|
|
throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT)
|
|
brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT)
|
|
forza_state = _ForzaState()
|
|
last_seen = 0.0
|
|
in_race = False
|
|
prev_in_race = False # for transition detection \u2014 see _apply_normal docstring
|
|
have_telemetry = False # True between the first packet and the IDLE_TIMEOUT_S reset
|
|
|
|
try:
|
|
while True:
|
|
# Hot-plug detection: pydualsense's BG sendReport thread terminates
|
|
# silently on hidraw IOError (unplug, BT disconnect, USB resuspend).
|
|
# When it dies our triggerL/R writes go nowhere. Reconnect in-process
|
|
# so the daemon doesn't depend on a supervisor for plug-event recovery.
|
|
if not ds.report_thread.is_alive():
|
|
LOG.warning("dualsense disconnected; reconnecting")
|
|
_close_controller(ds)
|
|
ds = _connect_controller()
|
|
now = time.monotonic()
|
|
try:
|
|
data, _ = sock.recvfrom(2048)
|
|
last_seen = now
|
|
have_telemetry = True
|
|
except socket.timeout:
|
|
# Reset on telemetry-idle regardless of in_race state. After Forza
|
|
# exits with the user in its main menu (is_race_on=0 packets just
|
|
# before exit, so in_race was already False), the old check would
|
|
# leave the last lightbar/trigger state asserted forever.
|
|
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
|
LOG.info("forza idle for %.1fs \u2014 resetting controller", IDLE_TIMEOUT_S)
|
|
# One-shot 0x05 to actively retract the trigger motor; the BG
|
|
# thread will publish it ~12 times in the next 50ms before main
|
|
# thread loops back here. Subsequent idle iterations don't
|
|
# re-enter this branch (have_telemetry is now False).
|
|
reset_triggers(ds)
|
|
reset_lightbar(ds)
|
|
have_telemetry = False
|
|
in_race = False
|
|
prev_in_race = False
|
|
continue
|
|
|
|
pkt = parse_packet(data)
|
|
if pkt is None:
|
|
continue
|
|
|
|
# ForzaParser.IsRaceOn() override: combines packet field with the
|
|
# FH-specific RPM-accumulator workaround. Must be called once per
|
|
# packet so the accumulator state stays accurate.
|
|
in_race = forza_is_race_on(pkt, forza_state)
|
|
|
|
if not in_race:
|
|
# Transition into pre-race: one-shot mode 0x05 to actively
|
|
# retract the trigger motor. Subsequent steady-state frames
|
|
# send mode 0x00 (no command); re-asserting 0x05 every frame
|
|
# makes the firmware audibly whine retracting an already-
|
|
# neutral trigger. Divergence #4 in the module docstring.
|
|
if prev_in_race:
|
|
_apply_off(ds.triggerL)
|
|
_apply_off(ds.triggerR)
|
|
else:
|
|
_apply_normal(ds.triggerL)
|
|
_apply_normal(ds.triggerR)
|
|
apply_lightbar_pre_race(ds, pkt, forza_state)
|
|
prev_in_race = False
|
|
continue
|
|
|
|
if debug:
|
|
LOG.debug(
|
|
"rpm=%.0f/%.0f accel=%d brake=%d "
|
|
"slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f "
|
|
"throttle[freq=%d res=%d] brake[freq=%d res=%d]",
|
|
_safe(pkt, "current_engine_rpm"),
|
|
_safe(pkt, "engine_max_rpm"),
|
|
int(_safe(pkt, "accel")),
|
|
int(_safe(pkt, "brake")),
|
|
_safe(pkt, "tire_combined_slip_FL"),
|
|
_safe(pkt, "tire_combined_slip_FR"),
|
|
_safe(pkt, "tire_combined_slip_RL"),
|
|
_safe(pkt, "tire_combined_slip_RR"),
|
|
throttle_state.last_freq,
|
|
throttle_state.last_resistance,
|
|
brake_state.last_freq,
|
|
brake_state.last_resistance,
|
|
)
|
|
apply_lightbar_in_race(ds, pkt)
|
|
apply_left_trigger(ds, pkt, brake_state)
|
|
apply_right_trigger(ds, pkt, throttle_state)
|
|
prev_in_race = True
|
|
except KeyboardInterrupt:
|
|
LOG.info("shutting down")
|
|
finally:
|
|
try:
|
|
reset_triggers(ds)
|
|
except Exception:
|
|
pass
|
|
_close_controller(ds)
|
|
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="forza-trigger",
|
|
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
|
|
)
|
|
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
|
|
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
|
|
parser.add_argument(
|
|
"--debug",
|
|
action="store_true",
|
|
help="log per-packet telemetry at DEBUG level",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
|
|
logging.basicConfig(
|
|
level=level,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
return run(args.host, args.port, args.debug)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|