{ lib, pkgs, ... }: let mockServer = pkgs.writeText "mock-server.py" '' import http.server, json, os, sys from urllib.parse import urlparse MODE = sys.argv[1] PORT = int(sys.argv[2]) DATA_FILE = sys.argv[3] class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): pass def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return json.loads(self.rfile.read(length)) if length else {} def _json(self, code, body): data = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(data) def do_GET(self): if MODE == "jellyfin" and self.path.startswith("/Sessions"): try: with open(DATA_FILE) as f: sessions = json.load(f) except Exception: sessions = [] self._json(200, sessions) else: self.send_response(404) self.end_headers() def do_POST(self): if MODE == "grafana" and self.path == "/api/annotations": body = self._read_body() try: with open(DATA_FILE) as f: annotations = json.load(f) except Exception: annotations = [] aid = len(annotations) + 1 body["id"] = aid annotations.append(body) with open(DATA_FILE, "w") as f: json.dump(annotations, f) self._json(200, {"id": aid, "message": "Annotation added"}) else: self.send_response(404) self.end_headers() def do_PATCH(self): if MODE == "grafana" and self.path.startswith("/api/annotations/"): aid = int(self.path.rsplit("/", 1)[-1]) body = self._read_body() try: with open(DATA_FILE) as f: annotations = json.load(f) except Exception: annotations = [] for a in annotations: if a["id"] == aid: a.update(body) with open(DATA_FILE, "w") as f: json.dump(annotations, f) self._json(200, {"message": "Annotation patched"}) else: self.send_response(404) self.end_headers() http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever() ''; script = ../services/jellyfin-annotations.py; python = pkgs.python3; in pkgs.testers.runNixOSTest { name = "jellyfin-annotations"; nodes.machine = { pkgs, ... }: { environment.systemPackages = [ pkgs.python3 ]; }; testScript = '' import json import time JELLYFIN_PORT = 18096 GRAFANA_PORT = 13000 SESSIONS_FILE = "/tmp/sessions.json" ANNOTS_FILE = "/tmp/annotations.json" STATE_FILE = "/tmp/annotations-state.json" CREDS_DIR = "/tmp/test-creds" PYTHON = "${python}/bin/python3" MOCK = "${mockServer}" SCRIPT = "${script}" def read_annotations(): out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'") return json.loads(out.strip()) start_all() machine.wait_for_unit("multi-user.target") with subtest("Setup mock credentials and data files"): machine.succeed(f"mkdir -p {CREDS_DIR} && echo 'fake-api-key' > {CREDS_DIR}/jellyfin-api-key") machine.succeed(f"echo '[]' > {SESSIONS_FILE}") machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start mock Jellyfin and Grafana servers"): machine.succeed( f"systemd-run --unit=mock-jellyfin {PYTHON} {MOCK} jellyfin {JELLYFIN_PORT} {SESSIONS_FILE}" ) machine.succeed( f"systemd-run --unit=mock-grafana {PYTHON} {MOCK} grafana {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.wait_until_succeeds( f"curl -sf http://127.0.0.1:{JELLYFIN_PORT}/Sessions", timeout=10 ) machine.wait_until_succeeds( f"curl -sf -X POST http://127.0.0.1:{GRAFANA_PORT}/api/annotations " f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id", timeout=10, ) machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start annotation service pointing at mock servers"): machine.succeed( f"systemd-run --unit=annotations-svc " f"--setenv=JELLYFIN_URL=http://127.0.0.1:{JELLYFIN_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"{PYTHON} {SCRIPT}" ) time.sleep(2) with subtest("No annotations pushed when no streams active"): time.sleep(4) annots = read_annotations() assert annots == [], f"Expected no annotations, got: {annots}" with subtest("Annotation created when stream starts"): rich_session = json.dumps([{ "Id": "sess-1", "UserName": "alice", "Client": "Infuse", "DeviceName": "iPhone", "PlayState": {"PlayMethod": "Transcode"}, "NowPlayingItem": { "Name": "Inception", "Type": "Movie", "Bitrate": 20000000, "MediaStreams": [ {"Type": "Video", "Codec": "h264", "Width": 1920, "Height": 1080}, {"Type": "Audio", "Codec": "dts", "Channels": 6, "IsDefault": True}, ], }, "TranscodingInfo": { "IsVideoDirect": True, "IsAudioDirect": False, "VideoCodec": "h264", "AudioCodec": "aac", "AudioChannels": 2, "Bitrate": 8000000, "TranscodeReasons": ["AudioCodecNotSupported"], }, }]) machine.succeed(f"echo {repr(rich_session)} > {SESSIONS_FILE}") machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" text = annots[0]["text"] assert "alice: Inception (movie)" in text, f"Missing title in: {text}" assert "Transcode" in text, f"Missing method in: {text}" assert "H.264" in text, f"Missing video codec in: {text}" assert "DTS" in text and "AAC" in text, f"Missing audio codec in: {text}" assert "8.0 Mbps" in text, f"Missing bitrate in: {text}" assert "AudioCodecNotSupported" in text, f"Missing transcode reason in: {text}" assert "Infuse" in text and "iPhone" in text, f"Missing client in: {text}" assert "jellyfin" in annots[0].get("tags", []), f"Missing jellyfin tag: {annots[0]}" assert "timeEnd" not in annots[0], f"timeEnd should not be set yet: {annots[0]}" with subtest("Annotation closed when stream ends"): machine.succeed(f"echo '[]' > {SESSIONS_FILE}") machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" assert "timeEnd" in annots[0], f"timeEnd should be set: {annots[0]}" assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time" with subtest("Multiple concurrent streams each get their own annotation"): machine.succeed(f"echo '[]' > {ANNOTS_FILE}") machine.succeed( f"""echo '[ {{"Id":"sess-2","UserName":"bob","NowPlayingItem":{{"Name":"Breaking Bad","SeriesName":"Breaking Bad","ParentIndexNumber":1,"IndexNumber":1}}}}, {{"Id":"sess-3","UserName":"carol","NowPlayingItem":{{"Name":"Inception","Type":"Movie"}}}} ]' > {SESSIONS_FILE}""" ) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | python3 -c \"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 2, f"Expected 2 annotations, got: {annots}" texts = sorted(a["text"] for a in annots) assert any("Breaking Bad" in t and "S01E01" in t for t in texts), f"Missing Bob's annotation: {texts}" assert any("carol" in t and "Inception" in t for t in texts), f"Missing Carol's annotation: {texts}" with subtest("State survives service restart (no duplicate annotations)"): machine.succeed("systemctl stop annotations-svc || true") time.sleep(1) machine.succeed( f"systemd-run --unit=annotations-svc-2 " f"--setenv=JELLYFIN_URL=http://127.0.0.1:{JELLYFIN_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=CREDENTIALS_DIRECTORY={CREDS_DIR} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=3 " f"{PYTHON} {SCRIPT}" ) time.sleep(6) annots = read_annotations() assert len(annots) == 2, f"Restart should not create duplicates, got: {annots}" ''; }