jellyfin-qbittorrent-monitor: add webhook receiver for instant throttling
Some checks failed
Build and Deploy / deploy (push) Failing after 2m9s

This commit is contained in:
2026-04-17 19:47:29 -04:00
parent 48ac68c297
commit 1403c9d3bc
4 changed files with 257 additions and 5 deletions

View File

@@ -5,14 +5,67 @@
lib,
...
}:
let
webhookPlugin = import ./jellyfin-webhook-plugin.nix { inherit pkgs lib; };
jellyfinPort = service_configs.ports.private.jellyfin.port;
webhookPort = service_configs.ports.private.jellyfin_qbittorrent_monitor_webhook.port;
in
lib.mkIf config.services.jellyfin.enable {
# Materialise the Jellyfin Webhook plugin into Jellyfin's plugins dir before
# Jellyfin starts. Jellyfin rewrites meta.json at runtime, so a read-only
# nix-store symlink would EACCES — we copy instead.
systemd.services.jellyfin-webhook-install = {
before = [ "jellyfin.service" ];
wantedBy = [ "jellyfin.service" ];
serviceConfig = {
Type = "oneshot";
RemainAfterExit = true;
User = config.services.jellyfin.user;
Group = config.services.jellyfin.group;
ExecStart = webhookPlugin.mkInstallScript {
pluginsDir = "${config.services.jellyfin.dataDir}/plugins";
};
};
};
# After Jellyfin starts, POST the plugin configuration so the webhook
# targets the monitor's receiver. Idempotent; runs on every boot.
systemd.services.jellyfin-webhook-configure = {
after = [ "jellyfin.service" ];
wants = [ "jellyfin.service" ];
before = [ "jellyfin-qbittorrent-monitor.service" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
Type = "oneshot";
RemainAfterExit = true;
DynamicUser = true;
LoadCredential = "jellyfin-api-key:${config.age.secrets.jellyfin-api-key.path}";
ExecStart = webhookPlugin.mkConfigureScript {
jellyfinUrl = "http://127.0.0.1:${toString jellyfinPort}";
webhooks = [
{
name = "qBittorrent Monitor";
uri = "http://127.0.0.1:${toString webhookPort}/";
notificationTypes = [
"PlaybackStart"
"PlaybackProgress"
"PlaybackStop"
];
}
];
};
};
};
systemd.services."jellyfin-qbittorrent-monitor" = {
description = "Monitor Jellyfin streaming and control qBittorrent rate limits";
after = [
"network.target"
"jellyfin.service"
"qbittorrent.service"
"jellyfin-webhook-configure.service"
];
wants = [ "jellyfin-webhook-configure.service" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
@@ -44,7 +97,7 @@ lib.mkIf config.services.jellyfin.enable {
};
environment = {
JELLYFIN_URL = "http://localhost:${builtins.toString service_configs.ports.private.jellyfin.port}";
JELLYFIN_URL = "http://localhost:${builtins.toString jellyfinPort}";
QBITTORRENT_URL = "http://${config.vpnNamespaces.wg.namespaceAddress}:${builtins.toString service_configs.ports.private.torrent.port}";
CHECK_INTERVAL = "30";
# Bandwidth budget configuration
@@ -53,6 +106,9 @@ lib.mkIf config.services.jellyfin.enable {
DEFAULT_STREAM_BITRATE = "10000000"; # 10 Mbps fallback when bitrate unknown (bps)
MIN_TORRENT_SPEED = "100"; # KB/s - below this, pause torrents instead
STREAM_BITRATE_HEADROOM = "1.1"; # multiplier per stream for bitrate fluctuations
# Webhook receiver: Jellyfin Webhook plugin POSTs events here to throttle immediately.
WEBHOOK_BIND = "127.0.0.1";
WEBHOOK_PORT = toString webhookPort;
};
};
}

View File

@@ -7,6 +7,8 @@ import sys
import signal
import json
import ipaddress
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -34,6 +36,8 @@ class JellyfinQBittorrentMonitor:
default_stream_bitrate=10000000,
min_torrent_speed=100,
stream_bitrate_headroom=1.1,
webhook_port=0,
webhook_bind="127.0.0.1",
):
self.jellyfin_url = jellyfin_url
self.qbittorrent_url = qbittorrent_url
@@ -57,6 +61,12 @@ class JellyfinQBittorrentMonitor:
self.streaming_stop_delay = streaming_stop_delay
self.last_state_change = 0
# Webhook receiver: allows Jellyfin to push events instead of waiting for the poll
self.webhook_port = webhook_port
self.webhook_bind = webhook_bind
self.wake_event = threading.Event()
self.webhook_server = None
# Local network ranges (RFC 1918 private networks + localhost)
self.local_networks = [
ipaddress.ip_network("10.0.0.0/8"),
@@ -79,9 +89,56 @@ class JellyfinQBittorrentMonitor:
def signal_handler(self, signum, frame):
logger.info("Received shutdown signal, cleaning up...")
self.running = False
if self.webhook_server is not None:
# shutdown() blocks until serve_forever returns; run from a thread so we don't deadlock
threading.Thread(target=self.webhook_server.shutdown, daemon=True).start()
self.restore_normal_limits()
sys.exit(0)
def wake(self) -> None:
"""Signal the main loop to re-evaluate state immediately."""
self.wake_event.set()
def sleep_or_wake(self, seconds: float) -> None:
"""Wait up to `seconds`, returning early if a webhook wakes the loop."""
self.wake_event.wait(seconds)
self.wake_event.clear()
def start_webhook_server(self) -> None:
"""Start a background HTTP server that wakes the monitor on any POST."""
if not self.webhook_port:
return
monitor = self
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self): # noqa: N802
length = int(self.headers.get("Content-Length", "0") or "0")
body = self.rfile.read(min(length, 65536)) if length else b""
event = "unknown"
try:
if body:
event = json.loads(body).get("NotificationType", "unknown")
except (json.JSONDecodeError, ValueError):
pass
logger.info(f"Webhook received: {event}")
self.send_response(204)
self.end_headers()
monitor.wake()
def log_message(self, format, *args):
return # suppress default access log
self.webhook_server = HTTPServer(
(self.webhook_bind, self.webhook_port), WebhookHandler
)
threading.Thread(
target=self.webhook_server.serve_forever, daemon=True, name="webhook-server"
).start()
logger.info(
f"Webhook receiver listening on http://{self.webhook_bind}:{self.webhook_port}"
)
def check_jellyfin_sessions(self) -> list[dict]:
headers = (
{"X-Emby-Token": self.jellyfin_api_key} if self.jellyfin_api_key else {}
@@ -297,10 +354,14 @@ class JellyfinQBittorrentMonitor:
logger.info(f"Default stream bitrate: {self.default_stream_bitrate} bps")
logger.info(f"Minimum torrent speed: {self.min_torrent_speed} KB/s")
logger.info(f"Stream bitrate headroom: {self.stream_bitrate_headroom}x")
if self.webhook_port:
logger.info(f"Webhook receiver: {self.webhook_bind}:{self.webhook_port}")
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
self.start_webhook_server()
while self.running:
try:
self.sync_qbittorrent_state()
@@ -309,7 +370,7 @@ class JellyfinQBittorrentMonitor:
active_streams = self.check_jellyfin_sessions()
except ServiceUnavailable:
logger.warning("Jellyfin unavailable, maintaining current state")
time.sleep(self.check_interval)
self.sleep_or_wake(self.check_interval)
continue
streaming_active = len(active_streams) > 0
@@ -394,13 +455,13 @@ class JellyfinQBittorrentMonitor:
self.current_state = desired_state
self.last_active_streams = active_streams
time.sleep(self.check_interval)
self.sleep_or_wake(self.check_interval)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"Unexpected error in monitoring loop: {e}")
time.sleep(self.check_interval)
self.sleep_or_wake(self.check_interval)
self.restore_normal_limits()
logger.info("Monitor stopped")
@@ -421,6 +482,8 @@ if __name__ == "__main__":
default_stream_bitrate = int(os.getenv("DEFAULT_STREAM_BITRATE", "10000000"))
min_torrent_speed = int(os.getenv("MIN_TORRENT_SPEED", "100"))
stream_bitrate_headroom = float(os.getenv("STREAM_BITRATE_HEADROOM", "1.1"))
webhook_port = int(os.getenv("WEBHOOK_PORT", "0"))
webhook_bind = os.getenv("WEBHOOK_BIND", "127.0.0.1")
monitor = JellyfinQBittorrentMonitor(
jellyfin_url=jellyfin_url,
@@ -434,6 +497,8 @@ if __name__ == "__main__":
default_stream_bitrate=default_stream_bitrate,
min_torrent_speed=min_torrent_speed,
stream_bitrate_headroom=stream_bitrate_headroom,
webhook_port=webhook_port,
webhook_bind=webhook_bind,
)
monitor.run()