diff --git a/hosts/yarn/default.nix b/hosts/yarn/default.nix index 91f34e0..40daac1 100644 --- a/hosts/yarn/default.nix +++ b/hosts/yarn/default.nix @@ -171,7 +171,6 @@ }; }; - # PS5 DualSense adaptive triggers in Forza Horizon 4 / 5. services.forzaTrigger.enable = true; } diff --git a/hosts/yarn/forza-trigger/default.nix b/hosts/yarn/forza-trigger/default.nix index 51c4568..7aa1354 100644 --- a/hosts/yarn/forza-trigger/default.nix +++ b/hosts/yarn/forza-trigger/default.nix @@ -10,7 +10,7 @@ # Forza emits a fixed-format UDP telemetry stream ("Data Out") at 60 Hz on a # user-configured port. We listen on that port, parse each packet via fdp # (nettrom/forza_motorsport, MIT), and drive the PS5 DualSense's adaptive -# triggers via pydualsense (PyPI, MIT) which talks HID over hidraw. +# triggers via dualsense-controller (PyPI, MIT) which talks HID over hidraw. # # Setup on the user side, once enabled here: # - plug the DualSense in over USB and disable Steam Input for the @@ -23,11 +23,11 @@ let cfg = config.services.forzaTrigger; pythonPackages = import ./python-packages.nix { inherit lib pkgs; }; - inherit (pythonPackages) pydualsense fdp; + inherit (pythonPackages) dualsense-controller fdp; forzaTrigger = pkgs.writers.writePython3Bin "forza-trigger" { libraries = [ - pydualsense + dualsense-controller fdp ]; # The wrapped binary doesn't need style enforcement — readability of diff --git a/hosts/yarn/forza-trigger/forza_trigger.py b/hosts/yarn/forza-trigger/forza_trigger.py index 9a085c9..349b3c2 100644 --- a/hosts/yarn/forza-trigger/forza_trigger.py +++ b/hosts/yarn/forza-trigger/forza_trigger.py @@ -1,161 +1,78 @@ """Bridge Forza Horizon 4/5 telemetry to DualSense adaptive triggers. -This is a faithful Linux port of the RacingDSX -> DSX -> DualSense pipeline. -Every numeric value, every threshold, every map() / EWMA() coefficient, -and every output byte sequence has been verified against published -sources or against decompiled DSX 1.4.9 itself. +This is a one-to-one behavioural port of Race-Element's DualSense haptic +overlay (RiddleTime/Race-Element, GPL-3.0). Reference files: -Sources, in priority order: + * Race Element.HUD.Common/Overlays/Pitwall/DualSenseInternal/ + TriggerHaptics.cs — slip detection + frequency mapping + DsiConfiguration.cs — tuning defaults + DsiJob.cs — per-frame dispatch - 1. DSX 1.4.9 binary (Paliverse/DualSenseX, GitHub release, archived 2021-12-31) - decompiled with ILSpy. The decompilation revealed: - a. DSX bundles ExtendInput.DataTools.DualSense (Nielk1 Rev6, MIT) as - its trigger-effect encoder. - b. The UDP/JSON dispatcher in DualSenseX/Main.cs maps RacingDSX's - high-level CustomTriggerValueMode names to mode bytes: - VibrateResistance -> 6 (Simple_Vibration / 0x06) - VibrateResistanceA / AB -> 38 (Vibration / 0x26) - VibrateResistanceB -> 6 - When the dispatcher hits the `else` branch in - DualSense_USB_Updated.cs (any CustomTriggerValueIndex other than - 9/11/13/15/17/19) it writes the eight TriggerValue bytes RAW into - the trigger param region — no bit-packing, no scale conversion. - This is why RacingDSX's 0-255-scale stiffness values ARE the - actual amplitude bytes that reach the controller's firmware. +## What this daemon does - 2. Nielk1's reverse-engineering gist Rev 6 (MIT, - https://gist.github.com/Nielk1/6d54cc2c00d2201ccb8c2720ad7538db). - Source for the canonical Sony bit-packed Feedback (0x21) encoder used - for the non-slip path. Identical to the implementation shipped inside - DSX 1.4.9. + - Listens for Forza Horizon "Data Out" UDP telemetry. + - On each packet: dispatches `handle_braking` (L2) and `handle_acceleration` + (R2) — same two-function structure as Race-Element's `DsiJob.RunAction`. + - Each handler reads the relevant input pedal and the four tire slip + ratios; if both pedal and slip exceed their thresholds, computes a + frequency from slip severity and emits a Vibration trigger effect. + Otherwise resets the trigger to no-resistance. - 3. RacingDSX (cosmii02/RacingDSX, GPLv3) — community-tuned defaults for - Forza Horizon 4 / 5 since 2022. Specifically: - Config/ThrottleSettings.cs, Config/BrakeSettings.cs, - GameParsers/Parser.cs. +## What this daemon explicitly does NOT do -The HID transport (BT/USB framing, CRC32, ~1 kHz sendReport thread) is -provided by pydualsense (PyPI, MIT). We drive its low-level -`triggerL/R.mode` and `triggerL/R.forces[i]` fields directly because -pydualsense's high-level setMode/setForce API does not understand any -specific mode's parameter encoding — it just shovels bytes into the -output report at fixed offsets. That is exactly what we want. +Compared to the previous implementation, every haptic channel beyond the +two trigger Vibration effects is gone: -## Documented intentional divergences from RacingDSX + - No body LRA rumble (left/right motors). User reported the previous + multi-channel body rumble as "shakes my whole hand"; Race-Element + deliberately keeps the controller body silent so the trigger fingers + carry all information. + - No lightbar effects. Race-Element's DSI overlay leaves the lightbar + untouched (it's at the controller's default). + - No EWMA smoothing. Effects track slip frame-to-frame. + - No event-impulse system (no gear-shift Bow, no collision burst). + - No Machine mode for ABS. Race-Element uses the same Vibration + encoder for everything. + - No SlopeFeedback / cornering Feedback strength. The trigger has + zero resistance when not slipping. + - No surface texture, engine RPM rumble, lateral-G bias, or kerb + floor amplitude. -Reviewed and revised after a full audit against RacingDSX upstream -(`Parser.cs`, `ForzaParser.cs`, `Config/*Settings.cs`) and DSX 1.4.9 -decompilation (`Main.cs`, `DualSense_USB_Updated.cs`, -`TriggerEffectGenerator.cs`). Several earlier divergences were folded back -into upstream behavior; remaining items below are real and source-justified. +## Faithful reproduction of upstream -1. **Motion gate** (`_is_in_motion()`): slip detection is gated on speed or - wheel rotation. RacingDSX `Parser.cs:108-114, 182-186` has no such gate, - so locked stationary wheels keep the slip path active forever and the - trigger stuck in vibration mode (the original user-reported bug). We gate - on `speed > 0.1 m/s` OR any wheel rotation > 0.1 rad/s. +Upstream pin: RiddleTime/Race-Element @ 5bc0eebba64f (2026-04-30). -2. **DSX-faithful Feedback** (`_apply_feedback`): when `strength > 8` we - return False without writing, identical to DSX's `TriggerEffectGenerator - .Resistance` (`TriggerEffectGenerator.cs:193-218`). The trigger holds its - last-set physical state until the EWMA-smoothed strength decays into the - 1..8 canonical range. Note: an earlier revision clamped to 8, which - produced a maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA - decayed from slip-stiffness range (175..255 throttle, 5..150 brake) down - to Feedback range (0..3 throttle, 0..7 brake) \u2014 that brick wall is what - the user reported as 'fights my finger' on burnouts and ABS braking. +The slip-coefficient formulas in `_brake_frequency_pct` and +`_throttle_frequency_pct` are byte-faithful ports of Race-Element's +upstream code, including a copy-paste bug in their throttle and brake +paths where the "rear slip coefficient" multiplies `front_slip` instead +of `rear_slip`. The bug is preserved for behavioural parity; see the +inline comments tagged "RACE-ELEMENT BUG". -3. **Clutch gate** (opt-in, default off): RacingDSX is clutch-blind by - design \u2014 `ForzaParser.cs:147-185` `ParsePacket` doesn't read the Clutch - field, and `Parser.cs:170-225` doesn't reference it either. When - `FORZA_TRIGGER_CLUTCH_GATE=1` is set in the environment we bypass the - throttle path when `clutch > 128`. On automatic transmission (default in - FH4/FH5) the game blips the clutch byte to ~255 for ~100 ms during every - gear change, so this gate produces a felt trigger relaxation on every - shift. Manual-transmission users may find it physically accurate; - auto-transmission users almost certainly don't. +## Tuning constants -4. **AutomaticGun (mode 0x26) for slip vibration** instead of RacingDSX's - Simple_Vibration (mode 0x06). RacingDSX sends `CustomTriggerValueMode - .VibrateResistance` which DSX dispatches to the raw-passthrough branch in - `DualSense_USB_Updated.cs::CustomTriggerValues` (mode byte 6, 0..255 raw - amplitude). Mode 0x06 is a raw PWM buzzer \u2014 audibly clunky, no resistance - characteristic. Mode 0x26 (`TriggerEffectGenerator.AutomaticGun`, - `TriggerEffectGenerator.cs:292-326`) uses Sony's bit-packed force layout - plus a frequency byte: applies controlled resistance at zones [position..9] - while pulsing the motor at `frequency` Hz. The result is 'pulsed - resistance' rather than raw buzz \u2014 textural, controlled, GT7-style. +Race-Element exposes these as runtime config sliders. We bake them in as +module constants matching the upstream defaults from `DsiConfiguration`: -5. **No-op write suppression** on `_TriggerState._last_sent`: each - `_write_trigger` compares the new (mode, forces) tuple against the cache. - Identical writes are skipped, and `_trigger_dirty` stays False so the - next `_flush()` masks the trigger update flag bits in `outReport[1]`. - This makes pre-race steady-state, the clutch gate, and the EWMA-stable - non-slip path effectively zero-cost at the HID layer. +| | Brake | Throttle | +|-----------------------|--------|----------| +| input deadzone | 3 % | 3 % | +| front slip threshold | 0.25 | 0.35 | +| rear slip threshold | 0.25 | 0.25 | +| amplitude | 8 | 7 | +| min frequency | 3 Hz | 6 Hz | +| max frequency | 85 Hz | 96 Hz | +## Transport - -6. **Disabled pydualsense BG thread + on-demand HID push**: pydualsense's - `sendReport` thread writes a complete output report at the USB poll rate - (~250 Hz) with the trigger update flags asserted. DSX writes only on UDP - packet arrival from RacingDSX (~60 Hz; `Main.cs:7692-7710`, - `DualSense_USB_Updated.cs:21-79`). Empirically, ~250 Hz of trigger writes - produces buzz/oscillation under finger pressure on Linux that ~60 Hz - does not (the user-reported burnout/ABS feel issue). [Inference] the - firmware likely re-initializes its PID controller per write at the higher - rate; this is not source-confirmed but is the simplest model that fits - both observations. We disable the BG thread (`ds.ds_thread = False` after - `init`) and push reports ourselves via `_flush()` only on actual state - change, masking the trigger flag bits when only the lightbar (RPM - gradient) updated. Hot-plug detection moves from `report_thread.is_alive()` - to write-failure handling in `_flush()`. - - - -## Threading note - -pydualsense's `sendReport` background thread is disabled (divergence #6). -We call `prepareReport()` + `writeReport()` ourselves on the same main thread -that reads UDP and computes effects, so trigger / lightbar / motor field -writes are atomic by construction \u2014 no torn-frame mitigation needed. - -## System interaction notes - -**Single-controller assumption.** pydualsense's `__find_device` enumerates all -DualSense devices (vid 0x054C, pid 0x0CE6 standard / 0x0DF2 Edge), keeps the -last one matched (no break in the loop), then opens via `hidapi_open(vid, pid)` -without serial/path \u2014 `hid_open` returns the first match, which is not -necessarily the one selected. With multiple DualSense controllers the picked -controller is non-deterministic. pydualsense's source explicitly notes -`# TODO: implement multiple controllers working`. RacingDSX/DSX are also -single-controller (DSX's `connectedController` is a singleton). Forza Horizon -is single-player so this is fine in practice; if multi-controller selection -matters, monkey-patch `__find_device` to filter by `serial_number`. - -**Steam Input.** When Steam Input's PlayStation Configuration Support is -enabled for the game, Steam intercepts hidraw input AND writes its own HID -output reports (rumble, lightbar, sometimes triggers). Our daemon writes -competing output reports at ~1 kHz; the controller observes whichever wrote -last. Effect: trigger oscillates and feels broken. The Nix module's README -in `default.nix` instructs users to disable PlayStation Configuration Support -for Forza in Steam (Settings \u2192 Controller). - -**dualsensectl.** Installed in the Nix module for ad-hoc debugging. Single- -shot writes from `dualsensectl trigger left feedback ...` get overwritten by -our BG thread's next iteration ~4 ms later. Use it only when the daemon is -stopped (`systemctl --user stop forza-trigger`). - -**Hot-plug.** pydualsense's BG `sendReport` thread terminates silently on -hidraw IOError (unplug, BT disconnect, USB resuspend). The main loop polls -`ds.report_thread.is_alive()` and reconnects in-process via -`_connect_controller()`, which retries `pydualsense.init()` every -`RECONNECT_BACKOFF_S` until the controller comes back. The daemon does not -depend on systemd or any other supervisor for plug-event recovery; running it -directly from a shell handles unplug/replug exactly the same way. +`dualsense-controller` (yesbotics/dualsense-controller-python) handles +the HID transport, BT/USB framing including BT CRC32, and on-demand +output writes (state-changed-since-last-input-tick gate). Hot-plug +recovery routes through the library's `on_error` callback, which sets a +flag the main loop polls. """ import argparse -import collections import logging import math import os @@ -164,24 +81,95 @@ import socket import sys import time +from dualsense_controller import DualSenseController from fdp import ForzaDataPacket -from pydualsense import TriggerModes, pydualsense + + +LOG = logging.getLogger("forza-trigger") + + +# --- Tuning constants (Race-Element DsiConfiguration defaults) --------------- +# Pedal inputs come from Forza as 0..255 bytes; thresholds are in 0..1 scale. +BRAKE_INPUT_THRESHOLD = 0.03 +BRAKE_FRONT_SLIP_THRESHOLD = 0.25 +BRAKE_REAR_SLIP_THRESHOLD = 0.25 +BRAKE_AMPLITUDE = 8 +BRAKE_MIN_FREQUENCY = 3 +BRAKE_MAX_FREQUENCY = 85 + +THROTTLE_INPUT_THRESHOLD = 0.03 +THROTTLE_FRONT_SLIP_THRESHOLD = 0.35 +THROTTLE_REAR_SLIP_THRESHOLD = 0.25 +THROTTLE_AMPLITUDE = 7 +THROTTLE_MIN_FREQUENCY = 6 +THROTTLE_MAX_FREQUENCY = 96 + +# Slip-to-percentage divisors (Race-Element TriggerHaptics). Each equals +# (front_clip_ceiling + rear_clip_ceiling) for its handler, which makes +# `pct` scale to [0, 1] without an explicit cap downstream: +# brake: front_ceil(10) + rear_ceil(7.5) = 17.5 +# throttle: front_ceil(5) + rear_ceil(7.5) = 12.5 +BRAKE_PCT_DIVISOR = 17.5 +THROTTLE_PCT_DIVISOR = 12.5 + +# --- Forza UDP packet sizes -> fdp packet_format strings --------------------- +PACKET_FORMATS = { + 232: "sled", + 311: "dash", + 324: "fh4", # FH4 and FH5 share the same layout +} + +# --- Daemon lifecycle constants ---------------------------------------------- +IDLE_TIMEOUT_S = 3.0 +RECONNECT_BACKOFF_S = 1.0 + + +# --- Module state (signal + hot-plug flags) ---------------------------------- +_shutdown = False +_disconnected = False + + +def _on_termination(signum: int, frame: object) -> None: + """SIGTERM/SIGINT handler — sets the main-loop shutdown flag. + + systemd sends SIGTERM by default; without an explicit handler Python + ignores it and systemd waits TimeoutStopSec=90s before SIGKILL. + """ + global _shutdown + _shutdown = True + + +def _on_controller_error(prev: object, exc: Exception) -> None: + """dualsense-controller `on_error` callback for HID read-thread failures.""" + global _disconnected + LOG.warning("dualsense exception: %s", exc) + _disconnected = True + + +# --- Helpers ----------------------------------------------------------------- + + +def _safe_abs(pkt: ForzaDataPacket, name: str) -> float: + """Read a packet field defensively. Returns 0.0 for missing fields, + NaN, or +/-inf; otherwise returns the absolute value of the float.""" + try: + v = float(getattr(pkt, name, 0.0)) + except (TypeError, ValueError): + return 0.0 + if not math.isfinite(v): + return 0.0 + return abs(v) def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: """Create or inherit the UDP listener socket. - Under systemd socket activation (LISTEN_FDS=1 with LISTEN_PID matching our - pid) the socket is already bound by the service manager and passed as fd 3. - Otherwise bind normally — this keeps the daemon runnable outside of systemd. + Under systemd socket activation (LISTEN_FDS=1, LISTEN_PID == ours) + fd 3 is the pre-bound socket. Otherwise bind normally. """ - listen_pid_str = os.environ.get("LISTEN_PID", "") - listen_fds_str = os.environ.get("LISTEN_FDS", "0") - if ( - listen_pid_str - and int(listen_pid_str) == os.getpid() - and int(listen_fds_str) >= 1 - ): + listen_pid = os.environ.get("LISTEN_PID", "") + listen_fds = os.environ.get("LISTEN_FDS", "0") + if listen_pid and int(listen_pid) == os.getpid() and int(listen_fds) >= 1: sock = socket.fromfd(3, socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) LOG.info("using systemd-pre-bound socket on %s:%d", host, port) @@ -193,1049 +181,8 @@ def _get_socket(host: str, port: int, timeout: float = 1.0) -> socket.socket: sock.settimeout(timeout) return sock -LOG = logging.getLogger("forza-trigger") -# --- Mode bytes --------------------------------------------------------------- -# pydualsense's `TriggerModes` IntFlag covers every mode byte we use: -# TriggerModes.Off = 0x00 (no-op) -# TriggerModes.Rigid_B = 0x05 (canonical Sony Off / Reset) -# TriggerModes.Pulse_B = 0x06 (Simple_Vibration / Simple_AutomaticGun) -# TriggerModes.Rigid_A = 0x21 (Feedback, canonical) -# TriggerModes.Pulse_AB = 0x26 (AutomaticGun / pulsed resistance) -DS_MODE_OFF = TriggerModes.Rigid_B -DS_MODE_AUTOMATIC_GUN = TriggerModes.Pulse_AB # bit-packed forces + frequency -DS_MODE_FEEDBACK = TriggerModes.Rigid_A -DS_MODE_MACHINE = TriggerModes(0x27) # two-amplitude alternating pulse, GT7 ABS -DS_MODE_BOW = TriggerModes.Pulse_A # 0x22, resist-then-snap impulse for gear-shift events - -# --- RacingDSX defaults (Config/ThrottleSettings.cs) -------------------------- -THROTTLE_GRIP_LOSS = 0.6 -THROTTLE_REAR_SLIP_ACCEL_MIN = 200 -THROTTLE_VIB_POSITION = 5 # VibrationModeStart -THROTTLE_MIN_VIBRATION = 5 # below this freq, fall back to Resistance - -THROTTLE_MAX_VIBRATION = 55 # peak frequency at slip == 5 -THROTTLE_MIN_STIFFNESS = 255 # slip-mode amplitude at avgAccel == 0 -THROTTLE_MAX_STIFFNESS = 175 # slip-mode amplitude at avgAccel == AccelerationLimit -THROTTLE_MIN_RESISTANCE = 0 # non-slip canonical strength at avgAccel == 0 -THROTTLE_MAX_RESISTANCE = 3 # non-slip canonical strength at avgAccel == AccelerationLimit -THROTTLE_ACCELERATION_LIMIT = 10 -THROTTLE_TURN_ACCEL_SCALE = 0.25 -THROTTLE_FORWARD_ACCEL_SCALE = 1.0 -THROTTLE_RESISTANCE_SMOOTHING = 0.5 # was RacingDSX 0.9 (over-smoothed by ~10 frames; per GT7-research \u00a74.1) -THROTTLE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces slip-freq jitter without killing responsiveness -THROTTLE_EFFECT_INTENSITY = 1.0 -THROTTLE_LAST_RESISTANCE_INIT = 1 # Parser.lastThrottleResistance - -# --- RacingDSX defaults (Config/BrakeSettings.cs) ----------------------------- -BRAKE_GRIP_LOSS = 0.05 -BRAKE_DEADZONE = 100 # Parser literal: data.Brake > 100 -BRAKE_VIB_POSITION = 0 # VibrationStart - position for AutomaticGun lockup (whole-range) -# Machine 0x27 sets ONLY two zones (start, end), not a range. For ABS bite-point -# feel we anchor the alternating pulse in the middle of trigger travel. -BRAKE_ABS_MACHINE_START = 2 -BRAKE_ABS_MACHINE_END = 8 -BRAKE_MIN_VIBRATION = 15 - -BRAKE_MAX_VIBRATION = 20 -BRAKE_MIN_STIFFNESS = 150 # RacingDSX default; the slip-vibration amplitude floor is set by the physical-limit scaler at endpoint -BRAKE_MAX_STIFFNESS = 5 -BRAKE_MIN_RESISTANCE = 0 -BRAKE_MAX_RESISTANCE = 7 -BRAKE_RESISTANCE_SMOOTHING = 0.4 -BRAKE_VIBRATION_SMOOTHING = 0.7 # was 1.0 (raw); reduces ABS-band jitter (per GT7-research \u00a74.1) -BRAKE_EFFECT_INTENSITY = 1.0 -BRAKE_LAST_RESISTANCE_INIT = 200 # Parser.lastBrakeResistance - -# --- Forza UDP packet sizes -> fdp packet_format strings ---------------------- -PACKET_FORMATS = { - 232: "sled", - 311: "dash", - 324: "fh4", # FH4 and FH5 share the same layout -} - -# --- ForzaParser state-machine constants (GameParsers/ForzaParser.cs) -------- -# CarClass field maps as 0=D, 1=C, 2=B, 3=A, 4=S1, 5=S2, 6=X (FH) / 7=X (FM). -# Parser.cs uses an `<=` cascade, so any value > 5 is treated as X. -CAR_CLASS_COLORS = [ - (107, 185, 236), # ColorClassD - (234, 202, 49), # ColorClassC - (211, 90, 37), # ColorClassB - (187, 59, 34), # ColorClassA - (128, 54, 243), # ColorClassS1 - (75, 88, 229), # ColorClassS2 - (105, 182, 72), # ColorClassX (no CPI tint) -] -MAX_CPI = 255 # ForzaParser.MaxCPI -RPM_REDLINE_RATIO = 0.9 # Profile.RPMRedlineRatio -GREEN_FLOOR = 50 # Math.Max(..., 50) on green channel in non-redline path -RACE_OFF_RPM_FRAMES = 200 # ForzaParser.RPMAccumulatorTriggerRaceOff - -# --- Clutch gate (throttle only, opt-in) ------------------------------------- -# Forza emits `clutch` 0..255 (0 = engaged, 255 = disengaged). When the clutch -# is disengaged the engine is mechanically disconnected from the wheels and -# the throttle pedal can't transmit power, so a 'physically accurate' throttle -# trigger has no business resisting. RacingDSX is clutch-blind by design -# (`ForzaParser.cs` ParsePacket comments out the Clutch field). On automatic -# transmission \u2014 the FH4/FH5 default \u2014 the game blips the clutch byte to ~255 -# for ~100 ms during every gear change, so enabling this gate produces a felt -# trigger relaxation on every shift. Disabled by default; set the env var -# FORZA_TRIGGER_CLUTCH_GATE=1 to enable. -CLUTCH_DISENGAGE_THRESHOLD = 128 -CLUTCH_GATE_ENABLED = os.environ.get("FORZA_TRIGGER_CLUTCH_GATE", "0") == "1" - - -# --- Reset on idle (UDP timeout) --------------------------------------------- -# Not present in RacingDSX; an additional safety so the controller doesn't get -# stuck if Forza is killed mid-race or the network drops. -IDLE_TIMEOUT_S = 3.0 - -# --- Hot-plug reconnect backoff ---------------------------------------------- -# pydualsense's BG sendReport thread terminates silently on hidraw IOError -# (controller unplugged, BT disconnect, USB resuspend). The main loop polls -# the thread's liveness and reconnects in-process \u2014 the script is agnostic -# of supervisors like systemd. The same backoff governs the initial-connect -# wait when the daemon starts before any controller is plugged in. -RECONNECT_BACKOFF_S = 1.0 - -# --- Stationary motion gate -------------------------------------------------- -# Forza reports nonzero `tire_combined_slip_*` on a stationary car with locked -# wheels (e.g. after coming to a hard stop). RacingDSX/DSX have no gate for -# this and end up with the brake (and sometimes throttle) trigger stuck in -# Simple_Vibration mode forever, because the slip path keeps firing. We -# additionally require either the car or any wheel to be in real motion before -# treating slip as a haptic event. -STATIONARY_SPEED_MS = 0.1 # m/s; below this the car is considered stopped -STATIONARY_WHEEL_RAD_S = 0.1 # rad/s; below this a wheel is considered locked - - -def _is_in_motion(pkt: ForzaDataPacket) -> bool: - """True iff the car is moving or any wheel is rotating meaningfully. - - Used to gate slip-detection: when both car and all four wheels read as - stopped, any nonzero `tire_combined_slip` Forza emits is data noise from - locked wheels and should not drive haptic vibration. - - fdp's `sled` and `dash`/`fh4` formats both carry `wheel_rotation_speed_*`; - only `dash`/`fh4` adds `speed`. The wheel-rotation check covers both. - """ - if abs(_safe(pkt, "speed")) > STATIONARY_SPEED_MS: - return True - for wheel in ("FL", "FR", "RL", "RR"): - if abs(_safe(pkt, f"wheel_rotation_speed_{wheel}")) > STATIONARY_WHEEL_RAD_S: - return True - return False -# --- HID output: dirty-tracking + on-demand flush ---------------------------- -# pydualsense's BG `sendReport` thread writes a complete output report at the -# USB poll rate (~250 Hz) with the 'set right/left trigger motor' flag bits -# (0x04 / 0x08) asserted in `outReport[1] = 0xFF`. The DualSense firmware reads -# every such report as a fresh trigger command and re-initializes its internal -# PID controller \u2014 manifesting under finger pressure as the buzz/oscillation -# the user reported during burnouts and ABS braking. -# -# We disable the BG thread entirely (see _connect_controller) and call -# pydualsense's `prepareReport()` + `writeReport()` ourselves, exactly once per -# state change. The Python-level no-op cache on `_TriggerState._last_sent` -# (divergence #5) becomes the real gate: when neither trigger nor lightbar -# changed since the last flush, no HID write happens and the firmware sees a -# truly continuous effect. -# -# Divergence #9 in the module docstring. -_trigger_dirty: bool = False # set when _write_trigger updates trigger state -_lightbar_dirty: bool = False # set when _set_lightbar updates RGB -_last_lightbar: tuple[int, int, int] | None = None -_motors_dirty: bool = False # set when _set_motors updates LRA amplitude -_last_motors: tuple[int, int] | None = None # (left, right) 0..255 -_lra_smoothed: tuple[float, float] = (0.0, 0.0) # (left, right) EWMA-smoothed amplitude pre-clamp -LRA_SMOOTHING_ALPHA = 0.6 # alpha for surface-rumble texture; suppresses 60 Hz machine-gun artifact - -# pydualsense's `prepareReport()` returns a USB report (outReport[0]==0x02) -# with `outReport[1] = 0xFF`. The flag bits we care about: -# outReport[1]: bit 0x04 = update right trigger, bit 0x08 = update left -# outReport[2]: bit 0x04 = update lightbar (LED strips) -# We clear the bits the firmware doesn't need to process so the trigger PID -# isn't reset when only the lightbar (RPM gradient) changed. -# Bluetooth reports (outReport[0]==0x31) carry a CRC32 at bytes 74-77 that -# pydualsense computes inside `prepareReport()`; we don't apply the flag-mask -# optimization there because mutating after that point invalidates the CRC. -_USB_TRIGGER_FLAGS_BYTE = 1 -_USB_LIGHTBAR_FLAGS_BYTE = 2 -_TRIGGER_FLAG_BITS = 0x04 | 0x08 # right + left trigger update bits -_LIGHTBAR_FLAG_BIT = 0x04 # LED strips update bit - - -def _set_lightbar(ds, r: int, g: int, b: int) -> None: - """Set the touchpad lightbar RGB. Marks the lightbar dirty if the color - actually changed; otherwise no-op so steady-state colors don't trigger - redundant HID writes.""" - global _lightbar_dirty, _last_lightbar - new = (int(r), int(g), int(b)) - if _last_lightbar == new: - return - ds.light.setColorI(new[0], new[1], new[2]) - _last_lightbar = new - _lightbar_dirty = True - - -def _set_motors(ds, left: int, right: int) -> None: - """Set the body LRA motor amplitudes (0..255 each). Marks motors dirty if - the (left, right) tuple actually changed. - - pydualsense writes these at `outReport[3]` (right) / `outReport[4]` (left) - via its existing `prepareReport()`; the rumble flag bits in `outReport[1]` - (0x01, 0x02) are already asserted by pydualsense's `0xFF` default. We only - need to update `ds.leftMotor` / `ds.rightMotor` and dirty-track here.""" - global _motors_dirty, _last_motors - new = (max(0, min(255, int(left))), max(0, min(255, int(right)))) - if _last_motors == new: - return - ds.leftMotor = new[0] - ds.rightMotor = new[1] - _last_motors = new - _motors_dirty = True - - - -def _flush(ds) -> bool: - """Push the current trigger / lightbar state to the controller via one HID - write. Idempotent \u2014 no-op if nothing changed since the last flush. The - update flags for unchanged subsystems are masked out so the firmware doesn't - re-process them. Returns False on write failure (controller unplugged) so - the caller can reconnect.""" - global _trigger_dirty, _lightbar_dirty, _motors_dirty - if not (_trigger_dirty or _lightbar_dirty or _motors_dirty): - return True - try: - out = ds.prepareReport() - # Bluetooth reports (outReport[0] == 0x31) carry a CRC32 at bytes - # 74-77 that pydualsense computes inside `prepareReport()`. If we - # mutate flag bytes after that point the firmware drops every packet - # for CRC mismatch \u2014 a previous revision did exactly that and any BT - # user got zero trigger updates. The cost of leaving the BT update - # flags asserted is one cheap firmware re-read of identical trigger - # bytes per frame; the firmware idempotency we rely on for USB - # ('don't restart the PID on identical input') applies here too. - # USB has no CRC field so we keep the flag-mask optimization there. - if out and out[0] == 0x02: - if not _trigger_dirty: - out[_USB_TRIGGER_FLAGS_BYTE] &= ~_TRIGGER_FLAG_BITS - if not _lightbar_dirty: - out[_USB_LIGHTBAR_FLAGS_BYTE] &= ~_LIGHTBAR_FLAG_BIT - ds.writeReport(out) - except (IOError, OSError) as e: - LOG.warning("dualsense write failed: %s", e) - return False - _trigger_dirty = False - _lightbar_dirty = False - _motors_dirty = False - return True - - -def _reset_caches(*states) -> None: - """Resync `_TriggerState._last_sent` to the controller's freshly-reset state. - - After a reconnect or idle-timeout reset, the controller is at mode 0x05 + - zero forces. Without resyncing the caches, the next `apply_*` whose target - happens to match the pre-reset cached value would skip the write and we'd - silently leave the trigger in mode 0x05 instead of the intended state.""" - for st in states: - st.reset() - global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors - _trigger_dirty = False - _lightbar_dirty = False - _motors_dirty = False - _last_lightbar = (0, 0, 0) - _last_motors = (0, 0) - global _lra_smoothed - _lra_smoothed = (0.0, 0.0) - - - -# --- Effect encoders ---------------------------------------------------------- - - -def _write_trigger(trig, mode: TriggerModes, forces: list[int], st: object | None) -> None: - """Write mode + forces to a pydualsense trigger, suppressing no-op writes. - - When `st` is a `_TriggerState`, the (mode, forces) tuple is compared against - `st._last_sent`. If identical, the write is skipped and the trigger-dirty - was \u2014 the next `_flush()` will not push anything for this trigger. - - When the tuple differs, the trigger fields are updated and `_trigger_dirty` - The next `_flush()` pushes one HID report carrying the new state. - - Callers that do NOT pass `st` (reset_triggers, shutdown reset) are - unconditional \u2014 they always write and always mark dirty. - """ - global _trigger_dirty - if st is not None: - new = (int(mode), tuple(forces)) - if st._last_sent == new: - return - for i in range(7): - trig.forces[i] = forces[i] - trig.mode = mode - if st is not None: - st._last_sent = (int(mode), tuple(forces)) - _trigger_dirty = True - - -def _apply_off(trig, st: object | None = None) -> None: - """Canonical Sony Off / Reset — mode byte 0x05, all params 0. - - Mirrors `TriggerEffectGenerator.Reset()` in DSX. Per Sony's docs (Nielk1 - Rev 6), mode 0x05 *actively* returns the trigger stop to the neutral - position; mode 0x00 only clears state without retracting the motor. - """ - _write_trigger(trig, DS_MODE_OFF, [0, 0, 0, 0, 0, 0, 0], st) - - -def _apply_feedback(trig, position: int, strength: int, st: object | None = None) -> bool: - """Sony Feedback (mode 0x21), bit-packed. - - Byte-faithful port of `ExtendInput.DataTools.DualSense.TriggerEffectGenerator - .Resistance` from DSX 1.4.9 (cs lines 193-218). Returns False without - writing when `strength > 8`, matching DSX exactly: the trigger holds its - last-set physical state (Simple_Vibration if mid-slip, Reset if pre-race) - until the EWMA-smoothed strength decays into the 1..8 canonical range. - - The previous implementation clamped strength to 8 and produced a - maximum-stiffness 'brick wall' for ~6-9 frames as the EWMA decayed from - its slip-stiffness range (175..255 throttle, 5..150 brake) down to the - Feedback range (0..3 throttle, 0..7 brake). That brick wall is what the - user reported as 'fights my finger' on burnouts and ABS braking.""" - if position > 9: - return False - if strength > 8: - # DSX-faithful: silently no-op. The trigger fields and the no-op - # cache (`st._last_sent`) keep their previous values; the next - # `_flush()` masks the trigger flag bits via `_trigger_dirty=False`. - return False - if strength <= 0: - _apply_off(trig, st) - return True - - force_value = (strength - 1) & 0x07 - force_zones = 0 - active_zones = 0 - for i in range(position, 10): - force_zones |= force_value << (3 * i) - active_zones |= 1 << i - - _write_trigger( - trig, - DS_MODE_FEEDBACK, - [ - active_zones & 0xFF, - (active_zones >> 8) & 0xFF, - force_zones & 0xFF, - (force_zones >> 8) & 0xFF, - (force_zones >> 16) & 0xFF, - (force_zones >> 24) & 0xFF, - 0, - ], - st, - ) - return True - - - -def _apply_automatic_gun( - trig, position: int, strength: int, frequency: int, st: object | None = None -) -> bool: - """Sony AutomaticGun (mode 0x26), bit-packed forces with frequency byte. - - Verbatim port of `TriggerEffectGenerator.AutomaticGun` (DSX 1.4.9 - `TriggerEffectGenerator.cs:292-326`). Same bit-packed force layout as - Resistance/Feedback (mode 0x21), plus a frequency byte at offset +9 - (forces[6] in pydualsense's mapping). - - Produces a 'pulsed resistance' effect: the firmware applies bit-packed - force at zones [position..9] and pulses the motor at `frequency` Hz. This - feels textural / controlled rather than the raw PWM buzz of mode 0x06, - and matches what GT7 and other PS5-native racing games use for wheelspin. - """ - if position > 9: - return False - if strength > 8: - return False # DSX-faithful silent no-op (matches Resistance behavior) - if strength <= 0 or frequency <= 0: - _apply_off(trig, st) - return True - - force_value = (strength - 1) & 0x07 - force_zones = 0 - active_zones = 0 - for i in range(position, 10): - force_zones |= force_value << (3 * i) - active_zones |= 1 << i - - _write_trigger( - trig, - DS_MODE_AUTOMATIC_GUN, - [ - active_zones & 0xFF, - (active_zones >> 8) & 0xFF, - force_zones & 0xFF, - (force_zones >> 8) & 0xFF, - (force_zones >> 16) & 0xFF, - (force_zones >> 24) & 0xFF, - frequency & 0xFF, - ], - st, - ) - return True - - - -def _apply_machine( - trig, - start: int, - end: int, - strength_a: int, - strength_b: int, - frequency: int, - period: int, - st: object | None = None, -) -> bool: - """Sony Machine (mode 0x27), two-amplitude alternating pulse with period. - - Verbatim port of `TriggerEffectGenerator.Machine` (DSX 1.4.9 - `TriggerEffectGenerator.cs:328-368`). Applies bit-packed force at zones - {start, end} alternating between strength_a and strength_b at `frequency` - Hz on a `period` (in 100ms units) cycle. The 'rhythmic catch-and-release' - character is what GT7 uses for ABS \u2014 cannot be produced by mode 0x26 - which has only one amplitude. Fits cleanly into pydualsense's forces[] - layout: forces[0..4] cover destinationArray[+1..+5] (active zones, - strength pair, frequency, period); forces[5..6] are required-zero by - Sony's spec. - """ - if start > 8 or end > 9 or end <= start: - return False - if strength_a > 7 or strength_b > 7: - return False - if frequency <= 0: - _apply_off(trig, st) - return True - active_zones = (1 << start) | (1 << end) - strength_pair = (strength_a & 0x07) | ((strength_b & 0x07) << 3) - _write_trigger( - trig, - DS_MODE_MACHINE, - [ - active_zones & 0xFF, - (active_zones >> 8) & 0xFF, - strength_pair & 0xFF, - frequency & 0xFF, - period & 0xFF, - 0, - 0, - ], - st, - ) - return True - - - -def _apply_bow( - trig, - start: int, - end: int, - force: int, - snap_force: int, - st: object | None = None, -) -> bool: - """Sony Bow (mode 0x22). Resists between zones {start, end} then snaps back. - - Verbatim port of `TriggerEffectGenerator.Bow` (DSX 1.4.9 - `TriggerEffectGenerator.cs:167-207`). Used as a 1-frame impulse on - gear-change edges \u2014 NFS-Unbound-style shift detent. Activates exactly two - zones (start, end) bit-packed; force/snap_force are 3-bit each, encoded - as `(force-1) | ((snap_force-1) << 3)`. - """ - if start > 8 or end > 8 or start >= end: - return False - if force > 8 or snap_force > 8: - return False - if end <= 0 or force <= 0 or snap_force <= 0: - _apply_off(trig, st) - return True - active_zones = (1 << start) | (1 << end) - pair = ((force - 1) & 0x07) | (((snap_force - 1) & 0x07) << 3) - _write_trigger( - trig, - DS_MODE_BOW, - [ - active_zones & 0xFF, - (active_zones >> 8) & 0xFF, - pair & 0xFF, - (pair >> 8) & 0xFF, # always 0 for force, snap_force \u2264 8 - 0, - 0, - 0, - ], - st, - ) - return True - - - -def _apply_slope_feedback( - trig, - start: int, - end: int, - start_strength: int, - end_strength: int, - st: object | None = None, -) -> bool: - """Sony SlopeFeedback (mode 0x21). Linear strength ramp from `start_strength` - at zone `start` to `end_strength` at zone `end`. - - Built on Nielk1 Rev6 gist's `MultiplePositionFeedback` factory: same byte - layout as `Resistance` (mode 0x21) with per-zone strengths instead of - uniform. Outside [start, end] zones are inactive (no force). Used for the - progressive brake bite-point feel \u2014 light pressure at the top of throw, - firmer at the bottom \u2014 mimicking a real hydraulic brake pedal. - - Stricter validation than `Resistance`: `start_strength` and `end_strength` - must each be in [1, 8]. Caller is responsible for short-circuiting to - `_apply_off` when the target is 0. - """ - if start > 8 or end > 9 or end <= start: - return False - if start_strength < 1 or start_strength > 8: - return False - if end_strength < 1 or end_strength > 8: - return False - span = end - start - force_zones = 0 - active_zones = 0 - for i in range(10): - if i < start: - continue - elif i <= end: - s = round(start_strength + (end_strength - start_strength) * (i - start) / span) - else: - s = end_strength - s = max(1, min(8, int(s))) - force_zones |= ((s - 1) & 0x07) << (3 * i) - active_zones |= 1 << i - _write_trigger( - trig, - DS_MODE_FEEDBACK, - [ - active_zones & 0xFF, - (active_zones >> 8) & 0xFF, - force_zones & 0xFF, - (force_zones >> 8) & 0xFF, - (force_zones >> 16) & 0xFF, - (force_zones >> 24) & 0xFF, - 0, - ], - st, - ) - return True - - - - -def reset_triggers(ds: pydualsense) -> None: - """Both triggers to canonical Off (mode 0x05). Actively retracts the motor.""" - _apply_off(ds.triggerL) - _apply_off(ds.triggerR) - - -def reset_lightbar(ds: pydualsense) -> None: - """Lightbar to off (RGB 0,0,0). - - Used when telemetry has been idle long enough that we should stop asserting - a race color \u2014 e.g. Forza exited or hasn't started a session yet. Without - this, pydualsense's BG sendReport thread keeps re-publishing whatever - `TouchpadColor` we last set, so the controller stays lit indefinitely. - """ - _set_lightbar(ds, 0, 0, 0) - - -# --- RacingDSX math primitives ------------------------------------------------ - - -def _map(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float: - """Mirrors Parser.Map() in RacingDSX, including endpoint clamping.""" - if x > in_max: - x = in_max - elif x < in_min: - x = in_min - return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min - - -def _ewma(value: float, last: float, alpha: float) -> float: - """Mirrors `Parser.EWMA(float, float, float)`. alpha=1.0 disables smoothing.""" - return alpha * value + (1.0 - alpha) * last - - -def _ewma_int(value: int, last: int, alpha: float) -> int: - """Mirrors `Parser.EWMA(int, int, float)` \u2014 floor of float-EWMA.""" - return math.floor(alpha * value + (1.0 - alpha) * last) - - -def _slip_to_strength(slip_ratio: float, lo: float, hi: float) -> int: - """Map a slip ratio to AutomaticGun / Vibration strength (0..8) on a log curve. - - Replaces the previous `amplitude >> 5` mapping which collapsed Sony's 8 - strength steps to 3 ({5,6,7}) over RacingDSX's 175..255 amplitude range \u2014 - burnout-onset and full-burnout felt identical. - - The log curve matches human trigger-force perception (roughly logarithmic) - and uses the full 1..8 range. Below `lo`, returns 0 (caller should route - to Off / steady-state). Above `hi`, saturates at 8. - """ - if slip_ratio < lo: - return 0 - if slip_ratio >= hi: - return 8 - t = (math.log(slip_ratio) - math.log(lo)) / (math.log(hi) - math.log(lo)) - return max(1, min(8, math.ceil(t * 8))) - - -def _safe(pkt: ForzaDataPacket, name: str, default: float = 0.0) -> float: - """Read a numeric field defensively. Returns default for missing fields, - NaN, or +/-inf \u2014 protects every caller from crashing on corrupt UDP - packets, fdp parser overflow, or future-game-version field changes.""" - v = float(getattr(pkt, name, default)) - if not math.isfinite(v): - return float(default) - return v - - -# --- Per-trigger persistent state for EWMA ------------------------------------ - - -class _TriggerState: - __slots__ = ( - "last_resistance", - "last_freq", - "_init_resistance", - "_last_sent", - "event_remaining_frames", - "event_callable", - ) - - def __init__(self, init_resistance: int) -> None: - self._init_resistance: int = int(init_resistance) - self.last_resistance: int = int(init_resistance) - self.last_freq: int = 0 - # (mode_int, forces_tuple) of last byte sequence actually written to - # the pydualsense trigger fields. None = nothing written yet. Used by - # _write_trigger to suppress redundant HID writes \u2014 divergence #5. - self._last_sent: tuple[int, tuple[int, ...]] | None = None - # Event-impulse override: when event_remaining_frames > 0, apply_*_trigger - # calls event_callable(trig, st) instead of computing per-frame state. - # Used for gear-shift Bow (Phase 5) and any future short overrides. - self.event_remaining_frames: int = 0 - self.event_callable = None - - def reset(self) -> None: - """Resync to the controller's freshly-reset state. Called from - `_reset_caches` after reconnect / idle reset; also clears the EWMA - smoothing state so the first frame of a new race doesn't carry stale - slip-amplitude values across the transition.""" - self.last_resistance = self._init_resistance - self.last_freq = 0 - self._last_sent = (int(DS_MODE_OFF), (0, 0, 0, 0, 0, 0, 0)) - self.event_remaining_frames = 0 - self.event_callable = None - - -# --- Forza game-level persistent state (ForzaParser.cs fields) ---------------- - - -class _ForzaState: - """Persistent across packets. Mirrors ForzaParser's instance fields: - LastEngineRPM, LastRPMAccumulator, LastValidCarClass, LastValidCarCPI.""" - - __slots__ = ( - "last_engine_rpm", - "rpm_accumulator", - "last_valid_car_class", - "last_valid_car_cpi", - "last_gear", - ) - - def __init__(self) -> None: - self.last_engine_rpm = 0.0 - self.rpm_accumulator = 0 - self.last_valid_car_class = 0 - self.last_valid_car_cpi = 0 - self.last_gear = -1 # sentinel; first packet's gear edge is suppressed - - -def _clamp_byte(v: float) -> int: - """Clamp to [0, 255] before writing to a uint8 RGB channel.""" - return max(0, min(255, int(v))) - - -def forza_is_race_on(pkt: ForzaDataPacket, state: _ForzaState) -> bool: - """Mirrors `ForzaParser.IsRaceOn()` verbatim. - - FH4/FH5's `is_race_on` field is unreliable: it sometimes stays True after - the player exits a race or pauses. ForzaParser detects the off state by - watching for unchanged engine RPM combined with non-positive Power across - `RPMAccumulatorTriggerRaceOff` (200) consecutive frames. Power is dash-only, - so for sled-format packets it reads as 0; that matches RacingDSX exactly. - """ - in_race = bool(int(getattr(pkt, "is_race_on", 0))) - current_rpm = _safe(pkt, "current_engine_rpm") - power = _safe(pkt, "power") - - if current_rpm == state.last_engine_rpm and power <= 0: - state.rpm_accumulator += 1 - if state.rpm_accumulator > RACE_OFF_RPM_FRAMES: - in_race = False - else: - state.rpm_accumulator = 0 - - state.last_engine_rpm = current_rpm - return in_race - - -# --- Lightbar (touchpad LED ring) --------------------------------------------- - - -def apply_lightbar_pre_race(ds: pydualsense, pkt: ForzaDataPacket, state: _ForzaState) -> None: - """Mirrors `ForzaParser.GetPreRaceInstructions()` lightbar logic. - - Sets the lightbar to the car's class color, dimmed by `cpi/MAX_CPI`. - X-class cars use the fixed ColorClassX without a CPI tint. Car class and - CPI fields can briefly read 0 during loading screens, so we cache the - last valid value seen \u2014 also matching ForzaParser's behavior.""" - car_class = int(_safe(pkt, "car_class")) - if car_class > 0: - state.last_valid_car_class = car_class - car_class = state.last_valid_car_class - - cpi = int(_safe(pkt, "car_performance_index")) - if cpi > 0: - state.last_valid_car_cpi = min(cpi, 255) - cpi = state.last_valid_car_cpi - - cpi_ratio = cpi / MAX_CPI - - if car_class <= 5: - cr, cg, cb = CAR_CLASS_COLORS[car_class] - r = math.floor(cpi_ratio * cr) - g = math.floor(cpi_ratio * cg) - b = math.floor(cpi_ratio * cb) - else: - r, g, b = CAR_CLASS_COLORS[6] - - _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) - - -def apply_lightbar_in_race(ds: pydualsense, pkt: ForzaDataPacket) -> None: - """Mirrors `Parser.GetInRaceLightbarInstruction()` RPM-gradient logic. - - Below the redline ratio (Profile.RPMRedlineRatio = 0.9), red rises and - green falls linearly with rpm_ratio, with green floored at 50. At or - above redline the lightbar goes pure red (255, 0, 0).""" - max_rpm = _safe(pkt, "engine_max_rpm") - idle_rpm = _safe(pkt, "engine_idle_rpm") - current_rpm = _safe(pkt, "current_engine_rpm") - - engine_range = max_rpm - idle_rpm - if engine_range <= 0: - rpm_ratio = 0.0 - else: - rpm_ratio = (current_rpm - idle_rpm) / engine_range - - if rpm_ratio >= RPM_REDLINE_RATIO: - r, g, b = 255, 0, 0 - else: - r = math.floor(rpm_ratio * 255) - g = max(math.floor((1.0 - rpm_ratio) * 255), GREEN_FLOOR) - b = 0 - - _set_lightbar(ds, _clamp_byte(r), _clamp_byte(g), _clamp_byte(b)) - - -# --- Body LRA motors --------------------------------------------------------- -# Adaptive triggers are only half of the GT7 racing-haptic vocabulary; the -# other half is the body LRAs (left/right linear resonant actuators) feeding -# 'feel which side hit the kerb' through outReport[3]/[4]. RacingDSX/DSX -# don't drive these for racing telemetry; we extend the daemon to use them. -# Mix surface_rumble (road texture, 0..1 per wheel) and wheel_on_rumble_strip -# (boolean kerb contact) per side: front-left/rear-left -> left motor, -# front-right/rear-right -> right motor. -LRA_KERB_FLOOR_AMP = 80 # minimum amplitude when wheel is on a kerb strip - - -def apply_lra(ds: pydualsense, pkt: ForzaDataPacket) -> None: - """Drive the body LRA motors from Forza's surface-rumble + kerb fields. - - Surface-rumble values jitter frame-to-frame on textured surfaces (gravel, - kerbs, wet asphalt). Without smoothing the LRAs sound like a machine-gun - at 60 Hz. EWMA at alpha=0.6 keeps response responsive (~5 frames to reach - >=240 from a step input) while killing single-frame artifacts.""" - global _lra_smoothed - surface_fl = abs(_safe(pkt, "surface_rumble_FL")) - surface_fr = abs(_safe(pkt, "surface_rumble_FR")) - surface_rl = abs(_safe(pkt, "surface_rumble_RL")) - surface_rr = abs(_safe(pkt, "surface_rumble_RR")) - kerb_left = bool(_safe(pkt, "wheel_on_rumble_strip_FL")) or bool( - _safe(pkt, "wheel_on_rumble_strip_RL") - ) - kerb_right = bool(_safe(pkt, "wheel_on_rumble_strip_FR")) or bool( - _safe(pkt, "wheel_on_rumble_strip_RR") - ) - - target_left = max(surface_fl, surface_rl) * 255.0 - target_right = max(surface_fr, surface_rr) * 255.0 - smoothed_left = LRA_SMOOTHING_ALPHA * target_left + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[0] - smoothed_right = LRA_SMOOTHING_ALPHA * target_right + (1.0 - LRA_SMOOTHING_ALPHA) * _lra_smoothed[1] - _lra_smoothed = (smoothed_left, smoothed_right) - - left_amp = int(smoothed_left) - right_amp = int(smoothed_right) - if kerb_left: - left_amp = max(left_amp, LRA_KERB_FLOOR_AMP) - if kerb_right: - right_amp = max(right_amp, LRA_KERB_FLOOR_AMP) - - _set_motors(ds, left_amp, right_amp) - - - -# --- Throttle (right trigger) ------------------------------------------------- - - -def apply_right_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: - """Mirrors `Parser.GetInRaceRightTriggerInstruction()` line for line, with - one optional divergence: the throttle is released when the clutch is - disengaged. See divergence #3 in the module docstring.""" - # Event impulse override (e.g. gear-shift Bow): runs the scheduled encoder - # for `event_remaining_frames` packets, then resumes steady-state. - if st.event_remaining_frames > 0 and st.event_callable is not None: - st.event_remaining_frames -= 1 - st.event_callable(ds.triggerR, st) - return - if CLUTCH_GATE_ENABLED and int(_safe(pkt, "clutch", 0.0)) > CLUTCH_DISENGAGE_THRESHOLD: - _apply_off(ds.triggerR, st) - return - - accel_x = _safe(pkt, "acceleration_x") - accel_z = _safe(pkt, "acceleration_z") - avg_accel = math.sqrt( - THROTTLE_TURN_ACCEL_SCALE * (accel_x * accel_x) - + THROTTLE_FORWARD_ACCEL_SCALE * (accel_z * accel_z) - ) - - fl = abs(_safe(pkt, "tire_combined_slip_FL")) - fr = abs(_safe(pkt, "tire_combined_slip_FR")) - rl = abs(_safe(pkt, "tire_combined_slip_RL")) - rr = abs(_safe(pkt, "tire_combined_slip_RR")) - front_slip = (fl + fr) * 0.5 - rear_slip = (rl + rr) * 0.5 - four_wheel_slip = (fl + fr + rl + rr) * 0.25 - - accelerator = int(_safe(pkt, "accel")) - - losing_grip = ( - front_slip > THROTTLE_GRIP_LOSS - or (rear_slip > THROTTLE_GRIP_LOSS and accelerator > THROTTLE_REAR_SLIP_ACCEL_MIN) - ) and _is_in_motion(pkt) - - if losing_grip: - # Floor after Map (matches `(int)Math.Floor(Map(...))` in Parser.cs). - target_freq = math.floor( - _map(four_wheel_slip, THROTTLE_GRIP_LOSS, 5.0, 0.0, THROTTLE_MAX_VIBRATION) - ) - target_resistance = math.floor( - _map( - avg_accel, - 0.0, - THROTTLE_ACCELERATION_LIMIT, - THROTTLE_MIN_STIFFNESS, - THROTTLE_MAX_STIFFNESS, - ) - ) - # Floor after EWMA (matches `(int)EWMA(int, int, float)` overload). - freq = _ewma_int(target_freq, st.last_freq, THROTTLE_VIBRATION_SMOOTHING) - resistance = _ewma_int( - target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING - ) - st.last_freq = freq - st.last_resistance = resistance - - # Vibration entry condition matches RacingDSX `Parser.cs:189-195`: - # `freq > MinVibration AND accelerator > VibrationModeStart`. Below it, - # fall through to Feedback (which DSX-faithfully no-ops on force > 8). - if freq > THROTTLE_MIN_VIBRATION and accelerator > THROTTLE_VIB_POSITION: - # AutomaticGun (mode 0x26) gives a controlled pulse rather than the - # raw-PWM buzz of Simple_Vibration (mode 0x06). Strength is 1-8; - # we map RacingDSX's slip-stiffness range (175..255) into it via - # >>5 (175\u21925, 200\u21926, 255\u21927 clamped 8). Frequency byte is the - # filtered slip-vibration freq from RacingDSX's mapping (0..55). - # Strength from slip ratio via log curve (audit Phase 2). Throttle - # slip strength uses the dominant axle (max of front/rear) since RWD/AWD - # burnouts concentrate slip on the drive wheels; averaging across all 4 - # collapses the dominant signal. lo=0.4 means slip-onset at the entry - # threshold registers as strength 2-3 rather than 1. - slip_metric = max(front_slip, rear_slip) - strength = _slip_to_strength(slip_metric, lo=0.4, hi=2.0) - if strength == 0: - strength = 1 # in slip path => entry threshold passed; floor at 1 - _apply_automatic_gun( - ds.triggerR, - THROTTLE_VIB_POSITION, - strength, - int(freq * THROTTLE_EFFECT_INTENSITY), - st, - ) - else: - # Below threshold: RacingDSX sends `Resistance(0, filteredResistance)` - # with slip-stiffness force values (175..255). DSX silently no-ops - # via `TriggerEffectGenerator.Resistance` returning False on force > 8; - # the trigger holds whatever Vibration state was last set. Our - # `_apply_feedback` does the same. - _apply_feedback( - ds.triggerR, - 0, - int(resistance * THROTTLE_EFFECT_INTENSITY), - st, - ) - return - - # Out of slip path entirely. - - - target_resistance = math.floor( - _map( - avg_accel, - 0.0, - THROTTLE_ACCELERATION_LIMIT, - THROTTLE_MIN_RESISTANCE, - THROTTLE_MAX_RESISTANCE, - ) - ) - resistance = _ewma_int(target_resistance, st.last_resistance, THROTTLE_RESISTANCE_SMOOTHING) - st.last_resistance = resistance - _apply_feedback(ds.triggerR, 0, int(resistance * THROTTLE_EFFECT_INTENSITY), st) - - -# --- Brake (left trigger) ----------------------------------------------------- - - -def apply_left_trigger(ds: pydualsense, pkt: ForzaDataPacket, st: _TriggerState) -> None: - """Mirrors `Parser.GetInRaceLeftTriggerInstruction()` line for line.""" - fl = abs(_safe(pkt, "tire_combined_slip_FL")) - fr = abs(_safe(pkt, "tire_combined_slip_FR")) - rl = abs(_safe(pkt, "tire_combined_slip_RL")) - rr = abs(_safe(pkt, "tire_combined_slip_RR")) - four_wheel_slip = (fl + fr + rl + rr) * 0.25 - brake = int(_safe(pkt, "brake")) - - losing_grip = ( - four_wheel_slip > BRAKE_GRIP_LOSS - and brake > BRAKE_DEADZONE - and _is_in_motion(pkt) - ) - - if losing_grip: - # Compute the freq for the AutomaticGun (lock-up) path. Machine uses - # its own per-band fixed frequencies per the audit's recommended map. - target_freq = math.floor( - _map(four_wheel_slip, BRAKE_GRIP_LOSS, 5.0, 0.0, BRAKE_MAX_VIBRATION) - ) - target_resistance = math.floor( - _map( - brake, - 0, - 255, - BRAKE_MAX_STIFFNESS, - BRAKE_MIN_STIFFNESS, - ) - ) - freq = _ewma_int(target_freq, st.last_freq, BRAKE_VIBRATION_SMOOTHING) - resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) - st.last_freq = freq - st.last_resistance = resistance - - # GT7-style: ABS regime uses Machine (mode 0x27, two-amplitude - # alternating pulse for the 'rhythmic catch-and-release' character). - # Severe lock-up (slip > 0.25) crosses to AutomaticGun at max strength - # \u2014 there's no useful catch-release feel beyond ABS, just sustained - # heavy buzz. Gate on slip directly rather than the EWMA-smoothed freq - # because RacingDSX's freq map only reaches MIN_VIBRATION at slip > 3.7 - # which is well past lock-up; gating on freq would make Machine - # unreachable in the actual ABS regime (0.05..0.25). - if four_wheel_slip > 0.25: - _apply_automatic_gun( - ds.triggerL, - BRAKE_VIB_POSITION, - 8, - max(20, int(freq * BRAKE_EFFECT_INTENSITY)), - st, - ) - else: - if four_wheel_slip < 0.10: - amp_a, amp_b, machine_freq, period = 1, 3, 20, 2 - elif four_wheel_slip < 0.15: - amp_a, amp_b, machine_freq, period = 2, 5, 22, 1 - elif four_wheel_slip < 0.20: - amp_a, amp_b, machine_freq, period = 3, 6, 25, 1 - else: - amp_a, amp_b, machine_freq, period = 3, 7, 28, 1 - _apply_machine( - ds.triggerL, - BRAKE_ABS_MACHINE_START, - BRAKE_ABS_MACHINE_END, - amp_a, - amp_b, - machine_freq, - period, - st, - ) - return - - # Out of slip path entirely. - - - target_resistance = math.floor(_map(brake, 0, 255, BRAKE_MIN_RESISTANCE, BRAKE_MAX_RESISTANCE)) - # Slip\u2192non-slip recovery: when last_resistance carries a slip-mode value - # (5..150 from the slip-stiffness range), normal EWMA decay holds the trigger - # in its prior Vibration/Machine state for ~8 frames (~133 ms) because - # `_apply_feedback` silently no-ops on force > 8. Halve the residue each - # frame until it's in Feedback range so the user feels brake-release - # within ~3 frames of slip exit. - if st.last_resistance > 12: - st.last_resistance = max(target_resistance, st.last_resistance // 2) - resistance = _ewma_int(target_resistance, st.last_resistance, BRAKE_RESISTANCE_SMOOTHING) - st.last_resistance = resistance - - - - # SlopeFeedback gives a progressive bite-point: light at the top of throw, - # firmer at the bottom \u2014 a real hydraulic brake pedal feel. start=2, end=8 - # anchors the ramp in the bite-point band; start_strength=2 keeps the top - # easy to engage while end_strength scales with brake input. - end_strength = int(resistance * BRAKE_EFFECT_INTENSITY) - if end_strength < 1: - # No (or trivial) brake: trigger neutral. Same as DSX-faithful Resistance(0,0). - _apply_off(ds.triggerL, st) - elif end_strength > 8: - # Slip-stiffness EWMA decay residue: silently no-op, keep prior state. - # Matches the divergence #2 brick-wall-avoidance design (and audit \u00a71.5 - # decay accelerator above). - return - else: - _apply_slope_feedback( - ds.triggerL, start=2, end=8, start_strength=2, end_strength=end_strength, st=st, - ) - - -# --- UDP main loop ------------------------------------------------------------ - - -def parse_packet(data: bytes) -> ForzaDataPacket | None: +def _parse_packet(data: bytes) -> ForzaDataPacket | None: fmt = PACKET_FORMATS.get(len(data)) if fmt is None: LOG.debug("ignoring packet of unexpected length %d", len(data)) @@ -1247,124 +194,183 @@ def parse_packet(data: bytes) -> ForzaDataPacket | None: return None -def _close_controller(ds: pydualsense | None) -> None: - """Best-effort close. The HID device may already be gone (unplug, BT drop) - in which case `device.close()` raises; we don't care.""" - if ds is None: +def _axle_slip(pkt: ForzaDataPacket) -> tuple[float, float]: + """(front_slip, rear_slip) — dominant tire of each axle. + + Race-Element uses `Math.Max` over each axle's two tires to surface the + worst-slipping wheel, since slip on either side counts. fdp emits + `tire_combined_slip_*` as signed floats; `_safe_abs` filters NaN/inf + and takes the absolute value (Race-Element calls this NegateIfNegative). + """ + fl = _safe_abs(pkt, "tire_combined_slip_FL") + fr = _safe_abs(pkt, "tire_combined_slip_FR") + rl = _safe_abs(pkt, "tire_combined_slip_RL") + rr = _safe_abs(pkt, "tire_combined_slip_RR") + return max(fl, fr), max(rl, rr) + + +# --- Trigger haptic handlers (1:1 port of TriggerHaptics.cs) ----------------- + + +def handle_braking(controller: DualSenseController, pkt: ForzaDataPacket) -> None: + """Brake (L2). Mirrors `TriggerHaptics.HandleBraking`. + + Note: Race-Element's brake path leaves the trigger in its prior effect + when brake is engaged but slip is below threshold — i.e. you can hold + the brake without slip after an ABS event and the trigger keeps + vibrating until you release the brake. This matches the upstream code + exactly (no else-branch around the slip check). + """ + brake_input = _safe_abs(pkt, "brake") / 255.0 + if brake_input <= BRAKE_INPUT_THRESHOLD: + controller.left_trigger.effect.off() + return + + front_slip, rear_slip = _axle_slip(pkt) + if ( + front_slip <= BRAKE_FRONT_SLIP_THRESHOLD + and rear_slip <= BRAKE_REAR_SLIP_THRESHOLD + ): + return # Race-Element falls through here: trigger keeps prior effect. + + front_coef = min(front_slip * 4, 10) + # RACE-ELEMENT BUG: upstream computes the rear coefficient from FRONT slip + # (TriggerHaptics.cs line 36, `slipRatioFront * 2f`). Faithful port keeps + # the bug; if you'd rather use rear_slip * 2, change one symbol. + rear_coef = min(front_slip * 2, 7.5) + pct = (front_coef + rear_coef) / BRAKE_PCT_DIVISOR + freq = max(BRAKE_MIN_FREQUENCY, int(BRAKE_MAX_FREQUENCY * pct)) + + controller.left_trigger.effect.vibration( + start_position=0, amplitude=BRAKE_AMPLITUDE, frequency=freq + ) + + +def handle_acceleration(controller: DualSenseController, pkt: ForzaDataPacket) -> None: + """Throttle (R2). Mirrors `TriggerHaptics.HandleAcceleration`.""" + throttle_input = _safe_abs(pkt, "accel") / 255.0 + if throttle_input <= THROTTLE_INPUT_THRESHOLD: + controller.right_trigger.effect.off() + return + + front_slip, rear_slip = _axle_slip(pkt) + if ( + front_slip <= THROTTLE_FRONT_SLIP_THRESHOLD + and rear_slip <= THROTTLE_REAR_SLIP_THRESHOLD + ): + # Throttle path resets to default explicitly when not slipping + # (HandleAcceleration line 112, `R2Effect = TriggerEffect.Default`). + controller.right_trigger.effect.off() + return + + front_coef = min(front_slip * 3, 5) + # RACE-ELEMENT BUG: same as brake — rear coefficient uses FRONT slip + # (TriggerHaptics.cs line 92, `slipRatioFront * 5f`). + rear_coef = min(front_slip * 5, 7.5) + pct = (front_coef + rear_coef) / THROTTLE_PCT_DIVISOR + freq = max(THROTTLE_MIN_FREQUENCY, int(THROTTLE_MAX_FREQUENCY * pct)) + + controller.right_trigger.effect.vibration( + start_position=0, amplitude=THROTTLE_AMPLITUDE, frequency=freq + ) + + +def reset_triggers(controller: DualSenseController) -> None: + """Default both triggers — called on idle, reconnect, and shutdown.""" + controller.left_trigger.effect.off() + controller.right_trigger.effect.off() + + +# --- Connection / hot-plug --------------------------------------------------- + + +def _connect_controller() -> DualSenseController | None: + """Block until a DualSense is reachable. Returns the activated + controller, or None if shutdown was requested before one appeared. + """ + LOG.info("opening dualsense controller") + first_failure_logged = False + while not _shutdown: + try: + devices = DualSenseController.enumerate_devices() + except Exception as e: + if not first_failure_logged: + LOG.warning("hidapi enumeration failed: %s", e) + first_failure_logged = True + time.sleep(RECONNECT_BACKOFF_S) + continue + + if not devices: + if not first_failure_logged: + LOG.warning( + "no DualSense found; retrying every %.1fs", RECONNECT_BACKOFF_S + ) + first_failure_logged = True + time.sleep(RECONNECT_BACKOFF_S) + continue + + try: + controller = DualSenseController(device_index_or_device_info=devices[0]) + controller.on_error(_on_controller_error) + controller.activate() + except Exception as e: + if not first_failure_logged: + LOG.warning("controller activation failed: %s", e) + first_failure_logged = True + time.sleep(RECONNECT_BACKOFF_S) + continue + + # Push a clean initial state so we don't inherit residual effects from + # a previous Steam Input session, prior daemon instance, or stale + # firmware state. + reset_triggers(controller) + global _disconnected + _disconnected = False + LOG.info("dualsense controller connected (%s)", controller.connection_type) + return controller + + return None + + +def _close_controller(controller: DualSenseController | None) -> None: + if controller is None: return try: - ds.close() + controller.deactivate() except Exception: pass -def _connect_controller() -> pydualsense: - """Open the DualSense, blocking until one is reachable. - - `pydualsense.init()` raises when no DualSense is plugged in. That's a - normal startup-or-replug condition for us, not a fatal error \u2014 the - daemon is meant to live for the whole user session and self-heal across - plug events without external supervision. We log the first failure once, - then retry quietly every `RECONNECT_BACKOFF_S` seconds. - """ - LOG.info("opening dualsense controller") - first_failure_logged = False - while True: - ds = pydualsense() - try: - ds.init() - except Exception as e: - _close_controller(ds) - if not first_failure_logged: - LOG.warning( - "dualsense not available (%s); retrying every %.1fs", - e, - RECONNECT_BACKOFF_S, - ) - first_failure_logged = True - time.sleep(RECONNECT_BACKOFF_S) - if _shutdown_requested: - raise KeyboardInterrupt - continue - # Stop pydualsense's BG sendReport thread. See divergence #6 in the - # rationale near `_set_lightbar` / `_flush`. Without this the firmware - # gets a stream of 'set trigger' commands at ~250 Hz, which it treats - # as fresh effect commands \u2014 the trigger PID restarts every ~4 ms and - # oscillates against the user's finger. - ds.ds_thread = False - if ds.report_thread.is_alive(): - ds.report_thread.join(timeout=2.0) - # Push a clean initial state so we don't inherit residual trigger - # effects from a previous Steam Input session, prior daemon instance, - # or stale firmware state. - global _trigger_dirty, _lightbar_dirty, _motors_dirty, _last_lightbar, _last_motors - _trigger_dirty = False - _lightbar_dirty = False - _motors_dirty = False - _last_lightbar = None - _last_motors = None - _apply_off(ds.triggerL) - _apply_off(ds.triggerR) - _set_lightbar(ds, 0, 0, 0) - _set_motors(ds, 0, 0) - try: - ds.writeReport(ds.prepareReport()) - _trigger_dirty = False - _lightbar_dirty = False - _motors_dirty = False - except (IOError, OSError) as e: - LOG.warning("initial write failed: %s; retrying connect", e) - _close_controller(ds) - time.sleep(RECONNECT_BACKOFF_S) - if _shutdown_requested: - raise KeyboardInterrupt - continue - LOG.info("dualsense controller connected") - return ds +# --- Main loop --------------------------------------------------------------- -# --- Signal handling -------------------------------------------------------- -# systemd sends SIGTERM (not SIGINT) by default. Python does not route -# SIGTERM to KeyboardInterrupt, so without an explicit handler the process -# ignores SIGTERM and systemd waits TimeoutStopSec=90s before SIGKILL. -# Meanwhile socket activation can fire a second instance — two daemons -# fighting over the same controller. We set a module-level flag that the -# main loop polls once per timeout (≤1s) so shutdown is prompt. -_shutdown_requested = False - - -def _handle_termination(signum: int, frame: object) -> None: - """Signal handler for SIGTERM / SIGINT — unblock the main loop.""" - global _shutdown_requested - _shutdown_requested = True - - -def run(host: str, port: int, debug: bool, exit_on_idle: bool = False) -> int: - # Register signal handlers FIRST so a SIGTERM during the (potentially - # blocking) `_connect_controller` retry loop sets the shutdown flag - # instead of killing the process with default-handler. The connect loop - # checks the flag between retries so the daemon exits cleanly even with - # no controller plugged in. - signal.signal(signal.SIGTERM, _handle_termination) - signal.signal(signal.SIGINT, _handle_termination) - ds = _connect_controller() - +def run(host: str, port: int, exit_on_idle: bool = False) -> int: + signal.signal(signal.SIGTERM, _on_termination) + signal.signal(signal.SIGINT, _on_termination) + # Bind the UDP socket BEFORE opening the controller. If the bind fails + # (port in use, systemd-passed fd unusable, etc.) we exit cleanly without + # having activated the controller's HID stack (which would otherwise leak + # an active session). _get_socket raises on bind failure. LOG.info("listening for forza udp on %s:%d", host, port) sock = _get_socket(host, port) - throttle_state = _TriggerState(init_resistance=THROTTLE_LAST_RESISTANCE_INIT) - brake_state = _TriggerState(init_resistance=BRAKE_LAST_RESISTANCE_INIT) - forza_state = _ForzaState() - last_seen = 0.0 - in_race = False + controller = _connect_controller() + if controller is None: + sock.close() + return 0 - have_telemetry = False # True between the first packet and the IDLE_TIMEOUT_S reset + last_seen = 0.0 + have_telemetry = False try: - while True: - if _shutdown_requested: - raise KeyboardInterrupt + while not _shutdown: + if _disconnected: + LOG.warning("dualsense disconnected; reconnecting") + _close_controller(controller) + controller = _connect_controller() + if controller is None: + return 0 now = time.monotonic() try: @@ -1372,114 +378,33 @@ def run(host: str, port: int, debug: bool, exit_on_idle: bool = False) -> int: last_seen = now have_telemetry = True except socket.timeout: - if _shutdown_requested: - raise KeyboardInterrupt - - # Reset on telemetry-idle regardless of in_race state. After Forza - # exits with the user in its main menu (is_race_on=0 packets just - # before exit, so in_race was already False), the old check would - # leave the last lightbar/trigger state asserted forever. + if _shutdown: + break if have_telemetry and (now - last_seen) > IDLE_TIMEOUT_S: - LOG.info( - "forza idle for %.1fs — resetting controller", - IDLE_TIMEOUT_S, - ) - # One-shot 0x05 to actively retract the trigger motor; the BG - # thread will publish it ~12 times in the next 50 ms before - # main thread loops back here. - reset_triggers(ds) - reset_lightbar(ds) - _set_motors(ds, 0, 0) - if not _flush(ds): - LOG.warning("dualsense disconnected; reconnecting") - _close_controller(ds) - ds = _connect_controller() - _reset_caches(throttle_state, brake_state) + LOG.info("forza idle for %.1fs — resetting", IDLE_TIMEOUT_S) + reset_triggers(controller) have_telemetry = False - in_race = False - if exit_on_idle: LOG.info("exiting on idle") - return 0 + break continue - pkt = parse_packet(data) + pkt = _parse_packet(data) if pkt is None: continue - # ForzaParser.IsRaceOn() override: combines packet field with the - # FH-specific RPM-accumulator workaround. Must be called once per - # packet so the accumulator state stays accurate. - in_race = forza_is_race_on(pkt, forza_state) - - if not in_race: - # Pre-race: release both triggers via mode 0x05. The no-op - # cache (divergence #5) means subsequent identical frames - # don't produce HID writes \u2014 the trigger holds its released - # state with zero ongoing motor work. - _apply_off(ds.triggerL, brake_state) - _apply_off(ds.triggerR, throttle_state) - apply_lightbar_pre_race(ds, pkt, forza_state) - _set_motors(ds, 0, 0) - - else: - if debug: - LOG.debug( - "rpm=%.0f/%.0f accel=%d brake=%d " - "slip[FL,FR,RL,RR]=%.2f,%.2f,%.2f,%.2f " - "throttle[freq=%d res=%d] brake[freq=%d res=%d]", - _safe(pkt, "current_engine_rpm"), - _safe(pkt, "engine_max_rpm"), - int(_safe(pkt, "accel")), - int(_safe(pkt, "brake")), - _safe(pkt, "tire_combined_slip_FL"), - _safe(pkt, "tire_combined_slip_FR"), - _safe(pkt, "tire_combined_slip_RL"), - _safe(pkt, "tire_combined_slip_RR"), - throttle_state.last_freq, - throttle_state.last_resistance, - brake_state.last_freq, - brake_state.last_resistance, - ) - apply_lightbar_in_race(ds, pkt) - apply_lra(ds, pkt) - # Gear-shift event: when the gear field changes between packets, - # schedule a 1-frame Bow on R2 \u2014 NFS-Unbound-style shift detent. - # Bow zones (4..6) anchor the snap mid-throw; force=8/snap=8. - gear = int(_safe(pkt, "gear", -1)) - if ( - gear != forza_state.last_gear - and gear >= 0 - and forza_state.last_gear >= 0 - ): - throttle_state.event_remaining_frames = 1 - throttle_state.event_callable = lambda trig, st: _apply_bow( - trig, 4, 6, 8, 8, st - ) - forza_state.last_gear = gear - apply_left_trigger(ds, pkt, brake_state) - apply_right_trigger(ds, pkt, throttle_state) - - - # Push one HID report per Forza packet (60 Hz max). Idempotent \u2014 - # _flush() returns immediately if nothing actually changed since the - # last push (the no-op cache on `_TriggerState._last_sent` and on - # `_last_lightbar`). On write failure (controller unplugged), reset - # the daemon's cached state so the post-reconnect frame writes anew. - if not _flush(ds): - LOG.warning("dualsense disconnected; reconnecting") - _close_controller(ds) - ds = _connect_controller() - _reset_caches(throttle_state, brake_state) - except KeyboardInterrupt: - LOG.info("shutting down") + handle_acceleration(controller, pkt) + handle_braking(controller, pkt) finally: try: - reset_triggers(ds) - _flush(ds) + reset_triggers(controller) + except Exception: + pass + _close_controller(controller) + try: + sock.close() except Exception: pass - _close_controller(ds) return 0 @@ -1492,9 +417,7 @@ def main() -> int: parser.add_argument("--host", default="127.0.0.1", help="UDP bind address") parser.add_argument("--port", type=int, default=5300, help="UDP bind port") parser.add_argument( - "--debug", - action="store_true", - help="log per-packet telemetry at DEBUG level", + "--debug", action="store_true", help="log per-packet decisions at DEBUG" ) parser.add_argument( "--exit-on-idle", @@ -1505,12 +428,10 @@ def main() -> int: level = os.environ.get("FORZA_TRIGGER_LOG", "DEBUG" if args.debug else "INFO") logging.basicConfig( - level=level, - format="%(asctime)s %(levelname)s %(name)s %(message)s", + level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s" ) - return run(args.host, args.port, args.debug, args.exit_on_idle) + return run(args.host, args.port, args.exit_on_idle) if __name__ == "__main__": sys.exit(main()) - diff --git a/hosts/yarn/forza-trigger/python-packages.nix b/hosts/yarn/forza-trigger/python-packages.nix index 2afb336..619ffd7 100644 --- a/hosts/yarn/forza-trigger/python-packages.nix +++ b/hosts/yarn/forza-trigger/python-packages.nix @@ -12,65 +12,64 @@ let py = python.pkgs; in rec { - # CFFI bindings to libhidapi (flok/hidapi-cffi on PyPI). pydualsense's - # `import hidapi` resolves to this — nixpkgs' python3Packages.hidapi is the - # Cython wrapper from trezor/cython-hidapi which exposes a different - # `import hid` API and can't satisfy pydualsense. - hidapi-usb = py.buildPythonPackage rec { - pname = "hidapi-usb"; - version = "0.3.2"; - format = "setuptools"; + # DualSense controller library (yesbotics/dualsense-controller-python). + # Bundles its own hidapi-cffi binding; replaces the previous pydualsense + + # hidapi-usb pair. The library's transport model writes the HID output + # report on every input report tick BUT only when an output state actually + # changed since the last write — so we get on-demand HID writes for free + # without disabling any background thread (the thing pydualsense made us + # work around at ~250 Hz). BT CRC32 is computed correctly inside the + # library's Bt31OutReport.to_bytes. + dualsense-controller = py.buildPythonPackage rec { + pname = "dualsense-controller"; + version = "0.3.1"; + pyproject = true; - # PyPI's project URL slug uses a hyphen (`hidapi-usb`) but the sdist file - # itself is PEP-625-normalized to an underscore (`hidapi_usb-…`). Stock - # fetchPypi assumes they match — they don't here, so fetch by direct URL. - src = pkgs.fetchurl { - url = "https://files.pythonhosted.org/packages/55/80/960ae94b615e26a7d1aeebe8e9fefda2f25608bf1016f9aec268b328c35e/hidapi_usb-${version}.tar.gz"; - hash = "sha256-oxp+2i+qqYd1uwiS2Dh8/PzO62iYQQXpR936MnDIFk0="; + # PyPI normalizes the sdist filename to underscore form (PEP 625) but + # the project URL slug is dasherized. fetchPypi assumes they match; + # passing the underscored pname matches the on-disk filename. + src = py.fetchPypi { + pname = "dualsense_controller"; + inherit version; + hash = "sha256-yy1MQeRPqaLvoXaAigQd3gPFsFLbwKqrD4mP2zQqcFw="; }; - propagatedBuildInputs = [ py.cffi ]; + nativeBuildInputs = [ py.poetry-core ]; - # Upstream's hidapi.py walks a tuple of soname strings via ffi.dlopen() - # until one resolves. Pin the two Linux hidraw entries to absolute store - # paths so the wrapped Python in our writePython3Bin closure finds them - # without LD_LIBRARY_PATH wrapping. The libusb / iohidmanager / dylib / - # dll entries are dead code on Linux. --replace-fail makes a rename in + # Upstream's pyproject pins pyee ^11 and cffi ^1.15. nixpkgs ships + # pyee 13 and cffi 2. The library only uses stable APIs — pyee + # `EventEmitter`/`on`/`emit` and cffi `FFI`/`cdef`/`dlopen`/`new`/ + # `buffer` — that work unchanged on the newer versions. Relax both. + pythonRelaxDeps = [ + "pyee" + "cffi" + ]; + + propagatedBuildInputs = with py; [ + pyee + cffi + deprecated + ]; + + # Same hidapi-binding pattern as our previous hidapi-usb derivation. + # The library walks a tuple of soname strings via ffi.dlopen() until + # one resolves; pin the two Linux hidraw entries to absolute store + # paths so the wrapped Python in writePython3Bin doesn't need + # LD_LIBRARY_PATH wrapping. The libusb / iohidmanager / dylib / dll + # entries are dead code on Linux. --replace-fail makes a rename in # upstream's tuple a loud build error rather than a silent ImportError # at runtime. postPatch = '' - substituteInPlace hidapi.py \ + substituteInPlace src/dualsense_controller/core/hidapi/hidapi.py \ --replace-fail "'libhidapi-hidraw.so'," "'${pkgs.hidapi}/lib/libhidapi-hidraw.so'," \ --replace-fail "'libhidapi-hidraw.so.0'," "'${pkgs.hidapi}/lib/libhidapi-hidraw.so.0'," ''; - pythonImportsCheck = [ "hidapi" ]; + pythonImportsCheck = [ "dualsense_controller" ]; meta = { - description = "CFFI wrapper for hidapi (used by pydualsense)"; - homepage = "https://github.com/flok/hidapi-cffi"; - license = lib.licenses.bsd3; - }; - }; - - pydualsense = py.buildPythonPackage rec { - pname = "pydualsense"; - version = "0.7.5"; - format = "pyproject"; - - src = py.fetchPypi { - inherit pname version; - hash = "sha256-YgX8AJE4f8p7geKT3xlCD0Mlh1GcyHpBz4rEIqdwKgs="; - }; - - nativeBuildInputs = [ py.poetry-core ]; - propagatedBuildInputs = [ hidapi-usb ]; - - pythonImportsCheck = [ "pydualsense" ]; - - meta = { - description = "Control your PS5 DualSense controller from Python"; - homepage = "https://github.com/flok/pydualsense"; + description = "Use DualSense Controller with Python"; + homepage = "https://github.com/yesbotics/dualsense-controller-python"; license = lib.licenses.mit; }; };