jellyfin-annotations: preserve state on grafana failure, add grace period

Three edge cases broke annotations on reboot or interrupted sessions:

- state.pop() ran before grafana_close(), so a failed PATCH (Grafana
  still restarting after reboot) permanently lost the grafana_id and
  left the annotation open forever in Grafana.
- a single poll with no sessions closed every active annotation, so
  Jellyfin restarts or client reconnects produced spurious close +
  duplicate-open pairs.
- timeEnd was always now_ms, so a reboot during playback wrote an
  annotation reading as if the user watched through the outage.

Fix: track last_seen_ms and missing_count in state; retain entries
until grafana_close succeeds (retry indefinitely); require
MISSING_THRESHOLD absent polls before close; clamp close_time to
last_seen_ms + (MISSING_THRESHOLD + 1) * POLL_INTERVAL.

Adds three subtests in tests/jellyfin-annotations.nix that each fail
on the old code and pass on the new.
This commit is contained in:
2026-04-22 00:35:26 -04:00
parent a228f61d34
commit ddac5e3f04
2 changed files with 286 additions and 22 deletions

View File

@@ -10,6 +10,11 @@ JELLYFIN_URL = os.environ.get("JELLYFIN_URL", "http://127.0.0.1:8096")
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/jellyfin-annotations/state.json")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
# Consecutive polls a session must be absent from /Sessions before we close
# its annotation. Smooths over Jellyfin restarts, client reconnects, and
# brief network hiccups that would otherwise spuriously close + reopen an
# annotation every time /Sessions returns an empty list for a single poll.
MISSING_THRESHOLD = int(os.environ.get("MISSING_THRESHOLD", "2"))
def get_api_key():
@@ -193,8 +198,77 @@ def grafana_close(grafana_id, end_ms):
f"{GRAFANA_URL}/api/annotations/{grafana_id}",
{"timeEnd": end_ms},
)
return True
except Exception as e:
print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr)
return False
def reconcile(state, sessions, now_ms):
"""Fold the current /Sessions snapshot into state in place.
Returns True iff state was mutated (caller should persist).
Invariants this preserves, which the prior implementation violated:
- A state entry is only removed after grafana_close succeeds. A failed
PATCH (Grafana restarting, network blip) must leave the entry so we
can retry on the next poll rather than orphan the open annotation.
- Closing a session uses last_seen_ms clamped by grace window, not
now_ms. After a reboot or long outage, now_ms is wildly later than
when playback actually stopped, and using it paints the annotation
as if the user watched through the outage.
- A single missed poll does not close; a session must be absent for
MISSING_THRESHOLD consecutive polls. Absorbs Jellyfin restarts and
brief /Sessions empties without duplicating annotations.
"""
dirty = False
current_ids = {s["Id"] for s in sessions}
# Active sessions: create new entries or refresh existing ones.
for s in sessions:
sid = s["Id"]
entry = state.get(sid)
if entry is None:
label = format_label(s)
grafana_id = grafana_post(label, now_ms)
if grafana_id is None:
# Grafana unreachable; retry on next poll. Do not persist a
# half-open entry.
continue
state[sid] = {
"grafana_id": grafana_id,
"label": label,
"start_ms": now_ms,
"last_seen_ms": now_ms,
"missing_count": 0,
}
dirty = True
else:
entry["last_seen_ms"] = now_ms
entry["missing_count"] = 0
dirty = True
# Absent sessions: increment miss counter; close only after threshold.
# `grace_ms` caps how far timeEnd drifts from last_seen, so a reboot or
# long outage closes the annotation near when playback actually stopped
# rather than at service-recovery time.
grace_ms = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000
for sid in list(state.keys()):
if sid in current_ids:
continue
entry = state[sid]
entry["missing_count"] = entry.get("missing_count", 0) + 1
dirty = True
if entry["missing_count"] < MISSING_THRESHOLD:
continue
last_seen_ms = entry.get("last_seen_ms", now_ms)
close_time = min(now_ms, last_seen_ms + grace_ms)
if grafana_close(entry["grafana_id"], close_time):
del state[sid]
# On failure the entry stays with a bumped missing_count; next poll
# will retry with the same bounded close_time.
return dirty
def main():
@@ -205,25 +279,11 @@ def main():
now_ms = int(time.time() * 1000)
sessions = get_active_sessions(api_key)
# sessions is None on a Jellyfin API error — skip the whole pass
# rather than risk reinterpreting a transport failure as "nothing
# is playing" and closing every open annotation.
if sessions is not None:
current_ids = {s["Id"] for s in sessions}
for s in sessions:
sid = s["Id"]
if sid not in state:
label = format_label(s)
grafana_id = grafana_post(label, now_ms)
if grafana_id is not None:
state[sid] = {
"grafana_id": grafana_id,
"label": label,
"start_ms": now_ms,
}
save_state(state)
for sid in [k for k in state if k not in current_ids]:
info = state.pop(sid)
grafana_close(info["grafana_id"], now_ms)
if reconcile(state, sessions, now_ms):
save_state(state)
time.sleep(POLL_INTERVAL)

