diff --git a/services/grafana/jellyfin-annotations.py b/services/grafana/jellyfin-annotations.py index edd2ba1..6647183 100644 --- a/services/grafana/jellyfin-annotations.py +++ b/services/grafana/jellyfin-annotations.py @@ -10,6 +10,11 @@ JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096") GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) +# Consecutive polls a session must be absent from /Sessions before we close +# its annotation. Smooths over Jellyfin restarts, client reconnects, and +# brief network hiccups that would otherwise spuriously close + reopen an +# annotation every time /Sessions returns an empty list for a single poll. +MISSING_THRESHOLD = int(os.environ.get("MISSING_THRESHOLD", "2")) def get_api_key(): @@ -193,8 +198,77 @@ def grafana_close(grafana_id, end_ms): f"{GRAFANA_URL}/api/annotations/{grafana_id}", {"timeEnd": end_ms}, ) + return True except Exception as e: print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) + return False + + +def reconcile(state, sessions, now_ms): + """Fold the current /Sessions snapshot into state in place. + + Returns True iff state was mutated (caller should persist). + + Invariants this preserves, which the prior implementation violated: + - A state entry is only removed after grafana_close succeeds. A failed + PATCH (Grafana restarting, network blip) must leave the entry so we + can retry on the next poll rather than orphan the open annotation. + - Closing a session uses last_seen_ms clamped by grace window, not + now_ms. After a reboot or long outage, now_ms is wildly later than + when playback actually stopped, and using it paints the annotation + as if the user watched through the outage. + - A single missed poll does not close; a session must be absent for + MISSING_THRESHOLD consecutive polls. Absorbs Jellyfin restarts and + brief /Sessions empties without duplicating annotations. + """ + dirty = False + current_ids = {s["Id"] for s in sessions} + + # Active sessions: create new entries or refresh existing ones. + for s in sessions: + sid = s["Id"] + entry = state.get(sid) + if entry is None: + label = format_label(s) + grafana_id = grafana_post(label, now_ms) + if grafana_id is None: + # Grafana unreachable; retry on next poll. Do not persist a + # half-open entry. + continue + state[sid] = { + "grafana_id": grafana_id, + "label": label, + "start_ms": now_ms, + "last_seen_ms": now_ms, + "missing_count": 0, + } + dirty = True + else: + entry["last_seen_ms"] = now_ms + entry["missing_count"] = 0 + dirty = True + + # Absent sessions: increment miss counter; close only after threshold. + # `grace_ms` caps how far timeEnd drifts from last_seen, so a reboot or + # long outage closes the annotation near when playback actually stopped + # rather than at service-recovery time. + grace_ms = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000 + for sid in list(state.keys()): + if sid in current_ids: + continue + entry = state[sid] + entry["missing_count"] = entry.get("missing_count", 0) + 1 + dirty = True + if entry["missing_count"] < MISSING_THRESHOLD: + continue + last_seen_ms = entry.get("last_seen_ms", now_ms) + close_time = min(now_ms, last_seen_ms + grace_ms) + if grafana_close(entry["grafana_id"], close_time): + del state[sid] + # On failure the entry stays with a bumped missing_count; next poll + # will retry with the same bounded close_time. + + return dirty def main(): @@ -205,25 +279,11 @@ def main(): now_ms = int(time.time() * 1000) sessions = get_active_sessions(api_key) + # sessions is None on a Jellyfin API error — skip the whole pass + # rather than risk reinterpreting a transport failure as "nothing + # is playing" and closing every open annotation. if sessions is not None: - current_ids = {s["Id"] for s in sessions} - - for s in sessions: - sid = s["Id"] - if sid not in state: - label = format_label(s) - grafana_id = grafana_post(label, now_ms) - if grafana_id is not None: - state[sid] = { - "grafana_id": grafana_id, - "label": label, - "start_ms": now_ms, - } - save_state(state) - - for sid in [k for k in state if k not in current_ids]: - info = state.pop(sid) - grafana_close(info["grafana_id"], now_ms) + if reconcile(state, sessions, now_ms): save_state(state) time.sleep(POLL_INTERVAL) diff --git a/tests/jellyfin-annotations.nix b/tests/jellyfin-annotations.nix index d6c4cea..2c8c1ee 100644 --- a/tests/jellyfin-annotations.nix +++ b/tests/jellyfin-annotations.nix @@ -171,20 +171,224 @@ pkgs.testers.runNixOSTest { annots = read_annotations() assert len(annots) == 2, f"Expected 2 annotations, got: {annots}" - with subtest("State survives service restart (no duplicate annotations)"): - machine.succeed("systemctl stop annotations-svc || true") - time.sleep(1) + def start_svc(unit): machine.succeed( - f"systemd-run --unit=annotations-svc-2 " + f"systemd-run --unit={unit} " f"--setenv=JELLYFIN_URL=http://127.0.0.1:8096 " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " + f"--setenv=MISSING_THRESHOLD=2 " f"{PYTHON} {SCRIPT}" ) + + def start_playback(psid, tok, hdr): + payload = json.dumps({ + "ItemId": movie_id, "MediaSourceId": media_source_id, + "PlaySessionId": psid, "CanSeek": True, "IsPaused": False, + }) + machine.succeed( + f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' " + f"-d '{payload}' -H 'Content-Type:application/json' " + f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'" + ) + + def stop_playback(psid, tok, hdr): + payload = json.dumps({ + "ItemId": movie_id, "MediaSourceId": media_source_id, + "PlaySessionId": psid, "PositionTicks": 50000000, + }) + machine.succeed( + f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing/Stopped' " + f"-d '{payload}' -H 'Content-Type:application/json' " + f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'" + ) + + def wait_for_annotation_count(expected, timeout=15): + machine.wait_until_succeeds( + f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)>={expected} else 1)\"", + timeout=timeout, + ) + + def wait_for_close(annot_id, timeout=25): + machine.wait_until_succeeds( + f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if any('timeEnd' in x and x.get('id')=={annot_id} for x in a) else 1)\"", + timeout=timeout, + ) + + with subtest("State survives service restart (no duplicate annotations)"): + machine.succeed("systemctl stop annotations-svc || true") + time.sleep(1) + start_svc("annotations-svc-2") time.sleep(6) annots = read_annotations() assert len(annots) == 2, f"Restart should not create duplicates, got: {annots}" + + # ------------------------------------------------------------------------ + # Edge cases: reboot / interrupted session scenarios + # ------------------------------------------------------------------------ + # Reset for reliability subtests: close active playbacks, stop units, + # wipe state and annotations file to get a clean slate. + with subtest("Reset for edge-case subtests"): + stop_playback("test-play-multi-1", token, auth_header) + stop_playback("test-play-multi-2", token2, auth_header2) + machine.succeed("systemctl stop annotations-svc-2 || true") + machine.succeed("systemctl stop mock-grafana || true") + machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") + machine.succeed( + f"systemd-run --unit=mock-grafana-rel {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" + ) + machine.wait_for_open_port(GRAFANA_PORT) + start_svc("annotations-svc-rel") + time.sleep(2) + + with subtest("Grafana unreachable during close does not orphan annotation (bug #1)"): + start_playback("test-grafana-down", token, auth_header) + wait_for_annotation_count(1) + annots = read_annotations() + assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" + grafana_id = annots[0]["id"] + + # Simulate Grafana outage concurrent with session end. + # Production scenario: both services restart after reboot; Grafana is + # slower to come up. Old code popped the state entry before attempting + # close, so a failed PATCH permanently leaked the annotation. + machine.succeed("systemctl stop mock-grafana-rel") + stop_playback("test-grafana-down", token, auth_header) + + # Wait long enough for grace period + multiple failed close attempts + # (POLL_INTERVAL=3, MISSING_THRESHOLD=2 -> threshold hit at ~6s). + time.sleep(15) + + # With the fix: state entry must be preserved with grafana_id intact, + # because grafana_close failed. Without the fix: state is empty. + state_data = json.loads(machine.succeed(f"cat {STATE_FILE}")) + assert state_data, ( + f"Bug #1: state entry lost after failed grafana_close; annotation " + f"{grafana_id} orphaned in Grafana forever. state={state_data}" + ) + retained_ids = [v["grafana_id"] for v in state_data.values()] + assert grafana_id in retained_ids, ( + f"Bug #1: state does not retain grafana_id {grafana_id} for retry; got {retained_ids}" + ) + + # Restart Grafana. Next poll should retry and succeed. + machine.succeed( + f"systemd-run --unit=mock-grafana-rel-2 {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" + ) + machine.wait_for_open_port(GRAFANA_PORT) + wait_for_close(grafana_id) + annots = read_annotations() + closed = [a for a in annots if a.get("id") == grafana_id] + assert closed and "timeEnd" in closed[0], f"Annotation {grafana_id} should be closed after Grafana recovery: {annots}" + # State entry must now be gone (successful close → cleanup). + state_after = json.loads(machine.succeed(f"cat {STATE_FILE}")) + assert not any(v["grafana_id"] == grafana_id for v in state_after.values()), ( + f"State entry should be removed after successful close: {state_after}" + ) + + # Reset for grace-period subtest. + with subtest("Reset for grace-period subtest"): + machine.succeed("systemctl stop annotations-svc-rel || true") + machine.succeed("systemctl stop mock-grafana-rel-2 || true") + machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") + machine.succeed( + f"systemd-run --unit=mock-grafana-grace {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" + ) + machine.wait_for_open_port(GRAFANA_PORT) + start_svc("annotations-svc-grace") + time.sleep(2) + + with subtest("Transient session absence within grace window does not close annotation (bug #2)"): + start_playback("test-flap", token, auth_header) + wait_for_annotation_count(1) + annots = read_annotations() + initial_id = annots[0]["id"] + + # Production scenario: Jellyfin restart, brief network hiccup, or + # client reconnect causes /Sessions to return empty for one poll. + # Old code: any missed poll immediately closes; re-appearance opens a + # duplicate annotation. Fix: grace period absorbs a single absent poll. + stop_playback("test-flap", token, auth_header) + # Sleep long enough for at least one poll to see absence (missing_count=1), + # but less than MISSING_THRESHOLD*POLL_INTERVAL (=6s) so close is NOT yet triggered. + time.sleep(4) + start_playback("test-flap", token, auth_header) + # Let several polls run; session should re-stabilize, missing_count reset. + time.sleep(9) + + annots = read_annotations() + assert len(annots) == 1, ( + f"Bug #2: transient absence created a duplicate annotation. " + f"Grace period should have held. got: {annots}" + ) + assert "timeEnd" not in annots[0], ( + f"Bug #2: annotation closed during flap; should still be open. got: {annots}" + ) + assert annots[0]["id"] == initial_id, ( + f"Bug #2: annotation id changed across flap; same session should keep id. got: {annots}" + ) + + # Reset for outage subtest. + with subtest("Reset for outage subtest"): + stop_playback("test-flap", token, auth_header) + machine.succeed("systemctl stop annotations-svc-grace || true") + machine.succeed("systemctl stop mock-grafana-grace || true") + machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") + machine.succeed( + f"systemd-run --unit=mock-grafana-out {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" + ) + machine.wait_for_open_port(GRAFANA_PORT) + start_svc("annotations-svc-out-1") + time.sleep(2) + + with subtest("Long outage closes annotation near last_seen_ms, not service-recovery time (bug #3)"): + start_playback("test-outage", token, auth_header) + wait_for_annotation_count(1) + annots = read_annotations() + outage_id = annots[0]["id"] + + # Let it run for a couple of polls so last_seen_ms is strictly > start_ms. + time.sleep(8) + state_before = json.loads(machine.succeed(f"cat {STATE_FILE}")) + assert state_before, "Expected state entry before outage" + # Only the fixed code records last_seen_ms. Old code has no such field. + last_seen_before = max( + (v.get("last_seen_ms") for v in state_before.values() if "last_seen_ms" in v), + default=None, + ) + assert last_seen_before is not None, ( + f"Bug #3: state schema missing last_seen_ms; cannot bound timeEnd. state={state_before}" + ) + + # Simulate a long outage: stop the annotations service, end playback, + # and wait well past what old code would interpret as real-time now. + machine.succeed("systemctl stop annotations-svc-out-1") + stop_playback("test-outage", token, auth_header) + OUTAGE_SEC = 30 + time.sleep(OUTAGE_SEC) + before_restart_ms = int(time.time() * 1000) + + # Bring annotations service back (simulating post-reboot recovery). + start_svc("annotations-svc-out-2") + wait_for_close(outage_id) + + annots = read_annotations() + closed = [a for a in annots if a.get("id") == outage_id] + assert closed, f"Expected outage_id {outage_id} in annotations: {annots}" + time_end = closed[0]["timeEnd"] + gap_ms = time_end - last_seen_before + # grace_bound = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000 = 9000ms. + # Allow slack for poll timing: assert close time is well below what + # now_ms-based closing would produce (OUTAGE_SEC * 1000 = 30000ms). + assert gap_ms < 15000, ( + f"Bug #3: timeEnd {time_end} is {gap_ms}ms past last_seen {last_seen_before}; " + f"expected grace-bounded close (< 15000ms). Old code uses now_ms." + ) + assert time_end < before_restart_ms, ( + f"Bug #3: timeEnd {time_end} is at-or-after service-restart time {before_restart_ms}; " + f"should be clamped to last_seen + grace." + ) ''; }