438 lines
15 KiB
Python
438 lines
15 KiB
Python
"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers.
|
|
|
|
This is a one-to-one behavioural port of Race-Element's DualSense haptic
|
|
overlay (RiddleTime/Race-Element, GPL-3.0). Reference files:
|
|
|
|
* Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/
|
|
TriggerHaptics.cs — slip detection + frequency mapping
|
|
DsiConfiguration.cs — tuning defaults
|
|
DsiJob.cs — per-frame dispatch
|
|
|
|
## What this daemon does
|
|
|
|
- Listens for Forza Horizon "Data Out" UDP telemetry.
|
|
- On each packet: dispatches `handle_braking` (L2) and `handle_acceleration`
|
|
(R2) — same two-function structure as Race-Element's `DsiJob.RunAction`.
|
|
- Each handler reads the relevant input pedal and the four tire slip
|
|
ratios; if both pedal and slip exceed their thresholds, computes a
|
|
frequency from slip severity and emits a Vibration trigger effect.
|
|
Otherwise resets the trigger to no-resistance.
|
|
|
|
## What this daemon explicitly does NOT do
|
|
|
|
Compared to the previous implementation, every haptic channel beyond the
|
|
two trigger Vibration effects is gone:
|
|
|
|
- No body LRA rumble (left/right motors). User reported the previous
|
|
multi-channel body rumble as "shakes my whole hand"; Race-Element
|
|
deliberately keeps the controller body silent so the trigger fingers
|
|
carry all information.
|
|
- No lightbar effects. Race-Element's DSI overlay leaves the lightbar
|
|
untouched (it's at the controller's default).
|
|
- No EWMA smoothing. Effects track slip frame-to-frame.
|
|
- No event-impulse system (no gear-shift Bow, no collision burst).
|
|
- No Machine mode for ABS. Race-Element uses the same Vibration
|
|
encoder for everything.
|
|
- No SlopeFeedback / cornering Feedback strength. The trigger has
|
|
zero resistance when not slipping.
|
|
- No surface texture, engine RPM rumble, lateral-G bias, or kerb
|
|
floor amplitude.
|
|
|
|
## Faithful reproduction of upstream
|
|
|
|
Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30).
|
|
|
|
The slip-coefficient formulas in `_brake_frequency_pct` and
|
|
`_throttle_frequency_pct` are byte-faithful ports of Race-Element's
|
|
upstream code, including a copy-paste bug in their throttle and brake
|
|
paths where the "rear slip coefficient" multiplies `front_slip` instead
|
|
of `rear_slip`. The bug is preserved for behavioural parity; see the
|
|
inline comments tagged "RACE-ELEMENT BUG".
|
|
|
|
## Tuning constants
|
|
|
|
Race-Element exposes these as runtime config sliders. We bake them in as
|
|
module constants matching the upstream defaults from `DsiConfiguration`:
|
|
|
|
| | Brake | Throttle |
|
|
|-----------------------|--------|----------|
|
|
| input deadzone | 3 % | 3 % |
|
|
| front slip threshold | 0.25 | 0.35 |
|
|
| rear slip threshold | 0.25 | 0.25 |
|
|
| amplitude | 8 | 7 |
|
|
| min frequency | 3 Hz | 6 Hz |
|
|
| max frequency | 85 Hz | 96 Hz |
|
|
|
|
## Transport
|
|
|
|
`dualsense-controller` (yesbotics/dualsense-controller-python) handles
|
|
the HID transport, BT/USB framing including BT CRC32, and on-demand
|
|
output writes (state-changed-since-last-input-tick gate). Hot-plug
|
|
recovery routes through the library's `on_error` callback, which sets a
|
|
flag the main loop polls.
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import math
|
|
import os
|
|
import signal
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
from dualsense_controller import DualSenseController
|
|
from fdp import ForzaDataPacket
|
|
|
|
|
|
LOG = logging.getLogger("forza-trigger")
|
|
|
|
|
|
# --- Tuning constants (Race-Element DsiConfiguration defaults) ---------------
|
|
# Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale.
|
|
BRAKE_INPUT_THRESHOLD = 0.03
|
|
BRAKE_FRONT_SLIP_THRESHOLD = 0.25
|
|
BRAKE_REAR_SLIP_THRESHOLD = 0.25
|
|
BRAKE_AMPLITUDE = 8
|
|
BRAKE_MIN_FREQUENCY = 3
|
|
BRAKE_MAX_FREQUENCY = 85
|
|
|
|
THROTTLE_INPUT_THRESHOLD = 0.03
|
|
THROTTLE_FRONT_SLIP_THRESHOLD = 0.35
|
|
THROTTLE_REAR_SLIP_THRESHOLD = 0.25
|
|
THROTTLE_AMPLITUDE = 7
|
|
THROTTLE_MIN_FREQUENCY = 6
|
|
THROTTLE_MAX_FREQUENCY = 96
|
|
|
|
# Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals
|
|
# (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes
|
|
# `pct` scale to [0, 1] without an explicit cap downstream:
|
|
# brake: front_ceil(10) + rear_ceil(7.5) = 17.5
|
|
# throttle: front_ceil(5) + rear_ceil(7.5) = 12.5
|
|
BRAKE_PCT_DIVISOR = 17.5
|
|
THROTTLE_PCT_DIVISOR = 12.5
|
|
|
|
# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------
|
|
PACKET_FORMATS = {
|
|
232: "sled",
|
|
311: "dash",
|
|
324: "fh4", # FH4 and FH5 share the same layout
|
|
}
|
|
|
|
# --- Daemon lifecycle constants ----------------------------------------------
|
|
IDLE_TIMEOUT_S = 3.0
|
|
RECONNECT_BACKOFF_S = 1.0
|
|
|
|
|
|
# --- Module state (signal + hot-plug flags) ----------------------------------
|
|
_shutdown = False
|
|
_disconnected = False
|
|
|
|
|
|
def _on_termination(signum: int, frame: object) -> None:
|
|
"""SIGTERM/SIGINT handler — sets the main-loop shutdown flag.
|
|
|
|
systemd sends SIGTERM by default; without an explicit handler Python
|
|
ignores it and systemd waits TimeoutStopSec=90s before SIGKILL.
|
|
"""
|
|
global _shutdown
|
|
_shutdown = True
|
|
|
|
|
|
def _on_controller_error(prev: object, exc: Exception) -> None:
|
|
"""dualsense-controller `on_error` callback for HID read-thread failures."""
|
|
global _disconnected
|
|
LOG.warning("dualsense exception: %s", exc)
|
|
_disconnected = True
|
|
|
|
|
|
# --- Helpers -----------------------------------------------------------------
|
|
|
|
|
|
def _safe_abs(pkt: ForzaDataPacket, name: str) -> float:
|
|
"""Read a packet field defensively. Returns 0.0 for missing fields,
|
|
NaN, or +/-inf; otherwise returns the absolute value of the float."""
|
|
try:
|
|
v = float(getattr(pkt, name, 0.0))
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
if not math.isfinite(v):
|
|
return 0.0
|
|
return abs(v)
|
|
|
|
|
|
def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket:
|
|
"""Create or inherit the UDP listener socket.
|
|
|
|
Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours)
|
|
fd 3 is the pre-bound socket. Otherwise bind normally.
|
|
"""
|
|
listen_pid = os.environ.get("LISTEN_PID", "")
|
|
listen_fds = os.environ.get("LISTEN_FDS", "0")
|
|
if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1:
|
|
sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(timeout)
|
|
LOG.info("using systemd-pre-bound socket on %s:%d", host, port)
|
|
return sock
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind((host, port))
|
|
sock.settimeout(timeout)
|
|
return sock
|
|
|
|
|
|
def _parse_packet(data: bytes) -> ForzaDataPacket | None:
|
|
fmt = PACKET_FORMATS.get(len(data))
|
|
if fmt is None:
|
|
LOG.debug("ignoring packet of unexpected length %d", len(data))
|
|
return None
|
|
try:
|
|
return ForzaDataPacket(data, packet_format=fmt)
|
|
except Exception:
|
|
LOG.exception("failed to parse forza packet (len=%d)", len(data))
|
|
return None
|
|
|
|
|
|
def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]:
|
|
"""(front_slip, rear_slip) — dominant tire of each axle.
|
|
|
|
Race-Element uses `Math.Max` over each axle's two tires to surface the
|
|
worst-slipping wheel, since slip on either side counts. fdp emits
|
|
`tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf
|
|
and takes the absolute value (Race-Element calls this NegateIfNegative).
|
|
"""
|
|
fl = _safe_abs(pkt, "tire_combined_slip_FL")
|
|
fr = _safe_abs(pkt, "tire_combined_slip_FR")
|
|
rl = _safe_abs(pkt, "tire_combined_slip_RL")
|
|
rr = _safe_abs(pkt, "tire_combined_slip_RR")
|
|
return max(fl, fr), max(rl, rr)
|
|
|
|
|
|
# --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) -----------------
|
|
|
|
|
|
def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
|
|
"""Brake (L2). Mirrors `TriggerHaptics.HandleBraking`.
|
|
|
|
Note: Race-Element's brake path leaves the trigger in its prior effect
|
|
when brake is engaged but slip is below threshold — i.e. you can hold
|
|
the brake without slip after an ABS event and the trigger keeps
|
|
vibrating until you release the brake. This matches the upstream code
|
|
exactly (no else-branch around the slip check).
|
|
"""
|
|
brake_input = _safe_abs(pkt, "brake") / 255.0
|
|
if brake_input <= BRAKE_INPUT_THRESHOLD:
|
|
controller.left_trigger.effect.off()
|
|
return
|
|
|
|
front_slip, rear_slip = _axle_slip(pkt)
|
|
if (
|
|
front_slip <= BRAKE_FRONT_SLIP_THRESHOLD
|
|
and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD
|
|
):
|
|
return # Race-Element falls through here: trigger keeps prior effect.
|
|
|
|
front_coef = min(front_slip * 4, 10)
|
|
# RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip
|
|
# (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps
|
|
# the bug; if you'd rather use rear_slip * 2, change one symbol.
|
|
rear_coef = min(front_slip * 2, 7.5)
|
|
pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR
|
|
freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct))
|
|
|
|
controller.left_trigger.effect.vibration(
|
|
start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq
|
|
)
|
|
|
|
|
|
def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None:
|
|
"""Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`."""
|
|
throttle_input = _safe_abs(pkt, "accel") / 255.0
|
|
if throttle_input <= THROTTLE_INPUT_THRESHOLD:
|
|
controller.right_trigger.effect.off()
|
|
return
|
|
|
|
front_slip, rear_slip = _axle_slip(pkt)
|
|
if (
|
|
front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD
|
|
and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD
|
|
):
|
|
# Throttle path resets to default explicitly when not slipping
|
|
# (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`).
|
|
controller.right_trigger.effect.off()
|
|
return
|
|
|
|
front_coef = min(front_slip * 3, 5)
|
|
# RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip
|
|
# (TriggerHaptics.cs line 92, `slipRatioFront * 5f`).
|
|
rear_coef = min(front_slip * 5, 7.5)
|
|
pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR
|
|
freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct))
|
|
|
|
controller.right_trigger.effect.vibration(
|
|
start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq
|
|
)
|
|
|
|
|
|
def reset_triggers(controller: DualSenseController) -> None:
|
|
"""Default both triggers — called on idle, reconnect, and shutdown."""
|
|
controller.left_trigger.effect.off()
|
|
controller.right_trigger.effect.off()
|
|
|
|
|
|
# --- Connection / hot-plug ---------------------------------------------------
|
|
|
|
|
|
def _connect_controller() -> DualSenseController | None:
|
|
"""Block until a DualSense is reachable. Returns the activated
|
|
controller, or None if shutdown was requested before one appeared.
|
|
"""
|
|
LOG.info("opening dualsense controller")
|
|
first_failure_logged = False
|
|
while not _shutdown:
|
|
try:
|
|
devices = DualSenseController.enumerate_devices()
|
|
except Exception as e:
|
|
if not first_failure_logged:
|
|
LOG.warning("hidapi enumeration failed: %s", e)
|
|
first_failure_logged = True
|
|
time.sleep(RECONNECT_BACKOFF_S)
|
|
continue
|
|
|
|
if not devices:
|
|
if not first_failure_logged:
|
|
LOG.warning(
|
|
"no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S
|
|
)
|
|
first_failure_logged = True
|
|
time.sleep(RECONNECT_BACKOFF_S)
|
|
continue
|
|
|
|
try:
|
|
controller = DualSenseController(device_index_or_device_info=devices[0])
|
|
controller.on_error(_on_controller_error)
|
|
controller.activate()
|
|
except Exception as e:
|
|
if not first_failure_logged:
|
|
LOG.warning("controller activation failed: %s", e)
|
|
first_failure_logged = True
|
|
time.sleep(RECONNECT_BACKOFF_S)
|
|
continue
|
|
|
|
# Push a clean initial state so we don't inherit residual effects from
|
|
# a previous Steam Input session, prior daemon instance, or stale
|
|
# firmware state.
|
|
reset_triggers(controller)
|
|
global _disconnected
|
|
_disconnected = False
|
|
LOG.info("dualsense controller connected (%s)", controller.connection_type)
|
|
return controller
|
|
|
|
return None
|
|
|
|
|
|
def _close_controller(controller: DualSenseController | None) -> None:
|
|
if controller is None:
|
|
return
|
|
try:
|
|
controller.deactivate()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# --- Main loop ---------------------------------------------------------------
|
|
|
|
|
|
def run(host: str, port: int, exit_on_idle: bool = False) -> int:
|
|
signal.signal(signal.SIGTERM, _on_termination)
|
|
signal.signal(signal.SIGINT, _on_termination)
|
|
|
|
# Bind the UDP socket BEFORE opening the controller. If the bind fails
|
|
# (port in use, systemd-passed fd unusable, etc.) we exit cleanly without
|
|
# having activated the controller's HID stack (which would otherwise leak
|
|
# an active session). _get_socket raises on bind failure.
|
|
LOG.info("listening for forza udp on %s:%d", host, port)
|
|
sock = _get_socket(host, port)
|
|
|
|
controller = _connect_controller()
|
|
if controller is None:
|
|
sock.close()
|
|
return 0
|
|
|
|
last_seen = 0.0
|
|
have_telemetry = False
|
|
|
|
try:
|
|
while not _shutdown:
|
|
if _disconnected:
|
|
LOG.warning("dualsense disconnected; reconnecting")
|
|
_close_controller(controller)
|
|
controller = _connect_controller()
|
|
if controller is None:
|
|
return 0
|
|
|
|
now = time.monotonic()
|
|
try:
|
|
data, _ = sock.recvfrom(2048)
|
|
last_seen = now
|
|
have_telemetry = True
|
|
except socket.timeout:
|
|
if _shutdown:
|
|
break
|
|
if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S:
|
|
LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S)
|
|
reset_triggers(controller)
|
|
have_telemetry = False
|
|
if exit_on_idle:
|
|
LOG.info("exiting on idle")
|
|
break
|
|
continue
|
|
|
|
pkt = _parse_packet(data)
|
|
if pkt is None:
|
|
continue
|
|
|
|
handle_acceleration(controller, pkt)
|
|
handle_braking(controller, pkt)
|
|
finally:
|
|
try:
|
|
reset_triggers(controller)
|
|
except Exception:
|
|
pass
|
|
_close_controller(controller)
|
|
try:
|
|
sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="forza-trigger",
|
|
description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.",
|
|
)
|
|
parser.add_argument("--host", default="127.0.0.1", help="UDP bind address")
|
|
parser.add_argument("--port", type=int, default=5300, help="UDP bind port")
|
|
parser.add_argument(
|
|
"--debug", action="store_true", help="log per-packet decisions at DEBUG"
|
|
)
|
|
parser.add_argument(
|
|
"--exit-on-idle",
|
|
action="store_true",
|
|
help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO")
|
|
logging.basicConfig(
|
|
level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s"
|
|
)
|
|
return run(args.host, args.port, args.exit_on_idle)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|