Poll /slots endpoint, create annotations when slots start processing, close with token count when complete. Includes NixOS VM test with mock llama-cpp and grafana servers. Dashboard annotation entry added.
180 lines
6.4 KiB
Nix
180 lines
6.4 KiB
Nix
{
|
|
lib,
|
|
pkgs,
|
|
...
|
|
}:
|
|
let
|
|
mockGrafana = ./mock-grafana-server.py;
|
|
script = ../services/llama-cpp-annotations.py;
|
|
python = pkgs.python3;
|
|
|
|
mockLlamaCpp = pkgs.writeText "mock-llama-cpp-server.py" ''
|
|
import http.server, json, sys, os
|
|
|
|
PORT = int(sys.argv[1])
|
|
STATE_FILE = sys.argv[2]
|
|
|
|
if not os.path.exists(STATE_FILE):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 0}}], f)
|
|
|
|
class Handler(http.server.BaseHTTPRequestHandler):
|
|
def log_message(self, fmt, *args):
|
|
pass
|
|
|
|
def _json(self, code, body):
|
|
data = json.dumps(body).encode()
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def do_GET(self):
|
|
if self.path == "/slots":
|
|
with open(STATE_FILE) as f:
|
|
slots = json.load(f)
|
|
self._json(200, slots)
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
def do_POST(self):
|
|
if self.path == "/test/set-slots":
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
body = json.loads(self.rfile.read(length)) if length else []
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(body, f)
|
|
self._json(200, {"ok": True})
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever()
|
|
'';
|
|
in
|
|
pkgs.testers.runNixOSTest {
|
|
name = "llama-cpp-annotations";
|
|
|
|
nodes.machine =
|
|
{ pkgs, ... }:
|
|
{
|
|
environment.systemPackages = [
|
|
pkgs.python3
|
|
pkgs.curl
|
|
];
|
|
};
|
|
|
|
testScript = ''
|
|
import json
|
|
import time
|
|
|
|
GRAFANA_PORT = 13000
|
|
LLAMA_PORT = 16688
|
|
ANNOTS_FILE = "/tmp/annotations.json"
|
|
SLOTS_FILE = "/tmp/llama-slots.json"
|
|
STATE_FILE = "/tmp/llama-annot-state.json"
|
|
PYTHON = "${python}/bin/python3"
|
|
MOCK_GRAFANA = "${mockGrafana}"
|
|
MOCK_LLAMA = "${mockLlamaCpp}"
|
|
SCRIPT = "${script}"
|
|
|
|
def read_annotations():
|
|
out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'")
|
|
return json.loads(out.strip())
|
|
|
|
def set_slots(slots):
|
|
machine.succeed(
|
|
f"curl -sf -X POST http://127.0.0.1:{LLAMA_PORT}/test/set-slots "
|
|
f"-H 'Content-Type: application/json' "
|
|
f"-d '{json.dumps(slots)}'"
|
|
)
|
|
|
|
start_all()
|
|
machine.wait_for_unit("multi-user.target")
|
|
|
|
with subtest("Start mock services"):
|
|
machine.succeed(f"echo '[]' > {ANNOTS_FILE}")
|
|
machine.succeed(
|
|
f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
|
|
)
|
|
machine.succeed(
|
|
f"echo '[{{\"id\": 0, \"is_processing\": false, \"next_token\": {{\"n_decoded\": 0}}}}]' > {SLOTS_FILE}"
|
|
)
|
|
machine.succeed(
|
|
f"systemd-run --unit=mock-llama {PYTHON} {MOCK_LLAMA} {LLAMA_PORT} {SLOTS_FILE}"
|
|
)
|
|
machine.wait_until_succeeds(
|
|
f"curl -sf http://127.0.0.1:{GRAFANA_PORT}/api/annotations -X POST "
|
|
f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id",
|
|
timeout=10,
|
|
)
|
|
machine.wait_until_succeeds(
|
|
f"curl -sf http://127.0.0.1:{LLAMA_PORT}/slots | grep -q is_processing",
|
|
timeout=10,
|
|
)
|
|
machine.succeed(f"echo '[]' > {ANNOTS_FILE}")
|
|
|
|
with subtest("Start annotation service"):
|
|
machine.succeed(
|
|
f"systemd-run --unit=llama-annot "
|
|
f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} "
|
|
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
|
f"--setenv=STATE_FILE={STATE_FILE} "
|
|
f"--setenv=POLL_INTERVAL=2 "
|
|
f"{PYTHON} {SCRIPT}"
|
|
)
|
|
time.sleep(3)
|
|
|
|
with subtest("No annotations when slots are idle"):
|
|
annots = read_annotations()
|
|
assert annots == [], f"Expected no annotations, got: {annots}"
|
|
|
|
with subtest("Annotation created when slot starts processing"):
|
|
set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}])
|
|
machine.wait_until_succeeds(
|
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"",
|
|
timeout=15,
|
|
)
|
|
annots = read_annotations()
|
|
assert len(annots) == 1, f"Expected 1 annotation, got: {annots}"
|
|
assert "llama-cpp" in annots[0].get("tags", []), f"Missing tag: {annots[0]}"
|
|
assert "slot 0" in annots[0]["text"], f"Missing slot info: {annots[0]['text']}"
|
|
assert "timeEnd" not in annots[0], f"timeEnd should not be set: {annots[0]}"
|
|
|
|
with subtest("Annotation closed when slot stops processing"):
|
|
set_slots([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 42}}])
|
|
machine.wait_until_succeeds(
|
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"",
|
|
timeout=15,
|
|
)
|
|
annots = read_annotations()
|
|
assert len(annots) == 1, f"Expected 1, got: {annots}"
|
|
assert "timeEnd" in annots[0], f"timeEnd missing: {annots[0]}"
|
|
assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time"
|
|
assert "42 tokens" in annots[0].get("text", ""), f"Token count missing: {annots[0]}"
|
|
|
|
with subtest("State survives restart"):
|
|
set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}])
|
|
machine.wait_until_succeeds(
|
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"",
|
|
timeout=15,
|
|
)
|
|
machine.succeed("systemctl stop llama-annot || true")
|
|
time.sleep(1)
|
|
machine.succeed(
|
|
f"systemd-run --unit=llama-annot-2 "
|
|
f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} "
|
|
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
|
f"--setenv=STATE_FILE={STATE_FILE} "
|
|
f"--setenv=POLL_INTERVAL=2 "
|
|
f"{PYTHON} {SCRIPT}"
|
|
)
|
|
time.sleep(4)
|
|
annots = read_annotations()
|
|
assert len(annots) == 2, f"Restart should not duplicate, got: {annots}"
|
|
'';
|
|
}
|