phase 2: promote services/, tests/, patches/, lib/, scripts/
This commit is contained in:
504
services/jellyfin/jellyfin-qbittorrent-monitor.py
Normal file
504
services/jellyfin/jellyfin-qbittorrent-monitor.py
Normal file
@@ -0,0 +1,504 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import requests
|
||||
import time
|
||||
import logging
|
||||
import sys
|
||||
import signal
|
||||
import json
|
||||
import ipaddress
|
||||
import threading
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ServiceUnavailable(Exception):
|
||||
"""Raised when a monitored service is temporarily unavailable."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class JellyfinQBittorrentMonitor:
|
||||
def __init__(
|
||||
self,
|
||||
jellyfin_url="http://localhost:8096",
|
||||
qbittorrent_url="http://localhost:8080",
|
||||
check_interval=30,
|
||||
jellyfin_api_key=None,
|
||||
streaming_start_delay=10,
|
||||
streaming_stop_delay=60,
|
||||
total_bandwidth_budget=30000000,
|
||||
service_buffer=5000000,
|
||||
default_stream_bitrate=10000000,
|
||||
min_torrent_speed=100,
|
||||
stream_bitrate_headroom=1.1,
|
||||
webhook_port=0,
|
||||
webhook_bind="127.0.0.1",
|
||||
):
|
||||
self.jellyfin_url = jellyfin_url
|
||||
self.qbittorrent_url = qbittorrent_url
|
||||
self.check_interval = check_interval
|
||||
self.jellyfin_api_key = jellyfin_api_key
|
||||
self.total_bandwidth_budget = total_bandwidth_budget
|
||||
self.service_buffer = service_buffer
|
||||
self.default_stream_bitrate = default_stream_bitrate
|
||||
self.min_torrent_speed = min_torrent_speed
|
||||
self.stream_bitrate_headroom = stream_bitrate_headroom
|
||||
self.last_streaming_state = None
|
||||
self.current_state = "unlimited"
|
||||
self.torrents_paused = False
|
||||
self.last_alt_limits = None
|
||||
self.running = True
|
||||
self.session = requests.Session() # Use session for cookies
|
||||
self.last_active_streams = []
|
||||
|
||||
# Hysteresis settings to prevent rapid switching
|
||||
self.streaming_start_delay = streaming_start_delay
|
||||
self.streaming_stop_delay = streaming_stop_delay
|
||||
self.last_state_change = 0
|
||||
|
||||
# Webhook receiver: allows Jellyfin to push events instead of waiting for the poll
|
||||
self.webhook_port = webhook_port
|
||||
self.webhook_bind = webhook_bind
|
||||
self.wake_event = threading.Event()
|
||||
self.webhook_server = None
|
||||
|
||||
# Local network ranges (RFC 1918 private networks + localhost)
|
||||
self.local_networks = [
|
||||
ipaddress.ip_network("10.0.0.0/8"),
|
||||
ipaddress.ip_network("172.16.0.0/12"),
|
||||
ipaddress.ip_network("192.168.0.0/16"),
|
||||
ipaddress.ip_network("127.0.0.0/8"),
|
||||
ipaddress.ip_network("::1/128"), # IPv6 localhost
|
||||
ipaddress.ip_network("fe80::/10"), # IPv6 link-local
|
||||
]
|
||||
|
||||
def is_local_ip(self, ip_address: str) -> bool:
|
||||
"""Check if an IP address is from a local network"""
|
||||
try:
|
||||
ip = ipaddress.ip_address(ip_address)
|
||||
return any(ip in network for network in self.local_networks)
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid IP address format: {ip_address}")
|
||||
return True # Treat invalid IPs as local for safety
|
||||
|
||||
def signal_handler(self, signum, frame):
|
||||
logger.info("Received shutdown signal, cleaning up...")
|
||||
self.running = False
|
||||
if self.webhook_server is not None:
|
||||
# shutdown() blocks until serve_forever returns; run from a thread so we don't deadlock
|
||||
threading.Thread(target=self.webhook_server.shutdown, daemon=True).start()
|
||||
self.restore_normal_limits()
|
||||
sys.exit(0)
|
||||
|
||||
def wake(self) -> None:
|
||||
"""Signal the main loop to re-evaluate state immediately."""
|
||||
self.wake_event.set()
|
||||
|
||||
def sleep_or_wake(self, seconds: float) -> None:
|
||||
"""Wait up to `seconds`, returning early if a webhook wakes the loop."""
|
||||
self.wake_event.wait(seconds)
|
||||
self.wake_event.clear()
|
||||
|
||||
def start_webhook_server(self) -> None:
|
||||
"""Start a background HTTP server that wakes the monitor on any POST."""
|
||||
if not self.webhook_port:
|
||||
return
|
||||
|
||||
monitor = self
|
||||
|
||||
class WebhookHandler(BaseHTTPRequestHandler):
|
||||
def do_POST(self): # noqa: N802
|
||||
length = int(self.headers.get("Content-Length", "0") or "0")
|
||||
body = self.rfile.read(min(length, 65536)) if length else b""
|
||||
event = "unknown"
|
||||
try:
|
||||
if body:
|
||||
event = json.loads(body).get("NotificationType", "unknown")
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
logger.info(f"Webhook received: {event}")
|
||||
self.send_response(204)
|
||||
self.end_headers()
|
||||
monitor.wake()
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return # suppress default access log
|
||||
|
||||
self.webhook_server = HTTPServer(
|
||||
(self.webhook_bind, self.webhook_port), WebhookHandler
|
||||
)
|
||||
threading.Thread(
|
||||
target=self.webhook_server.serve_forever, daemon=True, name="webhook-server"
|
||||
).start()
|
||||
logger.info(
|
||||
f"Webhook receiver listening on http://{self.webhook_bind}:{self.webhook_port}"
|
||||
)
|
||||
|
||||
def check_jellyfin_sessions(self) -> list[dict]:
|
||||
headers = (
|
||||
{"X-Emby-Token": self.jellyfin_api_key} if self.jellyfin_api_key else {}
|
||||
)
|
||||
|
||||
try:
|
||||
response = requests.get(
|
||||
f"{self.jellyfin_url}/Sessions", headers=headers, timeout=10
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to check Jellyfin sessions: {e}")
|
||||
raise ServiceUnavailable(f"Jellyfin unavailable: {e}") from e
|
||||
|
||||
try:
|
||||
sessions = response.json()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse Jellyfin response: {e}")
|
||||
raise ServiceUnavailable(f"Jellyfin returned invalid JSON: {e}") from e
|
||||
|
||||
active_streams = []
|
||||
for session in sessions:
|
||||
if (
|
||||
"NowPlayingItem" in session
|
||||
and not session.get("PlayState", {}).get("IsPaused", True)
|
||||
and not self.is_local_ip(session.get("RemoteEndPoint", ""))
|
||||
):
|
||||
item = session["NowPlayingItem"]
|
||||
item_type = item.get("Type", "").lower()
|
||||
if item_type in ["movie", "episode", "video"]:
|
||||
user = session.get("UserName", "Unknown")
|
||||
stream_name = f"{user}: {item.get('Name', 'Unknown')}"
|
||||
if session.get("TranscodingInfo") and session[
|
||||
"TranscodingInfo"
|
||||
].get("Bitrate"):
|
||||
bitrate = session["TranscodingInfo"]["Bitrate"]
|
||||
elif item.get("Bitrate"):
|
||||
bitrate = item["Bitrate"]
|
||||
elif item.get("MediaSources", [{}])[0].get("Bitrate"):
|
||||
bitrate = item["MediaSources"][0]["Bitrate"]
|
||||
else:
|
||||
bitrate = self.default_stream_bitrate
|
||||
|
||||
bitrate = min(int(bitrate), 100_000_000)
|
||||
# Add headroom to account for bitrate fluctuations
|
||||
bitrate = int(bitrate * self.stream_bitrate_headroom)
|
||||
active_streams.append({"name": stream_name, "bitrate_bps": bitrate})
|
||||
|
||||
return active_streams
|
||||
|
||||
def check_qbittorrent_alternate_limits(self) -> bool:
|
||||
try:
|
||||
response = self.session.get(
|
||||
f"{self.qbittorrent_url}/api/v2/transfer/speedLimitsMode", timeout=10
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.text.strip() == "1"
|
||||
else:
|
||||
logger.warning(
|
||||
f"SpeedLimitsMode endpoint returned HTTP {response.status_code}"
|
||||
)
|
||||
raise ServiceUnavailable(
|
||||
f"qBittorrent returned HTTP {response.status_code}"
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"SpeedLimitsMode endpoint failed: {e}")
|
||||
raise ServiceUnavailable(f"qBittorrent unavailable: {e}") from e
|
||||
|
||||
def use_alt_limits(self, enable: bool) -> None:
|
||||
action = "enabled" if enable else "disabled"
|
||||
try:
|
||||
current_throttle = self.check_qbittorrent_alternate_limits()
|
||||
|
||||
if current_throttle == enable:
|
||||
logger.debug(
|
||||
f"Alternate speed limits already {action}, no action needed"
|
||||
)
|
||||
return
|
||||
|
||||
response = self.session.post(
|
||||
f"{self.qbittorrent_url}/api/v2/transfer/toggleSpeedLimitsMode",
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
new_state = self.check_qbittorrent_alternate_limits()
|
||||
if new_state == enable:
|
||||
logger.info(f"Alternate speed limits {action}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Toggle may have failed: expected {enable}, got {new_state}"
|
||||
)
|
||||
|
||||
except ServiceUnavailable:
|
||||
logger.warning(
|
||||
f"qBittorrent unavailable, cannot {action} alternate speed limits"
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to {action} alternate speed limits: {e}")
|
||||
|
||||
def pause_all_torrents(self) -> None:
|
||||
try:
|
||||
response = self.session.post(
|
||||
f"{self.qbittorrent_url}/api/v2/torrents/stop",
|
||||
data={"hashes": "all"},
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to pause torrents: {e}")
|
||||
|
||||
def resume_all_torrents(self) -> None:
|
||||
try:
|
||||
response = self.session.post(
|
||||
f"{self.qbittorrent_url}/api/v2/torrents/start",
|
||||
data={"hashes": "all"},
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to resume torrents: {e}")
|
||||
|
||||
def set_alt_speed_limits(self, dl_kbs: float, ul_kbs: float) -> None:
|
||||
try:
|
||||
payload = {
|
||||
"alt_dl_limit": int(dl_kbs * 1024),
|
||||
"alt_up_limit": int(ul_kbs * 1024),
|
||||
}
|
||||
response = self.session.post(
|
||||
f"{self.qbittorrent_url}/api/v2/app/setPreferences",
|
||||
data={"json": json.dumps(payload)},
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
self.last_alt_limits = (dl_kbs, ul_kbs)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to set alternate speed limits: {e}")
|
||||
|
||||
def restore_normal_limits(self) -> None:
|
||||
if self.torrents_paused:
|
||||
logger.info("Resuming all torrents before shutdown...")
|
||||
self.resume_all_torrents()
|
||||
self.torrents_paused = False
|
||||
|
||||
if self.current_state != "unlimited":
|
||||
logger.info("Restoring normal speed limits before shutdown...")
|
||||
self.use_alt_limits(False)
|
||||
self.current_state = "unlimited"
|
||||
|
||||
def sync_qbittorrent_state(self) -> None:
|
||||
try:
|
||||
if self.current_state == "unlimited":
|
||||
actual_state = self.check_qbittorrent_alternate_limits()
|
||||
if actual_state:
|
||||
logger.warning(
|
||||
"qBittorrent state mismatch detected: expected alt speed OFF, got ON. Re-syncing..."
|
||||
)
|
||||
self.use_alt_limits(False)
|
||||
elif self.current_state == "throttled":
|
||||
if self.last_alt_limits:
|
||||
self.set_alt_speed_limits(*self.last_alt_limits)
|
||||
actual_state = self.check_qbittorrent_alternate_limits()
|
||||
if not actual_state:
|
||||
logger.warning(
|
||||
"qBittorrent state mismatch detected: expected alt speed ON, got OFF. Re-syncing..."
|
||||
)
|
||||
self.use_alt_limits(True)
|
||||
elif self.current_state == "paused":
|
||||
self.pause_all_torrents()
|
||||
self.torrents_paused = True
|
||||
except ServiceUnavailable:
|
||||
pass
|
||||
|
||||
def should_change_state(self, new_streaming_state: bool) -> bool:
|
||||
"""Apply hysteresis to prevent rapid state changes"""
|
||||
now = time.time()
|
||||
|
||||
if new_streaming_state == self.last_streaming_state:
|
||||
return False
|
||||
|
||||
time_since_change = now - self.last_state_change
|
||||
|
||||
if new_streaming_state and not self.last_streaming_state:
|
||||
if time_since_change >= self.streaming_start_delay:
|
||||
self.last_state_change = now
|
||||
return True
|
||||
else:
|
||||
remaining = self.streaming_start_delay - time_since_change
|
||||
logger.info(
|
||||
f"Streaming started - waiting {remaining:.1f}s before enforcing limits"
|
||||
)
|
||||
|
||||
elif not new_streaming_state and self.last_streaming_state:
|
||||
if time_since_change >= self.streaming_stop_delay:
|
||||
self.last_state_change = now
|
||||
return True
|
||||
else:
|
||||
remaining = self.streaming_stop_delay - time_since_change
|
||||
logger.info(
|
||||
f"Streaming stopped - waiting {remaining:.1f}s before restoring unlimited mode"
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
logger.info("Starting Jellyfin-qBittorrent monitor")
|
||||
logger.info(f"Jellyfin URL: {self.jellyfin_url}")
|
||||
logger.info(f"qBittorrent URL: {self.qbittorrent_url}")
|
||||
logger.info(f"Check interval: {self.check_interval}s")
|
||||
logger.info(f"Streaming start delay: {self.streaming_start_delay}s")
|
||||
logger.info(f"Streaming stop delay: {self.streaming_stop_delay}s")
|
||||
logger.info(f"Total bandwidth budget: {self.total_bandwidth_budget} bps")
|
||||
logger.info(f"Service buffer: {self.service_buffer} bps")
|
||||
logger.info(f"Default stream bitrate: {self.default_stream_bitrate} bps")
|
||||
logger.info(f"Minimum torrent speed: {self.min_torrent_speed} KB/s")
|
||||
logger.info(f"Stream bitrate headroom: {self.stream_bitrate_headroom}x")
|
||||
if self.webhook_port:
|
||||
logger.info(f"Webhook receiver: {self.webhook_bind}:{self.webhook_port}")
|
||||
|
||||
signal.signal(signal.SIGINT, self.signal_handler)
|
||||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||||
|
||||
self.start_webhook_server()
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
self.sync_qbittorrent_state()
|
||||
|
||||
try:
|
||||
active_streams = self.check_jellyfin_sessions()
|
||||
except ServiceUnavailable:
|
||||
logger.warning("Jellyfin unavailable, maintaining current state")
|
||||
self.sleep_or_wake(self.check_interval)
|
||||
continue
|
||||
|
||||
streaming_active = len(active_streams) > 0
|
||||
|
||||
if active_streams:
|
||||
for stream in active_streams:
|
||||
logger.debug(
|
||||
f"Active stream: {stream['name']} ({stream['bitrate_bps']} bps)"
|
||||
)
|
||||
|
||||
if active_streams != self.last_active_streams:
|
||||
if streaming_active:
|
||||
stream_names = ", ".join(
|
||||
stream["name"] for stream in active_streams
|
||||
)
|
||||
logger.info(
|
||||
f"Active streams ({len(active_streams)}): {stream_names}"
|
||||
)
|
||||
elif len(active_streams) == 0 and self.last_streaming_state:
|
||||
logger.info("No active streaming sessions")
|
||||
|
||||
if self.should_change_state(streaming_active):
|
||||
self.last_streaming_state = streaming_active
|
||||
|
||||
streaming_state = bool(self.last_streaming_state)
|
||||
total_streaming_bps = sum(
|
||||
stream["bitrate_bps"] for stream in active_streams
|
||||
)
|
||||
remaining_bps = (
|
||||
self.total_bandwidth_budget
|
||||
- self.service_buffer
|
||||
- total_streaming_bps
|
||||
)
|
||||
remaining_kbs = max(0, remaining_bps) / 8 / 1024
|
||||
|
||||
if not streaming_state:
|
||||
desired_state = "unlimited"
|
||||
elif streaming_active:
|
||||
if remaining_kbs >= self.min_torrent_speed:
|
||||
desired_state = "throttled"
|
||||
else:
|
||||
desired_state = "paused"
|
||||
else:
|
||||
desired_state = self.current_state
|
||||
|
||||
if desired_state != self.current_state:
|
||||
if desired_state == "unlimited":
|
||||
action = "resume torrents, disable alt speed"
|
||||
elif desired_state == "throttled":
|
||||
action = (
|
||||
"set alt limits "
|
||||
f"dl={int(remaining_kbs)}KB/s ul={int(remaining_kbs)}KB/s, enable alt speed"
|
||||
)
|
||||
else:
|
||||
action = "pause torrents"
|
||||
|
||||
logger.info(
|
||||
"State change %s -> %s | streams=%d total_bps=%d remaining_bps=%d action=%s",
|
||||
self.current_state,
|
||||
desired_state,
|
||||
len(active_streams),
|
||||
total_streaming_bps,
|
||||
remaining_bps,
|
||||
action,
|
||||
)
|
||||
|
||||
if desired_state == "unlimited":
|
||||
if self.torrents_paused:
|
||||
self.resume_all_torrents()
|
||||
self.torrents_paused = False
|
||||
self.use_alt_limits(False)
|
||||
elif desired_state == "throttled":
|
||||
if self.torrents_paused:
|
||||
self.resume_all_torrents()
|
||||
self.torrents_paused = False
|
||||
self.set_alt_speed_limits(remaining_kbs, remaining_kbs)
|
||||
self.use_alt_limits(True)
|
||||
else:
|
||||
if not self.torrents_paused:
|
||||
self.pause_all_torrents()
|
||||
self.torrents_paused = True
|
||||
|
||||
self.current_state = desired_state
|
||||
self.last_active_streams = active_streams
|
||||
self.sleep_or_wake(self.check_interval)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error in monitoring loop: {e}")
|
||||
self.sleep_or_wake(self.check_interval)
|
||||
|
||||
self.restore_normal_limits()
|
||||
logger.info("Monitor stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
# Configuration from environment variables
|
||||
jellyfin_url = os.getenv("JELLYFIN_URL", "http://localhost:8096")
|
||||
qbittorrent_url = os.getenv("QBITTORRENT_URL", "http://localhost:8080")
|
||||
check_interval = int(os.getenv("CHECK_INTERVAL", "30"))
|
||||
jellyfin_api_key = os.getenv("JELLYFIN_API_KEY")
|
||||
streaming_start_delay = int(os.getenv("STREAMING_START_DELAY", "10"))
|
||||
streaming_stop_delay = int(os.getenv("STREAMING_STOP_DELAY", "60"))
|
||||
total_bandwidth_budget = int(os.getenv("TOTAL_BANDWIDTH_BUDGET", "30000000"))
|
||||
service_buffer = int(os.getenv("SERVICE_BUFFER", "5000000"))
|
||||
default_stream_bitrate = int(os.getenv("DEFAULT_STREAM_BITRATE", "10000000"))
|
||||
min_torrent_speed = int(os.getenv("MIN_TORRENT_SPEED", "100"))
|
||||
stream_bitrate_headroom = float(os.getenv("STREAM_BITRATE_HEADROOM", "1.1"))
|
||||
webhook_port = int(os.getenv("WEBHOOK_PORT", "0"))
|
||||
webhook_bind = os.getenv("WEBHOOK_BIND", "127.0.0.1")
|
||||
|
||||
monitor = JellyfinQBittorrentMonitor(
|
||||
jellyfin_url=jellyfin_url,
|
||||
qbittorrent_url=qbittorrent_url,
|
||||
check_interval=check_interval,
|
||||
jellyfin_api_key=jellyfin_api_key,
|
||||
streaming_start_delay=streaming_start_delay,
|
||||
streaming_stop_delay=streaming_stop_delay,
|
||||
total_bandwidth_budget=total_bandwidth_budget,
|
||||
service_buffer=service_buffer,
|
||||
default_stream_bitrate=default_stream_bitrate,
|
||||
min_torrent_speed=min_torrent_speed,
|
||||
stream_bitrate_headroom=stream_bitrate_headroom,
|
||||
webhook_port=webhook_port,
|
||||
webhook_bind=webhook_bind,
|
||||
)
|
||||
|
||||
monitor.run()
|
||||
Reference in New Issue
Block a user