#!/usr/bin/env python3 import json import os import sys import time import urllib.request from pathlib import Path JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096") GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) # Consecutive polls a session must be absent from /Sessions before we close # its annotation. Smooths over Jellyfin restarts, client reconnects, and # brief network hiccups that would otherwise spuriously close + reopen an # annotation every time /Sessions returns an empty list for a single poll. MISSING_THRESHOLD = int(os.environ.get("MISSING_THRESHOLD", "2")) def get_api_key(): cred_dir = os.environ.get("CREDENTIALS_DIRECTORY") if cred_dir: return Path(cred_dir, "jellyfin-api-key").read_text().strip() for p in ["/run/agenix/jellyfin-api-key"]: if Path(p).exists(): return Path(p).read_text().strip() sys.exit("ERROR: Cannot find jellyfin-api-key") def http_json(method, url, body=None): data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "Accept": "application/json"}, method=method, ) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) def get_active_sessions(api_key): try: req = urllib.request.Request( f"{JELLYFIN_URL}/Sessions?api_key={api_key}", headers={"Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: sessions = json.loads(resp.read()) return [s for s in sessions if s.get("NowPlayingItem")] except Exception as e: print(f"Error fetching sessions: {e}", file=sys.stderr) return None def _codec(name): if not name: return "" aliases = {"h264": "H.264", "h265": "H.265", "hevc": "H.265", "av1": "AV1", "vp9": "VP9", "vp8": "VP8", "mpeg4": "MPEG-4", "mpeg2video": "MPEG-2", "aac": "AAC", "ac3": "AC3", "eac3": "EAC3", "dts": "DTS", "truehd": "TrueHD", "mp3": "MP3", "opus": "Opus", "flac": "FLAC", "vorbis": "Vorbis"} return aliases.get(name.lower(), name.upper()) def _res(width, height): if not height: return "" common = {2160: "4K", 1440: "1440p", 1080: "1080p", 720: "720p", 480: "480p", 360: "360p"} return common.get(height, f"{height}p") def _channels(n): labels = {1: "Mono", 2: "Stereo", 6: "5.1", 7: "6.1", 8: "7.1"} return labels.get(n, f"{n}ch") if n else "" def format_label(session): user = session.get("UserName", "Unknown") item = session.get("NowPlayingItem", {}) or {} transcode = session.get("TranscodingInfo") or {} play_state = session.get("PlayState") or {} client = session.get("Client", "") device = session.get("DeviceName", "") name = item.get("Name", "Unknown") series = item.get("SeriesName", "") season = item.get("ParentIndexNumber") episode = item.get("IndexNumber") media_type = item.get("Type", "") if series and season and episode: title = f"{series} S{season:02d}E{episode:02d} \u2013 {name}" elif series: title = f"{series} \u2013 {name}" elif media_type == "Movie": title = f"{name} (movie)" else: title = name play_method = play_state.get("PlayMethod", "") if play_method == "DirectPlay": method = "Direct Play" elif play_method == "DirectStream": method = "Direct Stream" elif play_method == "Transcode" or transcode: method = "Transcode" else: method = "Direct Play" media_streams = item.get("MediaStreams") or [] video_streams = [s for s in media_streams if s.get("Type") == "Video"] audio_streams = [s for s in media_streams if s.get("Type") == "Audio"] default_audio = next((s for s in audio_streams if s.get("IsDefault")), None) audio_stream = default_audio or (audio_streams[0] if audio_streams else {}) video_stream = video_streams[0] if video_streams else {} src_vcodec = _codec(video_stream.get("Codec", "")) src_res = _res(video_stream.get("Width") or item.get("Width"), video_stream.get("Height") or item.get("Height")) src_acodec = _codec(audio_stream.get("Codec", "")) src_channels = _channels(audio_stream.get("Channels")) is_video_direct = transcode.get("IsVideoDirect", True) is_audio_direct = transcode.get("IsAudioDirect", True) if transcode and not is_video_direct: dst_vcodec = _codec(transcode.get("VideoCodec", "")) dst_res = _res(transcode.get("Width"), transcode.get("Height")) or src_res if src_vcodec and dst_vcodec and src_vcodec != dst_vcodec: video_part = f"{src_vcodec}\u2192{dst_vcodec} {dst_res}".strip() else: video_part = f"{dst_vcodec or src_vcodec} {dst_res}".strip() else: video_part = f"{src_vcodec} {src_res}".strip() if transcode and not is_audio_direct: dst_acodec = _codec(transcode.get("AudioCodec", "")) dst_channels = _channels(transcode.get("AudioChannels")) or src_channels if src_acodec and dst_acodec and src_acodec != dst_acodec: audio_part = f"{src_acodec}\u2192{dst_acodec} {dst_channels}".strip() else: audio_part = f"{dst_acodec or src_acodec} {dst_channels}".strip() else: audio_part = f"{src_acodec} {src_channels}".strip() bitrate = transcode.get("Bitrate") or item.get("Bitrate") bitrate_part = f"{bitrate / 1_000_000:.1f} Mbps" if bitrate else "" reasons = transcode.get("TranscodeReasons") or [] reason_part = f"[{', '.join(reasons)}]" if reasons else "" stream_parts = [p for p in [method, video_part, audio_part, bitrate_part, reason_part] if p] client_str = " \u00b7 ".join(filter(None, [client, device])) lines = [f"{user}: {title}", " | ".join(stream_parts)] if client_str: lines.append(client_str) return "\n".join(lines) def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f) os.replace(tmp, STATE_FILE) def grafana_post(label, start_ms): try: result = http_json( "POST", f"{GRAFANA_URL}/api/annotations", {"time": start_ms, "text": label, "tags": ["jellyfin"]}, ) return result.get("id") except Exception as e: print(f"Error posting annotation: {e}", file=sys.stderr) return None def grafana_close(grafana_id, end_ms): try: http_json( "PATCH", f"{GRAFANA_URL}/api/annotations/{grafana_id}", {"timeEnd": end_ms}, ) return True except Exception as e: print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) return False def reconcile(state, sessions, now_ms): """Fold the current /Sessions snapshot into state in place. Returns True iff state was mutated (caller should persist). Invariants this preserves, which the prior implementation violated: - A state entry is only removed after grafana_close succeeds. A failed PATCH (Grafana restarting, network blip) must leave the entry so we can retry on the next poll rather than orphan the open annotation. - Closing a session uses last_seen_ms clamped by grace window, not now_ms. After a reboot or long outage, now_ms is wildly later than when playback actually stopped, and using it paints the annotation as if the user watched through the outage. - A single missed poll does not close; a session must be absent for MISSING_THRESHOLD consecutive polls. Absorbs Jellyfin restarts and brief /Sessions empties without duplicating annotations. """ dirty = False current_ids = {s["Id"] for s in sessions} # Active sessions: create new entries or refresh existing ones. for s in sessions: sid = s["Id"] entry = state.get(sid) if entry is None: label = format_label(s) grafana_id = grafana_post(label, now_ms) if grafana_id is None: # Grafana unreachable; retry on next poll. Do not persist a # half-open entry. continue state[sid] = { "grafana_id": grafana_id, "label": label, "start_ms": now_ms, "last_seen_ms": now_ms, "missing_count": 0, } dirty = True else: entry["last_seen_ms"] = now_ms entry["missing_count"] = 0 dirty = True # Absent sessions: increment miss counter; close only after threshold. # `grace_ms` caps how far timeEnd drifts from last_seen, so a reboot or # long outage closes the annotation near when playback actually stopped # rather than at service-recovery time. grace_ms = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000 for sid in list(state.keys()): if sid in current_ids: continue entry = state[sid] entry["missing_count"] = entry.get("missing_count", 0) + 1 dirty = True if entry["missing_count"] < MISSING_THRESHOLD: continue last_seen_ms = entry.get("last_seen_ms", now_ms) close_time = min(now_ms, last_seen_ms + grace_ms) if grafana_close(entry["grafana_id"], close_time): del state[sid] # On failure the entry stays with a bumped missing_count; next poll # will retry with the same bounded close_time. return dirty def main(): api_key = get_api_key() state = load_state() while True: now_ms = int(time.time() * 1000) sessions = get_active_sessions(api_key) # sessions is None on a Jellyfin API error — skip the whole pass # rather than risk reinterpreting a transport failure as "nothing # is playing" and closing every open annotation. if sessions is not None: if reconcile(state, sessions, now_ms): save_state(state) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()