#!/usr/bin/env python3 """Monitor Jellyfin log files for client playback/transcoding failures. Tails Jellyfin's rotating log files, matches [ERR] lines that indicate a client-facing failure (playback error, transcode crash, stream abort), deduplicates within a window, and pushes a ntfy notification. Environment JELLYFIN_LOG_DIR path to Jellyfin log directory (required) NTFY_SERVER_URL ntfy server base URL (required) NTFY_TOPIC ntfy topic name (required) NTFY_TOKEN_FILE optional path to file containing ntfy auth bearer token HOSTNAME server hostname for notification title (default: "muffin") POLL_INTERVAL seconds between log scans (default: 15) DEDUP_WINDOW seconds before re-alerting same signature (default: 300) """ import glob import hashlib import logging import os import re import signal import sys import time import urllib.request import urllib.error from pathlib import Path logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Patterns # --------------------------------------------------------------------------- # Jellyfin log line prefix: [2024-01-01 12:00:00.000 +00:00] [ERR] [123] _LOG_PREFIX_RE = re.compile( r"^\[[\d\-]{10} [\d:.]{12} [+-]\d{2}:\d{2}\] \[ERR\] \[\d+\] " ) # Sources that indicate a client-facing failure. _CLIENT_FAILURE_SOURCES = { # Transcoding engine crashes / errors "MediaBrowser.MediaEncoding.Transcoding.TranscodeManager", "MediaBrowser.MediaEncoding.Encoder.EncodingManager", # Playback / session errors "Emby.Server.Implementations.Session.SessionManager", # HTTP exceptions on media endpoints "Jellyfin.Server.Middleware.ExceptionMiddleware", # Streaming / live TV "MediaBrowser.Api.Playback.MediaInfoService", "MediaBrowser.Api.Playback.Progressive.ProgressiveStreamWriter", "MediaBrowser.Api.Playback.Hls.DynamicHlsService", # Direct play / stream "MediaBrowser.Controller.MediaEncoding.EncodingHelper", # DLNA / remote control (rare but client-facing) "Emby.Server.Implementations.HttpServer.HttpListenerHost", } # Additional message-level patterns for lines whose source is not in # _CLIENT_FAILURE_SOURCES but whose message text indicates a client problem. _CLIENT_FAILURE_PATTERNS = [ re.compile(p, re.IGNORECASE) for p in [ r"error processing request.*?(?:/Videos/|/Items/|/Audio/)", r"ffmpeg.*?(?:error|exited with code [1-9]|crashed|killed)", r"playback\s*error", r"transcode.*?(?:fail|error|abort)", r"stream.*?(?:fail|error|abort|closed)", r"client.*?(?:disconnect|error|timeout)", ] ] # Items to scrub from log lines before generating a dedup signature. _SIGNATURE_SCRUB_RE = re.compile( r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}" # UUID r"|\b[0-9a-fA-F]{32,}\b" # long hex hashes r"|\b\d{4,}\b" # ids / durations / sizes ≥ 4 digits r"|0x[0-9a-fA-F]+" # hex addresses ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _read_token(token_file: str | None) -> str | None: if not token_file or not os.path.isfile(token_file): return None try: return Path(token_file).read_text().strip() except OSError: return None def _send_ntfy( server_url: str, topic: str, title: str, message: str, token: str | None, priority: str = "high", tags: str = "warning", ) -> bool: """POST a ntfy notification. Returns True on success.""" url = f"{server_url.rstrip('/')}/{topic}" data = message.encode("utf-8") headers = { "Title": title, "Priority": priority, "Tags": tags, "Content-Type": "text/plain", } if token: headers["Authorization"] = f"Bearer {token}" req = urllib.request.Request(url, data=data, headers=headers, method="POST") try: with urllib.request.urlopen(req, timeout=15) as resp: return 200 <= resp.status < 300 except urllib.error.HTTPError as exc: logger.warning("ntfy POST returned HTTP %s: %s", exc.code, exc.reason) return False except urllib.error.URLError as exc: logger.warning("ntfy POST failed: %s", exc.reason) return False def _error_signature(line: str) -> str: """Return a stable hash for a Jellyfin error log line. Strips the timestamp prefix and normalises UUIDs, hex hashes, and large integers so that the same logical error from different sessions or items collapses to the same signature. """ # Strip timestamp / level / thread prefix so we keep : body = _LOG_PREFIX_RE.sub("", line, count=1) if not body: body = line # Collapse repeated whitespace normalised = _SIGNATURE_SCRUB_RE.sub("", body) normalised = re.sub(r"\s+", " ", normalised).strip() # Keep the source prefix (up to first ':') as part of the signature return hashlib.sha256(normalised.encode()).hexdigest() def _is_client_failure(line: str) -> bool: """Check whether a Jellyfin [ERR] log line indicates a client failure.""" if not _LOG_PREFIX_RE.match(line): return False # Strip prefix for matching body = _LOG_PREFIX_RE.sub("", line, count=1) if not body: return False # Check source (the part before ': ') if ": " in body: source = body.split(": ", 1)[0] if source in _CLIENT_FAILURE_SOURCES: return True # Fall back to message-level patterns for pat in _CLIENT_FAILURE_PATTERNS: if pat.search(body): return True return False def _scan_log_file(path: str, seen_positions: dict[str, int]) -> list[str]: """Read new lines from *path* since *seen_positions[path]*. Updates *seen_positions* in place. Handles truncation (log rotation) by resetting the cursor to 0 when the file shrinks. """ hits: list[str] = [] try: st = os.stat(path) inode_key = f"{st.st_ino}:{st.st_dev}" prev_offset = seen_positions.get(inode_key, 0) if st.st_size < prev_offset: # File was truncated (rotation): start from the beginning. prev_offset = 0 if st.st_size == prev_offset: seen_positions[inode_key] = prev_offset return hits with open(path, "r", errors="replace") as fh: fh.seek(prev_offset) for raw in fh: line = raw.rstrip("\n\r") if _is_client_failure(line): hits.append(line) seen_positions[inode_key] = fh.tell() except FileNotFoundError: seen_positions.pop(inode_key, None) except OSError as exc: logger.debug("Cannot read %s: %s", path, exc) return hits # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def main() -> None: log_dir = os.environ.get("JELLYFIN_LOG_DIR") ntfy_url = os.environ.get("NTFY_SERVER_URL") ntfy_topic = os.environ.get("NTFY_TOPIC") ntfy_token_file = os.environ.get("NTFY_TOKEN_FILE") hostname = os.environ.get("HOSTNAME", "muffin") poll_interval = int(os.environ.get("POLL_INTERVAL", "15")) dedup_window = int(os.environ.get("DEDUP_WINDOW", "300")) if not log_dir: logger.fatal("JELLYFIN_LOG_DIR is required") sys.exit(1) if not ntfy_url: logger.fatal("NTFY_SERVER_URL is required") sys.exit(1) if not ntfy_topic: logger.fatal("NTFY_TOPIC is required") sys.exit(1) running = True def _handle_signal(signum: int, _frame: object) -> None: nonlocal running logger.info("Received signal %s, shutting down", signum) running = False signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) ntfy_token = _read_token(ntfy_token_file) # Dedup state: signature → last-alerted timestamp seen_signatures: dict[str, float] = {} # File read cursors: "{inode}:{dev}" → byte offset file_positions: dict[str, int] = {} logger.info( "Starting Jellyfin failure alert monitor (log_dir=%s, poll=%ss, dedup=%ss)", log_dir, poll_interval, dedup_window, ) while running: try: now = time.time() # Expire old dedup entries expired = [s for s, ts in seen_signatures.items() if now - ts > dedup_window] for s in expired: del seen_signatures[s] # Scan all log files log_pattern = os.path.join(log_dir, "log_*.log") for path in sorted(glob.glob(log_pattern)): hits = _scan_log_file(path, file_positions) for line in hits: sig = _error_signature(line) if sig in seen_signatures: logger.debug("Suppressed duplicate: %s", line[:120]) continue seen_signatures[sig] = now # Build a clean title: source + short summary body = _LOG_PREFIX_RE.sub("", line, count=1) title = f"[{hostname}] Jellyfin client failure" if ": " in body: source, msg = body.split(": ", 1) title = f"[{hostname}] Jellyfin: {source.split('.')[-1]}" body = msg # Truncate body for readability if len(body) > 500: body = body[:497] + "..." logger.warning("Alerting: %s", body[:120]) _send_ntfy( ntfy_url, ntfy_topic, title, body, ntfy_token, ) # Clean up stale file-position entries for rotated-out files current_inodes = set() for path in glob.glob(log_pattern): try: st = os.stat(path) current_inodes.add(f"{st.st_ino}:{st.st_dev}") except OSError: pass stale = [k for k in file_positions if k not in current_inodes] for k in stale: del file_positions[k] except Exception: logger.exception("Unhandled error in main loop") # Sleep in small increments so we can react to SIGTERM promptly. deadline = time.time() + poll_interval while running and time.time() < deadline: time.sleep(min(1, deadline - time.time())) logger.info("Jellyfin failure alert monitor stopped") if __name__ == "__main__": main()