{ lib, pkgs, ... }: let mockGrafana = ./mock-grafana-server.py; script = ../services/llama-cpp-annotations.py; python = pkgs.python3; mockLlamaCpp = pkgs.writeText "mock-llama-cpp-server.py" '' import http.server, json, sys, os PORT = int(sys.argv[1]) STATE_FILE = sys.argv[2] if not os.path.exists(STATE_FILE): with open(STATE_FILE, "w") as f: json.dump([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 0}}], f) class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): pass def _json(self, code, body): data = json.dumps(body).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(data) def do_GET(self): if self.path == "/slots": with open(STATE_FILE) as f: slots = json.load(f) self._json(200, slots) else: self.send_response(404) self.end_headers() def do_POST(self): if self.path == "/test/set-slots": length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else [] with open(STATE_FILE, "w") as f: json.dump(body, f) self._json(200, {"ok": True}) else: self.send_response(404) self.end_headers() http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever() ''; in pkgs.testers.runNixOSTest { name = "llama-cpp-annotations"; nodes.machine = { pkgs, ... }: { environment.systemPackages = [ pkgs.python3 pkgs.curl ]; }; testScript = '' import json import time GRAFANA_PORT = 13000 LLAMA_PORT = 16688 ANNOTS_FILE = "/tmp/annotations.json" SLOTS_FILE = "/tmp/llama-slots.json" STATE_FILE = "/tmp/llama-annot-state.json" PYTHON = "${python}/bin/python3" MOCK_GRAFANA = "${mockGrafana}" MOCK_LLAMA = "${mockLlamaCpp}" SCRIPT = "${script}" def read_annotations(): out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'") return json.loads(out.strip()) def set_slots(slots): machine.succeed( f"curl -sf -X POST http://127.0.0.1:{LLAMA_PORT}/test/set-slots " f"-H 'Content-Type: application/json' " f"-d '{json.dumps(slots)}'" ) start_all() machine.wait_for_unit("multi-user.target") with subtest("Start mock services"): machine.succeed(f"echo '[]' > {ANNOTS_FILE}") machine.succeed( f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" ) machine.succeed( f"echo '[{{\"id\": 0, \"is_processing\": false, \"next_token\": {{\"n_decoded\": 0}}}}]' > {SLOTS_FILE}" ) machine.succeed( f"systemd-run --unit=mock-llama {PYTHON} {MOCK_LLAMA} {LLAMA_PORT} {SLOTS_FILE}" ) machine.wait_until_succeeds( f"curl -sf http://127.0.0.1:{GRAFANA_PORT}/api/annotations -X POST " f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id", timeout=10, ) machine.wait_until_succeeds( f"curl -sf http://127.0.0.1:{LLAMA_PORT}/slots | grep -q is_processing", timeout=10, ) machine.succeed(f"echo '[]' > {ANNOTS_FILE}") with subtest("Start annotation service"): machine.succeed( f"systemd-run --unit=llama-annot " f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=2 " f"{PYTHON} {SCRIPT}" ) time.sleep(3) with subtest("No annotations when slots are idle"): annots = read_annotations() assert annots == [], f"Expected no annotations, got: {annots}" with subtest("Annotation created when slot starts processing"): set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}]) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | {PYTHON} -c " f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" assert "llama-cpp" in annots[0].get("tags", []), f"Missing tag: {annots[0]}" assert "slot 0" in annots[0]["text"], f"Missing slot info: {annots[0]['text']}" assert "timeEnd" not in annots[0], f"timeEnd should not be set: {annots[0]}" with subtest("Annotation closed when slot stops processing"): set_slots([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 42}}]) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | {PYTHON} -c " f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"", timeout=15, ) annots = read_annotations() assert len(annots) == 1, f"Expected 1, got: {annots}" assert "timeEnd" in annots[0], f"timeEnd missing: {annots[0]}" assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time" assert "42 tokens" in annots[0].get("text", ""), f"Token count missing: {annots[0]}" with subtest("State survives restart"): set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}]) machine.wait_until_succeeds( f"cat {ANNOTS_FILE} | {PYTHON} -c " f"\"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"", timeout=15, ) machine.succeed("systemctl stop llama-annot || true") time.sleep(1) machine.succeed( f"systemd-run --unit=llama-annot-2 " f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} " f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " f"--setenv=STATE_FILE={STATE_FILE} " f"--setenv=POLL_INTERVAL=2 " f"{PYTHON} {SCRIPT}" ) time.sleep(4) annots = read_annotations() assert len(annots) == 2, f"Restart should not duplicate, got: {annots}" ''; }