View File

@@ -171,20 +171,224 @@ pkgs.testers.runNixOSTest {
annots = read_annotations()
assert len(annots) == 2, f"Expected 2 annotations, got: {annots}"
with subtest("State survives service restart (no duplicate annotations)"):
machine.succeed("systemctl stop annotations-svc || true")
time.sleep(1)
def start_svc(unit):
machine.succeed(
f"systemd-run --unit=annotations-svc-2 "
f"systemd-run --unit={unit} "
f"--setenv=JELLYFIN_URL=http://127.0.0.1:8096 "
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} "
f"--setenv=STATE_FILE={STATE_FILE} "
f"--setenv=POLL_INTERVAL=3 "
f"--setenv=MISSING_THRESHOLD=2 "
f"{PYTHON} {SCRIPT}"
)
def start_playback(psid, tok, hdr):
payload = json.dumps({
"ItemId": movie_id, "MediaSourceId": media_source_id,
"PlaySessionId": psid, "CanSeek": True, "IsPaused": False,
})
machine.succeed(
f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing' "
f"-d '{payload}' -H 'Content-Type:application/json' "
f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'"
)
def stop_playback(psid, tok, hdr):
payload = json.dumps({
"ItemId": movie_id, "MediaSourceId": media_source_id,
"PlaySessionId": psid, "PositionTicks": 50000000,
})
machine.succeed(
f"curl -sf -X POST 'http://localhost:8096/Sessions/Playing/Stopped' "
f"-d '{payload}' -H 'Content-Type:application/json' "
f"-H 'X-Emby-Authorization:{hdr}, Token={tok}'"
)
def wait_for_annotation_count(expected, timeout=15):
machine.wait_until_succeeds(
f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)>={expected} else 1)\"",
timeout=timeout,
)
def wait_for_close(annot_id, timeout=25):
machine.wait_until_succeeds(
f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if any('timeEnd' in x and x.get('id')=={annot_id} for x in a) else 1)\"",
timeout=timeout,
)
with subtest("State survives service restart (no duplicate annotations)"):
machine.succeed("systemctl stop annotations-svc || true")
time.sleep(1)
start_svc("annotations-svc-2")
time.sleep(6)
annots = read_annotations()
assert len(annots) == 2, f"Restart should not create duplicates, got: {annots}"
# ------------------------------------------------------------------------
# Edge cases: reboot / interrupted session scenarios
# ------------------------------------------------------------------------
# Reset for reliability subtests: close active playbacks, stop units,
# wipe state and annotations file to get a clean slate.
with subtest("Reset for edge-case subtests"):
stop_playback("test-play-multi-1", token, auth_header)
stop_playback("test-play-multi-2", token2, auth_header2)
machine.succeed("systemctl stop annotations-svc-2 || true")
machine.succeed("systemctl stop mock-grafana || true")
machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}")
machine.succeed(
f"systemd-run --unit=mock-grafana-rel {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
)
machine.wait_for_open_port(GRAFANA_PORT)
start_svc("annotations-svc-rel")
time.sleep(2)
with subtest("Grafana unreachable during close does not orphan annotation (bug #1)"):
start_playback("test-grafana-down", token, auth_header)
wait_for_annotation_count(1)
annots = read_annotations()
assert len(annots) == 1, f"Expected 1 annotation, got: {annots}"
grafana_id = annots[0]["id"]
# Simulate Grafana outage concurrent with session end.
# Production scenario: both services restart after reboot; Grafana is
# slower to come up. Old code popped the state entry before attempting
# close, so a failed PATCH permanently leaked the annotation.
machine.succeed("systemctl stop mock-grafana-rel")
stop_playback("test-grafana-down", token, auth_header)
# Wait long enough for grace period + multiple failed close attempts
# (POLL_INTERVAL=3, MISSING_THRESHOLD=2 -> threshold hit at ~6s).
time.sleep(15)
# With the fix: state entry must be preserved with grafana_id intact,
# because grafana_close failed. Without the fix: state is empty.
state_data = json.loads(machine.succeed(f"cat {STATE_FILE}"))
assert state_data, (
f"Bug #1: state entry lost after failed grafana_close; annotation "
f"{grafana_id} orphaned in Grafana forever. state={state_data}"
)
retained_ids = [v["grafana_id"] for v in state_data.values()]
assert grafana_id in retained_ids, (
f"Bug #1: state does not retain grafana_id {grafana_id} for retry; got {retained_ids}"
)
# Restart Grafana. Next poll should retry and succeed.
machine.succeed(
f"systemd-run --unit=mock-grafana-rel-2 {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
)
machine.wait_for_open_port(GRAFANA_PORT)
wait_for_close(grafana_id)
annots = read_annotations()
closed = [a for a in annots if a.get("id") == grafana_id]
assert closed and "timeEnd" in closed[0], f"Annotation {grafana_id} should be closed after Grafana recovery: {annots}"
# State entry must now be gone (successful close cleanup).
state_after = json.loads(machine.succeed(f"cat {STATE_FILE}"))
assert not any(v["grafana_id"] == grafana_id for v in state_after.values()), (
f"State entry should be removed after successful close: {state_after}"
)
# Reset for grace-period subtest.
with subtest("Reset for grace-period subtest"):
machine.succeed("systemctl stop annotations-svc-rel || true")
machine.succeed("systemctl stop mock-grafana-rel-2 || true")
machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}")
machine.succeed(
f"systemd-run --unit=mock-grafana-grace {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
)
machine.wait_for_open_port(GRAFANA_PORT)
start_svc("annotations-svc-grace")
time.sleep(2)
with subtest("Transient session absence within grace window does not close annotation (bug #2)"):
start_playback("test-flap", token, auth_header)
wait_for_annotation_count(1)
annots = read_annotations()
initial_id = annots[0]["id"]
# Production scenario: Jellyfin restart, brief network hiccup, or
# client reconnect causes /Sessions to return empty for one poll.
# Old code: any missed poll immediately closes; re-appearance opens a
# duplicate annotation. Fix: grace period absorbs a single absent poll.
stop_playback("test-flap", token, auth_header)
# Sleep long enough for at least one poll to see absence (missing_count=1),
# but less than MISSING_THRESHOLD*POLL_INTERVAL (=6s) so close is NOT yet triggered.
time.sleep(4)
start_playback("test-flap", token, auth_header)
# Let several polls run; session should re-stabilize, missing_count reset.
time.sleep(9)
annots = read_annotations()
assert len(annots) == 1, (
f"Bug #2: transient absence created a duplicate annotation. "
f"Grace period should have held. got: {annots}"
)
assert "timeEnd" not in annots[0], (
f"Bug #2: annotation closed during flap; should still be open. got: {annots}"
)
assert annots[0]["id"] == initial_id, (
f"Bug #2: annotation id changed across flap; same session should keep id. got: {annots}"
)
# Reset for outage subtest.
with subtest("Reset for outage subtest"):
stop_playback("test-flap", token, auth_header)
machine.succeed("systemctl stop annotations-svc-grace || true")
machine.succeed("systemctl stop mock-grafana-grace || true")
machine.succeed(f"rm -f {STATE_FILE}; echo '[]' > {ANNOTS_FILE}")
machine.succeed(
f"systemd-run --unit=mock-grafana-out {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
)
machine.wait_for_open_port(GRAFANA_PORT)
start_svc("annotations-svc-out-1")
time.sleep(2)
with subtest("Long outage closes annotation near last_seen_ms, not service-recovery time (bug #3)"):
start_playback("test-outage", token, auth_header)
wait_for_annotation_count(1)
annots = read_annotations()
outage_id = annots[0]["id"]
# Let it run for a couple of polls so last_seen_ms is strictly > start_ms.
time.sleep(8)
state_before = json.loads(machine.succeed(f"cat {STATE_FILE}"))
assert state_before, "Expected state entry before outage"
# Only the fixed code records last_seen_ms. Old code has no such field.
last_seen_before = max(
(v.get("last_seen_ms") for v in state_before.values() if "last_seen_ms" in v),
default=None,
)
assert last_seen_before is not None, (
f"Bug #3: state schema missing last_seen_ms; cannot bound timeEnd. state={state_before}"
)
# Simulate a long outage: stop the annotations service, end playback,
# and wait well past what old code would interpret as real-time now.
machine.succeed("systemctl stop annotations-svc-out-1")
stop_playback("test-outage", token, auth_header)
OUTAGE_SEC = 30
time.sleep(OUTAGE_SEC)
before_restart_ms = int(time.time() * 1000)
# Bring annotations service back (simulating post-reboot recovery).
start_svc("annotations-svc-out-2")
wait_for_close(outage_id)
annots = read_annotations()
closed = [a for a in annots if a.get("id") == outage_id]
assert closed, f"Expected outage_id {outage_id} in annotations: {annots}"
time_end = closed[0]["timeEnd"]
gap_ms = time_end - last_seen_before
# grace_bound = (MISSING_THRESHOLD + 1) * POLL_INTERVAL * 1000 = 9000ms.
# Allow slack for poll timing: assert close time is well below what
# now_ms-based closing would produce (OUTAGE_SEC * 1000 = 30000ms).
assert gap_ms < 15000, (
f"Bug #3: timeEnd {time_end} is {gap_ms}ms past last_seen {last_seen_before}; "
f"expected grace-bounded close (< 15000ms). Old code uses now_ms."
)
assert time_end < before_restart_ms, (
f"Bug #3: timeEnd {time_end} is at-or-after service-restart time {before_restart_ms}; "
f"should be clamped to last_seen + grace."
)
'';
}