"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. This is a one-to-one behavioural port of Race-Element's DualSense haptic overlay (RiddleTime/Race-Element, GPL-3.0). Reference files: * Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/ TriggerHaptics.cs — slip detection + frequency mapping DsiConfiguration.cs — tuning defaults DsiJob.cs — per-frame dispatch ## What this daemon does - Listens for Forza Horizon "Data Out" UDP telemetry. - On each packet: dispatches `handle_braking` (L2) and `handle_acceleration` (R2) — same two-function structure as Race-Element's `DsiJob.RunAction`. - Each handler reads the relevant input pedal and the four tire slip ratios; if both pedal and slip exceed their thresholds, computes a frequency from slip severity and emits a Vibration trigger effect. Otherwise resets the trigger to no-resistance. ## What this daemon explicitly does NOT do Compared to the previous implementation, every haptic channel beyond the two trigger Vibration effects is gone: - No body LRA rumble (left/right motors). User reported the previous multi-channel body rumble as "shakes my whole hand"; Race-Element deliberately keeps the controller body silent so the trigger fingers carry all information. - No lightbar effects. Race-Element's DSI overlay leaves the lightbar untouched (it's at the controller's default). - No EWMA smoothing. Effects track slip frame-to-frame. - No event-impulse system (no gear-shift Bow, no collision burst). - No Machine mode for ABS. Race-Element uses the same Vibration encoder for everything. - No SlopeFeedback / cornering Feedback strength. The trigger has zero resistance when not slipping. - No surface texture, engine RPM rumble, lateral-G bias, or kerb floor amplitude. ## Faithful reproduction of upstream Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30). The slip-coefficient formulas in `_brake_frequency_pct` and `_throttle_frequency_pct` are byte-faithful ports of Race-Element's upstream code, including a copy-paste bug in their throttle and brake paths where the "rear slip coefficient" multiplies `front_slip` instead of `rear_slip`. The bug is preserved for behavioural parity; see the inline comments tagged "RACE-ELEMENT BUG". ## Tuning constants Race-Element exposes these as runtime config sliders. We bake them in as module constants matching the upstream defaults from `DsiConfiguration`: | | Brake | Throttle | |-----------------------|--------|----------| | input deadzone | 3 % | 3 % | | front slip threshold | 0.25 | 0.35 | | rear slip threshold | 0.25 | 0.25 | | amplitude | 8 | 7 | | min frequency | 3 Hz | 6 Hz | | max frequency | 85 Hz | 96 Hz | ## Transport `dualsense-controller` (yesbotics/dualsense-controller-python) handles the HID transport, BT/USB framing including BT CRC32, and on-demand output writes (state-changed-since-last-input-tick gate). Hot-plug recovery routes through the library's `on_error` callback, which sets a flag the main loop polls. """ import argparse import logging import math import os import signal import socket import sys import time from dualsense_controller import DualSenseController from fdp import ForzaDataPacket LOG = logging.getLogger("forza-trigger") # --- Tuning constants (Race-Element DsiConfiguration defaults) --------------- # Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale. BRAKE_INPUT_THRESHOLD = 0.03 BRAKE_FRONT_SLIP_THRESHOLD = 0.25 BRAKE_REAR_SLIP_THRESHOLD = 0.25 BRAKE_AMPLITUDE = 8 BRAKE_MIN_FREQUENCY = 3 BRAKE_MAX_FREQUENCY = 85 THROTTLE_INPUT_THRESHOLD = 0.03 THROTTLE_FRONT_SLIP_THRESHOLD = 0.35 THROTTLE_REAR_SLIP_THRESHOLD = 0.25 THROTTLE_AMPLITUDE = 7 THROTTLE_MIN_FREQUENCY = 6 THROTTLE_MAX_FREQUENCY = 96 # Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals # (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes # `pct` scale to [0, 1] without an explicit cap downstream: # brake: front_ceil(10) + rear_ceil(7.5) = 17.5 # throttle: front_ceil(5) + rear_ceil(7.5) = 12.5 BRAKE_PCT_DIVISOR = 17.5 THROTTLE_PCT_DIVISOR = 12.5 # --- Forza UDP packet sizes -> fdp packet_format strings --------------------- PACKET_FORMATS = { 232: "sled", 311: "dash", 324: "fh4", # FH4 and FH5 share the same layout } # --- Daemon lifecycle constants ---------------------------------------------- IDLE_TIMEOUT_S = 3.0 RECONNECT_BACKOFF_S = 1.0 # --- Module state (signal + hot-plug flags) ---------------------------------- _shutdown = False _disconnected = False def _on_termination(signum: int, frame: object) -> None: """SIGTERM/SIGINT handler — sets the main-loop shutdown flag. systemd sends SIGTERM by default; without an explicit handler Python ignores it and systemd waits TimeoutStopSec=90s before SIGKILL. """ global _shutdown _shutdown = True def _on_controller_error(prev: object, exc: Exception) -> None: """dualsense-controller `on_error` callback for HID read-thread failures.""" global _disconnected LOG.warning("dualsense exception: %s", exc) _disconnected = True # --- Helpers ----------------------------------------------------------------- def _safe_abs(pkt: ForzaDataPacket, name: str) -> float: """Read a packet field defensively. Returns 0.0 for missing fields, NaN, or +/-inf; otherwise returns the absolute value of the float.""" try: v = float(getattr(pkt, name, 0.0)) except (TypeError, ValueError): return 0.0 if not math.isfinite(v): return 0.0 return abs(v) def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: """Create or inherit the UDP listener socket. Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours) fd 3 is the pre-bound socket. Otherwise bind normally. """ listen_pid = os.environ.get("LISTEN_PID", "") listen_fds = os.environ.get("LISTEN_FDS", "0") if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1: sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) LOG.info("using systemd-pre-bound socket on %s:%d", host, port) return sock sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.settimeout(timeout) return sock def _parse_packet(data: bytes) -> ForzaDataPacket | None: fmt = PACKET_FORMATS.get(len(data)) if fmt is None: LOG.debug("ignoring packet of unexpected length %d", len(data)) return None try: return ForzaDataPacket(data, packet_format=fmt) except Exception: LOG.exception("failed to parse forza packet (len=%d)", len(data)) return None def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]: """(front_slip, rear_slip) — dominant tire of each axle. Race-Element uses `Math.Max` over each axle's two tires to surface the worst-slipping wheel, since slip on either side counts. fdp emits `tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf and takes the absolute value (Race-Element calls this NegateIfNegative). """ fl = _safe_abs(pkt, "tire_combined_slip_FL") fr = _safe_abs(pkt, "tire_combined_slip_FR") rl = _safe_abs(pkt, "tire_combined_slip_RL") rr = _safe_abs(pkt, "tire_combined_slip_RR") return max(fl, fr), max(rl, rr) # --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) ----------------- def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None: """Brake (L2). Mirrors `TriggerHaptics.HandleBraking`. Note: Race-Element's brake path leaves the trigger in its prior effect when brake is engaged but slip is below threshold — i.e. you can hold the brake without slip after an ABS event and the trigger keeps vibrating until you release the brake. This matches the upstream code exactly (no else-branch around the slip check). """ brake_input = _safe_abs(pkt, "brake") / 255.0 if brake_input <= BRAKE_INPUT_THRESHOLD: controller.left_trigger.effect.off() return front_slip, rear_slip = _axle_slip(pkt) if ( front_slip <= BRAKE_FRONT_SLIP_THRESHOLD and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD ): return # Race-Element falls through here: trigger keeps prior effect. front_coef = min(front_slip * 4, 10) # RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip # (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps # the bug; if you'd rather use rear_slip * 2, change one symbol. rear_coef = min(front_slip * 2, 7.5) pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct)) controller.left_trigger.effect.vibration( start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq ) def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None: """Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`.""" throttle_input = _safe_abs(pkt, "accel") / 255.0 if throttle_input <= THROTTLE_INPUT_THRESHOLD: controller.right_trigger.effect.off() return front_slip, rear_slip = _axle_slip(pkt) if ( front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD ): # Throttle path resets to default explicitly when not slipping # (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`). controller.right_trigger.effect.off() return front_coef = min(front_slip * 3, 5) # RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip # (TriggerHaptics.cs line 92, `slipRatioFront * 5f`). rear_coef = min(front_slip * 5, 7.5) pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct)) controller.right_trigger.effect.vibration( start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq ) def reset_triggers(controller: DualSenseController) -> None: """Default both triggers — called on idle, reconnect, and shutdown.""" controller.left_trigger.effect.off() controller.right_trigger.effect.off() # --- Connection / hot-plug --------------------------------------------------- def _connect_controller() -> DualSenseController | None: """Block until a DualSense is reachable. Returns the activated controller, or None if shutdown was requested before one appeared. """ LOG.info("opening dualsense controller") first_failure_logged = False while not _shutdown: try: devices = DualSenseController.enumerate_devices() except Exception as e: if not first_failure_logged: LOG.warning("hidapi enumeration failed: %s", e) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) continue if not devices: if not first_failure_logged: LOG.warning( "no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S ) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) continue try: controller = DualSenseController(device_index_or_device_info=devices[0]) controller.on_error(_on_controller_error) controller.activate() except Exception as e: if not first_failure_logged: LOG.warning("controller activation failed: %s", e) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) continue # Push a clean initial state so we don't inherit residual effects from # a previous Steam Input session, prior daemon instance, or stale # firmware state. reset_triggers(controller) global _disconnected _disconnected = False LOG.info("dualsense controller connected (%s)", controller.connection_type) return controller return None def _close_controller(controller: DualSenseController | None) -> None: if controller is None: return try: controller.deactivate() except Exception: pass # --- Main loop --------------------------------------------------------------- def run(host: str, port: int, exit_on_idle: bool = False) -> int: signal.signal(signal.SIGTERM, _on_termination) signal.signal(signal.SIGINT, _on_termination) # Bind the UDP socket BEFORE opening the controller. If the bind fails # (port in use, systemd-passed fd unusable, etc.) we exit cleanly without # having activated the controller's HID stack (which would otherwise leak # an active session). _get_socket raises on bind failure. LOG.info("listening for forza udp on %s:%d", host, port) sock = _get_socket(host, port) controller = _connect_controller() if controller is None: sock.close() return 0 last_seen = 0.0 have_telemetry = False try: while not _shutdown: if _disconnected: LOG.warning("dualsense disconnected; reconnecting") _close_controller(controller) controller = _connect_controller() if controller is None: return 0 now = time.monotonic() try: data, _ = sock.recvfrom(2048) last_seen = now have_telemetry = True except socket.timeout: if _shutdown: break if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S: LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S) reset_triggers(controller) have_telemetry = False if exit_on_idle: LOG.info("exiting on idle") break continue pkt = _parse_packet(data) if pkt is None: continue handle_acceleration(controller, pkt) handle_braking(controller, pkt) finally: try: reset_triggers(controller) except Exception: pass _close_controller(controller) try: sock.close() except Exception: pass return 0 def main() -> int: parser = argparse.ArgumentParser( prog="forza-trigger", description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.", ) parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") parser.add_argument("--port", type=int, default=5300, help="UDP bind port") parser.add_argument( "--debug", action="store_true", help="log per-packet decisions at DEBUG" ) parser.add_argument( "--exit-on-idle", action="store_true", help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)", ) args = parser.parse_args() level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") logging.basicConfig( level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s" ) return run(args.host, args.port, args.exit_on_idle) if __name__ == "__main__": sys.exit(main())