diff --git a/service-configs.nix b/service-configs.nix index 4cb200d..3f20d4a 100644 --- a/service-configs.nix +++ b/service-configs.nix @@ -81,6 +81,12 @@ rec { port = 6011; proto = "tcp"; }; + # Webhook receiver for the Jellyfin-qBittorrent monitor — Jellyfin pushes + # playback events here so throttling reacts without waiting for the poll. + jellyfin_qbittorrent_monitor_webhook = { + port = 9898; + proto = "tcp"; + }; bitmagnet = { port = 3333; proto = "tcp"; diff --git a/services/jellyfin/jellyfin-qbittorrent-monitor.nix b/services/jellyfin/jellyfin-qbittorrent-monitor.nix index 4bf57d7..9b800d1 100644 --- a/services/jellyfin/jellyfin-qbittorrent-monitor.nix +++ b/services/jellyfin/jellyfin-qbittorrent-monitor.nix @@ -5,14 +5,67 @@ lib, ... }: +let + webhookPlugin = import ./jellyfin-webhook-plugin.nix { inherit pkgs lib; }; + jellyfinPort = service_configs.ports.private.jellyfin.port; + webhookPort = service_configs.ports.private.jellyfin_qbittorrent_monitor_webhook.port; +in lib.mkIf config.services.jellyfin.enable { + # Materialise the Jellyfin Webhook plugin into Jellyfin's plugins dir before + # Jellyfin starts. Jellyfin rewrites meta.json at runtime, so a read-only + # nix-store symlink would EACCES — we copy instead. + systemd.services.jellyfin-webhook-install = { + before = [ "jellyfin.service" ]; + wantedBy = [ "jellyfin.service" ]; + serviceConfig = { + Type = "oneshot"; + RemainAfterExit = true; + User = config.services.jellyfin.user; + Group = config.services.jellyfin.group; + ExecStart = webhookPlugin.mkInstallScript { + pluginsDir = "${config.services.jellyfin.dataDir}/plugins"; + }; + }; + }; + + # After Jellyfin starts, POST the plugin configuration so the webhook + # targets the monitor's receiver. Idempotent; runs on every boot. + systemd.services.jellyfin-webhook-configure = { + after = [ "jellyfin.service" ]; + wants = [ "jellyfin.service" ]; + before = [ "jellyfin-qbittorrent-monitor.service" ]; + wantedBy = [ "multi-user.target" ]; + serviceConfig = { + Type = "oneshot"; + RemainAfterExit = true; + DynamicUser = true; + LoadCredential = "jellyfin-api-key:${config.age.secrets.jellyfin-api-key.path}"; + ExecStart = webhookPlugin.mkConfigureScript { + jellyfinUrl = "http://127.0.0.1:${toString jellyfinPort}"; + webhooks = [ + { + name = "qBittorrent Monitor"; + uri = "http://127.0.0.1:${toString webhookPort}/"; + notificationTypes = [ + "PlaybackStart" + "PlaybackProgress" + "PlaybackStop" + ]; + } + ]; + }; + }; + }; + systemd.services."jellyfin-qbittorrent-monitor" = { description = "Monitor Jellyfin streaming and control qBittorrent rate limits"; after = [ "network.target" "jellyfin.service" "qbittorrent.service" + "jellyfin-webhook-configure.service" ]; + wants = [ "jellyfin-webhook-configure.service" ]; wantedBy = [ "multi-user.target" ]; serviceConfig = { @@ -44,7 +97,7 @@ lib.mkIf config.services.jellyfin.enable { }; environment = { - JELLYFIN_URL = "http://localhost:${builtins.toString service_configs.ports.private.jellyfin.port}"; + JELLYFIN_URL = "http://localhost:${builtins.toString jellyfinPort}"; QBITTORRENT_URL = "http://${config.vpnNamespaces.wg.namespaceAddress}:${builtins.toString service_configs.ports.private.torrent.port}"; CHECK_INTERVAL = "30"; # Bandwidth budget configuration @@ -53,6 +106,9 @@ lib.mkIf config.services.jellyfin.enable { DEFAULT_STREAM_BITRATE = "10000000"; # 10 Mbps fallback when bitrate unknown (bps) MIN_TORRENT_SPEED = "100"; # KB/s - below this, pause torrents instead STREAM_BITRATE_HEADROOM = "1.1"; # multiplier per stream for bitrate fluctuations + # Webhook receiver: Jellyfin Webhook plugin POSTs events here to throttle immediately. + WEBHOOK_BIND = "127.0.0.1"; + WEBHOOK_PORT = toString webhookPort; }; }; } diff --git a/services/jellyfin/jellyfin-qbittorrent-monitor.py b/services/jellyfin/jellyfin-qbittorrent-monitor.py index 3c01cee..5c9326b 100644 --- a/services/jellyfin/jellyfin-qbittorrent-monitor.py +++ b/services/jellyfin/jellyfin-qbittorrent-monitor.py @@ -7,6 +7,8 @@ import sys import signal import json import ipaddress +import threading +from http.server import HTTPServer, BaseHTTPRequestHandler logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -34,6 +36,8 @@ class JellyfinQBittorrentMonitor: default_stream_bitrate=10000000, min_torrent_speed=100, stream_bitrate_headroom=1.1, + webhook_port=0, + webhook_bind="127.0.0.1", ): self.jellyfin_url = jellyfin_url self.qbittorrent_url = qbittorrent_url @@ -57,6 +61,12 @@ class JellyfinQBittorrentMonitor: self.streaming_stop_delay = streaming_stop_delay self.last_state_change = 0 + # Webhook receiver: allows Jellyfin to push events instead of waiting for the poll + self.webhook_port = webhook_port + self.webhook_bind = webhook_bind + self.wake_event = threading.Event() + self.webhook_server = None + # Local network ranges (RFC 1918 private networks + localhost) self.local_networks = [ ipaddress.ip_network("10.0.0.0/8"), @@ -79,9 +89,56 @@ class JellyfinQBittorrentMonitor: def signal_handler(self, signum, frame): logger.info("Received shutdown signal, cleaning up...") self.running = False + if self.webhook_server is not None: + # shutdown() blocks until serve_forever returns; run from a thread so we don't deadlock + threading.Thread(target=self.webhook_server.shutdown, daemon=True).start() self.restore_normal_limits() sys.exit(0) + def wake(self) -> None: + """Signal the main loop to re-evaluate state immediately.""" + self.wake_event.set() + + def sleep_or_wake(self, seconds: float) -> None: + """Wait up to `seconds`, returning early if a webhook wakes the loop.""" + self.wake_event.wait(seconds) + self.wake_event.clear() + + def start_webhook_server(self) -> None: + """Start a background HTTP server that wakes the monitor on any POST.""" + if not self.webhook_port: + return + + monitor = self + + class WebhookHandler(BaseHTTPRequestHandler): + def do_POST(self): # noqa: N802 + length = int(self.headers.get("Content-Length", "0") or "0") + body = self.rfile.read(min(length, 65536)) if length else b"" + event = "unknown" + try: + if body: + event = json.loads(body).get("NotificationType", "unknown") + except (json.JSONDecodeError, ValueError): + pass + logger.info(f"Webhook received: {event}") + self.send_response(204) + self.end_headers() + monitor.wake() + + def log_message(self, format, *args): + return # suppress default access log + + self.webhook_server = HTTPServer( + (self.webhook_bind, self.webhook_port), WebhookHandler + ) + threading.Thread( + target=self.webhook_server.serve_forever, daemon=True, name="webhook-server" + ).start() + logger.info( + f"Webhook receiver listening on http://{self.webhook_bind}:{self.webhook_port}" + ) + def check_jellyfin_sessions(self) -> list[dict]: headers = ( {"X-Emby-Token": self.jellyfin_api_key} if self.jellyfin_api_key else {} @@ -297,10 +354,14 @@ class JellyfinQBittorrentMonitor: logger.info(f"Default stream bitrate: {self.default_stream_bitrate} bps") logger.info(f"Minimum torrent speed: {self.min_torrent_speed} KB/s") logger.info(f"Stream bitrate headroom: {self.stream_bitrate_headroom}x") + if self.webhook_port: + logger.info(f"Webhook receiver: {self.webhook_bind}:{self.webhook_port}") signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) + self.start_webhook_server() + while self.running: try: self.sync_qbittorrent_state() @@ -309,7 +370,7 @@ class JellyfinQBittorrentMonitor: active_streams = self.check_jellyfin_sessions() except ServiceUnavailable: logger.warning("Jellyfin unavailable, maintaining current state") - time.sleep(self.check_interval) + self.sleep_or_wake(self.check_interval) continue streaming_active = len(active_streams) > 0 @@ -394,13 +455,13 @@ class JellyfinQBittorrentMonitor: self.current_state = desired_state self.last_active_streams = active_streams - time.sleep(self.check_interval) + self.sleep_or_wake(self.check_interval) except KeyboardInterrupt: break except Exception as e: logger.error(f"Unexpected error in monitoring loop: {e}") - time.sleep(self.check_interval) + self.sleep_or_wake(self.check_interval) self.restore_normal_limits() logger.info("Monitor stopped") @@ -421,6 +482,8 @@ if __name__ == "__main__": default_stream_bitrate = int(os.getenv("DEFAULT_STREAM_BITRATE", "10000000")) min_torrent_speed = int(os.getenv("MIN_TORRENT_SPEED", "100")) stream_bitrate_headroom = float(os.getenv("STREAM_BITRATE_HEADROOM", "1.1")) + webhook_port = int(os.getenv("WEBHOOK_PORT", "0")) + webhook_bind = os.getenv("WEBHOOK_BIND", "127.0.0.1") monitor = JellyfinQBittorrentMonitor( jellyfin_url=jellyfin_url, @@ -434,6 +497,8 @@ if __name__ == "__main__": default_stream_bitrate=default_stream_bitrate, min_torrent_speed=min_torrent_speed, stream_bitrate_headroom=stream_bitrate_headroom, + webhook_port=webhook_port, + webhook_bind=webhook_bind, ) monitor.run() diff --git a/tests/jellyfin-qbittorrent-monitor.nix b/tests/jellyfin-qbittorrent-monitor.nix index dd6508e..e3c248a 100644 --- a/tests/jellyfin-qbittorrent-monitor.nix +++ b/tests/jellyfin-qbittorrent-monitor.nix @@ -6,6 +6,21 @@ }: let jfLib = import ./jellyfin-test-lib.nix { inherit pkgs lib; }; + webhookPlugin = import ../services/jellyfin/jellyfin-webhook-plugin.nix { inherit pkgs lib; }; + configureWebhook = webhookPlugin.mkConfigureScript { + jellyfinUrl = "http://localhost:8096"; + webhooks = [ + { + name = "qBittorrent Monitor"; + uri = "http://127.0.0.1:9898/"; + notificationTypes = [ + "PlaybackStart" + "PlaybackProgress" + "PlaybackStop" + ]; + } + ]; + }; in pkgs.testers.runNixOSTest { name = "jellyfin-qbittorrent-monitor"; @@ -69,11 +84,30 @@ pkgs.testers.runNixOSTest { } ]; - # Create directories for qBittorrent + # Create directories for qBittorrent. systemd.tmpfiles.rules = [ "d /var/lib/qbittorrent/downloads 0755 qbittorrent qbittorrent" "d /var/lib/qbittorrent/incomplete 0755 qbittorrent qbittorrent" ]; + + # Install the Jellyfin Webhook plugin before Jellyfin starts, mirroring + # the production module. Jellyfin rewrites meta.json at runtime so a + # read-only nix-store symlink would fail — we materialise a writable copy. + systemd.services."jellyfin-webhook-install" = { + description = "Install Jellyfin Webhook plugin files"; + before = [ "jellyfin.service" ]; + wantedBy = [ "jellyfin.service" ]; + serviceConfig = { + Type = "oneshot"; + RemainAfterExit = true; + User = "jellyfin"; + Group = "jellyfin"; + UMask = "0077"; + ExecStart = webhookPlugin.mkInstallScript { + pluginsDir = "/var/lib/jellyfin/plugins"; + }; + }; + }; }; # Public test IP (RFC 5737 TEST-NET-3) so Jellyfin sees it as external @@ -394,6 +428,97 @@ pkgs.testers.runNixOSTest { local_playback["PositionTicks"] = 50000000 server.succeed(f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing/Stopped' -d '{json.dumps(local_playback)}' -H 'Content-Type:application/json' -H 'X-Emby-Authorization:{local_auth}, Token={local_token}'") + # === WEBHOOK TESTS === + # + # Configure the Jellyfin Webhook plugin to target the monitor, then verify + # the real Jellyfin → plugin → monitor path reacts faster than any possible + # poll. CHECK_INTERVAL=30 rules out polling as the cause. + + WEBHOOK_PORT = 9898 + WEBHOOK_CREDS = "/tmp/webhook-creds" + + # Start a webhook-enabled monitor with long poll interval. + server.succeed("systemctl stop monitor-test || true") + time.sleep(1) + server.succeed(f""" + systemd-run --unit=monitor-webhook \ + --setenv=JELLYFIN_URL=http://localhost:8096 \ + --setenv=JELLYFIN_API_KEY={token} \ + --setenv=QBITTORRENT_URL=http://localhost:8080 \ + --setenv=CHECK_INTERVAL=30 \ + --setenv=STREAMING_START_DELAY=1 \ + --setenv=STREAMING_STOP_DELAY=1 \ + --setenv=TOTAL_BANDWIDTH_BUDGET=50000000 \ + --setenv=SERVICE_BUFFER=2000000 \ + --setenv=DEFAULT_STREAM_BITRATE=10000000 \ + --setenv=MIN_TORRENT_SPEED=100 \ + --setenv=WEBHOOK_PORT={WEBHOOK_PORT} \ + --setenv=WEBHOOK_BIND=127.0.0.1 \ + {python} {monitor} + """) + server.wait_until_succeeds(f"ss -ltn | grep -q ':{WEBHOOK_PORT}'", timeout=15) + time.sleep(2) + assert not is_throttled(), "Should start unthrottled" + + # Drop the admin token where the configure script expects it (production uses agenix). + server.succeed(f"mkdir -p {WEBHOOK_CREDS} && echo '{token}' > {WEBHOOK_CREDS}/jellyfin-api-key") + server.succeed( + f"systemd-run --wait --unit=webhook-configure-test " + f"--setenv=CREDENTIALS_DIRECTORY={WEBHOOK_CREDS} " + f"${configureWebhook}" + ) + + with subtest("Real PlaybackStart event throttles via the plugin"): + playback_start = { + "ItemId": movie_id, + "MediaSourceId": media_source_id, + "PlaySessionId": "test-plugin-start", + "CanSeek": True, + "IsPaused": False, + } + start_cmd = f"curl -sf -X POST 'http://{server_ip}:8096/Sessions/Playing' -d '{json.dumps(playback_start)}' -H 'Content-Type:application/json' -H 'X-Emby-Authorization:{client_auth}, Token={client_token}'" + client.succeed(start_cmd) + server.wait_until_succeeds( + "curl -sf http://localhost:8080/api/v2/transfer/speedLimitsMode | grep -q '^1$'", + timeout=5, + ) + # Let STREAMING_STOP_DELAY (1s) elapse so the upcoming stop is not swallowed by hysteresis. + time.sleep(2) + + with subtest("Real PlaybackStop event unthrottles via the plugin"): + playback_stop = { + "ItemId": movie_id, + "MediaSourceId": media_source_id, + "PlaySessionId": "test-plugin-start", + "PositionTicks": 50000000, + } + stop_cmd = f"curl -sf -X POST 'http://{server_ip}:8096/Sessions/Playing/Stopped' -d '{json.dumps(playback_stop)}' -H 'Content-Type:application/json' -H 'X-Emby-Authorization:{client_auth}, Token={client_token}'" + client.succeed(stop_cmd) + server.wait_until_succeeds( + "curl -sf http://localhost:8080/api/v2/transfer/speedLimitsMode | grep -q '^0$'", + timeout=10, + ) + + # Restore fast-polling monitor for the service-restart tests below. + server.succeed("systemctl stop monitor-webhook || true") + time.sleep(1) + server.succeed(f""" + systemd-run --unit=monitor-test \ + --setenv=JELLYFIN_URL=http://localhost:8096 \ + --setenv=JELLYFIN_API_KEY={token} \ + --setenv=QBITTORRENT_URL=http://localhost:8080 \ + --setenv=CHECK_INTERVAL=1 \ + --setenv=STREAMING_START_DELAY=1 \ + --setenv=STREAMING_STOP_DELAY=1 \ + --setenv=TOTAL_BANDWIDTH_BUDGET=50000000 \ + --setenv=SERVICE_BUFFER=2000000 \ + --setenv=DEFAULT_STREAM_BITRATE=10000000 \ + --setenv=MIN_TORRENT_SPEED=100 \ + {python} {monitor} + """) + time.sleep(2) + + # === SERVICE RESTART TESTS === with subtest("qBittorrent restart during throttled state re-applies throttling"):