"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline. Every numeric value, every threshold, every map() / EWMA() coefficient, and every output byte sequence has been verified against published sources or against decompiled DSX 1.4.9 itself. Sources, in priority order: 1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31) decompiled with ILSpy. The decompilation revealed: a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as its trigger-effect encoder. b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's high-level CustomTriggerValueMode names to mode bytes: VibrateResistance -> 6 (Simple_Vibration / 0x06) VibrateResistanceA / AB -> 38 (Vibration / 0x26) VibrateResistanceB -> 6 When the dispatcher hits the `else` branch in DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than 9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into the trigger param region — no bit-packing, no scale conversion. This is why RacingDSX's 0-255-scale stiffness values ARE the actual amplitude bytes that reach the controller's firmware. 2. Nielk1's reverse-engineering gist Rev 6 (MIT, https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db). Source for the canonical Sony bit-packed Feedback (0x21) encoder used for the non-slip path. Identical to the implementation shipped inside DSX 1.4.9. 3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for Forza Horizon 4 / 5 since 2022. Specifically: Config/ThrottleSettings.cs, Config/BrakeSettings.cs, GameParsers/Parser.cs. The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is provided by pydualsense (PyPI, MIT). We drive its low-level `triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because pydualsense's high-level setMode/setForce API does not understand any specific mode's parameter encoding — it just shovels bytes into the output report at fixed offsets. That is exactly what we want. ## Documented intentional divergences from RacingDSX 1. Motion gate (`_is_in_motion()`): slip detection is gated on speed or wheel rotation. RacingDSX has no gate, so locked stationary wheels (e.g. after a hard stop with brake held) keep the slip path active forever and the trigger stuck in vibration mode. The gate fixes the user-reported bug. 2. Clamp-to-8 in `_apply_feedback()`: DSX's `TriggerEffectGenerator.Resistance` silently skips when force > 8, leaving the trigger stuck in whatever mode it was in (Simple_Vibration during slip\u2192non-slip transitions). We clamp to 8 instead so the transition produces a smooth Feedback ramp. 3. Car performance index width: fdp parses `car_performance_index` as Int32 (the field's actual width per Forza's spec), while RacingDSX's `FMData.cs` reads only `GetUInt8(bytes, 220)` \u2014 the low byte. For any car with CPI > 255 (B/A/ S1/S2/X) the two implementations disagree on the pre-race lightbar's CPI tint. We use the correct value; RacingDSX's lightbar is dimmer/inconsistent on those cars. Mask `cpi & 0xFF` in `apply_lightbar_pre_race` to match RacingDSX byte-for-byte if you want bug-faithful Windows-equivalent dimming. ## Threading note pydualsense's `sendReport` background thread reads `triggerR/L.mode` and `forces[0..6]` independently \u2014 there's no atomic publish primitive. Our `_apply_*` helpers write `forces[]` first and `mode` last; the BG thread reads `mode` first, so this ordering keeps the worst-case torn frame to one ~4 ms HID write at slip\u2194non-slip mode transitions. Audible as a brief click on transitions, not stuck state. Without lock/atomic primitives in pydualsense's API this is the cleanest mitigation available. ## System interaction notes **Single-controller assumption.** pydualsense's `__find_device` enumerates all DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)` without serial/path \u2014 `hid_open` returns the first match, which is not necessarily the one selected. With multiple DualSense controllers the picked controller is non-deterministic. pydualsense's source explicitly notes `# TODO: implement multiple controllers working`. RacingDSX/DSX are also single-controller (DSX's `connectedController` is a singleton). Forza Horizon is single-player so this is fine in practice; if multi-controller selection matters, monkey-patch `__find_device` to filter by `serial_number`. **Steam Input.** When Steam Input's PlayStation Configuration Support is enabled for the game, Steam intercepts hidraw input AND writes its own HID output reports (rumble, lightbar, sometimes triggers). Our daemon writes competing output reports at ~1 kHz; the controller observes whichever wrote last. Effect: trigger oscillates and feels broken. The Nix module's README in `default.nix` instructs users to disable PlayStation Configuration Support for Forza in Steam (Settings \u2192 Controller). **dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single- shot writes from `dualsensectl trigger left feedback ...` get overwritten by our BG thread's next iteration ~4 ms later. Use it only when the daemon is stopped (`systemctl --user stop forza-trigger`). **Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls `ds.report_thread.is_alive()` and reconnects in-process via `_connect_controller()`, which retries `pydualsense.init()` every `RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not depend on systemd or any other supervisor for plug-event recovery; running it directly from a shell handles unplug/replug exactly the same way. """ from __future__ import annotations import argparse import logging import math import os import socket import sys import time from fdp import ForzaDataPacket from pydualsense import TriggerModes, pydualsense LOG = logging.getLogger("forza-trigger") # --- Mode bytes --------------------------------------------------------------- # pydualsense's IntFlag aliases happen to cover the modes we need: # TriggerModes.Off = 0x00 (between-race idle; pydualsense's name) # TriggerModes(0x05) = 0x05 (canonical Sony Off / Reset; mid-race zero-force) # TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun) # TriggerModes.Rigid_A = 0x21 (Feedback, canonical) DS_MODE_NORMAL = TriggerModes.Off DS_MODE_OFF = TriggerModes(0x05) DS_MODE_SIMPLE_VIBRATION = TriggerModes.Pulse_B DS_MODE_FEEDBACK = TriggerModes.Rigid_A # --- RacingDSX defaults (Config/ThrottleSettings.cs) -------------------------- THROTTLE_GRIP_LOSS = 0.6 THROTTLE_REAR_SLIP_ACCEL_MIN = 200 THROTTLE_VIB_POSITION = 5 # VibrationModeStart THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5 THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0 THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0 THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit THROTTLE_ACCELERATION_LIMIT = 10 THROTTLE_TURN_ACCEL_SCALE = 0.25 THROTTLE_FORWARD_ACCEL_SCALE = 1.0 THROTTLE_RESISTANCE_SMOOTHING = 0.9 THROTTLE_VIBRATION_SMOOTHING = 1.0 THROTTLE_EFFECT_INTENSITY = 1.0 THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance # --- RacingDSX defaults (Config/BrakeSettings.cs) ----------------------------- BRAKE_GRIP_LOSS = 0.05 BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100 BRAKE_VIB_POSITION = 0 # VibrationStart BRAKE_MIN_VIBRATION = 15 BRAKE_MAX_VIBRATION = 20 BRAKE_MIN_STIFFNESS = 150 BRAKE_MAX_STIFFNESS = 5 BRAKE_MIN_RESISTANCE = 0 BRAKE_MAX_RESISTANCE = 7 BRAKE_RESISTANCE_SMOOTHING = 0.4 BRAKE_VIBRATION_SMOOTHING = 1.0 BRAKE_EFFECT_INTENSITY = 1.0 BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance # --- Forza UDP packet sizes -> fdp packet_format strings ---------------------- PACKET_FORMATS = { 232: "sled", 311: "dash", 324: "fh4", # FH4 and FH5 share the same layout } # --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) -------- # CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM). # Parser.cs uses an `<=` cascade, so any value > 5 is treated as X. CAR_CLASS_COLORS = [ (107, 185, 236), # ColorClassD (234, 202, 49), # ColorClassC (211, 90, 37), # ColorClassB (187, 59, 34), # ColorClassA (128, 54, 243), # ColorClassS1 (75, 88, 229), # ColorClassS2 (105, 182, 72), # ColorClassX (no CPI tint) ] MAX_CPI = 255 # ForzaParser.MaxCPI RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff # --- Reset on idle (UDP timeout) --------------------------------------------- # Not present in RacingDSX; an additional safety so the controller doesn't get # stuck if Forza is killed mid-race or the network drops. IDLE_TIMEOUT_S = 3.0 # --- Hot-plug reconnect backoff ---------------------------------------------- # pydualsense's BG sendReport thread terminates silently on hidraw IOError # (controller unplugged, BT disconnect, USB resuspend). The main loop polls # the thread's liveness and reconnects in-process \u2014 the script is agnostic # of supervisors like systemd. The same backoff governs the initial-connect # wait when the daemon starts before any controller is plugged in. RECONNECT_BACKOFF_S = 1.0 # --- Stationary motion gate -------------------------------------------------- # Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked # wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for # this and end up with the brake (and sometimes throttle) trigger stuck in # Simple_Vibration mode forever, because the slip path keeps firing. We # additionally require either the car or any wheel to be in real motion before # treating slip as a haptic event. STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked def _is_in_motion(pkt: ForzaDataPacket) -> bool: """True iff the car is moving or any wheel is rotating meaningfully. Used to gate slip-detection: when both car and all four wheels read as stopped, any nonzero `tire_combined_slip` Forza emits is data noise from locked wheels and should not drive haptic vibration. """ if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS: return True for wheel in ("FL", "FR", "RL", "RR"): if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S: return True return False # --- Effect encoders ---------------------------------------------------------- def _apply_normal(trig) -> None: """`TriggerMode.Normal` in DSX's vocabulary \u2014 mode byte 0, all params 0. Mirrors `DualSense_USB_Updated.cs` `bytes2 = NormalTrigger = 0L` then `array[11..17,20] = bytes2[0..7]`. Used between races / on idle, matching RacingDSX's `GetPreRaceInstructions()`. """ # Write forces before mode so pydualsense's BG sendReport thread, which # reads mode then forces non-atomically (~250 Hz USB / ~1 kHz BT), is more # likely to observe a self-consistent (mode, forces) pair. See the # threading-hazard note in the module docstring. for i in range(7): trig.forces[i] = 0 trig.mode = DS_MODE_NORMAL def _apply_off(trig) -> None: """Canonical Sony Off / Reset \u2014 mode byte 0x05, all params 0. Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs, mode 5 actively returns the trigger stop to the neutral position; mode 0 just clears state. DSX uses Reset() as the fall-through for `Resistance(0,0)`, so we route mid-race zero-strength fallbacks here for byte-perfect parity. """ for i in range(7): trig.forces[i] = 0 trig.mode = DS_MODE_OFF def _apply_feedback(trig, position: int, strength: int) -> bool: """Sony Feedback (mode 0x21), bit-packed. Verbatim port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator .Resistance` from DSX 1.4.9 \u2014 with one deliberate divergence. DSX's TriggerEffectGenerator.Resistance returns `false` without writing when strength > 8, and RacingDSX's fall-through path routinely sends 5..255- range slip-mode stiffness values into Feedback, hitting that branch every transition out of slip. The result observed by the player: \"ABS feedback continues even when stationary\" \u2014 the trigger remains stuck in whatever mode (typically Simple_Vibration) was set before the failed Resistance call, sometimes indefinitely if Forza keeps reporting nonzero slip on locked wheels. We clamp out-of-range strength to 8 instead. The transition out of slip now produces a smooth Feedback ramp from full-stiffness down to the non-slip target as the EWMA decays, rather than freezing on stale Simple_Vibration bytes. The return value (kept for symmetry with DSX's bool-returning Resistance) is False on invalid position, True otherwise. """ if position > 9: return False if strength > 8: strength = 8 if strength <= 0: # Sony's algorithm: zero force -> Reset (canonical Off, mode 0x05). # DSX's TriggerEffectGenerator.Resistance falls through to Reset() # here, so we do the same for byte-perfect parity. _apply_off(trig) return True force_value = (strength - 1) & 0x07 force_zones = 0 active_zones = 0 for i in range(position, 10): force_zones |= force_value << (3 * i) active_zones |= 1 << i trig.forces[0] = active_zones & 0xFF trig.forces[1] = (active_zones >> 8) & 0xFF trig.forces[2] = force_zones & 0xFF trig.forces[3] = (force_zones >> 8) & 0xFF trig.forces[4] = (force_zones >> 16) & 0xFF trig.forces[5] = (force_zones >> 24) & 0xFF trig.forces[6] = 0 # frequency byte unused for Feedback trig.mode = DS_MODE_FEEDBACK return True def _apply_simple_vibration(trig, position: int, amplitude: int, frequency: int) -> None: """Legacy Simple_Vibration (mode 0x06), raw byte passthrough. Mirrors DSX's `else` branch in `DualSense_USB_Updated.cs::CustomTriggerValues`: array[11] = TriggerValue1 (= 6 for VibrateResistance) array[12] = TriggerValue2 (= frequency) array[13] = TriggerValue3 (= amplitude) array[14] = TriggerValue4 (= position) array[15..17,20] = 0 Per Nielk1, Simple_Vibration was Sony's pre-firmware-update vibration mode — same effect as canonical Vibration (0x26) on every shipped DualSense, but takes raw 0-255 amplitude bytes instead of the bit- packed 0-8 zone format. RacingDSX/DSX have used it since v1.0; the entire Forza-on-DualSense community ships these byte values. """ if amplitude <= 0 or frequency <= 0: _apply_off(trig) return trig.forces[0] = frequency & 0xFF trig.forces[1] = amplitude & 0xFF trig.forces[2] = position & 0xFF trig.forces[3] = 0 trig.forces[4] = 0 trig.forces[5] = 0 trig.forces[6] = 0 trig.mode = DS_MODE_SIMPLE_VIBRATION def reset_triggers(ds: pydualsense) -> None: _apply_normal(ds.triggerL) _apply_normal(ds.triggerR) # --- RacingDSX math primitives ------------------------------------------------ def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: """Mirrors Parser.Map() in RacingDSX, including endpoint clamping.""" if x > in_max: x = in_max elif x < in_min: x = in_min return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min def _ewma(value: float, last: float, alpha: float) -> float: """Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing.""" return alpha * value + (1.0 - alpha) * last def _ewma_int(value: int, last: int, alpha: float) -> int: """Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA.""" return math.floor(alpha * value + (1.0 - alpha) * last) def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: return float(getattr(pkt, name, default)) # --- Per-trigger persistent state for EWMA ------------------------------------ class _TriggerState: __slots__ = ("last_resistance", "last_freq") def __init__(self, init_resistance: int) -> None: # Mirrors RacingDSX's `int lastThrottleResistance` / `int lastBrakeResistance`. self.last_resistance: int = int(init_resistance) self.last_freq: int = 0 # --- Forza game-level persistent state (ForzaParser.cs fields) ---------------- class _ForzaState: """Persistent across packets. Mirrors ForzaParser's instance fields: LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI.""" __slots__ = ( "last_engine_rpm", "rpm_accumulator", "last_valid_car_class", "last_valid_car_cpi", ) def __init__(self) -> None: self.last_engine_rpm = 0.0 self.rpm_accumulator = 0 self.last_valid_car_class = 0 self.last_valid_car_cpi = 0 def _clamp_byte(v: float) -> int: """Clamp to [0, 255] before writing to a uint8 RGB channel.""" return max(0, min(255, int(v))) def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool: """Mirrors `ForzaParser.IsRaceOn()` verbatim. FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after the player exits a race or pauses. ForzaParser detects the off state by watching for unchanged engine RPM combined with non-positive Power across `RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only, so for sled-format packets it reads as 0; that matches RacingDSX exactly. """ in_race = bool(int(getattr(pkt, "is_race_on", 0))) current_rpm = _safe(pkt, "current_engine_rpm") power = _safe(pkt, "power") if current_rpm == state.last_engine_rpm and power <= 0: state.rpm_accumulator += 1 if state.rpm_accumulator > RACE_OFF_RPM_FRAMES: in_race = False else: state.rpm_accumulator = 0 state.last_engine_rpm = current_rpm return in_race # --- Lightbar (touchpad LED ring) --------------------------------------------- def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None: """Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic. Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`. X-class cars use the fixed ColorClassX without a CPI tint. Car class and CPI fields can briefly read 0 during loading screens, so we cache the last valid value seen \u2014 also matching ForzaParser's behavior.""" car_class = int(_safe(pkt, "car_class")) if car_class > 0: state.last_valid_car_class = car_class car_class = state.last_valid_car_class cpi = int(_safe(pkt, "car_performance_index")) if cpi > 0: state.last_valid_car_cpi = min(cpi, 255) cpi = state.last_valid_car_cpi cpi_ratio = cpi / MAX_CPI if car_class <= 5: cr, cg, cb = CAR_CLASS_COLORS[car_class] r = math.floor(cpi_ratio * cr) g = math.floor(cpi_ratio * cg) b = math.floor(cpi_ratio * cb) else: r, g, b = CAR_CLASS_COLORS[6] ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None: """Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic. Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and green falls linearly with rpm_ratio, with green floored at 50. At or above redline the lightbar goes pure red (255, 0, 0).""" max_rpm = _safe(pkt, "engine_max_rpm") idle_rpm = _safe(pkt, "engine_idle_rpm") current_rpm = _safe(pkt, "current_engine_rpm") engine_range = max_rpm - idle_rpm if engine_range <= 0: rpm_ratio = 0.0 else: rpm_ratio = (current_rpm - idle_rpm) / engine_range if rpm_ratio >= RPM_REDLINE_RATIO: r, g, b = 255, 0, 0 else: r = math.floor(rpm_ratio * 255) g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR) b = 0 ds.light.setColorI(_clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) # --- Throttle (right trigger) ------------------------------------------------- def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: """Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line.""" accel_x = _safe(pkt, "acceleration_x") accel_z = _safe(pkt, "acceleration_z") avg_accel = math.sqrt( THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x) + THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z) ) fl = abs(_safe(pkt, "tire_combined_slip_FL")) fr = abs(_safe(pkt, "tire_combined_slip_FR")) rl = abs(_safe(pkt, "tire_combined_slip_RL")) rr = abs(_safe(pkt, "tire_combined_slip_RR")) front_slip = (fl + fr) * 0.5 rear_slip = (rl + rr) * 0.5 four_wheel_slip = (fl + fr + rl + rr) * 0.25 accelerator = int(_safe(pkt, "accel")) losing_grip = ( front_slip > THROTTLE_GRIP_LOSS or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN) ) and _is_in_motion(pkt) if losing_grip: # Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs). target_freq = math.floor( _map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION) ) target_resistance = math.floor( _map( avg_accel, 0.0, THROTTLE_ACCELERATION_LIMIT, THROTTLE_MIN_STIFFNESS, THROTTLE_MAX_STIFFNESS, ) ) # Floor after EWMA (matches `(int)EWMA(int, int, float)` overload). freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING) resistance = _ewma_int( target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING ) st.last_freq = freq st.last_resistance = resistance if freq <= THROTTLE_MIN_VIBRATION or accelerator <= THROTTLE_VIB_POSITION: # RacingDSX throttle fall-through: sends `Resistance(0, filteredResistance)` # where filteredResistance is in slip-mode range (175..255). DSX's # TriggerEffectGenerator.Resistance returns false for force > 8 without # writing, leaving the trigger stuck in whatever mode (typically # Simple_Vibration) was set previously. Our `_apply_feedback` clamps # strength to 8 instead, producing a smooth Feedback ramp \u2014 a # documented divergence that fixes the user-visible \"vibration # continues briefly after slip ends\" symptom. _apply_feedback( ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY), ) else: _apply_simple_vibration( ds.triggerR, THROTTLE_VIB_POSITION, int(resistance * THROTTLE_EFFECT_INTENSITY), int(freq * THROTTLE_EFFECT_INTENSITY), ) return target_resistance = math.floor( _map( avg_accel, 0.0, THROTTLE_ACCELERATION_LIMIT, THROTTLE_MIN_RESISTANCE, THROTTLE_MAX_RESISTANCE, ) ) resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING) st.last_resistance = resistance _apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY)) # --- Brake (left trigger) ----------------------------------------------------- def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: """Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line.""" fl = abs(_safe(pkt, "tire_combined_slip_FL")) fr = abs(_safe(pkt, "tire_combined_slip_FR")) rl = abs(_safe(pkt, "tire_combined_slip_RL")) rr = abs(_safe(pkt, "tire_combined_slip_RR")) four_wheel_slip = (fl + fr + rl + rr) * 0.25 brake = int(_safe(pkt, "brake")) losing_grip = ( four_wheel_slip > BRAKE_GRIP_LOSS and brake > BRAKE_DEADZONE and _is_in_motion(pkt) ) if losing_grip: target_freq = math.floor( _map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION) ) target_resistance = math.floor( _map( brake, 0, 255, BRAKE_MAX_STIFFNESS, BRAKE_MIN_STIFFNESS, ) ) freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING) resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) st.last_freq = freq st.last_resistance = resistance if freq <= BRAKE_MIN_VIBRATION: # RacingDSX brake fall-through (Parser.cs:128) sends Resistance(0, 0) # explicitly \u2014 strength=0 routes to canonical Off (mode 0x05). # Subtle slip while braking should leave the trigger neutral. _apply_feedback(ds.triggerL, 0, 0) else: _apply_simple_vibration( ds.triggerL, BRAKE_VIB_POSITION, int(resistance * BRAKE_EFFECT_INTENSITY), int(freq * BRAKE_EFFECT_INTENSITY), ) return target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE)) resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) st.last_resistance = resistance _apply_feedback(ds.triggerL, 0, int(resistance * BRAKE_EFFECT_INTENSITY)) # --- UDP main loop ------------------------------------------------------------ def parse_packet(data: bytes) -> ForzaDataPacket | None: fmt = PACKET_FORMATS.get(len(data)) if fmt is None: LOG.debug("ignoring packet of unexpected length %d", len(data)) return None try: return ForzaDataPacket(data, packet_format=fmt) except Exception: LOG.exception("failed to parse forza packet (len=%d)", len(data)) return None def _close_controller(ds: pydualsense | None) -> None: """Best-effort close. The HID device may already be gone (unplug, BT drop) in which case `device.close()` raises; we don't care.""" if ds is None: return try: ds.close() except Exception: pass def _connect_controller() -> pydualsense: """Open the DualSense, blocking until one is reachable. `pydualsense.init()` raises when no DualSense is plugged in. That's a normal startup-or-replug condition for us, not a fatal error \u2014 the daemon is meant to live for the whole user session and self-heal across plug events without external supervision. We log the first failure once, then retry quietly every `RECONNECT_BACKOFF_S` seconds. """ LOG.info("opening dualsense controller") first_failure_logged = False while True: ds = pydualsense() try: ds.init() except Exception as e: _close_controller(ds) if not first_failure_logged: LOG.warning( "dualsense not available (%s); retrying every %.1fs", e, RECONNECT_BACKOFF_S, ) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) continue LOG.info("dualsense controller connected") return ds def run(host: str, port: int, debug: bool) -> int: ds = _connect_controller() LOG.info("listening for forza udp on %s:%d", host, port) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.settimeout(1.0) throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT) brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT) forza_state = _ForzaState() last_seen = 0.0 in_race = False try: while True: # Hot-plug detection: pydualsense's BG sendReport thread terminates # silently on hidraw IOError (unplug, BT disconnect, USB resuspend). # When it dies our triggerL/R writes go nowhere. Reconnect in-process # so the daemon doesn't depend on a supervisor for plug-event recovery. if not ds.report_thread.is_alive(): LOG.warning("dualsense disconnected; reconnecting") _close_controller(ds) ds = _connect_controller() now = time.monotonic() try: data, _ = sock.recvfrom(2048) last_seen = now except socket.timeout: if in_race and (now - last_seen) > IDLE_TIMEOUT_S: LOG.info("forza idle for %.1fs \u2014 resetting triggers", IDLE_TIMEOUT_S) reset_triggers(ds) in_race = False continue pkt = parse_packet(data) if pkt is None: continue # ForzaParser.IsRaceOn() override: combines packet field with the # FH-specific RPM-accumulator workaround. Must be called once per # packet so the accumulator state stays accurate. in_race = forza_is_race_on(pkt, forza_state) if not in_race: # GetPreRaceInstructions: lightbar -> car class color, both # triggers -> Normal (mode 0x00). Re-asserted every frame to # mirror RacingDSX's per-packet emission. _apply_normal(ds.triggerL) _apply_normal(ds.triggerR) apply_lightbar_pre_race(ds, pkt, forza_state) continue if debug: LOG.debug( "rpm=%.0f/%.0f accel=%d brake=%d " "slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f " "throttle[freq=%d res=%d] brake[freq=%d res=%d]", _safe(pkt, "current_engine_rpm"), _safe(pkt, "engine_max_rpm"), int(_safe(pkt, "accel")), int(_safe(pkt, "brake")), _safe(pkt, "tire_combined_slip_FL"), _safe(pkt, "tire_combined_slip_FR"), _safe(pkt, "tire_combined_slip_RL"), _safe(pkt, "tire_combined_slip_RR"), throttle_state.last_freq, throttle_state.last_resistance, brake_state.last_freq, brake_state.last_resistance, ) apply_lightbar_in_race(ds, pkt) apply_left_trigger(ds, pkt, brake_state) apply_right_trigger(ds, pkt, throttle_state) except KeyboardInterrupt: LOG.info("shutting down") finally: try: reset_triggers(ds) except Exception: pass _close_controller(ds) return 0 def main() -> int: parser = argparse.ArgumentParser( prog="forza-trigger", description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.", ) parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") parser.add_argument("--port", type=int, default=5300, help="UDP bind port") parser.add_argument( "--debug", action="store_true", help="log per-packet telemetry at DEBUG level", ) args = parser.parse_args() level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") logging.basicConfig( level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) return run(args.host, args.port, args.debug) if __name__ == "__main__": sys.exit(main())