{ lib, pkgs, ... }: let jfLib = import ./jellyfin-test-lib.nix { inherit pkgs lib; }; mockGrafana = ./mock-grafana-server.py; script = ../services/grafana/jellyfin-annotations.py; python = pkgs.python3; in pkgs.testers.runNixOSTest { name = "jellyfin-annotations"; nodes.machine = { pkgs, ... }: { imports = [ jfLib.jellyfinTestConfig ]; environment.systemPackages = [ pkgs.python3 ]; }; testScript = '' import json import time import importlib.util _spec = importlib.util.spec_from_file_location("jf_helpers", "${jfLib.helpers}") assert _spec and _spec.loader _jf = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(_jf) setup_jellyfin = _jf.setup_jellyfin jellyfin_api = _jf.jellyfin_api GRAFANA_PORT = 13000 ANNOTS_FILE = "/tmp/annotations.json" STATE_FILE = "/tmp/annotations-state.json" CREDS_DIR = "/tmp/test-creds" PYTHON = "${python}/bin/python3" MOCK_GRAFANA = "${mockGrafana}" SCRIPT = "${script}" auth_header = 'MediaBrowser Client="Infuse", DeviceId="test-dev-1", Device="iPhone", Version="1.0"' auth_header2 = 'MediaBrowser Client="Jellyfin Web", DeviceId="test-dev-2", Device="Chrome", Version="1.0"' def read_annotations(): out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'") return json.loads(out.strip()) start_all() token, user_id, movie_id, media_source_id = setup_jellyfin( machine, retry, auth_header, "${jfLib.payloads.auth}", "${jfLib.payloads.empty}", ) with subtest("Setup mock Grafana and credentials"): machine.succeed(f"mkdir -p {CREDS_DIR}") machine.succeed(f"echo '{token}' > {CREDS_DIR}/jellyfin-api-key") machine.succeed(f"echo '[]' > {ANNOTS_FILE}") machine.succeed( f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_until_succeeds( f"curl -sf -X POST http://127.0.0.1:{GRAFANA_PORT}/api/annotations " f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id", timeout=10, ) machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start annotation service"): machine.succeed( f"systemd-run --unit=annotations-svc " f"--setenv=JELLYFIN_URL=http://127.0.0.1:8096 " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"{PYTHON} {SCRIPT}" ) time.sleep(2) with subtest("No annotations when no streams active"): time.sleep(4) annots = read_annotations() assert annots == [], f"Expected no annotations, got: {annots}" with subtest("Annotation created when playback starts"): playback_start = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": "test-play-1", "CanSeek": True, "IsPaused": False, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' " f"-d '{playback_start}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{auth_header}, Token={token}'" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" text = annots[0]["text"] assert "jellyfin" in annots[0].get("tags", []), f"Missing jellyfin tag: {annots[0]}" assert "Test Movie" in text, f"Missing title in: {text}" assert "Infuse" in text, f"Missing client in: {text}" assert "iPhone" in text, f"Missing device in: {text}" assert "timeEnd" not in annots[0], f"timeEnd should not be set yet: {annots[0]}" with subtest("Annotation closed when playback stops"): playback_stop = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": "test-play-1", "PositionTicks": 50000000, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing/Stopped' " f"-d '{playback_stop}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{auth_header}, Token={token}'" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" assert "timeEnd" in annots[0], f"timeEnd should be set: {annots[0]}" assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time" with subtest("Multiple concurrent streams each get their own annotation"): machine.succeed(f"echo '[]' > {ANNOTS_FILE}") auth_result2 = json.loads(machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Users/AuthenticateByName' " f"-d '@${jfLib.payloads.auth}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{auth_header2}'" )) token2 = auth_result2["AccessToken"] playback1 = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": "test-play-multi-1", "CanSeek": True, "IsPaused": False, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' " f"-d '{playback1}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{auth_header}, Token={token}'" ) playback2 = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": "test-play-multi-2", "CanSeek": True, "IsPaused": False, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' " f"-d '{playback2}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{auth_header2}, Token={token2}'" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 2, f"Expected 2 annotations, got: {annots}" def start_svc(unit): machine.succeed( f"systemd-run --unit={unit} " f"--setenv=JELLYFIN_URL=http://127.0.0.1:8096 " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"--setenv=MISSING_THRESHOLD=2 " f"{PYTHON} {SCRIPT}" ) def start_playback(psid, tok, hdr): payload = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": psid, "CanSeek": True, "IsPaused": False, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' " f"-d '{payload}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'" ) def stop_playback(psid, tok, hdr): payload = json.dumps({ "ItemId": movie_id, "MediaSourceId": media_source_id, "PlaySessionId": psid, "PositionTicks": 50000000, }) machine.succeed( f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing/Stopped' " f"-d '{payload}' -H 'Content-Type:application/json' " f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'" ) def wait_for_annotation_count(expected, timeout=15): machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)>={expected} else 1)\"", timeout=timeout, ) def wait_for_close(annot_id, timeout=25): machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if any('timeEnd' in x and x.get('id')=={annot_id} for x in a) else 1)\"", timeout=timeout, ) with subtest("State survives service restart (no duplicate annotations)"): machine.succeed("systemctl stop annotations-svc || true") time.sleep(1) start_svc("annotations-svc-2") time.sleep(6) annots = read_annotations() assert len(annots) == 2, f"Restart should not create duplicates, got: {annots}" # ------------------------------------------------------------------------ # Edge cases: reboot / interrupted session scenarios # ------------------------------------------------------------------------ # Reset for reliability subtests: close active playbacks, stop units, # wipe state and annotations file to get a clean slate. with subtest("Reset for edge-case subtests"): stop_playback("test-play-multi-1", token, auth_header) stop_playback("test-play-multi-2", token2, auth_header2) machine.succeed("systemctl stop annotations-svc-2 || true") machine.succeed("systemctl stop mock-grafana || true") machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") machine.succeed( f"systemd-run --unit=mock-grafana-rel {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_for_open_port(GRAFANA_PORT) start_svc("annotations-svc-rel") time.sleep(2) with subtest("Grafana unreachable during close does not orphan annotation (bug #1)"): start_playback("test-grafana-down", token, auth_header) wait_for_annotation_count(1) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" grafana_id = annots[0]["id"] # Simulate Grafana outage concurrent with session end. # Production scenario: both services restart after reboot; Grafana is # slower to come up. Old code popped the state entry before attempting # close, so a failed PATCH permanently leaked the annotation. machine.succeed("systemctl stop mock-grafana-rel") stop_playback("test-grafana-down", token, auth_header) # Wait long enough for grace period + multiple failed close attempts # (POLL_INTERVAL=3, MISSING_THRESHOLD=2 -> threshold hit at ~6s). time.sleep(15) # With the fix: state entry must be preserved with grafana_id intact, # because grafana_close failed. Without the fix: state is empty. state_data = json.loads(machine.succeed(f"cat {STATE_FILE}")) assert state_data, ( f"Bug #1: state entry lost after failed grafana_close; annotation " f"{grafana_id} orphaned in Grafana forever. state={state_data}" ) retained_ids = [v["grafana_id"] for v in state_data.values()] assert grafana_id in retained_ids, ( f"Bug #1: state does not retain grafana_id {grafana_id} for retry; got {retained_ids}" ) # Restart Grafana. Next poll should retry and succeed. machine.succeed( f"systemd-run --unit=mock-grafana-rel-2 {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_for_open_port(GRAFANA_PORT) wait_for_close(grafana_id) annots = read_annotations() closed = [a for a in annots if a.get("id") == grafana_id] assert closed and "timeEnd" in closed[0], f"Annotation {grafana_id} should be closed after Grafana recovery: {annots}" # State entry must now be gone (successful close → cleanup). state_after = json.loads(machine.succeed(f"cat {STATE_FILE}")) assert not any(v["grafana_id"] == grafana_id for v in state_after.values()), ( f"State entry should be removed after successful close: {state_after}" ) # Reset for grace-period subtest. with subtest("Reset for grace-period subtest"): machine.succeed("systemctl stop annotations-svc-rel || true") machine.succeed("systemctl stop mock-grafana-rel-2 || true") machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") machine.succeed( f"systemd-run --unit=mock-grafana-grace {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_for_open_port(GRAFANA_PORT) start_svc("annotations-svc-grace") time.sleep(2) with subtest("Transient session absence within grace window does not close annotation (bug #2)"): start_playback("test-flap", token, auth_header) wait_for_annotation_count(1) annots = read_annotations() initial_id = annots[0]["id"] # Production scenario: Jellyfin restart, brief network hiccup, or # client reconnect causes /Sessions to return empty for one poll. # Old code: any missed poll immediately closes; re-appearance opens a # duplicate annotation. Fix: grace period absorbs a single absent poll. stop_playback("test-flap", token, auth_header) # Sleep long enough for at least one poll to see absence (missing_count=1), # but less than MISSING_THRESHOLD*POLL_INTERVAL (=6s) so close is NOT yet triggered. time.sleep(4) start_playback("test-flap", token, auth_header) # Let several polls run; session should re-stabilize, missing_count reset. time.sleep(9) annots = read_annotations() assert len(annots) == 1, ( f"Bug #2: transient absence created a duplicate annotation. " f"Grace period should have held. got: {annots}" ) assert "timeEnd" not in annots[0], ( f"Bug #2: annotation closed during flap; should still be open. got: {annots}" ) assert annots[0]["id"] == initial_id, ( f"Bug #2: annotation id changed across flap; same session should keep id. got: {annots}" ) # Reset for outage subtest. with subtest("Reset for outage subtest"): stop_playback("test-flap", token, auth_header) machine.succeed("systemctl stop annotations-svc-grace || true") machine.succeed("systemctl stop mock-grafana-grace || true") machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}") machine.succeed( f"systemd-run --unit=mock-grafana-out {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_for_open_port(GRAFANA_PORT) start_svc("annotations-svc-out-1") time.sleep(2) with subtest("Long outage closes annotation near last_seen_ms, not service-recovery time (bug #3)"): start_playback("test-outage", token, auth_header) wait_for_annotation_count(1) annots = read_annotations() outage_id = annots[0]["id"] # Let it run for a couple of polls so last_seen_ms is strictly > start_ms. time.sleep(8) state_before = json.loads(machine.succeed(f"cat {STATE_FILE}")) assert state_before, "Expected state entry before outage" # Only the fixed code records last_seen_ms. Old code has no such field. last_seen_before = max( (v.get("last_seen_ms") for v in state_before.values() if "last_seen_ms" in v), default=None, ) assert last_seen_before is not None, ( f"Bug #3: state schema missing last_seen_ms; cannot bound timeEnd. state={state_before}" ) # Simulate a long outage: stop the annotations service, end playback, # and wait well past what old code would interpret as real-time now. machine.succeed("systemctl stop annotations-svc-out-1") stop_playback("test-outage", token, auth_header) OUTAGE_SEC = 30 time.sleep(OUTAGE_SEC) before_restart_ms = int(time.time() * 1000) # Bring annotations service back (simulating post-reboot recovery). start_svc("annotations-svc-out-2") wait_for_close(outage_id) annots = read_annotations() closed = [a for a in annots if a.get("id") == outage_id] assert closed, f"Expected outage_id {outage_id} in annotations: {annots}" time_end = closed[0]["timeEnd"] gap_ms = time_end - last_seen_before # grace_bound = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000 = 9000ms. # Allow slack for poll timing: assert close time is well below what # now_ms-based closing would produce (OUTAGE_SEC * 1000 = 30000ms). assert gap_ms < 15000, ( f"Bug #3: timeEnd {time_end} is {gap_ms}ms past last_seen {last_seen_before}; " f"expected grace-bounded close (< 15000ms). Old code uses now_ms." ) assert time_end < before_restart_ms, ( f"Bug #3: timeEnd {time_end} is at-or-after service-restart time {before_restart_ms}; " f"should be clamped to last_seen + grace." ) ''; }