From 9baeaa5c23bd1e4fa3c1cff038e87a92716d666c Mon Sep 17 00:00:00 2001 From: Simon Gardling Date: Thu, 2 Apr 2026 17:43:49 -0400 Subject: [PATCH] llama-cpp: add grafana annotations for inference requests Poll /slots endpoint, create annotations when slots start processing, close with token count when complete. Includes NixOS VM test with mock llama-cpp and grafana servers. Dashboard annotation entry added. --- configuration.nix | 1 + services/llama-cpp-annotations.nix | 40 +++++++ services/llama-cpp-annotations.py | 127 ++++++++++++++++++++ services/monitoring.nix | 12 ++ tests/llama-cpp-annotations.nix | 179 +++++++++++++++++++++++++++++ tests/tests.nix | 3 + 6 files changed, 362 insertions(+) create mode 100644 services/llama-cpp-annotations.nix create mode 100644 services/llama-cpp-annotations.py create mode 100644 tests/llama-cpp-annotations.nix diff --git a/configuration.nix b/configuration.nix index 676c9ef..de8f0b3 100644 --- a/configuration.nix +++ b/configuration.nix @@ -48,6 +48,7 @@ ./services/soulseek.nix ./services/llama-cpp.nix + ./services/llama-cpp-annotations.nix ./services/ups.nix ./services/monitoring.nix diff --git a/services/llama-cpp-annotations.nix b/services/llama-cpp-annotations.nix new file mode 100644 index 0000000..94f5221 --- /dev/null +++ b/services/llama-cpp-annotations.nix @@ -0,0 +1,40 @@ +{ + config, + pkgs, + service_configs, + lib, + ... +}: +{ + systemd.services.llama-cpp-annotations = { + description = "LLM request annotation service for Grafana"; + after = [ + "network.target" + "grafana.service" + "llama-cpp.service" + ]; + wantedBy = [ "multi-user.target" ]; + serviceConfig = { + ExecStart = "${pkgs.python3}/bin/python3 ${./llama-cpp-annotations.py}"; + Restart = "always"; + RestartSec = "10s"; + DynamicUser = true; + StateDirectory = "llama-cpp-annotations"; + NoNewPrivileges = true; + ProtectSystem = "strict"; + ProtectHome = true; + PrivateTmp = true; + RestrictAddressFamilies = [ + "AF_INET" + "AF_INET6" + ]; + MemoryDenyWriteExecute = true; + }; + environment = { + LLAMA_CPP_URL = "http://127.0.0.1:${toString service_configs.ports.private.llama_cpp.port}"; + GRAFANA_URL = "http://127.0.0.1:${toString service_configs.ports.private.grafana.port}"; + STATE_FILE = "/var/lib/llama-cpp-annotations/state.json"; + POLL_INTERVAL = "5"; + }; + }; +} diff --git a/services/llama-cpp-annotations.py b/services/llama-cpp-annotations.py new file mode 100644 index 0000000..d8fac69 --- /dev/null +++ b/services/llama-cpp-annotations.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +import json +import os +import sys +import time +import urllib.request + +LLAMA_CPP_URL = os.environ.get("LLAMA_CPP_URL", "http://127.0.0.1:6688") +GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") +STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json") +POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5")) + + +def http_json(method, url, body=None): + data = json.dumps(body).encode() if body is not None else None + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + method=method, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read()) + + +def get_slots(): + try: + req = urllib.request.Request( + f"{LLAMA_CPP_URL}/slots", + headers={"Accept": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return json.loads(resp.read()) + except Exception as e: + print(f"Error fetching slots: {e}", file=sys.stderr) + return None + + +def load_state(): + try: + with open(STATE_FILE) as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return {} + + +def save_state(state): + os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) + tmp = STATE_FILE + ".tmp" + with open(tmp, "w") as f: + json.dump(state, f) + os.replace(tmp, STATE_FILE) + + +def grafana_post(text, start_ms): + try: + result = http_json( + "POST", + f"{GRAFANA_URL}/api/annotations", + {"time": start_ms, "text": text, "tags": ["llama-cpp"]}, + ) + return result.get("id") + except Exception as e: + print(f"Error posting annotation: {e}", file=sys.stderr) + return None + + +def grafana_close(grafana_id, end_ms, text=None): + try: + body = {"timeEnd": end_ms} + if text is not None: + body["text"] = text + http_json( + "PATCH", + f"{GRAFANA_URL}/api/annotations/{grafana_id}", + body, + ) + except Exception as e: + print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) + + +def main(): + state = load_state() + + while True: + now_ms = int(time.time() * 1000) + slots = get_slots() + + if slots is not None: + # Track which slots are currently processing + processing_ids = set() + for slot in slots: + slot_id = str(slot["id"]) + is_processing = slot.get("is_processing", False) + + if is_processing: + processing_ids.add(slot_id) + if slot_id not in state: + text = f"LLM request (slot {slot['id']})" + grafana_id = grafana_post(text, now_ms) + if grafana_id is not None: + state[slot_id] = { + "grafana_id": grafana_id, + "start_ms": now_ms, + } + save_state(state) + + # Close annotations for slots that stopped processing + for slot_id in [k for k in state if k not in processing_ids]: + info = state.pop(slot_id) + # Try to get token count from the slot data + n_decoded = None + for slot in slots: + if str(slot["id"]) == slot_id: + n_decoded = slot.get("next_token", {}).get("n_decoded") + break + text = f"LLM request (slot {slot_id})" + if n_decoded is not None and n_decoded > 0: + text += f" — {n_decoded} tokens" + grafana_close(info["grafana_id"], now_ms, text) + save_state(state) + + time.sleep(POLL_INTERVAL) + + +if __name__ == "__main__": + main() diff --git a/services/monitoring.nix b/services/monitoring.nix index f23f7c7..5c4c6ab 100644 --- a/services/monitoring.nix +++ b/services/monitoring.nix @@ -120,6 +120,18 @@ let type = "tags"; tags = [ "zfs-scrub" ]; } + { + name = "LLM Requests"; + datasource = { + type = "grafana"; + uid = "-- Grafana --"; + }; + enable = true; + iconColor = "purple"; + showIn = 0; + type = "tags"; + tags = [ "llama-cpp" ]; + } ]; panels = [ diff --git a/tests/llama-cpp-annotations.nix b/tests/llama-cpp-annotations.nix new file mode 100644 index 0000000..4d7eb2d --- /dev/null +++ b/tests/llama-cpp-annotations.nix @@ -0,0 +1,179 @@ +{ + lib, + pkgs, + ... +}: +let + mockGrafana = ./mock-grafana-server.py; + script = ../services/llama-cpp-annotations.py; + python = pkgs.python3; + + mockLlamaCpp = pkgs.writeText "mock-llama-cpp-server.py" '' + import http.server, json, sys, os + + PORT = int(sys.argv[1]) + STATE_FILE = sys.argv[2] + + if not os.path.exists(STATE_FILE): + with open(STATE_FILE, "w") as f: + json.dump([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 0}}], f) + + class Handler(http.server.BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + pass + + def _json(self, code, body): + data = json.dumps(body).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(data) + + def do_GET(self): + if self.path == "/slots": + with open(STATE_FILE) as f: + slots = json.load(f) + self._json(200, slots) + else: + self.send_response(404) + self.end_headers() + + def do_POST(self): + if self.path == "/test/set-slots": + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else [] + with open(STATE_FILE, "w") as f: + json.dump(body, f) + self._json(200, {"ok": True}) + else: + self.send_response(404) + self.end_headers() + + http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever() + ''; +in +pkgs.testers.runNixOSTest { + name = "llama-cpp-annotations"; + + nodes.machine = + { pkgs, ... }: + { + environment.systemPackages = [ + pkgs.python3 + pkgs.curl + ]; + }; + + testScript = '' + import json + import time + + GRAFANA_PORT = 13000 + LLAMA_PORT = 16688 + ANNOTS_FILE = "/tmp/annotations.json" + SLOTS_FILE = "/tmp/llama-slots.json" + STATE_FILE = "/tmp/llama-annot-state.json" + PYTHON = "${python}/bin/python3" + MOCK_GRAFANA = "${mockGrafana}" + MOCK_LLAMA = "${mockLlamaCpp}" + SCRIPT = "${script}" + + def read_annotations(): + out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'") + return json.loads(out.strip()) + + def set_slots(slots): + machine.succeed( + f"curl -sf -X POST http://127.0.0.1:{LLAMA_PORT}/test/set-slots " + f"-H 'Content-Type: application/json' " + f"-d '{json.dumps(slots)}'" + ) + + start_all() + machine.wait_for_unit("multi-user.target") + + with subtest("Start mock services"): + machine.succeed(f"echo '[]' > {ANNOTS_FILE}") + machine.succeed( + f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}" + ) + machine.succeed( + f"echo '[{{\"id\": 0, \"is_processing\": false, \"next_token\": {{\"n_decoded\": 0}}}}]' > {SLOTS_FILE}" + ) + machine.succeed( + f"systemd-run --unit=mock-llama {PYTHON} {MOCK_LLAMA} {LLAMA_PORT} {SLOTS_FILE}" + ) + machine.wait_until_succeeds( + f"curl -sf http://127.0.0.1:{GRAFANA_PORT}/api/annotations -X POST " + f"-H 'Content-Type: application/json' -d '{{\"text\":\"ping\",\"tags\":[]}}' | grep -q id", + timeout=10, + ) + machine.wait_until_succeeds( + f"curl -sf http://127.0.0.1:{LLAMA_PORT}/slots | grep -q is_processing", + timeout=10, + ) + machine.succeed(f"echo '[]' > {ANNOTS_FILE}") + + with subtest("Start annotation service"): + machine.succeed( + f"systemd-run --unit=llama-annot " + f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} " + f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " + f"--setenv=STATE_FILE={STATE_FILE} " + f"--setenv=POLL_INTERVAL=2 " + f"{PYTHON} {SCRIPT}" + ) + time.sleep(3) + + with subtest("No annotations when slots are idle"): + annots = read_annotations() + assert annots == [], f"Expected no annotations, got: {annots}" + + with subtest("Annotation created when slot starts processing"): + set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}]) + machine.wait_until_succeeds( + f"cat {ANNOTS_FILE} | {PYTHON} -c " + f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"", + timeout=15, + ) + annots = read_annotations() + assert len(annots) == 1, f"Expected 1 annotation, got: {annots}" + assert "llama-cpp" in annots[0].get("tags", []), f"Missing tag: {annots[0]}" + assert "slot 0" in annots[0]["text"], f"Missing slot info: {annots[0]['text']}" + assert "timeEnd" not in annots[0], f"timeEnd should not be set: {annots[0]}" + + with subtest("Annotation closed when slot stops processing"): + set_slots([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 42}}]) + machine.wait_until_succeeds( + f"cat {ANNOTS_FILE} | {PYTHON} -c " + f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"", + timeout=15, + ) + annots = read_annotations() + assert len(annots) == 1, f"Expected 1, got: {annots}" + assert "timeEnd" in annots[0], f"timeEnd missing: {annots[0]}" + assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time" + assert "42 tokens" in annots[0].get("text", ""), f"Token count missing: {annots[0]}" + + with subtest("State survives restart"): + set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}]) + machine.wait_until_succeeds( + f"cat {ANNOTS_FILE} | {PYTHON} -c " + f"\"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"", + timeout=15, + ) + machine.succeed("systemctl stop llama-annot || true") + time.sleep(1) + machine.succeed( + f"systemd-run --unit=llama-annot-2 " + f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} " + f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} " + f"--setenv=STATE_FILE={STATE_FILE} " + f"--setenv=POLL_INTERVAL=2 " + f"{PYTHON} {SCRIPT}" + ) + time.sleep(4) + annots = read_annotations() + assert len(annots) == 2, f"Restart should not duplicate, got: {annots}" + ''; +} diff --git a/tests/tests.nix b/tests/tests.nix index 9762b9c..21e5b08 100644 --- a/tests/tests.nix +++ b/tests/tests.nix @@ -28,6 +28,9 @@ in # zfs scrub annotations test zfsScrubAnnotationsTest = handleTest ./zfs-scrub-annotations.nix; + # llama-cpp annotation service test + llamaCppAnnotationsTest = handleTest ./llama-cpp-annotations.nix; + # ntfy alerts test ntfyAlertsTest = handleTest ./ntfy-alerts.nix;