#!/usr/bin/env python3 import json import os import sys import time import urllib.request from pathlib import Path JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096") GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) def get_api_key(): cred_dir = os.environ.get("CREDENTIALS_DIRECTORY") if cred_dir: return Path(cred_dir, "jellyfin-api-key").read_text().strip() for p in ["/run/agenix/jellyfin-api-key"]: if Path(p).exists(): return Path(p).read_text().strip() sys.exit("ERROR: Cannot find jellyfin-api-key") def http_json(method, url, body=None): data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "Accept": "application/json"}, method=method, ) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) def get_active_sessions(api_key): try: req = urllib.request.Request( f"{JELLYFIN_URL}/Sessions?api_key={api_key}", headers={"Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: sessions = json.loads(resp.read()) return [s for s in sessions if s.get("NowPlayingItem")] except Exception as e: print(f"Error fetching sessions: {e}", file=sys.stderr) return None def _codec(name): if not name: return "" aliases = {"h264": "H.264", "h265": "H.265", "hevc": "H.265", "av1": "AV1", "vp9": "VP9", "vp8": "VP8", "mpeg4": "MPEG-4", "mpeg2video": "MPEG-2", "aac": "AAC", "ac3": "AC3", "eac3": "EAC3", "dts": "DTS", "truehd": "TrueHD", "mp3": "MP3", "opus": "Opus", "flac": "FLAC", "vorbis": "Vorbis"} return aliases.get(name.lower(), name.upper()) def _res(width, height): if not height: return "" common = {2160: "4K", 1440: "1440p", 1080: "1080p", 720: "720p", 480: "480p", 360: "360p"} return common.get(height, f"{height}p") def _channels(n): labels = {1: "Mono", 2: "Stereo", 6: "5.1", 7: "6.1", 8: "7.1"} return labels.get(n, f"{n}ch") if n else "" def format_label(session): user = session.get("UserName", "Unknown") item = session.get("NowPlayingItem", {}) or {} transcode = session.get("TranscodingInfo") or {} play_state = session.get("PlayState") or {} client = session.get("Client", "") device = session.get("DeviceName", "") name = item.get("Name", "Unknown") series = item.get("SeriesName", "") season = item.get("ParentIndexNumber") episode = item.get("IndexNumber") media_type = item.get("Type", "") if series and season and episode: title = f"{series} S{season:02d}E{episode:02d} \u2013 {name}" elif series: title = f"{series} \u2013 {name}" elif media_type == "Movie": title = f"{name} (movie)" else: title = name play_method = play_state.get("PlayMethod", "") if play_method == "DirectPlay": method = "Direct Play" elif play_method == "DirectStream": method = "Direct Stream" elif play_method == "Transcode" or transcode: method = "Transcode" else: method = "Direct Play" media_streams = item.get("MediaStreams") or [] video_streams = [s for s in media_streams if s.get("Type") == "Video"] audio_streams = [s for s in media_streams if s.get("Type") == "Audio"] default_audio = next((s for s in audio_streams if s.get("IsDefault")), None) audio_stream = default_audio or (audio_streams[0] if audio_streams else {}) video_stream = video_streams[0] if video_streams else {} src_vcodec = _codec(video_stream.get("Codec", "")) src_res = _res(video_stream.get("Width") or item.get("Width"), video_stream.get("Height") or item.get("Height")) src_acodec = _codec(audio_stream.get("Codec", "")) src_channels = _channels(audio_stream.get("Channels")) is_video_direct = transcode.get("IsVideoDirect", True) is_audio_direct = transcode.get("IsAudioDirect", True) if transcode and not is_video_direct: dst_vcodec = _codec(transcode.get("VideoCodec", "")) dst_res = _res(transcode.get("Width"), transcode.get("Height")) or src_res if src_vcodec and dst_vcodec and src_vcodec != dst_vcodec: video_part = f"{src_vcodec}\u2192{dst_vcodec} {dst_res}".strip() else: video_part = f"{dst_vcodec or src_vcodec} {dst_res}".strip() else: video_part = f"{src_vcodec} {src_res}".strip() if transcode and not is_audio_direct: dst_acodec = _codec(transcode.get("AudioCodec", "")) dst_channels = _channels(transcode.get("AudioChannels")) or src_channels if src_acodec and dst_acodec and src_acodec != dst_acodec: audio_part = f"{src_acodec}\u2192{dst_acodec} {dst_channels}".strip() else: audio_part = f"{dst_acodec or src_acodec} {dst_channels}".strip() else: audio_part = f"{src_acodec} {src_channels}".strip() bitrate = transcode.get("Bitrate") or item.get("Bitrate") bitrate_part = f"{bitrate / 1_000_000:.1f} Mbps" if bitrate else "" reasons = transcode.get("TranscodeReasons") or [] reason_part = f"[{', '.join(reasons)}]" if reasons else "" stream_parts = [p for p in [method, video_part, audio_part, bitrate_part, reason_part] if p] client_str = " \u00b7 ".join(filter(None, [client, device])) lines = [f"{user}: {title}", " | ".join(stream_parts)] if client_str: lines.append(client_str) return "\n".join(lines) def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f) os.replace(tmp, STATE_FILE) def grafana_post(label, start_ms): try: result = http_json( "POST", f"{GRAFANA_URL}/api/annotations", {"time": start_ms, "text": label, "tags": ["jellyfin"]}, ) return result.get("id") except Exception as e: print(f"Error posting annotation: {e}", file=sys.stderr) return None def grafana_close(grafana_id, end_ms): try: http_json( "PATCH", f"{GRAFANA_URL}/api/annotations/{grafana_id}", {"timeEnd": end_ms}, ) except Exception as e: print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) def main(): api_key = get_api_key() state = load_state() while True: now_ms = int(time.time() * 1000) sessions = get_active_sessions(api_key) if sessions is not None: current_ids = {s["Id"] for s in sessions} for s in sessions: sid = s["Id"] if sid not in state: label = format_label(s) grafana_id = grafana_post(label, now_ms) if grafana_id is not None: state[sid] = { "grafana_id": grafana_id, "label": label, "start_ms": now_ms, } save_state(state) for sid in [k for k in state if k not in current_ids]: info = state.pop(sid) grafana_close(info["grafana_id"], now_ms) save_state(state) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()