This repository has been archived on 2026-04-18. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
server-config/services/grafana/jellyfin-annotations.py
2026-04-03 00:39:42 -04:00

234 lines
7.8 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import time
import urllib.request
from pathlib import Path
JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096")
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
def get_api_key():
cred_dir = os.environ.get("CREDENTIALS_DIRECTORY")
if cred_dir:
return Path(cred_dir, "jellyfin-api-key").read_text().strip()
for p in ["/run/agenix/jellyfin-api-key"]:
if Path(p).exists():
return Path(p).read_text().strip()
sys.exit("ERROR: Cannot find jellyfin-api-key")
def http_json(method, url, body=None):
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method=method,
)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
def get_active_sessions(api_key):
try:
req = urllib.request.Request(
f"{JELLYFIN_URL}/Sessions?api_key={api_key}",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=5) as resp:
sessions = json.loads(resp.read())
return [s for s in sessions if s.get("NowPlayingItem")]
except Exception as e:
print(f"Error fetching sessions: {e}", file=sys.stderr)
return None
def _codec(name):
if not name:
return ""
aliases = {"h264": "H.264", "h265": "H.265", "hevc": "H.265", "av1": "AV1",
"vp9": "VP9", "vp8": "VP8", "mpeg4": "MPEG-4", "mpeg2video": "MPEG-2",
"aac": "AAC", "ac3": "AC3", "eac3": "EAC3", "dts": "DTS",
"truehd": "TrueHD", "mp3": "MP3", "opus": "Opus", "flac": "FLAC",
"vorbis": "Vorbis"}
return aliases.get(name.lower(), name.upper())
def _res(width, height):
if not height:
return ""
common = {2160: "4K", 1440: "1440p", 1080: "1080p", 720: "720p",
480: "480p", 360: "360p"}
return common.get(height, f"{height}p")
def _channels(n):
labels = {1: "Mono", 2: "Stereo", 6: "5.1", 7: "6.1", 8: "7.1"}
return labels.get(n, f"{n}ch") if n else ""
def format_label(session):
user = session.get("UserName", "Unknown")
item = session.get("NowPlayingItem", {}) or {}
transcode = session.get("TranscodingInfo") or {}
play_state = session.get("PlayState") or {}
client = session.get("Client", "")
device = session.get("DeviceName", "")
name = item.get("Name", "Unknown")
series = item.get("SeriesName", "")
season = item.get("ParentIndexNumber")
episode = item.get("IndexNumber")
media_type = item.get("Type", "")
if series and season and episode:
title = f"{series} S{season:02d}E{episode:02d} \u2013 {name}"
elif series:
title = f"{series} \u2013 {name}"
elif media_type == "Movie":
title = f"{name} (movie)"
else:
title = name
play_method = play_state.get("PlayMethod", "")
if play_method == "DirectPlay":
method = "Direct Play"
elif play_method == "DirectStream":
method = "Direct Stream"
elif play_method == "Transcode" or transcode:
method = "Transcode"
else:
method = "Direct Play"
media_streams = item.get("MediaStreams") or []
video_streams = [s for s in media_streams if s.get("Type") == "Video"]
audio_streams = [s for s in media_streams if s.get("Type") == "Audio"]
default_audio = next((s for s in audio_streams if s.get("IsDefault")), None)
audio_stream = default_audio or (audio_streams[0] if audio_streams else {})
video_stream = video_streams[0] if video_streams else {}
src_vcodec = _codec(video_stream.get("Codec", ""))
src_res = _res(video_stream.get("Width") or item.get("Width"),
video_stream.get("Height") or item.get("Height"))
src_acodec = _codec(audio_stream.get("Codec", ""))
src_channels = _channels(audio_stream.get("Channels"))
is_video_direct = transcode.get("IsVideoDirect", True)
is_audio_direct = transcode.get("IsAudioDirect", True)
if transcode and not is_video_direct:
dst_vcodec = _codec(transcode.get("VideoCodec", ""))
dst_res = _res(transcode.get("Width"), transcode.get("Height")) or src_res
if src_vcodec and dst_vcodec and src_vcodec != dst_vcodec:
video_part = f"{src_vcodec}\u2192{dst_vcodec} {dst_res}".strip()
else:
video_part = f"{dst_vcodec or src_vcodec} {dst_res}".strip()
else:
video_part = f"{src_vcodec} {src_res}".strip()
if transcode and not is_audio_direct:
dst_acodec = _codec(transcode.get("AudioCodec", ""))
dst_channels = _channels(transcode.get("AudioChannels")) or src_channels
if src_acodec and dst_acodec and src_acodec != dst_acodec:
audio_part = f"{src_acodec}\u2192{dst_acodec} {dst_channels}".strip()
else:
audio_part = f"{dst_acodec or src_acodec} {dst_channels}".strip()
else:
audio_part = f"{src_acodec} {src_channels}".strip()
bitrate = transcode.get("Bitrate") or item.get("Bitrate")
bitrate_part = f"{bitrate / 1_000_000:.1f} Mbps" if bitrate else ""
reasons = transcode.get("TranscodeReasons") or []
reason_part = f"[{', '.join(reasons)}]" if reasons else ""
stream_parts = [p for p in [method, video_part, audio_part, bitrate_part, reason_part] if p]
client_str = " \u00b7 ".join(filter(None, [client, device]))
lines = [f"{user}: {title}", " | ".join(stream_parts)]
if client_str:
lines.append(client_str)
return "\n".join(lines)
def load_state():
try:
with open(STATE_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f)
os.replace(tmp, STATE_FILE)
def grafana_post(label, start_ms):
try:
result = http_json(
"POST",
f"{GRAFANA_URL}/api/annotations",
{"time": start_ms, "text": label, "tags": ["jellyfin"]},
)
return result.get("id")
except Exception as e:
print(f"Error posting annotation: {e}", file=sys.stderr)
return None
def grafana_close(grafana_id, end_ms):
try:
http_json(
"PATCH",
f"{GRAFANA_URL}/api/annotations/{grafana_id}",
{"timeEnd": end_ms},
)
except Exception as e:
print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr)
def main():
api_key = get_api_key()
state = load_state()
while True:
now_ms = int(time.time() * 1000)
sessions = get_active_sessions(api_key)
if sessions is not None:
current_ids = {s["Id"] for s in sessions}
for s in sessions:
sid = s["Id"]
if sid not in state:
label = format_label(s)
grafana_id = grafana_post(label, now_ms)
if grafana_id is not None:
state[sid] = {
"grafana_id": grafana_id,
"label": label,
"start_ms": now_ms,
}
save_state(state)
for sid in [k for k in state if k not in current_ids]:
info = state.pop(sid)
grafana_close(info["grafana_id"], now_ms)
save_state(state)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()