{ lib, pkgs, ... }: let mockServer = pkgs.writeText "mock-server.py" '' import http.server, json, os, sys from urllib.parse import urlparse MODE = sys.argv[1] PORT = int(sys.argv[2]) DATA_FILE = sys.argv[3] class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): pass def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return json.loads(self.rfile.read(length)) if length else {} def _json(self, code, body): data = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(data) def do_GET(self): if MODE == "jellyfin" and self.path.startswith("/Sessions"): try: with open(DATA_FILE) as f: sessions = json.load(f) except Exception: sessions = [] self._json(200, sessions) else: self.send_response(404) self.end_headers() def do_POST(self): if MODE == "grafana" and self.path == "/api/annotations": body = self._read_body() try: with open(DATA_FILE) as f: annotations = json.load(f) except Exception: annotations = [] aid = len(annotations) + 1 body["id"] = aid annotations.append(body) with open(DATA_FILE, "w") as f: json.dump(annotations, f) self._json(200, {"id": aid, "message": "Annotation added"}) else: self.send_response(404) self.end_headers() def do_PATCH(self): if MODE == "grafana" and self.path.startswith("/api/annotations/"): aid = int(self.path.rsplit("/", 1)[-1]) body = self._read_body() try: with open(DATA_FILE) as f: annotations = json.load(f) except Exception: annotations = [] for a in annotations: if a["id"] == aid: a.update(body) with open(DATA_FILE, "w") as f: json.dump(annotations, f) self._json(200, {"message": "Annotation patched"}) else: self.send_response(404) self.end_headers() http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever() ''; script = ../services/jellyfin-annotations.py; python = pkgs.python3; in pkgs.testers.runNixOSTest { name = "jellyfin-annotations"; nodes.machine = { pkgs, ... }: { environment.systemPackages = [ pkgs.python3 ]; }; testScript = '' import json import time JELLYFIN_PORT = 18096 GRAFANA_PORT = 13000 SESSIONS_FILE = "/tmp/sessions.json" ANNOTS_FILE = "/tmp/annotations.json" STATE_FILE = "/tmp/annotations-state.json" CREDS_DIR = "/tmp/test-creds" PYTHON = "${python}/bin/python3" MOCK = "${mockServer}" SCRIPT = "${script}" def read_annotations(): out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'") return json.loads(out.strip()) start_all() machine.wait_for_unit("multi-user.target") with subtest("Setup mock credentials and data files"): machine.succeed(f"mkdir -p {CREDS_DIR} && echo 'fake-api-key' > {CREDS_DIR}/jellyfin-api-key") machine.succeed(f"echo '[]' > {SESSIONS_FILE}") machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start mock Jellyfin and Grafana servers"): machine.succeed( f"systemd-run --unit=mock-jellyfin {PYTHON} {MOCK} jellyfin {JELLYFIN_PORT} {SESSIONS_FILE}" ) machine.succeed( f"systemd-run --unit=mock-grafana {PYTHON} {MOCK} grafana {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_until_succeeds( f"curl -sf http://127.0.0.1:{JELLYFIN_PORT}/Sessions", timeout=10 ) machine.wait_until_succeeds( f"curl -sf -X POST http://127.0.0.1:{GRAFANA_PORT}/api/annotations " f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id", timeout=10, ) machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start annotation service pointing at mock servers"): machine.succeed( f"systemd-run --unit=annotations-svc " f"--setenv=JELLYFIN_URL=http://127.0.0.1:{JELLYFIN_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"{PYTHON} {SCRIPT}" ) time.sleep(2) with subtest("No annotations pushed when no streams active"): time.sleep(4) annots = read_annotations() assert annots == [], f"Expected no annotations, got: {annots}" with subtest("Annotation created when stream starts"): machine.succeed( f"""echo '[{{"Id":"sess-1","UserName":"alice","NowPlayingItem":{{"Name":"Inception","Type":"Movie"}}}}]' > {SESSIONS_FILE}""" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" assert annots[0]["text"] == "alice: Inception (movie)", f"Unexpected label: {annots[0]}" assert "jellyfin" in annots[0].get("tags", []), f"Missing jellyfin tag: {annots[0]}" assert "timeEnd" not in annots[0], f"timeEnd should not be set yet: {annots[0]}" with subtest("Annotation closed when stream ends"): machine.succeed(f"echo '[]' > {SESSIONS_FILE}") machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" assert "timeEnd" in annots[0], f"timeEnd should be set: {annots[0]}" assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time" with subtest("Multiple concurrent streams each get their own annotation"): machine.succeed(f"echo '[]' > {ANNOTS_FILE}") machine.succeed( f"""echo '[ {{"Id":"sess-2","UserName":"bob","NowPlayingItem":{{"Name":"Breaking Bad","SeriesName":"Breaking Bad","ParentIndexNumber":1,"IndexNumber":1}}}}, {{"Id":"sess-3","UserName":"carol","NowPlayingItem":{{"Name":"Inception","Type":"Movie"}}}} ]' > {SESSIONS_FILE}""" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 2, f"Expected 2 annotations, got: {annots}" labels = sorted(a["text"] for a in annots) assert labels[0] == "bob: Breaking Bad S01E01 - Breaking Bad", f"Unexpected: {labels[0]}" assert labels[1] == "carol: Inception (movie)", f"Unexpected: {labels[1]}" with subtest("State survives service restart (no duplicate annotations)"): machine.succeed("systemctl stop annotations-svc || true") time.sleep(1) machine.succeed( f"systemd-run --unit=annotations-svc-2 " f"--setenv=JELLYFIN_URL=http://127.0.0.1:{JELLYFIN_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"{PYTHON} {SCRIPT}" ) time.sleep(6) annots = read_annotations() assert len(annots) == 2, f"Restart should not create duplicates, got: {annots}" ''; }