"""Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline. Every numeric value, every threshold, every map() / EWMA() coefficient, and every output byte sequence has been verified against published sources or against decompiled DSX 1.4.9 itself. Sources, in priority order: 1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31) decompiled with ILSpy. The decompilation revealed: a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as its trigger-effect encoder. b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's high-level CustomTriggerValueMode names to mode bytes: VibrateResistance -> 6 (Simple_Vibration / 0x06) VibrateResistanceA / AB -> 38 (Vibration / 0x26) VibrateResistanceB -> 6 When the dispatcher hits the `else` branch in DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than 9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into the trigger param region — no bit-packing, no scale conversion. This is why RacingDSX's 0-255-scale stiffness values ARE the actual amplitude bytes that reach the controller's firmware. 2. Nielk1's reverse-engineering gist Rev 6 (MIT, https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db). Source for the canonical Sony bit-packed Feedback (0x21) encoder used for the non-slip path. Identical to the implementation shipped inside DSX 1.4.9. 3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for Forza Horizon 4 / 5 since 2022. Specifically: Config/ThrottleSettings.cs, Config/BrakeSettings.cs, GameParsers/Parser.cs. The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is provided by pydualsense (PyPI, MIT). We drive its low-level `triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because pydualsense's high-level setMode/setForce API does not understand any specific mode's parameter encoding — it just shovels bytes into the output report at fixed offsets. That is exactly what we want. ## Documented intentional divergences from RacingDSX Reviewed and revised after a full audit against RacingDSX upstream (`Parser.cs`, `ForzaParser.cs`, `Config/*Settings.cs`) and DSX 1.4.9 decompilation (`Main.cs`, `DualSense_USB_Updated.cs`, `TriggerEffectGenerator.cs`). Several earlier divergences were folded back into upstream behavior; remaining items below are real and source-justified. 1. **Motion gate** (`_is_in_motion()`): slip detection is gated on speed or wheel rotation. RacingDSX `Parser.cs:108-114, 182-186` has no such gate, so locked stationary wheels keep the slip path active forever and the trigger stuck in vibration mode (the original user-reported bug). We gate on `speed > 0.1 m/s` OR any wheel rotation > 0.1 rad/s. 2. **DSX-faithful Feedback** (`_apply_feedback`): when `strength > 8` we return False without writing, identical to DSX's `TriggerEffectGenerator .Resistance` (`TriggerEffectGenerator.cs:193-218`). The trigger holds its last-set physical state until the EWMA-smoothed strength decays into the 1..8 canonical range. Note: an earlier revision clamped to 8, which produced a maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA decayed from slip-stiffness range (175..255 throttle, 5..150 brake) down to Feedback range (0..3 throttle, 0..7 brake) \u2014 that brick wall is what the user reported as 'fights my finger' on burnouts and ABS braking. 3. **Clutch gate** (opt-in, default off): RacingDSX is clutch-blind by design \u2014 `ForzaParser.cs:147-185` `ParsePacket` doesn't read the Clutch field, and `Parser.cs:170-225` doesn't reference it either. When `FORZA_TRIGGER_CLUTCH_GATE=1` is set in the environment we bypass the throttle path when `clutch > 128`. On automatic transmission (default in FH4/FH5) the game blips the clutch byte to ~255 for ~100 ms during every gear change, so this gate produces a felt trigger relaxation on every shift. Manual-transmission users may find it physically accurate; auto-transmission users almost certainly don't. 4. **AutomaticGun (mode 0x26) for slip vibration** instead of RacingDSX's Simple_Vibration (mode 0x06). RacingDSX sends `CustomTriggerValueMode .VibrateResistance` which DSX dispatches to the raw-passthrough branch in `DualSense_USB_Updated.cs::CustomTriggerValues` (mode byte 6, 0..255 raw amplitude). Mode 0x06 is a raw PWM buzzer \u2014 audibly clunky, no resistance characteristic. Mode 0x26 (`TriggerEffectGenerator.AutomaticGun`, `TriggerEffectGenerator.cs:292-326`) uses Sony's bit-packed force layout plus a frequency byte: applies controlled resistance at zones [position..9] while pulsing the motor at `frequency` Hz. The result is 'pulsed resistance' rather than raw buzz \u2014 textural, controlled, GT7-style. 5. **No-op write suppression** on `_TriggerState._last_sent`: each `_write_trigger` compares the new (mode, forces) tuple against the cache. Identical writes are skipped, and `_trigger_dirty` stays False so the next `_flush()` masks the trigger update flag bits in `outReport[1]`. This makes pre-race steady-state, the clutch gate, and the EWMA-stable non-slip path effectively zero-cost at the HID layer. 6. **Disabled pydualsense BG thread + on-demand HID push**: pydualsense's `sendReport` thread writes a complete output report at the USB poll rate (~250 Hz) with the trigger update flags asserted. DSX writes only on UDP packet arrival from RacingDSX (~60 Hz; `Main.cs:7692-7710`, `DualSense_USB_Updated.cs:21-79`). Empirically, ~250 Hz of trigger writes produces buzz/oscillation under finger pressure on Linux that ~60 Hz does not (the user-reported burnout/ABS feel issue). [Inference] the firmware likely re-initializes its PID controller per write at the higher rate; this is not source-confirmed but is the simplest model that fits both observations. We disable the BG thread (`ds.ds_thread = False` after `init`) and push reports ourselves via `_flush()` only on actual state change, masking the trigger flag bits when only the lightbar (RPM gradient) updated. Hot-plug detection moves from `report_thread.is_alive()` to write-failure handling in `_flush()`. ## Threading note pydualsense's `sendReport` background thread is disabled (divergence #6). We call `prepareReport()` + `writeReport()` ourselves on the same main thread that reads UDP and computes effects, so trigger / lightbar / motor field writes are atomic by construction \u2014 no torn-frame mitigation needed. ## System interaction notes **Single-controller assumption.** pydualsense's `__find_device` enumerates all DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)` without serial/path \u2014 `hid_open` returns the first match, which is not necessarily the one selected. With multiple DualSense controllers the picked controller is non-deterministic. pydualsense's source explicitly notes `# TODO: implement multiple controllers working`. RacingDSX/DSX are also single-controller (DSX's `connectedController` is a singleton). Forza Horizon is single-player so this is fine in practice; if multi-controller selection matters, monkey-patch `__find_device` to filter by `serial_number`. **Steam Input.** When Steam Input's PlayStation Configuration Support is enabled for the game, Steam intercepts hidraw input AND writes its own HID output reports (rumble, lightbar, sometimes triggers). Our daemon writes competing output reports at ~1 kHz; the controller observes whichever wrote last. Effect: trigger oscillates and feels broken. The Nix module's README in `default.nix` instructs users to disable PlayStation Configuration Support for Forza in Steam (Settings \u2192 Controller). **dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single- shot writes from `dualsensectl trigger left feedback ...` get overwritten by our BG thread's next iteration ~4 ms later. Use it only when the daemon is stopped (`systemctl --user stop forza-trigger`). **Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls `ds.report_thread.is_alive()` and reconnects in-process via `_connect_controller()`, which retries `pydualsense.init()` every `RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not depend on systemd or any other supervisor for plug-event recovery; running it directly from a shell handles unplug/replug exactly the same way. """ import argparse import collections import logging import math import os import signal import socket import sys import time from fdp import ForzaDataPacket from pydualsense import TriggerModes, pydualsense def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: """Create or inherit the UDP listener socket. Under systemd socket activation (LISTEN_FDS=1 with LISTEN_PID matching our pid) the socket is already bound by the service manager and passed as fd 3. Otherwise bind normally — this keeps the daemon runnable outside of systemd. """ listen_pid_str = os.environ.get("LISTEN_PID", "") listen_fds_str = os.environ.get("LISTEN_FDS", "0") if ( listen_pid_str and int(listen_pid_str) == os.getpid() and int(listen_fds_str) >= 1 ): sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) LOG.info("using systemd-pre-bound socket on %s:%d", host, port) return sock sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.settimeout(timeout) return sock LOG = logging.getLogger("forza-trigger") # --- Mode bytes --------------------------------------------------------------- # pydualsense's `TriggerModes` IntFlag covers every mode byte we use: # TriggerModes.Off = 0x00 (no-op) # TriggerModes.Rigid_B = 0x05 (canonical Sony Off / Reset) # TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun) # TriggerModes.Rigid_A = 0x21 (Feedback, canonical) # TriggerModes.Pulse_AB = 0x26 (AutomaticGun / pulsed resistance) DS_MODE_OFF = TriggerModes.Rigid_B DS_MODE_AUTOMATIC_GUN = TriggerModes.Pulse_AB # bit-packed forces + frequency DS_MODE_FEEDBACK = TriggerModes.Rigid_A DS_MODE_MACHINE = TriggerModes(0x27) # two-amplitude alternating pulse, GT7 ABS DS_MODE_BOW = TriggerModes.Pulse_A # 0x22, resist-then-snap impulse for gear-shift events # --- RacingDSX defaults (Config/ThrottleSettings.cs) -------------------------- THROTTLE_GRIP_LOSS = 0.6 THROTTLE_REAR_SLIP_ACCEL_MIN = 200 THROTTLE_VIB_POSITION = 5 # VibrationModeStart THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5 THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0 THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0 THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit THROTTLE_ACCELERATION_LIMIT = 10 THROTTLE_TURN_ACCEL_SCALE = 0.25 THROTTLE_FORWARD_ACCEL_SCALE = 1.0 THROTTLE_RESISTANCE_SMOOTHING = 0.5 # was RacingDSX 0.9 (over-smoothed by ~10 frames; per GT7-research \u00a74.1) THROTTLE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces slip-freq jitter without killing responsiveness THROTTLE_EFFECT_INTENSITY = 1.0 THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance # --- RacingDSX defaults (Config/BrakeSettings.cs) ----------------------------- BRAKE_GRIP_LOSS = 0.05 BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100 BRAKE_VIB_POSITION = 0 # VibrationStart - position for AutomaticGun lockup (whole-range) # Machine 0x27 sets ONLY two zones (start, end), not a range. For ABS bite-point # feel we anchor the alternating pulse in the middle of trigger travel. BRAKE_ABS_MACHINE_START = 2 BRAKE_ABS_MACHINE_END = 8 BRAKE_MIN_VIBRATION = 15 BRAKE_MAX_VIBRATION = 20 BRAKE_MIN_STIFFNESS = 150 # RacingDSX default; the slip-vibration amplitude floor is set by the physical-limit scaler at endpoint BRAKE_MAX_STIFFNESS = 5 BRAKE_MIN_RESISTANCE = 0 BRAKE_MAX_RESISTANCE = 7 BRAKE_RESISTANCE_SMOOTHING = 0.4 BRAKE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces ABS-band jitter (per GT7-research \u00a74.1) BRAKE_EFFECT_INTENSITY = 1.0 BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance # --- Forza UDP packet sizes -> fdp packet_format strings ---------------------- PACKET_FORMATS = { 232: "sled", 311: "dash", 324: "fh4", # FH4 and FH5 share the same layout } # --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) -------- # CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM). # Parser.cs uses an `<=` cascade, so any value > 5 is treated as X. CAR_CLASS_COLORS = [ (107, 185, 236), # ColorClassD (234, 202, 49), # ColorClassC (211, 90, 37), # ColorClassB (187, 59, 34), # ColorClassA (128, 54, 243), # ColorClassS1 (75, 88, 229), # ColorClassS2 (105, 182, 72), # ColorClassX (no CPI tint) ] MAX_CPI = 255 # ForzaParser.MaxCPI RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff # --- Clutch gate (throttle only, opt-in) ------------------------------------- # Forza emits `clutch` 0..255 (0 = engaged, 255 = disengaged). When the clutch # is disengaged the engine is mechanically disconnected from the wheels and # the throttle pedal can't transmit power, so a 'physically accurate' throttle # trigger has no business resisting. RacingDSX is clutch-blind by design # (`ForzaParser.cs` ParsePacket comments out the Clutch field). On automatic # transmission \u2014 the FH4/FH5 default \u2014 the game blips the clutch byte to ~255 # for ~100 ms during every gear change, so enabling this gate produces a felt # trigger relaxation on every shift. Disabled by default; set the env var # FORZA_TRIGGER_CLUTCH_GATE=1 to enable. CLUTCH_DISENGAGE_THRESHOLD = 128 CLUTCH_GATE_ENABLED = os.environ.get("FORZA_TRIGGER_CLUTCH_GATE", "0") == "1" # --- Reset on idle (UDP timeout) --------------------------------------------- # Not present in RacingDSX; an additional safety so the controller doesn't get # stuck if Forza is killed mid-race or the network drops. IDLE_TIMEOUT_S = 3.0 # --- Hot-plug reconnect backoff ---------------------------------------------- # pydualsense's BG sendReport thread terminates silently on hidraw IOError # (controller unplugged, BT disconnect, USB resuspend). The main loop polls # the thread's liveness and reconnects in-process \u2014 the script is agnostic # of supervisors like systemd. The same backoff governs the initial-connect # wait when the daemon starts before any controller is plugged in. RECONNECT_BACKOFF_S = 1.0 # --- Stationary motion gate -------------------------------------------------- # Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked # wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for # this and end up with the brake (and sometimes throttle) trigger stuck in # Simple_Vibration mode forever, because the slip path keeps firing. We # additionally require either the car or any wheel to be in real motion before # treating slip as a haptic event. STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked def _is_in_motion(pkt: ForzaDataPacket) -> bool: """True iff the car is moving or any wheel is rotating meaningfully. Used to gate slip-detection: when both car and all four wheels read as stopped, any nonzero `tire_combined_slip` Forza emits is data noise from locked wheels and should not drive haptic vibration. fdp's `sled` and `dash`/`fh4` formats both carry `wheel_rotation_speed_*`; only `dash`/`fh4` adds `speed`. The wheel-rotation check covers both. """ if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS: return True for wheel in ("FL", "FR", "RL", "RR"): if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S: return True return False # --- HID output: dirty-tracking + on-demand flush ---------------------------- # pydualsense's BG `sendReport` thread writes a complete output report at the # USB poll rate (~250 Hz) with the 'set right/left trigger motor' flag bits # (0x04 / 0x08) asserted in `outReport[1] = 0xFF`. The DualSense firmware reads # every such report as a fresh trigger command and re-initializes its internal # PID controller \u2014 manifesting under finger pressure as the buzz/oscillation # the user reported during burnouts and ABS braking. # # We disable the BG thread entirely (see _connect_controller) and call # pydualsense's `prepareReport()` + `writeReport()` ourselves, exactly once per # state change. The Python-level no-op cache on `_TriggerState._last_sent` # (divergence #5) becomes the real gate: when neither trigger nor lightbar # changed since the last flush, no HID write happens and the firmware sees a # truly continuous effect. # # Divergence #9 in the module docstring. _trigger_dirty: bool = False # set when _write_trigger updates trigger state _lightbar_dirty: bool = False # set when _set_lightbar updates RGB _last_lightbar: tuple[int, int, int] | None = None _motors_dirty: bool = False # set when _set_motors updates LRA amplitude _last_motors: tuple[int, int] | None = None # (left, right) 0..255 _lra_smoothed: tuple[float, float] = (0.0, 0.0) # (left, right) EWMA-smoothed amplitude pre-clamp LRA_SMOOTHING_ALPHA = 0.6 # alpha for surface-rumble texture; suppresses 60 Hz machine-gun artifact # pydualsense's `prepareReport()` returns a USB report (outReport[0]==0x02) # with `outReport[1] = 0xFF`. The flag bits we care about: # outReport[1]: bit 0x04 = update right trigger, bit 0x08 = update left # outReport[2]: bit 0x04 = update lightbar (LED strips) # We clear the bits the firmware doesn't need to process so the trigger PID # isn't reset when only the lightbar (RPM gradient) changed. # Bluetooth reports (outReport[0]==0x31) carry a CRC32 at bytes 74-77 that # pydualsense computes inside `prepareReport()`; we don't apply the flag-mask # optimization there because mutating after that point invalidates the CRC. _USB_TRIGGER_FLAGS_BYTE = 1 _USB_LIGHTBAR_FLAGS_BYTE = 2 _TRIGGER_FLAG_BITS = 0x04 | 0x08 # right + left trigger update bits _LIGHTBAR_FLAG_BIT = 0x04 # LED strips update bit def _set_lightbar(ds, r: int, g: int, b: int) -> None: """Set the touchpad lightbar RGB. Marks the lightbar dirty if the color actually changed; otherwise no-op so steady-state colors don't trigger redundant HID writes.""" global _lightbar_dirty, _last_lightbar new = (int(r), int(g), int(b)) if _last_lightbar == new: return ds.light.setColorI(new[0], new[1], new[2]) _last_lightbar = new _lightbar_dirty = True def _set_motors(ds, left: int, right: int) -> None: """Set the body LRA motor amplitudes (0..255 each). Marks motors dirty if the (left, right) tuple actually changed. pydualsense writes these at `outReport[3]` (right) / `outReport[4]` (left) via its existing `prepareReport()`; the rumble flag bits in `outReport[1]` (0x01, 0x02) are already asserted by pydualsense's `0xFF` default. We only need to update `ds.leftMotor` / `ds.rightMotor` and dirty-track here.""" global _motors_dirty, _last_motors new = (max(0, min(255, int(left))), max(0, min(255, int(right)))) if _last_motors == new: return ds.leftMotor = new[0] ds.rightMotor = new[1] _last_motors = new _motors_dirty = True def _flush(ds) -> bool: """Push the current trigger / lightbar state to the controller via one HID write. Idempotent \u2014 no-op if nothing changed since the last flush. The update flags for unchanged subsystems are masked out so the firmware doesn't re-process them. Returns False on write failure (controller unplugged) so the caller can reconnect.""" global _trigger_dirty, _lightbar_dirty, _motors_dirty if not (_trigger_dirty or _lightbar_dirty or _motors_dirty): return True try: out = ds.prepareReport() # Bluetooth reports (outReport[0] == 0x31) carry a CRC32 at bytes # 74-77 that pydualsense computes inside `prepareReport()`. If we # mutate flag bytes after that point the firmware drops every packet # for CRC mismatch \u2014 a previous revision did exactly that and any BT # user got zero trigger updates. The cost of leaving the BT update # flags asserted is one cheap firmware re-read of identical trigger # bytes per frame; the firmware idempotency we rely on for USB # ('don't restart the PID on identical input') applies here too. # USB has no CRC field so we keep the flag-mask optimization there. if out and out[0] == 0x02: if not _trigger_dirty: out[_USB_TRIGGER_FLAGS_BYTE] &= ~_TRIGGER_FLAG_BITS if not _lightbar_dirty: out[_USB_LIGHTBAR_FLAGS_BYTE] &= ~_LIGHTBAR_FLAG_BIT ds.writeReport(out) except (IOError, OSError) as e: LOG.warning("dualsense write failed: %s", e) return False _trigger_dirty = False _lightbar_dirty = False _motors_dirty = False return True def _reset_caches(*states) -> None: """Resync `_TriggerState._last_sent` to the controller's freshly-reset state. After a reconnect or idle-timeout reset, the controller is at mode 0x05 + zero forces. Without resyncing the caches, the next `apply_*` whose target happens to match the pre-reset cached value would skip the write and we'd silently leave the trigger in mode 0x05 instead of the intended state.""" for st in states: st.reset() global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors _trigger_dirty = False _lightbar_dirty = False _motors_dirty = False _last_lightbar = (0, 0, 0) _last_motors = (0, 0) global _lra_smoothed _lra_smoothed = (0.0, 0.0) # --- Effect encoders ---------------------------------------------------------- def _write_trigger(trig, mode: TriggerModes, forces: list[int], st: object | None) -> None: """Write mode + forces to a pydualsense trigger, suppressing no-op writes. When `st` is a `_TriggerState`, the (mode, forces) tuple is compared against `st._last_sent`. If identical, the write is skipped and the trigger-dirty was \u2014 the next `_flush()` will not push anything for this trigger. When the tuple differs, the trigger fields are updated and `_trigger_dirty` The next `_flush()` pushes one HID report carrying the new state. Callers that do NOT pass `st` (reset_triggers, shutdown reset) are unconditional \u2014 they always write and always mark dirty. """ global _trigger_dirty if st is not None: new = (int(mode), tuple(forces)) if st._last_sent == new: return for i in range(7): trig.forces[i] = forces[i] trig.mode = mode if st is not None: st._last_sent = (int(mode), tuple(forces)) _trigger_dirty = True def _apply_off(trig, st: object | None = None) -> None: """Canonical Sony Off / Reset — mode byte 0x05, all params 0. Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs (Nielk1 Rev 6), mode 0x05 *actively* returns the trigger stop to the neutral position; mode 0x00 only clears state without retracting the motor. """ _write_trigger(trig, DS_MODE_OFF, [0, 0, 0, 0, 0, 0, 0], st) def _apply_feedback(trig, position: int, strength: int, st: object | None = None) -> bool: """Sony Feedback (mode 0x21), bit-packed. Byte-faithful port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator .Resistance` from DSX 1.4.9 (cs lines 193-218). Returns False without writing when `strength > 8`, matching DSX exactly: the trigger holds its last-set physical state (Simple_Vibration if mid-slip, Reset if pre-race) until the EWMA-smoothed strength decays into the 1..8 canonical range. The previous implementation clamped strength to 8 and produced a maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA decayed from its slip-stiffness range (175..255 throttle, 5..150 brake) down to the Feedback range (0..3 throttle, 0..7 brake). That brick wall is what the user reported as 'fights my finger' on burnouts and ABS braking.""" if position > 9: return False if strength > 8: # DSX-faithful: silently no-op. The trigger fields and the no-op # cache (`st._last_sent`) keep their previous values; the next # `_flush()` masks the trigger flag bits via `_trigger_dirty=False`. return False if strength <= 0: _apply_off(trig, st) return True force_value = (strength - 1) & 0x07 force_zones = 0 active_zones = 0 for i in range(position, 10): force_zones |= force_value << (3 * i) active_zones |= 1 << i _write_trigger( trig, DS_MODE_FEEDBACK, [ active_zones & 0xFF, (active_zones >> 8) & 0xFF, force_zones & 0xFF, (force_zones >> 8) & 0xFF, (force_zones >> 16) & 0xFF, (force_zones >> 24) & 0xFF, 0, ], st, ) return True def _apply_automatic_gun( trig, position: int, strength: int, frequency: int, st: object | None = None ) -> bool: """Sony AutomaticGun (mode 0x26), bit-packed forces with frequency byte. Verbatim port of `TriggerEffectGenerator.AutomaticGun` (DSX 1.4.9 `TriggerEffectGenerator.cs:292-326`). Same bit-packed force layout as Resistance/Feedback (mode 0x21), plus a frequency byte at offset +9 (forces[6] in pydualsense's mapping). Produces a 'pulsed resistance' effect: the firmware applies bit-packed force at zones [position..9] and pulses the motor at `frequency` Hz. This feels textural / controlled rather than the raw PWM buzz of mode 0x06, and matches what GT7 and other PS5-native racing games use for wheelspin. """ if position > 9: return False if strength > 8: return False # DSX-faithful silent no-op (matches Resistance behavior) if strength <= 0 or frequency <= 0: _apply_off(trig, st) return True force_value = (strength - 1) & 0x07 force_zones = 0 active_zones = 0 for i in range(position, 10): force_zones |= force_value << (3 * i) active_zones |= 1 << i _write_trigger( trig, DS_MODE_AUTOMATIC_GUN, [ active_zones & 0xFF, (active_zones >> 8) & 0xFF, force_zones & 0xFF, (force_zones >> 8) & 0xFF, (force_zones >> 16) & 0xFF, (force_zones >> 24) & 0xFF, frequency & 0xFF, ], st, ) return True def _apply_machine( trig, start: int, end: int, strength_a: int, strength_b: int, frequency: int, period: int, st: object | None = None, ) -> bool: """Sony Machine (mode 0x27), two-amplitude alternating pulse with period. Verbatim port of `TriggerEffectGenerator.Machine` (DSX 1.4.9 `TriggerEffectGenerator.cs:328-368`). Applies bit-packed force at zones {start, end} alternating between strength_a and strength_b at `frequency` Hz on a `period` (in 100ms units) cycle. The 'rhythmic catch-and-release' character is what GT7 uses for ABS \u2014 cannot be produced by mode 0x26 which has only one amplitude. Fits cleanly into pydualsense's forces[] layout: forces[0..4] cover destinationArray[+1..+5] (active zones, strength pair, frequency, period); forces[5..6] are required-zero by Sony's spec. """ if start > 8 or end > 9 or end <= start: return False if strength_a > 7 or strength_b > 7: return False if frequency <= 0: _apply_off(trig, st) return True active_zones = (1 << start) | (1 << end) strength_pair = (strength_a & 0x07) | ((strength_b & 0x07) << 3) _write_trigger( trig, DS_MODE_MACHINE, [ active_zones & 0xFF, (active_zones >> 8) & 0xFF, strength_pair & 0xFF, frequency & 0xFF, period & 0xFF, 0, 0, ], st, ) return True def _apply_bow( trig, start: int, end: int, force: int, snap_force: int, st: object | None = None, ) -> bool: """Sony Bow (mode 0x22). Resists between zones {start, end} then snaps back. Verbatim port of `TriggerEffectGenerator.Bow` (DSX 1.4.9 `TriggerEffectGenerator.cs:167-207`). Used as a 1-frame impulse on gear-change edges \u2014 NFS-Unbound-style shift detent. Activates exactly two zones (start, end) bit-packed; force/snap_force are 3-bit each, encoded as `(force-1) | ((snap_force-1) << 3)`. """ if start > 8 or end > 8 or start >= end: return False if force > 8 or snap_force > 8: return False if end <= 0 or force <= 0 or snap_force <= 0: _apply_off(trig, st) return True active_zones = (1 << start) | (1 << end) pair = ((force - 1) & 0x07) | (((snap_force - 1) & 0x07) << 3) _write_trigger( trig, DS_MODE_BOW, [ active_zones & 0xFF, (active_zones >> 8) & 0xFF, pair & 0xFF, (pair >> 8) & 0xFF, # always 0 for force, snap_force \u2264 8 0, 0, 0, ], st, ) return True def _apply_slope_feedback( trig, start: int, end: int, start_strength: int, end_strength: int, st: object | None = None, ) -> bool: """Sony SlopeFeedback (mode 0x21). Linear strength ramp from `start_strength` at zone `start` to `end_strength` at zone `end`. Built on Nielk1 Rev6 gist's `MultiplePositionFeedback` factory: same byte layout as `Resistance` (mode 0x21) with per-zone strengths instead of uniform. Outside [start, end] zones are inactive (no force). Used for the progressive brake bite-point feel \u2014 light pressure at the top of throw, firmer at the bottom \u2014 mimicking a real hydraulic brake pedal. Stricter validation than `Resistance`: `start_strength` and `end_strength` must each be in [1, 8]. Caller is responsible for short-circuiting to `_apply_off` when the target is 0. """ if start > 8 or end > 9 or end <= start: return False if start_strength < 1 or start_strength > 8: return False if end_strength < 1 or end_strength > 8: return False span = end - start force_zones = 0 active_zones = 0 for i in range(10): if i < start: continue elif i <= end: s = round(start_strength + (end_strength - start_strength) * (i - start) / span) else: s = end_strength s = max(1, min(8, int(s))) force_zones |= ((s - 1) & 0x07) << (3 * i) active_zones |= 1 << i _write_trigger( trig, DS_MODE_FEEDBACK, [ active_zones & 0xFF, (active_zones >> 8) & 0xFF, force_zones & 0xFF, (force_zones >> 8) & 0xFF, (force_zones >> 16) & 0xFF, (force_zones >> 24) & 0xFF, 0, ], st, ) return True def reset_triggers(ds: pydualsense) -> None: """Both triggers to canonical Off (mode 0x05). Actively retracts the motor.""" _apply_off(ds.triggerL) _apply_off(ds.triggerR) def reset_lightbar(ds: pydualsense) -> None: """Lightbar to off (RGB 0,0,0). Used when telemetry has been idle long enough that we should stop asserting a race color \u2014 e.g. Forza exited or hasn't started a session yet. Without this, pydualsense's BG sendReport thread keeps re-publishing whatever `TouchpadColor` we last set, so the controller stays lit indefinitely. """ _set_lightbar(ds, 0, 0, 0) # --- RacingDSX math primitives ------------------------------------------------ def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: """Mirrors Parser.Map() in RacingDSX, including endpoint clamping.""" if x > in_max: x = in_max elif x < in_min: x = in_min return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min def _ewma(value: float, last: float, alpha: float) -> float: """Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing.""" return alpha * value + (1.0 - alpha) * last def _ewma_int(value: int, last: int, alpha: float) -> int: """Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA.""" return math.floor(alpha * value + (1.0 - alpha) * last) def _slip_to_strength(slip_ratio: float, lo: float, hi: float) -> int: """Map a slip ratio to AutomaticGun / Vibration strength (0..8) on a log curve. Replaces the previous `amplitude >> 5` mapping which collapsed Sony's 8 strength steps to 3 ({5,6,7}) over RacingDSX's 175..255 amplitude range \u2014 burnout-onset and full-burnout felt identical. The log curve matches human trigger-force perception (roughly logarithmic) and uses the full 1..8 range. Below `lo`, returns 0 (caller should route to Off / steady-state). Above `hi`, saturates at 8. """ if slip_ratio < lo: return 0 if slip_ratio >= hi: return 8 t = (math.log(slip_ratio) - math.log(lo)) / (math.log(hi) - math.log(lo)) return max(1, min(8, math.ceil(t * 8))) def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: """Read a numeric field defensively. Returns default for missing fields, NaN, or +/-inf \u2014 protects every caller from crashing on corrupt UDP packets, fdp parser overflow, or future-game-version field changes.""" v = float(getattr(pkt, name, default)) if not math.isfinite(v): return float(default) return v # --- Per-trigger persistent state for EWMA ------------------------------------ class _TriggerState: __slots__ = ( "last_resistance", "last_freq", "_init_resistance", "_last_sent", "event_remaining_frames", "event_callable", ) def __init__(self, init_resistance: int) -> None: self._init_resistance: int = int(init_resistance) self.last_resistance: int = int(init_resistance) self.last_freq: int = 0 # (mode_int, forces_tuple) of last byte sequence actually written to # the pydualsense trigger fields. None = nothing written yet. Used by # _write_trigger to suppress redundant HID writes \u2014 divergence #5. self._last_sent: tuple[int, tuple[int, ...]] | None = None # Event-impulse override: when event_remaining_frames > 0, apply_*_trigger # calls event_callable(trig, st) instead of computing per-frame state. # Used for gear-shift Bow (Phase 5) and any future short overrides. self.event_remaining_frames: int = 0 self.event_callable = None def reset(self) -> None: """Resync to the controller's freshly-reset state. Called from `_reset_caches` after reconnect / idle reset; also clears the EWMA smoothing state so the first frame of a new race doesn't carry stale slip-amplitude values across the transition.""" self.last_resistance = self._init_resistance self.last_freq = 0 self._last_sent = (int(DS_MODE_OFF), (0, 0, 0, 0, 0, 0, 0)) self.event_remaining_frames = 0 self.event_callable = None # --- Forza game-level persistent state (ForzaParser.cs fields) ---------------- class _ForzaState: """Persistent across packets. Mirrors ForzaParser's instance fields: LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI.""" __slots__ = ( "last_engine_rpm", "rpm_accumulator", "last_valid_car_class", "last_valid_car_cpi", "last_gear", ) def __init__(self) -> None: self.last_engine_rpm = 0.0 self.rpm_accumulator = 0 self.last_valid_car_class = 0 self.last_valid_car_cpi = 0 self.last_gear = -1 # sentinel; first packet's gear edge is suppressed def _clamp_byte(v: float) -> int: """Clamp to [0, 255] before writing to a uint8 RGB channel.""" return max(0, min(255, int(v))) def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool: """Mirrors `ForzaParser.IsRaceOn()` verbatim. FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after the player exits a race or pauses. ForzaParser detects the off state by watching for unchanged engine RPM combined with non-positive Power across `RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only, so for sled-format packets it reads as 0; that matches RacingDSX exactly. """ in_race = bool(int(getattr(pkt, "is_race_on", 0))) current_rpm = _safe(pkt, "current_engine_rpm") power = _safe(pkt, "power") if current_rpm == state.last_engine_rpm and power <= 0: state.rpm_accumulator += 1 if state.rpm_accumulator > RACE_OFF_RPM_FRAMES: in_race = False else: state.rpm_accumulator = 0 state.last_engine_rpm = current_rpm return in_race # --- Lightbar (touchpad LED ring) --------------------------------------------- def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None: """Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic. Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`. X-class cars use the fixed ColorClassX without a CPI tint. Car class and CPI fields can briefly read 0 during loading screens, so we cache the last valid value seen \u2014 also matching ForzaParser's behavior.""" car_class = int(_safe(pkt, "car_class")) if car_class > 0: state.last_valid_car_class = car_class car_class = state.last_valid_car_class cpi = int(_safe(pkt, "car_performance_index")) if cpi > 0: state.last_valid_car_cpi = min(cpi, 255) cpi = state.last_valid_car_cpi cpi_ratio = cpi / MAX_CPI if car_class <= 5: cr, cg, cb = CAR_CLASS_COLORS[car_class] r = math.floor(cpi_ratio * cr) g = math.floor(cpi_ratio * cg) b = math.floor(cpi_ratio * cb) else: r, g, b = CAR_CLASS_COLORS[6] _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None: """Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic. Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and green falls linearly with rpm_ratio, with green floored at 50. At or above redline the lightbar goes pure red (255, 0, 0).""" max_rpm = _safe(pkt, "engine_max_rpm") idle_rpm = _safe(pkt, "engine_idle_rpm") current_rpm = _safe(pkt, "current_engine_rpm") engine_range = max_rpm - idle_rpm if engine_range <= 0: rpm_ratio = 0.0 else: rpm_ratio = (current_rpm - idle_rpm) / engine_range if rpm_ratio >= RPM_REDLINE_RATIO: r, g, b = 255, 0, 0 else: r = math.floor(rpm_ratio * 255) g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR) b = 0 _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) # --- Body LRA motors --------------------------------------------------------- # Adaptive triggers are only half of the GT7 racing-haptic vocabulary; the # other half is the body LRAs (left/right linear resonant actuators) feeding # 'feel which side hit the kerb' through outReport[3]/[4]. RacingDSX/DSX # don't drive these for racing telemetry; we extend the daemon to use them. # Mix surface_rumble (road texture, 0..1 per wheel) and wheel_on_rumble_strip # (boolean kerb contact) per side: front-left/rear-left -> left motor, # front-right/rear-right -> right motor. LRA_KERB_FLOOR_AMP = 80 # minimum amplitude when wheel is on a kerb strip def apply_lra(ds: pydualsense, pkt: ForzaDataPacket) -> None: """Drive the body LRA motors from Forza's surface-rumble + kerb fields. Surface-rumble values jitter frame-to-frame on textured surfaces (gravel, kerbs, wet asphalt). Without smoothing the LRAs sound like a machine-gun at 60 Hz. EWMA at alpha=0.6 keeps response responsive (~5 frames to reach >=240 from a step input) while killing single-frame artifacts.""" global _lra_smoothed surface_fl = abs(_safe(pkt, "surface_rumble_FL")) surface_fr = abs(_safe(pkt, "surface_rumble_FR")) surface_rl = abs(_safe(pkt, "surface_rumble_RL")) surface_rr = abs(_safe(pkt, "surface_rumble_RR")) kerb_left = bool(_safe(pkt, "wheel_on_rumble_strip_FL")) or bool( _safe(pkt, "wheel_on_rumble_strip_RL") ) kerb_right = bool(_safe(pkt, "wheel_on_rumble_strip_FR")) or bool( _safe(pkt, "wheel_on_rumble_strip_RR") ) target_left = max(surface_fl, surface_rl) * 255.0 target_right = max(surface_fr, surface_rr) * 255.0 smoothed_left = LRA_SMOOTHING_ALPHA * target_left + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[0] smoothed_right = LRA_SMOOTHING_ALPHA * target_right + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[1] _lra_smoothed = (smoothed_left, smoothed_right) left_amp = int(smoothed_left) right_amp = int(smoothed_right) if kerb_left: left_amp = max(left_amp, LRA_KERB_FLOOR_AMP) if kerb_right: right_amp = max(right_amp, LRA_KERB_FLOOR_AMP) _set_motors(ds, left_amp, right_amp) # --- Throttle (right trigger) ------------------------------------------------- def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: """Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line, with one optional divergence: the throttle is released when the clutch is disengaged. See divergence #3 in the module docstring.""" # Event impulse override (e.g. gear-shift Bow): runs the scheduled encoder # for `event_remaining_frames` packets, then resumes steady-state. if st.event_remaining_frames > 0 and st.event_callable is not None: st.event_remaining_frames -= 1 st.event_callable(ds.triggerR, st) return if CLUTCH_GATE_ENABLED and int(_safe(pkt, "clutch", 0.0)) > CLUTCH_DISENGAGE_THRESHOLD: _apply_off(ds.triggerR, st) return accel_x = _safe(pkt, "acceleration_x") accel_z = _safe(pkt, "acceleration_z") avg_accel = math.sqrt( THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x) + THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z) ) fl = abs(_safe(pkt, "tire_combined_slip_FL")) fr = abs(_safe(pkt, "tire_combined_slip_FR")) rl = abs(_safe(pkt, "tire_combined_slip_RL")) rr = abs(_safe(pkt, "tire_combined_slip_RR")) front_slip = (fl + fr) * 0.5 rear_slip = (rl + rr) * 0.5 four_wheel_slip = (fl + fr + rl + rr) * 0.25 accelerator = int(_safe(pkt, "accel")) losing_grip = ( front_slip > THROTTLE_GRIP_LOSS or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN) ) and _is_in_motion(pkt) if losing_grip: # Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs). target_freq = math.floor( _map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION) ) target_resistance = math.floor( _map( avg_accel, 0.0, THROTTLE_ACCELERATION_LIMIT, THROTTLE_MIN_STIFFNESS, THROTTLE_MAX_STIFFNESS, ) ) # Floor after EWMA (matches `(int)EWMA(int, int, float)` overload). freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING) resistance = _ewma_int( target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING ) st.last_freq = freq st.last_resistance = resistance # Vibration entry condition matches RacingDSX `Parser.cs:189-195`: # `freq > MinVibration AND accelerator > VibrationModeStart`. Below it, # fall through to Feedback (which DSX-faithfully no-ops on force > 8). if freq > THROTTLE_MIN_VIBRATION and accelerator > THROTTLE_VIB_POSITION: # AutomaticGun (mode 0x26) gives a controlled pulse rather than the # raw-PWM buzz of Simple_Vibration (mode 0x06). Strength is 1-8; # we map RacingDSX's slip-stiffness range (175..255) into it via # >>5 (175\u21925, 200\u21926, 255\u21927 clamped 8). Frequency byte is the # filtered slip-vibration freq from RacingDSX's mapping (0..55). # Strength from slip ratio via log curve (audit Phase 2). Throttle # slip strength uses the dominant axle (max of front/rear) since RWD/AWD # burnouts concentrate slip on the drive wheels; averaging across all 4 # collapses the dominant signal. lo=0.4 means slip-onset at the entry # threshold registers as strength 2-3 rather than 1. slip_metric = max(front_slip, rear_slip) strength = _slip_to_strength(slip_metric, lo=0.4, hi=2.0) if strength == 0: strength = 1 # in slip path => entry threshold passed; floor at 1 _apply_automatic_gun( ds.triggerR, THROTTLE_VIB_POSITION, strength, int(freq * THROTTLE_EFFECT_INTENSITY), st, ) else: # Below threshold: RacingDSX sends `Resistance(0, filteredResistance)` # with slip-stiffness force values (175..255). DSX silently no-ops # via `TriggerEffectGenerator.Resistance` returning False on force > 8; # the trigger holds whatever Vibration state was last set. Our # `_apply_feedback` does the same. _apply_feedback( ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY), st, ) return # Out of slip path entirely. target_resistance = math.floor( _map( avg_accel, 0.0, THROTTLE_ACCELERATION_LIMIT, THROTTLE_MIN_RESISTANCE, THROTTLE_MAX_RESISTANCE, ) ) resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING) st.last_resistance = resistance _apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY), st) # --- Brake (left trigger) ----------------------------------------------------- def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: """Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line.""" fl = abs(_safe(pkt, "tire_combined_slip_FL")) fr = abs(_safe(pkt, "tire_combined_slip_FR")) rl = abs(_safe(pkt, "tire_combined_slip_RL")) rr = abs(_safe(pkt, "tire_combined_slip_RR")) four_wheel_slip = (fl + fr + rl + rr) * 0.25 brake = int(_safe(pkt, "brake")) losing_grip = ( four_wheel_slip > BRAKE_GRIP_LOSS and brake > BRAKE_DEADZONE and _is_in_motion(pkt) ) if losing_grip: # Compute the freq for the AutomaticGun (lock-up) path. Machine uses # its own per-band fixed frequencies per the audit's recommended map. target_freq = math.floor( _map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION) ) target_resistance = math.floor( _map( brake, 0, 255, BRAKE_MAX_STIFFNESS, BRAKE_MIN_STIFFNESS, ) ) freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING) resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) st.last_freq = freq st.last_resistance = resistance # GT7-style: ABS regime uses Machine (mode 0x27, two-amplitude # alternating pulse for the 'rhythmic catch-and-release' character). # Severe lock-up (slip > 0.25) crosses to AutomaticGun at max strength # \u2014 there's no useful catch-release feel beyond ABS, just sustained # heavy buzz. Gate on slip directly rather than the EWMA-smoothed freq # because RacingDSX's freq map only reaches MIN_VIBRATION at slip > 3.7 # which is well past lock-up; gating on freq would make Machine # unreachable in the actual ABS regime (0.05..0.25). if four_wheel_slip > 0.25: _apply_automatic_gun( ds.triggerL, BRAKE_VIB_POSITION, 8, max(20, int(freq * BRAKE_EFFECT_INTENSITY)), st, ) else: if four_wheel_slip < 0.10: amp_a, amp_b, machine_freq, period = 1, 3, 20, 2 elif four_wheel_slip < 0.15: amp_a, amp_b, machine_freq, period = 2, 5, 22, 1 elif four_wheel_slip < 0.20: amp_a, amp_b, machine_freq, period = 3, 6, 25, 1 else: amp_a, amp_b, machine_freq, period = 3, 7, 28, 1 _apply_machine( ds.triggerL, BRAKE_ABS_MACHINE_START, BRAKE_ABS_MACHINE_END, amp_a, amp_b, machine_freq, period, st, ) return # Out of slip path entirely. target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE)) # Slip\u2192non-slip recovery: when last_resistance carries a slip-mode value # (5..150 from the slip-stiffness range), normal EWMA decay holds the trigger # in its prior Vibration/Machine state for ~8 frames (~133 ms) because # `_apply_feedback` silently no-ops on force > 8. Halve the residue each # frame until it's in Feedback range so the user feels brake-release # within ~3 frames of slip exit. if st.last_resistance > 12: st.last_resistance = max(target_resistance, st.last_resistance // 2) resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) st.last_resistance = resistance # SlopeFeedback gives a progressive bite-point: light at the top of throw, # firmer at the bottom \u2014 a real hydraulic brake pedal feel. start=2, end=8 # anchors the ramp in the bite-point band; start_strength=2 keeps the top # easy to engage while end_strength scales with brake input. end_strength = int(resistance * BRAKE_EFFECT_INTENSITY) if end_strength < 1: # No (or trivial) brake: trigger neutral. Same as DSX-faithful Resistance(0,0). _apply_off(ds.triggerL, st) elif end_strength > 8: # Slip-stiffness EWMA decay residue: silently no-op, keep prior state. # Matches the divergence #2 brick-wall-avoidance design (and audit \u00a71.5 # decay accelerator above). return else: _apply_slope_feedback( ds.triggerL, start=2, end=8, start_strength=2, end_strength=end_strength, st=st, ) # --- UDP main loop ------------------------------------------------------------ def parse_packet(data: bytes) -> ForzaDataPacket | None: fmt = PACKET_FORMATS.get(len(data)) if fmt is None: LOG.debug("ignoring packet of unexpected length %d", len(data)) return None try: return ForzaDataPacket(data, packet_format=fmt) except Exception: LOG.exception("failed to parse forza packet (len=%d)", len(data)) return None def _close_controller(ds: pydualsense | None) -> None: """Best-effort close. The HID device may already be gone (unplug, BT drop) in which case `device.close()` raises; we don't care.""" if ds is None: return try: ds.close() except Exception: pass def _connect_controller() -> pydualsense: """Open the DualSense, blocking until one is reachable. `pydualsense.init()` raises when no DualSense is plugged in. That's a normal startup-or-replug condition for us, not a fatal error \u2014 the daemon is meant to live for the whole user session and self-heal across plug events without external supervision. We log the first failure once, then retry quietly every `RECONNECT_BACKOFF_S` seconds. """ LOG.info("opening dualsense controller") first_failure_logged = False while True: ds = pydualsense() try: ds.init() except Exception as e: _close_controller(ds) if not first_failure_logged: LOG.warning( "dualsense not available (%s); retrying every %.1fs", e, RECONNECT_BACKOFF_S, ) first_failure_logged = True time.sleep(RECONNECT_BACKOFF_S) if _shutdown_requested: raise KeyboardInterrupt continue # Stop pydualsense's BG sendReport thread. See divergence #6 in the # rationale near `_set_lightbar` / `_flush`. Without this the firmware # gets a stream of 'set trigger' commands at ~250 Hz, which it treats # as fresh effect commands \u2014 the trigger PID restarts every ~4 ms and # oscillates against the user's finger. ds.ds_thread = False if ds.report_thread.is_alive(): ds.report_thread.join(timeout=2.0) # Push a clean initial state so we don't inherit residual trigger # effects from a previous Steam Input session, prior daemon instance, # or stale firmware state. global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors _trigger_dirty = False _lightbar_dirty = False _motors_dirty = False _last_lightbar = None _last_motors = None _apply_off(ds.triggerL) _apply_off(ds.triggerR) _set_lightbar(ds, 0, 0, 0) _set_motors(ds, 0, 0) try: ds.writeReport(ds.prepareReport()) _trigger_dirty = False _lightbar_dirty = False _motors_dirty = False except (IOError, OSError) as e: LOG.warning("initial write failed: %s; retrying connect", e) _close_controller(ds) time.sleep(RECONNECT_BACKOFF_S) if _shutdown_requested: raise KeyboardInterrupt continue LOG.info("dualsense controller connected") return ds # --- Signal handling -------------------------------------------------------- # systemd sends SIGTERM (not SIGINT) by default. Python does not route # SIGTERM to KeyboardInterrupt, so without an explicit handler the process # ignores SIGTERM and systemd waits TimeoutStopSec=90s before SIGKILL. # Meanwhile socket activation can fire a second instance — two daemons # fighting over the same controller. We set a module-level flag that the # main loop polls once per timeout (≤1s) so shutdown is prompt. _shutdown_requested = False def _handle_termination(signum: int, frame: object) -> None: """Signal handler for SIGTERM / SIGINT — unblock the main loop.""" global _shutdown_requested _shutdown_requested = True def run(host: str, port: int, debug: bool, exit_on_idle: bool = False) -> int: # Register signal handlers FIRST so a SIGTERM during the (potentially # blocking) `_connect_controller` retry loop sets the shutdown flag # instead of killing the process with default-handler. The connect loop # checks the flag between retries so the daemon exits cleanly even with # no controller plugged in. signal.signal(signal.SIGTERM, _handle_termination) signal.signal(signal.SIGINT, _handle_termination) ds = _connect_controller() LOG.info("listening for forza udp on %s:%d", host, port) sock = _get_socket(host, port) throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT) brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT) forza_state = _ForzaState() last_seen = 0.0 in_race = False have_telemetry = False # True between the first packet and the IDLE_TIMEOUT_S reset try: while True: if _shutdown_requested: raise KeyboardInterrupt now = time.monotonic() try: data, _ = sock.recvfrom(2048) last_seen = now have_telemetry = True except socket.timeout: if _shutdown_requested: raise KeyboardInterrupt # Reset on telemetry-idle regardless of in_race state. After Forza # exits with the user in its main menu (is_race_on=0 packets just # before exit, so in_race was already False), the old check would # leave the last lightbar/trigger state asserted forever. if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S: LOG.info( "forza idle for %.1fs — resetting controller", IDLE_TIMEOUT_S, ) # One-shot 0x05 to actively retract the trigger motor; the BG # thread will publish it ~12 times in the next 50 ms before # main thread loops back here. reset_triggers(ds) reset_lightbar(ds) _set_motors(ds, 0, 0) if not _flush(ds): LOG.warning("dualsense disconnected; reconnecting") _close_controller(ds) ds = _connect_controller() _reset_caches(throttle_state, brake_state) have_telemetry = False in_race = False if exit_on_idle: LOG.info("exiting on idle") return 0 continue pkt = parse_packet(data) if pkt is None: continue # ForzaParser.IsRaceOn() override: combines packet field with the # FH-specific RPM-accumulator workaround. Must be called once per # packet so the accumulator state stays accurate. in_race = forza_is_race_on(pkt, forza_state) if not in_race: # Pre-race: release both triggers via mode 0x05. The no-op # cache (divergence #5) means subsequent identical frames # don't produce HID writes \u2014 the trigger holds its released # state with zero ongoing motor work. _apply_off(ds.triggerL, brake_state) _apply_off(ds.triggerR, throttle_state) apply_lightbar_pre_race(ds, pkt, forza_state) _set_motors(ds, 0, 0) else: if debug: LOG.debug( "rpm=%.0f/%.0f accel=%d brake=%d " "slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f " "throttle[freq=%d res=%d] brake[freq=%d res=%d]", _safe(pkt, "current_engine_rpm"), _safe(pkt, "engine_max_rpm"), int(_safe(pkt, "accel")), int(_safe(pkt, "brake")), _safe(pkt, "tire_combined_slip_FL"), _safe(pkt, "tire_combined_slip_FR"), _safe(pkt, "tire_combined_slip_RL"), _safe(pkt, "tire_combined_slip_RR"), throttle_state.last_freq, throttle_state.last_resistance, brake_state.last_freq, brake_state.last_resistance, ) apply_lightbar_in_race(ds, pkt) apply_lra(ds, pkt) # Gear-shift event: when the gear field changes between packets, # schedule a 1-frame Bow on R2 \u2014 NFS-Unbound-style shift detent. # Bow zones (4..6) anchor the snap mid-throw; force=8/snap=8. gear = int(_safe(pkt, "gear", -1)) if ( gear != forza_state.last_gear and gear >= 0 and forza_state.last_gear >= 0 ): throttle_state.event_remaining_frames = 1 throttle_state.event_callable = lambda trig, st: _apply_bow( trig, 4, 6, 8, 8, st ) forza_state.last_gear = gear apply_left_trigger(ds, pkt, brake_state) apply_right_trigger(ds, pkt, throttle_state) # Push one HID report per Forza packet (60 Hz max). Idempotent \u2014 # _flush() returns immediately if nothing actually changed since the # last push (the no-op cache on `_TriggerState._last_sent` and on # `_last_lightbar`). On write failure (controller unplugged), reset # the daemon's cached state so the post-reconnect frame writes anew. if not _flush(ds): LOG.warning("dualsense disconnected; reconnecting") _close_controller(ds) ds = _connect_controller() _reset_caches(throttle_state, brake_state) except KeyboardInterrupt: LOG.info("shutting down") finally: try: reset_triggers(ds) _flush(ds) except Exception: pass _close_controller(ds) return 0 def main() -> int: parser = argparse.ArgumentParser( prog="forza-trigger", description="Bridge Forza Horizon UDP telemetry to DualSense adaptive triggers.", ) parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") parser.add_argument("--port", type=int, default=5300, help="UDP bind port") parser.add_argument( "--debug", action="store_true", help="log per-packet telemetry at DEBUG level", ) parser.add_argument( "--exit-on-idle", action="store_true", help="exit after IDLE_TIMEOUT_S of no UDP packets (for systemd socket activation)", ) args = parser.parse_args() level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") logging.basicConfig( level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) return run(args.host, args.port, args.debug, args.exit_on_idle) if __name__ == "__main__": sys.exit(main())