Files
nixos/hosts/yarn/forza-trigger/forza_trigger.py

438 lines
15 KiB
Python

"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
This is a one-to-one behavioural port of Race-Element's DualSense haptic
overlay (RiddleTime/Race-Element, GPL-3.0). Reference files:
* Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/
TriggerHaptics.cs — slip detection + frequency mapping
DsiConfiguration.cs — tuning defaults
DsiJob.cs — per-frame dispatch
## What this daemon does
- Listens for Forza Horizon "Data Out" UDP telemetry.
- On each packet: dispatches `handle_braking` (L2) and `handle_acceleration`
(R2) — same two-function structure as Race-Element's `DsiJob.RunAction`.
- Each handler reads the relevant input pedal and the four tire slip
ratios; if both pedal and slip exceed their thresholds, computes a
frequency from slip severity and emits a Vibration trigger effect.
Otherwise resets the trigger to no-resistance.
## What this daemon explicitly does NOT do
Compared to the previous implementation, every haptic channel beyond the
two trigger Vibration effects is gone:
- No body LRA rumble (left/right motors). User reported the previous
multi-channel body rumble as "shakes my whole hand"; Race-Element
deliberately keeps the controller body silent so the trigger fingers
carry all information.
- No lightbar effects. Race-Element's DSI overlay leaves the lightbar
untouched (it's at the controller's default).
- No EWMA smoothing. Effects track slip frame-to-frame.
- No event-impulse system (no gear-shift Bow, no collision burst).
- No Machine mode for ABS. Race-Element uses the same Vibration
encoder for everything.
- No SlopeFeedback / cornering Feedback strength. The trigger has
zero resistance when not slipping.
- No surface texture, engine RPM rumble, lateral-G bias, or kerb
floor amplitude.
## Faithful reproduction of upstream
Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30).
The slip-coefficient formulas in `_brake_frequency_pct` and
`_throttle_frequency_pct` are byte-faithful ports of Race-Element's
upstream code, including a copy-paste bug in their throttle and brake
paths where the "rear slip coefficient" multiplies `front_slip` instead
of `rear_slip`. The bug is preserved for behavioural parity; see the
inline comments tagged "RACE-ELEMENT BUG".
## Tuning constants
Race-Element exposes these as runtime config sliders. We bake them in as
module constants matching the upstream defaults from `DsiConfiguration`:
| | Brake | Throttle |
|-----------------------|--------|----------|
| input deadzone | 3 % | 3 % |
| front slip threshold | 0.25 | 0.35 |
| rear slip threshold | 0.25 | 0.25 |
| amplitude | 8 | 7 |
| min frequency | 3 Hz | 6 Hz |
| max frequency | 85 Hz | 96 Hz |
## Transport
`dualsense-controller` (yesbotics/dualsense-controller-python) handles
the HID transport, BT/USB framing including BT CRC32, and on-demand
output writes (state-changed-since-last-input-tick gate). Hot-plug
recovery routes through the library's `on_error` callback, which sets a
flag the main loop polls.
"""
import argparse
import logging
import math
import os
import signal
import socket
import sys
import time
from dualsense_controller import DualSenseController
from fdp import ForzaDataPacket
LOG = logging.getLogger("forza-trigger")
# --- Tuning constants (Race-Element DsiConfiguration defaults) ---------------
# Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale.
BRAKE_INPUT_THRESHOLD = 0.03
BRAKE_FRONT_SLIP_THRESHOLD = 0.25
BRAKE_REAR_SLIP_THRESHOLD = 0.25
BRAKE_AMPLITUDE = 8
BRAKE_MIN_FREQUENCY = 3
BRAKE_MAX_FREQUENCY = 85
THROTTLE_INPUT_THRESHOLD = 0.03
THROTTLE_FRONT_SLIP_THRESHOLD = 0.35
THROTTLE_REAR_SLIP_THRESHOLD = 0.25
THROTTLE_AMPLITUDE = 7
THROTTLE_MIN_FREQUENCY = 6
THROTTLE_MAX_FREQUENCY = 96
# Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals
# (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes
# `pct` scale to [0, 1] without an explicit cap downstream:
# brake: front_ceil(10) + rear_ceil(7.5) = 17.5
# throttle: front_ceil(5) + rear_ceil(7.5) = 12.5
BRAKE_PCT_DIVISOR = 17.5
THROTTLE_PCT_DIVISOR = 12.5
# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------
PACKET_FORMATS = {
232: "sled",
311: "dash",
324: "fh4", # FH4 and FH5 share the same layout
}
# --- Daemon lifecycle constants ----------------------------------------------
IDLE_TIMEOUT_S = 3.0
RECONNECT_BACKOFF_S = 1.0
# --- Module state (signal + hot-plug flags) ----------------------------------
_shutdown = False
_disconnected = False
def _on_termination(signum: int, frame: object) -> None:
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag.
systemd sends SIGTERM by default; without an explicit handler Python
ignores it and systemd waits TimeoutStopSec=90s before SIGKILL.
"""
global _shutdown
_shutdown = True
def _on_controller_error(prev: object, exc: Exception) -> None:
"""dualsense-controller `on_error` callback for HID read-thread failures."""
global _disconnected
LOG.warning("dualsense exception: %s", exc)
_disconnected = True
# --- Helpers -----------------------------------------------------------------
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
"""Read a packet field defensively. Returns 0.0 for missing fields,
NaN, or +/-inf; otherwise returns the absolute value of the float."""
try:
v = float(getattr(pkt, name, 0.0))
except (TypeError, ValueError):
return 0.0
if not math.isfinite(v):
return 0.0
return abs(v)
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
"""Create or inherit the UDP listener socket.
Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours)
fd 3 is the pre-bound socket. Otherwise bind normally.
"""
listen_pid = os.environ.get("LISTEN_PID", "")
listen_fds = os.environ.get("LISTEN_FDS", "0")
if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
LOG.info("using systemd-pre-bound socket on %s:%d", host, port)
return sock
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.settimeout(timeout)
return sock
def _parse_packet(data: bytes) -> ForzaDataPacket | None:
fmt = PACKET_FORMATS.get(len(data))
if fmt is None:
LOG.debug("ignoring packet of unexpected length %d", len(data))
return None
try:
return ForzaDataPacket(data, packet_format=fmt)
except Exception:
LOG.exception("failed to parse forza packet (len=%d)", len(data))
return None
def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]:
"""(front_slip, rear_slip) — dominant tire of each axle.
Race-Element uses `Math.Max` over each axle's two tires to surface the
worst-slipping wheel, since slip on either side counts. fdp emits
`tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf
and takes the absolute value (Race-Element calls this NegateIfNegative).
"""
fl = _safe_abs(pkt, "tire_combined_slip_FL")
fr = _safe_abs(pkt, "tire_combined_slip_FR")
rl = _safe_abs(pkt, "tire_combined_slip_RL")
rr = _safe_abs(pkt, "tire_combined_slip_RR")
return max(fl, fr), max(rl, rr)
# --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) -----------------
def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
"""Brake (L2). Mirrors `TriggerHaptics.HandleBraking`.
Note: Race-Element's brake path leaves the trigger in its prior effect
when brake is engaged but slip is below threshold — i.e. you can hold
the brake without slip after an ABS event and the trigger keeps
vibrating until you release the brake. This matches the upstream code
exactly (no else-branch around the slip check).
"""
brake_input = _safe_abs(pkt, "brake") / 255.0
if brake_input <= BRAKE_INPUT_THRESHOLD:
controller.left_trigger.effect.off()
return
front_slip, rear_slip = _axle_slip(pkt)
if (
front_slip <= BRAKE_FRONT_SLIP_THRESHOLD
and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD
):
return # Race-Element falls through here: trigger keeps prior effect.
front_coef = min(front_slip * 4, 10)
# RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip
# (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps
# the bug; if you'd rather use rear_slip * 2, change one symbol.
rear_coef = min(front_slip * 2, 7.5)
pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR
freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct))
controller.left_trigger.effect.vibration(
start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq
)
def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
"""Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`."""
throttle_input = _safe_abs(pkt, "accel") / 255.0
if throttle_input <= THROTTLE_INPUT_THRESHOLD:
controller.right_trigger.effect.off()
return
front_slip, rear_slip = _axle_slip(pkt)
if (
front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD
and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD
):
# Throttle path resets to default explicitly when not slipping
# (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`).
controller.right_trigger.effect.off()
return
front_coef = min(front_slip * 3, 5)
# RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip
# (TriggerHaptics.cs line 92, `slipRatioFront * 5f`).
rear_coef = min(front_slip * 5, 7.5)
pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR
freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct))
controller.right_trigger.effect.vibration(
start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq
)
def reset_triggers(controller: DualSenseController) -> None:
"""Default both triggers — called on idle, reconnect, and shutdown."""
controller.left_trigger.effect.off()
controller.right_trigger.effect.off()
# --- Connection / hot-plug ---------------------------------------------------
def _connect_controller() -> DualSenseController | None:
"""Block until a DualSense is reachable. Returns the activated
controller, or None if shutdown was requested before one appeared.
"""
LOG.info("opening dualsense controller")
first_failure_logged = False
while not _shutdown:
try:
devices = DualSenseController.enumerate_devices()
except Exception as e:
if not first_failure_logged:
LOG.warning("hidapi enumeration failed: %s", e)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
if not devices:
if not first_failure_logged:
LOG.warning(
"no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S
)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
try:
controller = DualSenseController(device_index_or_device_info=devices[0])
controller.on_error(_on_controller_error)
controller.activate()
except Exception as e:
if not first_failure_logged:
LOG.warning("controller activation failed: %s", e)
first_failure_logged = True
time.sleep(RECONNECT_BACKOFF_S)
continue
# Push a clean initial state so we don't inherit residual effects from
# a previous Steam Input session, prior daemon instance, or stale
# firmware state.
reset_triggers(controller)
global _disconnected
_disconnected = False
LOG.info("dualsense controller connected (%s)", controller.connection_type)
return controller
return None
def _close_controller(controller: DualSenseController | None) -> None:
if controller is None:
return
try:
controller.deactivate()
except Exception:
pass
# --- Main loop ---------------------------------------------------------------
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
signal.signal(signal.SIGTERM, _on_termination)
signal.signal(signal.SIGINT, _on_termination)
# Bind the UDP socket BEFORE opening the controller. If the bind fails
# (port in use, systemd-passed fd unusable, etc.) we exit cleanly without
# having activated the controller's HID stack (which would otherwise leak
# an active session). _get_socket raises on bind failure.
LOG.info("listening for forza udp on %s:%d", host, port)
sock = _get_socket(host, port)
controller = _connect_controller()
if controller is None:
sock.close()
return 0
last_seen = 0.0
have_telemetry = False
try:
while not _shutdown:
if _disconnected:
LOG.warning("dualsense disconnected; reconnecting")
_close_controller(controller)
controller = _connect_controller()
if controller is None:
return 0
now = time.monotonic()
try:
data, _ = sock.recvfrom(2048)
last_seen = now
have_telemetry = True
except socket.timeout:
if _shutdown:
break
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
reset_triggers(controller)
have_telemetry = False
if exit_on_idle:
LOG.info("exiting on idle")
break
continue
pkt = _parse_packet(data)
if pkt is None:
continue
handle_acceleration(controller, pkt)
handle_braking(controller, pkt)
finally:
try:
reset_triggers(controller)
except Exception:
pass
_close_controller(controller)
try:
sock.close()
except Exception:
pass
return 0
def main() -> int:
parser = argparse.ArgumentParser(
prog="forza-trigger",
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
)
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
parser.add_argument(
"--debug", action="store_true", help="log per-packet decisions at DEBUG"
)
parser.add_argument(
"--exit-on-idle",
action="store_true",
help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)",
)
args = parser.parse_args()
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
logging.basicConfig(
level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
return run(args.host, args.port, args.exit_on_idle)
if __name__ == "__main__":
sys.exit(main())