Three edge cases broke annotations on reboot or interrupted sessions: - state.pop() ran before grafana_close(), so a failed PATCH (Grafana still restarting after reboot) permanently lost the grafana_id and left the annotation open forever in Grafana. - a single poll with no sessions closed every active annotation, so Jellyfin restarts or client reconnects produced spurious close + duplicate-open pairs. - timeEnd was always now_ms, so a reboot during playback wrote an annotation reading as if the user watched through the outage. Fix: track last_seen_ms and missing_count in state; retain entries until grafana_close succeeds (retry indefinitely); require MISSING_THRESHOLD absent polls before close; clamp close_time to last_seen_ms + (MISSING_THRESHOLD + 1) * POLL_INTERVAL. Adds three subtests in tests/jellyfin-annotations.nix that each fail on the old code and pass on the new.
294 lines
10 KiB
Python
294 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096")
|
|
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
|
|
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json")
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
|
|
# Consecutive polls a session must be absent from /Sessions before we close
|
|
# its annotation. Smooths over Jellyfin restarts, client reconnects, and
|
|
# brief network hiccups that would otherwise spuriously close + reopen an
|
|
# annotation every time /Sessions returns an empty list for a single poll.
|
|
MISSING_THRESHOLD = int(os.environ.get("MISSING_THRESHOLD", "2"))
|
|
|
|
|
|
def get_api_key():
|
|
cred_dir = os.environ.get("CREDENTIALS_DIRECTORY")
|
|
if cred_dir:
|
|
return Path(cred_dir, "jellyfin-api-key").read_text().strip()
|
|
for p in ["/run/agenix/jellyfin-api-key"]:
|
|
if Path(p).exists():
|
|
return Path(p).read_text().strip()
|
|
sys.exit("ERROR: Cannot find jellyfin-api-key")
|
|
|
|
|
|
def http_json(method, url, body=None):
|
|
data = json.dumps(body).encode() if body is not None else None
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
headers={"Content-Type": "application/json", "Accept": "application/json"},
|
|
method=method,
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def get_active_sessions(api_key):
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"{JELLYFIN_URL}/Sessions?api_key={api_key}",
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
sessions = json.loads(resp.read())
|
|
return [s for s in sessions if s.get("NowPlayingItem")]
|
|
except Exception as e:
|
|
print(f"Error fetching sessions: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def _codec(name):
|
|
if not name:
|
|
return ""
|
|
aliases = {"h264": "H.264", "h265": "H.265", "hevc": "H.265", "av1": "AV1",
|
|
"vp9": "VP9", "vp8": "VP8", "mpeg4": "MPEG-4", "mpeg2video": "MPEG-2",
|
|
"aac": "AAC", "ac3": "AC3", "eac3": "EAC3", "dts": "DTS",
|
|
"truehd": "TrueHD", "mp3": "MP3", "opus": "Opus", "flac": "FLAC",
|
|
"vorbis": "Vorbis"}
|
|
return aliases.get(name.lower(), name.upper())
|
|
|
|
|
|
def _res(width, height):
|
|
if not height:
|
|
return ""
|
|
common = {2160: "4K", 1440: "1440p", 1080: "1080p", 720: "720p",
|
|
480: "480p", 360: "360p"}
|
|
return common.get(height, f"{height}p")
|
|
|
|
|
|
def _channels(n):
|
|
labels = {1: "Mono", 2: "Stereo", 6: "5.1", 7: "6.1", 8: "7.1"}
|
|
return labels.get(n, f"{n}ch") if n else ""
|
|
|
|
|
|
def format_label(session):
|
|
user = session.get("UserName", "Unknown")
|
|
item = session.get("NowPlayingItem", {}) or {}
|
|
transcode = session.get("TranscodingInfo") or {}
|
|
play_state = session.get("PlayState") or {}
|
|
client = session.get("Client", "")
|
|
device = session.get("DeviceName", "")
|
|
|
|
name = item.get("Name", "Unknown")
|
|
series = item.get("SeriesName", "")
|
|
season = item.get("ParentIndexNumber")
|
|
episode = item.get("IndexNumber")
|
|
media_type = item.get("Type", "")
|
|
|
|
if series and season and episode:
|
|
title = f"{series} S{season:02d}E{episode:02d} \u2013 {name}"
|
|
elif series:
|
|
title = f"{series} \u2013 {name}"
|
|
elif media_type == "Movie":
|
|
title = f"{name} (movie)"
|
|
else:
|
|
title = name
|
|
|
|
play_method = play_state.get("PlayMethod", "")
|
|
if play_method == "DirectPlay":
|
|
method = "Direct Play"
|
|
elif play_method == "DirectStream":
|
|
method = "Direct Stream"
|
|
elif play_method == "Transcode" or transcode:
|
|
method = "Transcode"
|
|
else:
|
|
method = "Direct Play"
|
|
|
|
media_streams = item.get("MediaStreams") or []
|
|
video_streams = [s for s in media_streams if s.get("Type") == "Video"]
|
|
audio_streams = [s for s in media_streams if s.get("Type") == "Audio"]
|
|
default_audio = next((s for s in audio_streams if s.get("IsDefault")), None)
|
|
audio_stream = default_audio or (audio_streams[0] if audio_streams else {})
|
|
video_stream = video_streams[0] if video_streams else {}
|
|
|
|
src_vcodec = _codec(video_stream.get("Codec", ""))
|
|
src_res = _res(video_stream.get("Width") or item.get("Width"),
|
|
video_stream.get("Height") or item.get("Height"))
|
|
src_acodec = _codec(audio_stream.get("Codec", ""))
|
|
src_channels = _channels(audio_stream.get("Channels"))
|
|
|
|
is_video_direct = transcode.get("IsVideoDirect", True)
|
|
is_audio_direct = transcode.get("IsAudioDirect", True)
|
|
|
|
if transcode and not is_video_direct:
|
|
dst_vcodec = _codec(transcode.get("VideoCodec", ""))
|
|
dst_res = _res(transcode.get("Width"), transcode.get("Height")) or src_res
|
|
if src_vcodec and dst_vcodec and src_vcodec != dst_vcodec:
|
|
video_part = f"{src_vcodec}\u2192{dst_vcodec} {dst_res}".strip()
|
|
else:
|
|
video_part = f"{dst_vcodec or src_vcodec} {dst_res}".strip()
|
|
else:
|
|
video_part = f"{src_vcodec} {src_res}".strip()
|
|
|
|
if transcode and not is_audio_direct:
|
|
dst_acodec = _codec(transcode.get("AudioCodec", ""))
|
|
dst_channels = _channels(transcode.get("AudioChannels")) or src_channels
|
|
if src_acodec and dst_acodec and src_acodec != dst_acodec:
|
|
audio_part = f"{src_acodec}\u2192{dst_acodec} {dst_channels}".strip()
|
|
else:
|
|
audio_part = f"{dst_acodec or src_acodec} {dst_channels}".strip()
|
|
else:
|
|
audio_part = f"{src_acodec} {src_channels}".strip()
|
|
|
|
bitrate = transcode.get("Bitrate") or item.get("Bitrate")
|
|
bitrate_part = f"{bitrate / 1_000_000:.1f} Mbps" if bitrate else ""
|
|
|
|
reasons = transcode.get("TranscodeReasons") or []
|
|
reason_part = f"[{', '.join(reasons)}]" if reasons else ""
|
|
|
|
stream_parts = [p for p in [method, video_part, audio_part, bitrate_part, reason_part] if p]
|
|
client_str = " \u00b7 ".join(filter(None, [client, device]))
|
|
|
|
lines = [f"{user}: {title}", " | ".join(stream_parts)]
|
|
if client_str:
|
|
lines.append(client_str)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def load_state():
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
|
|
def save_state(state):
|
|
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
|
|
tmp = STATE_FILE + ".tmp"
|
|
with open(tmp, "w") as f:
|
|
json.dump(state, f)
|
|
os.replace(tmp, STATE_FILE)
|
|
|
|
|
|
def grafana_post(label, start_ms):
|
|
try:
|
|
result = http_json(
|
|
"POST",
|
|
f"{GRAFANA_URL}/api/annotations",
|
|
{"time": start_ms, "text": label, "tags": ["jellyfin"]},
|
|
)
|
|
return result.get("id")
|
|
except Exception as e:
|
|
print(f"Error posting annotation: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def grafana_close(grafana_id, end_ms):
|
|
try:
|
|
http_json(
|
|
"PATCH",
|
|
f"{GRAFANA_URL}/api/annotations/{grafana_id}",
|
|
{"timeEnd": end_ms},
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def reconcile(state, sessions, now_ms):
|
|
"""Fold the current /Sessions snapshot into state in place.
|
|
|
|
Returns True iff state was mutated (caller should persist).
|
|
|
|
Invariants this preserves, which the prior implementation violated:
|
|
- A state entry is only removed after grafana_close succeeds. A failed
|
|
PATCH (Grafana restarting, network blip) must leave the entry so we
|
|
can retry on the next poll rather than orphan the open annotation.
|
|
- Closing a session uses last_seen_ms clamped by grace window, not
|
|
now_ms. After a reboot or long outage, now_ms is wildly later than
|
|
when playback actually stopped, and using it paints the annotation
|
|
as if the user watched through the outage.
|
|
- A single missed poll does not close; a session must be absent for
|
|
MISSING_THRESHOLD consecutive polls. Absorbs Jellyfin restarts and
|
|
brief /Sessions empties without duplicating annotations.
|
|
"""
|
|
dirty = False
|
|
current_ids = {s["Id"] for s in sessions}
|
|
|
|
# Active sessions: create new entries or refresh existing ones.
|
|
for s in sessions:
|
|
sid = s["Id"]
|
|
entry = state.get(sid)
|
|
if entry is None:
|
|
label = format_label(s)
|
|
grafana_id = grafana_post(label, now_ms)
|
|
if grafana_id is None:
|
|
# Grafana unreachable; retry on next poll. Do not persist a
|
|
# half-open entry.
|
|
continue
|
|
state[sid] = {
|
|
"grafana_id": grafana_id,
|
|
"label": label,
|
|
"start_ms": now_ms,
|
|
"last_seen_ms": now_ms,
|
|
"missing_count": 0,
|
|
}
|
|
dirty = True
|
|
else:
|
|
entry["last_seen_ms"] = now_ms
|
|
entry["missing_count"] = 0
|
|
dirty = True
|
|
|
|
# Absent sessions: increment miss counter; close only after threshold.
|
|
# `grace_ms` caps how far timeEnd drifts from last_seen, so a reboot or
|
|
# long outage closes the annotation near when playback actually stopped
|
|
# rather than at service-recovery time.
|
|
grace_ms = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000
|
|
for sid in list(state.keys()):
|
|
if sid in current_ids:
|
|
continue
|
|
entry = state[sid]
|
|
entry["missing_count"] = entry.get("missing_count", 0) + 1
|
|
dirty = True
|
|
if entry["missing_count"] < MISSING_THRESHOLD:
|
|
continue
|
|
last_seen_ms = entry.get("last_seen_ms", now_ms)
|
|
close_time = min(now_ms, last_seen_ms + grace_ms)
|
|
if grafana_close(entry["grafana_id"], close_time):
|
|
del state[sid]
|
|
# On failure the entry stays with a bumped missing_count; next poll
|
|
# will retry with the same bounded close_time.
|
|
|
|
return dirty
|
|
|
|
|
|
def main():
|
|
api_key = get_api_key()
|
|
state = load_state()
|
|
|
|
while True:
|
|
now_ms = int(time.time() * 1000)
|
|
sessions = get_active_sessions(api_key)
|
|
|
|
# sessions is None on a Jellyfin API error — skip the whole pass
|
|
# rather than risk reinterpreting a transport failure as "nothing
|
|
# is playing" and closing every open annotation.
|
|
if sessions is not None:
|
|
if reconcile(state, sessions, now_ms):
|
|
save_state(state)
|
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|