llama-cpp: xmrig + grafana hooks
This commit is contained in:
@@ -1,15 +1,12 @@
|
|||||||
{
|
{
|
||||||
config,
|
|
||||||
pkgs,
|
pkgs,
|
||||||
service_configs,
|
service_configs,
|
||||||
lib,
|
|
||||||
...
|
...
|
||||||
}:
|
}:
|
||||||
{
|
{
|
||||||
systemd.services.llama-cpp-annotations = {
|
systemd.services.llama-cpp-annotations = {
|
||||||
description = "LLM request annotation service for Grafana";
|
description = "LLM request annotation service for Grafana";
|
||||||
after = [
|
after = [
|
||||||
"network.target"
|
|
||||||
"grafana.service"
|
"grafana.service"
|
||||||
"llama-cpp.service"
|
"llama-cpp.service"
|
||||||
];
|
];
|
||||||
@@ -31,10 +28,10 @@
|
|||||||
MemoryDenyWriteExecute = true;
|
MemoryDenyWriteExecute = true;
|
||||||
};
|
};
|
||||||
environment = {
|
environment = {
|
||||||
LLAMA_CPP_URL = "http://127.0.0.1:${toString service_configs.ports.private.llama_cpp.port}";
|
|
||||||
GRAFANA_URL = "http://127.0.0.1:${toString service_configs.ports.private.grafana.port}";
|
GRAFANA_URL = "http://127.0.0.1:${toString service_configs.ports.private.grafana.port}";
|
||||||
STATE_FILE = "/var/lib/llama-cpp-annotations/state.json";
|
STATE_FILE = "/var/lib/llama-cpp-annotations/state.json";
|
||||||
POLL_INTERVAL = "5";
|
POLL_INTERVAL = "5";
|
||||||
|
CPU_THRESHOLD = "50";
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,14 +1,42 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Grafana annotation service for llama-cpp inference requests.
|
||||||
|
|
||||||
|
Monitors llama-server CPU usage via /proc. Creates a Grafana annotation
|
||||||
|
when inference starts (CPU spikes), closes it when inference ends.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import glob
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
|
||||||
LLAMA_CPP_URL = os.environ.get("LLAMA_CPP_URL", "http://127.0.0.1:6688")
|
|
||||||
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
|
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
|
||||||
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json")
|
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json")
|
||||||
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5"))
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5"))
|
||||||
|
CPU_THRESHOLD = float(os.environ.get("CPU_THRESHOLD", "50"))
|
||||||
|
|
||||||
|
|
||||||
|
def find_llama_pid():
|
||||||
|
for path in glob.glob("/proc/[0-9]*/comm"):
|
||||||
|
try:
|
||||||
|
with open(path) as f:
|
||||||
|
if f.read().strip() == "llama-server":
|
||||||
|
return int(path.split("/")[2])
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cpu_times(pid):
|
||||||
|
try:
|
||||||
|
with open(f"/proc/{pid}/stat") as f:
|
||||||
|
fields = f.read().split(")")[-1].split()
|
||||||
|
return int(fields[11]) + int(fields[12])
|
||||||
|
except (OSError, IndexError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def http_json(method, url, body=None):
|
def http_json(method, url, body=None):
|
||||||
@@ -23,19 +51,6 @@ def http_json(method, url, body=None):
|
|||||||
return json.loads(resp.read())
|
return json.loads(resp.read())
|
||||||
|
|
||||||
|
|
||||||
def get_slots():
|
|
||||||
try:
|
|
||||||
req = urllib.request.Request(
|
|
||||||
f"{LLAMA_CPP_URL}/slots",
|
|
||||||
headers={"Accept": "application/json"},
|
|
||||||
)
|
|
||||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
||||||
return json.loads(resp.read())
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error fetching slots: {e}", file=sys.stderr)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def load_state():
|
def load_state():
|
||||||
try:
|
try:
|
||||||
with open(STATE_FILE) as f:
|
with open(STATE_FILE) as f:
|
||||||
@@ -81,45 +96,58 @@ def grafana_close(grafana_id, end_ms, text=None):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
state = load_state()
|
state = load_state()
|
||||||
|
prev_ticks = None
|
||||||
|
prev_time = None
|
||||||
|
hz = os.sysconf("SC_CLK_TCK")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
now_ms = int(time.time() * 1000)
|
now_ms = int(time.time() * 1000)
|
||||||
slots = get_slots()
|
pid = find_llama_pid()
|
||||||
|
|
||||||
if slots is not None:
|
if pid is None:
|
||||||
# Track which slots are currently processing
|
prev_ticks = None
|
||||||
processing_ids = set()
|
prev_time = None
|
||||||
for slot in slots:
|
time.sleep(POLL_INTERVAL)
|
||||||
slot_id = str(slot["id"])
|
continue
|
||||||
is_processing = slot.get("is_processing", False)
|
|
||||||
|
|
||||||
if is_processing:
|
ticks = get_cpu_times(pid)
|
||||||
processing_ids.add(slot_id)
|
now = time.monotonic()
|
||||||
if slot_id not in state:
|
|
||||||
text = f"LLM request (slot {slot['id']})"
|
|
||||||
grafana_id = grafana_post(text, now_ms)
|
|
||||||
if grafana_id is not None:
|
|
||||||
state[slot_id] = {
|
|
||||||
"grafana_id": grafana_id,
|
|
||||||
"start_ms": now_ms,
|
|
||||||
}
|
|
||||||
save_state(state)
|
|
||||||
|
|
||||||
# Close annotations for slots that stopped processing
|
if ticks is None or prev_ticks is None or prev_time is None:
|
||||||
for slot_id in [k for k in state if k not in processing_ids]:
|
prev_ticks = ticks
|
||||||
info = state.pop(slot_id)
|
prev_time = now
|
||||||
# Try to get token count from the slot data
|
time.sleep(POLL_INTERVAL)
|
||||||
n_decoded = None
|
continue
|
||||||
for slot in slots:
|
|
||||||
if str(slot["id"]) == slot_id:
|
dt = now - prev_time
|
||||||
n_decoded = slot.get("next_token", {}).get("n_decoded")
|
if dt <= 0:
|
||||||
break
|
prev_ticks = ticks
|
||||||
text = f"LLM request (slot {slot_id})"
|
prev_time = now
|
||||||
if n_decoded is not None and n_decoded > 0:
|
time.sleep(POLL_INTERVAL)
|
||||||
text += f" — {n_decoded} tokens"
|
continue
|
||||||
grafana_close(info["grafana_id"], now_ms, text)
|
|
||||||
|
cpu_pct = ((ticks - prev_ticks) / hz) / dt * 100
|
||||||
|
prev_ticks = ticks
|
||||||
|
prev_time = now
|
||||||
|
|
||||||
|
busy = cpu_pct > CPU_THRESHOLD
|
||||||
|
|
||||||
|
if busy and "active" not in state:
|
||||||
|
grafana_id = grafana_post("LLM request", now_ms)
|
||||||
|
if grafana_id is not None:
|
||||||
|
state["active"] = {
|
||||||
|
"grafana_id": grafana_id,
|
||||||
|
"start_ms": now_ms,
|
||||||
|
}
|
||||||
save_state(state)
|
save_state(state)
|
||||||
|
|
||||||
|
elif not busy and "active" in state:
|
||||||
|
info = state.pop("active")
|
||||||
|
duration_s = (now_ms - info["start_ms"]) / 1000
|
||||||
|
text = f"LLM request ({duration_s:.1f}s)"
|
||||||
|
grafana_close(info["grafana_id"], now_ms, text)
|
||||||
|
save_state(state)
|
||||||
|
|
||||||
time.sleep(POLL_INTERVAL)
|
time.sleep(POLL_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,11 @@
|
|||||||
{
|
{
|
||||||
pkgs,
|
pkgs,
|
||||||
service_configs,
|
|
||||||
...
|
...
|
||||||
}:
|
}:
|
||||||
{
|
{
|
||||||
systemd.services.llama-cpp-xmrig-pause = {
|
systemd.services.llama-cpp-xmrig-pause = {
|
||||||
description = "Pause xmrig while llama-cpp is processing requests";
|
description = "Pause xmrig while llama-cpp is processing requests";
|
||||||
after = [
|
after = [
|
||||||
"network.target"
|
|
||||||
"llama-cpp.service"
|
"llama-cpp.service"
|
||||||
"xmrig.service"
|
"xmrig.service"
|
||||||
];
|
];
|
||||||
@@ -16,20 +14,20 @@
|
|||||||
ExecStart = "${pkgs.python3}/bin/python3 ${./llama-cpp-xmrig-pause.py}";
|
ExecStart = "${pkgs.python3}/bin/python3 ${./llama-cpp-xmrig-pause.py}";
|
||||||
Restart = "always";
|
Restart = "always";
|
||||||
RestartSec = "10s";
|
RestartSec = "10s";
|
||||||
|
# Needs /proc access (default) and AF_UNIX for systemctl
|
||||||
NoNewPrivileges = true;
|
NoNewPrivileges = true;
|
||||||
ProtectHome = true;
|
ProtectHome = true;
|
||||||
ProtectSystem = "strict";
|
ProtectSystem = "strict";
|
||||||
PrivateTmp = true;
|
PrivateTmp = true;
|
||||||
RestrictAddressFamilies = [
|
RestrictAddressFamilies = [
|
||||||
"AF_INET"
|
"AF_UNIX" # systemctl talks to systemd over D-Bus unix socket
|
||||||
"AF_INET6"
|
|
||||||
];
|
];
|
||||||
MemoryDenyWriteExecute = true;
|
MemoryDenyWriteExecute = true;
|
||||||
};
|
};
|
||||||
environment = {
|
environment = {
|
||||||
LLAMA_CPP_URL = "http://127.0.0.1:${toString service_configs.ports.private.llama_cpp.port}";
|
|
||||||
POLL_INTERVAL = "3";
|
POLL_INTERVAL = "3";
|
||||||
GRACE_PERIOD = "10";
|
GRACE_PERIOD = "10";
|
||||||
|
CPU_THRESHOLD = "50";
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,42 +2,51 @@
|
|||||||
"""
|
"""
|
||||||
Pause xmrig while llama-cpp is processing inference requests.
|
Pause xmrig while llama-cpp is processing inference requests.
|
||||||
|
|
||||||
Polls llama-cpp /slots endpoint. When any slot is busy, stops xmrig.
|
Checks if the llama-server process is actively using CPU by reading
|
||||||
When all slots are idle for GRACE_PERIOD seconds, restarts xmrig.
|
/proc/<pid>/stat. When CPU usage exceeds the threshold, stops xmrig.
|
||||||
If llama-cpp is unreachable, does nothing (leaves xmrig in its current state).
|
When CPU drops below threshold for GRACE_PERIOD seconds, restarts xmrig.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import glob
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import urllib.request
|
|
||||||
|
|
||||||
LLAMA_CPP_URL = os.environ["LLAMA_CPP_URL"].rstrip("/")
|
|
||||||
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "3"))
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "3"))
|
||||||
GRACE_PERIOD = float(os.environ.get("GRACE_PERIOD", "10"))
|
GRACE_PERIOD = float(os.environ.get("GRACE_PERIOD", "10"))
|
||||||
|
# CPU percentage (per-core) above which llama-server is considered busy.
|
||||||
|
# Idle llama-server uses ~0% CPU; active inference saturates multiple cores.
|
||||||
|
CPU_THRESHOLD = float(os.environ.get("CPU_THRESHOLD", "50"))
|
||||||
|
|
||||||
|
|
||||||
def log(msg):
|
def log(msg):
|
||||||
print(f"[llama-cpp-xmrig-pause] {msg}", file=sys.stderr, flush=True)
|
print(f"[llama-cpp-xmrig-pause] {msg}", file=sys.stderr, flush=True)
|
||||||
|
|
||||||
|
|
||||||
def get_slots():
|
def find_llama_pid():
|
||||||
"""Fetch /slots from llama-cpp. Returns list of slot dicts, or None on error."""
|
"""Find the PID of the llama-server process."""
|
||||||
req = urllib.request.Request(f"{LLAMA_CPP_URL}/slots")
|
for path in glob.glob("/proc/[0-9]*/comm"):
|
||||||
|
try:
|
||||||
|
with open(path) as f:
|
||||||
|
if f.read().strip() == "llama-server":
|
||||||
|
return int(path.split("/")[2])
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_cpu_times(pid):
|
||||||
|
"""Read utime + stime from /proc/<pid>/stat. Returns total ticks or None."""
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
with open(f"/proc/{pid}/stat") as f:
|
||||||
return json.loads(resp.read())
|
fields = f.read().split(")")[-1].split()
|
||||||
except (urllib.error.URLError, OSError, json.JSONDecodeError, ValueError) as exc:
|
# fields[11] = utime, fields[12] = stime (0-indexed after ')')
|
||||||
log(f"Cannot reach llama-cpp: {exc}")
|
return int(fields[11]) + int(fields[12])
|
||||||
|
except (OSError, IndexError, ValueError):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def any_slot_busy(slots):
|
|
||||||
return any(s.get("is_processing", False) for s in slots)
|
|
||||||
|
|
||||||
|
|
||||||
def systemctl(action, unit):
|
def systemctl(action, unit):
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
["systemctl", action, unit],
|
["systemctl", action, unit],
|
||||||
@@ -51,35 +60,58 @@ def systemctl(action, unit):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
xmrig_paused = False
|
xmrig_paused = False
|
||||||
idle_since = None # monotonic timestamp when slots first went idle
|
idle_since = None
|
||||||
|
prev_ticks = None
|
||||||
|
prev_time = None
|
||||||
|
hz = os.sysconf("SC_CLK_TCK")
|
||||||
|
|
||||||
log(f"Starting: url={LLAMA_CPP_URL} poll={POLL_INTERVAL}s grace={GRACE_PERIOD}s")
|
log(f"Starting: poll={POLL_INTERVAL}s grace={GRACE_PERIOD}s threshold={CPU_THRESHOLD}%")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
slots = get_slots()
|
pid = find_llama_pid()
|
||||||
|
if pid is None:
|
||||||
if slots is None:
|
# llama-server not running
|
||||||
# llama-cpp unreachable — leave xmrig alone, reset idle timer
|
|
||||||
idle_since = None
|
idle_since = None
|
||||||
|
prev_ticks = None
|
||||||
|
prev_time = None
|
||||||
time.sleep(POLL_INTERVAL)
|
time.sleep(POLL_INTERVAL)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
busy = any_slot_busy(slots)
|
ticks = get_cpu_times(pid)
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
if ticks is None or prev_ticks is None or prev_time is None:
|
||||||
|
prev_ticks = ticks
|
||||||
|
prev_time = now
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
continue
|
||||||
|
|
||||||
|
dt = now - prev_time
|
||||||
|
if dt <= 0:
|
||||||
|
prev_ticks = ticks
|
||||||
|
prev_time = now
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# CPU% = (delta_ticks / hz) / delta_seconds * 100
|
||||||
|
cpu_pct = ((ticks - prev_ticks) / hz) / dt * 100
|
||||||
|
prev_ticks = ticks
|
||||||
|
prev_time = now
|
||||||
|
|
||||||
|
busy = cpu_pct > CPU_THRESHOLD
|
||||||
|
|
||||||
if busy:
|
if busy:
|
||||||
idle_since = None
|
idle_since = None
|
||||||
if not xmrig_paused:
|
if not xmrig_paused:
|
||||||
log("Slot busy — stopping xmrig")
|
log(f"llama-server busy ({cpu_pct:.0f}% CPU) — stopping xmrig")
|
||||||
if systemctl("stop", "xmrig"):
|
if systemctl("stop", "xmrig"):
|
||||||
xmrig_paused = True
|
xmrig_paused = True
|
||||||
else:
|
else:
|
||||||
# All slots idle
|
|
||||||
if xmrig_paused:
|
if xmrig_paused:
|
||||||
now = time.monotonic()
|
|
||||||
if idle_since is None:
|
if idle_since is None:
|
||||||
idle_since = now
|
idle_since = now
|
||||||
elif now - idle_since >= GRACE_PERIOD:
|
elif now - idle_since >= GRACE_PERIOD:
|
||||||
log("Slots idle past grace period — starting xmrig")
|
log(f"llama-server idle ({cpu_pct:.0f}% CPU) past grace period — starting xmrig")
|
||||||
if systemctl("start", "xmrig"):
|
if systemctl("start", "xmrig"):
|
||||||
xmrig_paused = False
|
xmrig_paused = False
|
||||||
idle_since = None
|
idle_since = None
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
{
|
{
|
||||||
lib,
|
|
||||||
pkgs,
|
pkgs,
|
||||||
...
|
...
|
||||||
}:
|
}:
|
||||||
@@ -8,49 +7,7 @@ let
|
|||||||
script = ../services/llama-cpp-annotations.py;
|
script = ../services/llama-cpp-annotations.py;
|
||||||
python = pkgs.python3;
|
python = pkgs.python3;
|
||||||
|
|
||||||
mockLlamaCpp = pkgs.writeText "mock-llama-cpp-server.py" ''
|
mockLlamaProcess = ./mock-llama-server-proc.py;
|
||||||
import http.server, json, sys, os
|
|
||||||
|
|
||||||
PORT = int(sys.argv[1])
|
|
||||||
STATE_FILE = sys.argv[2]
|
|
||||||
|
|
||||||
if not os.path.exists(STATE_FILE):
|
|
||||||
with open(STATE_FILE, "w") as f:
|
|
||||||
json.dump([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 0}}], f)
|
|
||||||
|
|
||||||
class Handler(http.server.BaseHTTPRequestHandler):
|
|
||||||
def log_message(self, fmt, *args):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _json(self, code, body):
|
|
||||||
data = json.dumps(body).encode()
|
|
||||||
self.send_response(code)
|
|
||||||
self.send_header("Content-Type", "application/json")
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(data)
|
|
||||||
|
|
||||||
def do_GET(self):
|
|
||||||
if self.path == "/slots":
|
|
||||||
with open(STATE_FILE) as f:
|
|
||||||
slots = json.load(f)
|
|
||||||
self._json(200, slots)
|
|
||||||
else:
|
|
||||||
self.send_response(404)
|
|
||||||
self.end_headers()
|
|
||||||
|
|
||||||
def do_POST(self):
|
|
||||||
if self.path == "/test/set-slots":
|
|
||||||
length = int(self.headers.get("Content-Length", 0))
|
|
||||||
body = json.loads(self.rfile.read(length)) if length else []
|
|
||||||
with open(STATE_FILE, "w") as f:
|
|
||||||
json.dump(body, f)
|
|
||||||
self._json(200, {"ok": True})
|
|
||||||
else:
|
|
||||||
self.send_response(404)
|
|
||||||
self.end_headers()
|
|
||||||
|
|
||||||
http.server.HTTPServer(("127.0.0.1", PORT), Handler).serve_forever()
|
|
||||||
'';
|
|
||||||
in
|
in
|
||||||
pkgs.testers.runNixOSTest {
|
pkgs.testers.runNixOSTest {
|
||||||
name = "llama-cpp-annotations";
|
name = "llama-cpp-annotations";
|
||||||
@@ -61,6 +18,7 @@ pkgs.testers.runNixOSTest {
|
|||||||
environment.systemPackages = [
|
environment.systemPackages = [
|
||||||
pkgs.python3
|
pkgs.python3
|
||||||
pkgs.curl
|
pkgs.curl
|
||||||
|
pkgs.procps
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -69,25 +27,23 @@ pkgs.testers.runNixOSTest {
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
GRAFANA_PORT = 13000
|
GRAFANA_PORT = 13000
|
||||||
LLAMA_PORT = 16688
|
|
||||||
ANNOTS_FILE = "/tmp/annotations.json"
|
ANNOTS_FILE = "/tmp/annotations.json"
|
||||||
SLOTS_FILE = "/tmp/llama-slots.json"
|
LLAMA_STATE = "/tmp/llama-state.txt"
|
||||||
STATE_FILE = "/tmp/llama-annot-state.json"
|
STATE_FILE = "/tmp/llama-annot-state.json"
|
||||||
PYTHON = "${python}/bin/python3"
|
PYTHON = "${python}/bin/python3"
|
||||||
MOCK_GRAFANA = "${mockGrafana}"
|
MOCK_GRAFANA = "${mockGrafana}"
|
||||||
MOCK_LLAMA = "${mockLlamaCpp}"
|
MOCK_LLAMA = "${mockLlamaProcess}"
|
||||||
SCRIPT = "${script}"
|
SCRIPT = "${script}"
|
||||||
|
|
||||||
def read_annotations():
|
def read_annotations():
|
||||||
out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'")
|
out = machine.succeed(f"cat {ANNOTS_FILE} 2>/dev/null || echo '[]'")
|
||||||
return json.loads(out.strip())
|
return json.loads(out.strip())
|
||||||
|
|
||||||
def set_slots(slots):
|
def set_busy():
|
||||||
machine.succeed(
|
machine.succeed(f"echo busy > {LLAMA_STATE}")
|
||||||
f"curl -sf -X POST http://127.0.0.1:{LLAMA_PORT}/test/set-slots "
|
|
||||||
f"-H 'Content-Type: application/json' "
|
def set_idle():
|
||||||
f"-d '{json.dumps(slots)}'"
|
machine.succeed(f"echo idle > {LLAMA_STATE}")
|
||||||
)
|
|
||||||
|
|
||||||
start_all()
|
start_all()
|
||||||
machine.wait_for_unit("multi-user.target")
|
machine.wait_for_unit("multi-user.target")
|
||||||
@@ -98,10 +54,7 @@ pkgs.testers.runNixOSTest {
|
|||||||
f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
|
f"systemd-run --unit=mock-grafana {PYTHON} {MOCK_GRAFANA} {GRAFANA_PORT} {ANNOTS_FILE}"
|
||||||
)
|
)
|
||||||
machine.succeed(
|
machine.succeed(
|
||||||
f"echo '[{{\"id\": 0, \"is_processing\": false, \"next_token\": {{\"n_decoded\": 0}}}}]' > {SLOTS_FILE}"
|
f"systemd-run --unit=mock-llama {PYTHON} {MOCK_LLAMA} {LLAMA_STATE}"
|
||||||
)
|
|
||||||
machine.succeed(
|
|
||||||
f"systemd-run --unit=mock-llama {PYTHON} {MOCK_LLAMA} {LLAMA_PORT} {SLOTS_FILE}"
|
|
||||||
)
|
)
|
||||||
machine.wait_until_succeeds(
|
machine.wait_until_succeeds(
|
||||||
f"curl -sf http://127.0.0.1:{GRAFANA_PORT}/api/annotations -X POST "
|
f"curl -sf http://127.0.0.1:{GRAFANA_PORT}/api/annotations -X POST "
|
||||||
@@ -109,7 +62,7 @@ pkgs.testers.runNixOSTest {
|
|||||||
timeout=10,
|
timeout=10,
|
||||||
)
|
)
|
||||||
machine.wait_until_succeeds(
|
machine.wait_until_succeeds(
|
||||||
f"curl -sf http://127.0.0.1:{LLAMA_PORT}/slots | grep -q is_processing",
|
"pgrep -x llama-server",
|
||||||
timeout=10,
|
timeout=10,
|
||||||
)
|
)
|
||||||
machine.succeed(f"echo '[]' > {ANNOTS_FILE}")
|
machine.succeed(f"echo '[]' > {ANNOTS_FILE}")
|
||||||
@@ -117,62 +70,62 @@ pkgs.testers.runNixOSTest {
|
|||||||
with subtest("Start annotation service"):
|
with subtest("Start annotation service"):
|
||||||
machine.succeed(
|
machine.succeed(
|
||||||
f"systemd-run --unit=llama-annot "
|
f"systemd-run --unit=llama-annot "
|
||||||
f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} "
|
|
||||||
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
||||||
f"--setenv=STATE_FILE={STATE_FILE} "
|
f"--setenv=STATE_FILE={STATE_FILE} "
|
||||||
f"--setenv=POLL_INTERVAL=2 "
|
f"--setenv=POLL_INTERVAL=2 "
|
||||||
|
f"--setenv=CPU_THRESHOLD=10 "
|
||||||
f"{PYTHON} {SCRIPT}"
|
f"{PYTHON} {SCRIPT}"
|
||||||
)
|
)
|
||||||
time.sleep(3)
|
time.sleep(5)
|
||||||
|
|
||||||
with subtest("No annotations when slots are idle"):
|
with subtest("No annotations when idle"):
|
||||||
annots = read_annotations()
|
annots = read_annotations()
|
||||||
assert annots == [], f"Expected no annotations, got: {annots}"
|
assert annots == [], f"Expected no annotations, got: {annots}"
|
||||||
|
|
||||||
with subtest("Annotation created when slot starts processing"):
|
with subtest("Annotation created when llama-server becomes busy"):
|
||||||
set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}])
|
set_busy()
|
||||||
machine.wait_until_succeeds(
|
machine.wait_until_succeeds(
|
||||||
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
||||||
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"",
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a else 1)\"",
|
||||||
timeout=15,
|
timeout=20,
|
||||||
)
|
)
|
||||||
annots = read_annotations()
|
annots = read_annotations()
|
||||||
assert len(annots) == 1, f"Expected 1 annotation, got: {annots}"
|
assert len(annots) == 1, f"Expected 1 annotation, got: {annots}"
|
||||||
assert "llama-cpp" in annots[0].get("tags", []), f"Missing tag: {annots[0]}"
|
assert "llama-cpp" in annots[0].get("tags", []), f"Missing tag: {annots[0]}"
|
||||||
assert "slot 0" in annots[0]["text"], f"Missing slot info: {annots[0]['text']}"
|
assert "LLM request" in annots[0]["text"], f"Missing text: {annots[0]['text']}"
|
||||||
assert "timeEnd" not in annots[0], f"timeEnd should not be set: {annots[0]}"
|
assert "timeEnd" not in annots[0], f"timeEnd should not be set: {annots[0]}"
|
||||||
|
|
||||||
with subtest("Annotation closed when slot stops processing"):
|
with subtest("Annotation closed when llama-server becomes idle"):
|
||||||
set_slots([{"id": 0, "is_processing": False, "next_token": {"n_decoded": 42}}])
|
set_idle()
|
||||||
machine.wait_until_succeeds(
|
machine.wait_until_succeeds(
|
||||||
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
||||||
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"",
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if a and 'timeEnd' in a[0] else 1)\"",
|
||||||
timeout=15,
|
timeout=20,
|
||||||
)
|
)
|
||||||
annots = read_annotations()
|
annots = read_annotations()
|
||||||
assert len(annots) == 1, f"Expected 1, got: {annots}"
|
assert len(annots) == 1, f"Expected 1, got: {annots}"
|
||||||
assert "timeEnd" in annots[0], f"timeEnd missing: {annots[0]}"
|
assert "timeEnd" in annots[0], f"timeEnd missing: {annots[0]}"
|
||||||
assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time"
|
assert annots[0]["timeEnd"] > annots[0]["time"], "timeEnd should be after time"
|
||||||
assert "42 tokens" in annots[0].get("text", ""), f"Token count missing: {annots[0]}"
|
assert "s)" in annots[0].get("text", ""), f"Duration missing: {annots[0]}"
|
||||||
|
|
||||||
with subtest("State survives restart"):
|
with subtest("State survives restart"):
|
||||||
set_slots([{"id": 0, "is_processing": True, "next_token": {"n_decoded": 0}}])
|
set_busy()
|
||||||
machine.wait_until_succeeds(
|
machine.wait_until_succeeds(
|
||||||
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
f"cat {ANNOTS_FILE} | {PYTHON} -c "
|
||||||
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"",
|
f"\"import sys,json; a=json.load(sys.stdin); exit(0 if len(a)==2 else 1)\"",
|
||||||
timeout=15,
|
timeout=20,
|
||||||
)
|
)
|
||||||
machine.succeed("systemctl stop llama-annot || true")
|
machine.succeed("systemctl stop llama-annot || true")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
machine.succeed(
|
machine.succeed(
|
||||||
f"systemd-run --unit=llama-annot-2 "
|
f"systemd-run --unit=llama-annot-2 "
|
||||||
f"--setenv=LLAMA_CPP_URL=http://127.0.0.1:{LLAMA_PORT} "
|
|
||||||
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
f"--setenv=GRAFANA_URL=http://127.0.0.1:{GRAFANA_PORT} "
|
||||||
f"--setenv=STATE_FILE={STATE_FILE} "
|
f"--setenv=STATE_FILE={STATE_FILE} "
|
||||||
f"--setenv=POLL_INTERVAL=2 "
|
f"--setenv=POLL_INTERVAL=2 "
|
||||||
|
f"--setenv=CPU_THRESHOLD=10 "
|
||||||
f"{PYTHON} {SCRIPT}"
|
f"{PYTHON} {SCRIPT}"
|
||||||
)
|
)
|
||||||
time.sleep(4)
|
time.sleep(6)
|
||||||
annots = read_annotations()
|
annots = read_annotations()
|
||||||
assert len(annots) == 2, f"Restart should not duplicate, got: {annots}"
|
assert len(annots) == 2, f"Restart should not duplicate, got: {annots}"
|
||||||
'';
|
'';
|
||||||
|
|||||||
162
tests/llama-cpp-xmrig-pause.nix
Normal file
162
tests/llama-cpp-xmrig-pause.nix
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
{
|
||||||
|
pkgs,
|
||||||
|
...
|
||||||
|
}:
|
||||||
|
let
|
||||||
|
script = ../services/llama-cpp-xmrig-pause.py;
|
||||||
|
python = pkgs.python3;
|
||||||
|
|
||||||
|
# SmolLM-135M Q2_K: 85MB, modern GGUFv3, generates ~30 tok/s on one CPU
|
||||||
|
# thread — slow enough that a 200-token request keeps the process busy for
|
||||||
|
# several seconds, fast enough that tests don't crawl.
|
||||||
|
tinyModel = pkgs.fetchurl {
|
||||||
|
url = "https://huggingface.co/QuantFactory/SmolLM-135M-GGUF/resolve/main/SmolLM-135M.Q2_K.gguf";
|
||||||
|
hash = "sha256-DX46drPNJILNba21xfY2tyE0/yPWgOhz43gJdeSYKh4=";
|
||||||
|
};
|
||||||
|
in
|
||||||
|
pkgs.testers.runNixOSTest {
|
||||||
|
name = "llama-cpp-xmrig-pause";
|
||||||
|
|
||||||
|
nodes.machine =
|
||||||
|
{ pkgs, ... }:
|
||||||
|
{
|
||||||
|
environment.systemPackages = [
|
||||||
|
pkgs.python3
|
||||||
|
pkgs.procps
|
||||||
|
pkgs.curl
|
||||||
|
pkgs.llama-cpp
|
||||||
|
];
|
||||||
|
|
||||||
|
# Mock xmrig as a simple sleep process that can be stopped/started.
|
||||||
|
systemd.services.xmrig = {
|
||||||
|
description = "Mock xmrig miner";
|
||||||
|
serviceConfig = {
|
||||||
|
ExecStart = "${pkgs.coreutils}/bin/sleep infinity";
|
||||||
|
Type = "simple";
|
||||||
|
};
|
||||||
|
wantedBy = [ "multi-user.target" ];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
testScript = ''
|
||||||
|
import time
|
||||||
|
|
||||||
|
PORT = 18088
|
||||||
|
MODEL = "${tinyModel}"
|
||||||
|
PYTHON = "${python}/bin/python3"
|
||||||
|
SCRIPT = "${script}"
|
||||||
|
|
||||||
|
# Tuned for test speed while remaining realistic.
|
||||||
|
# POLL_INTERVAL=1 keeps detection latency low.
|
||||||
|
# GRACE_PERIOD=5 is long enough to verify "stays stopped" but short enough
|
||||||
|
# that the full test completes in ~2 minutes.
|
||||||
|
# CPU_THRESHOLD=10 is low because the VM has limited cores and the model
|
||||||
|
# is small — but any active inference still saturates a core.
|
||||||
|
POLL_INTERVAL = "1"
|
||||||
|
GRACE_PERIOD = "5"
|
||||||
|
CPU_THRESHOLD = "10"
|
||||||
|
|
||||||
|
infer_counter = 0
|
||||||
|
|
||||||
|
def send_completion(n_predict=200):
|
||||||
|
"""Fire a completion request in the background via a transient systemd unit."""
|
||||||
|
global infer_counter
|
||||||
|
infer_counter += 1
|
||||||
|
name = f"infer-{infer_counter}"
|
||||||
|
machine.succeed(
|
||||||
|
f"systemd-run --unit={name} --property=Type=exec "
|
||||||
|
f"curl -sf -X POST http://127.0.0.1:{PORT}/completion "
|
||||||
|
f"-H 'Content-Type: application/json' "
|
||||||
|
f"-d '{{\"prompt\": \"Once upon a time in a land far away there lived\", \"n_predict\": {n_predict}}}'"
|
||||||
|
)
|
||||||
|
return name
|
||||||
|
|
||||||
|
def wait_inference_done(unit_name, timeout=60):
|
||||||
|
"""Wait for a background inference request to finish."""
|
||||||
|
machine.wait_until_fails(
|
||||||
|
f"systemctl is-active {unit_name}",
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
start_all()
|
||||||
|
machine.wait_for_unit("multi-user.target")
|
||||||
|
machine.wait_for_unit("xmrig.service")
|
||||||
|
|
||||||
|
with subtest("Start llama-server"):
|
||||||
|
machine.succeed(
|
||||||
|
f"systemd-run --unit=llama-server "
|
||||||
|
# Single inference thread to maximise per-core CPU%, which is
|
||||||
|
# what the monitor measures. Keeps token generation slow enough
|
||||||
|
# (~30 tok/s) that a 200-token request sustains load for seconds.
|
||||||
|
f"llama-server --model {MODEL} --port {PORT} --ctx-size 512 -t 1 -np 1"
|
||||||
|
)
|
||||||
|
machine.wait_until_succeeds(
|
||||||
|
f"curl -sf http://127.0.0.1:{PORT}/health",
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
machine.succeed("pgrep -x llama-server")
|
||||||
|
|
||||||
|
with subtest("Start pause monitor"):
|
||||||
|
machine.succeed(
|
||||||
|
f"systemd-run --unit=llama-xmrig-pause "
|
||||||
|
f"--setenv=POLL_INTERVAL={POLL_INTERVAL} "
|
||||||
|
f"--setenv=GRACE_PERIOD={GRACE_PERIOD} "
|
||||||
|
f"--setenv=CPU_THRESHOLD={CPU_THRESHOLD} "
|
||||||
|
f"{PYTHON} {SCRIPT}"
|
||||||
|
)
|
||||||
|
# The monitor needs two consecutive polls to compute a CPU delta.
|
||||||
|
# Wait for baseline to stabilise.
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
with subtest("xmrig stays running while llama-server is idle"):
|
||||||
|
machine.succeed("systemctl is-active xmrig")
|
||||||
|
|
||||||
|
with subtest("xmrig stopped during prompt processing"):
|
||||||
|
unit = send_completion(n_predict=200)
|
||||||
|
machine.wait_until_fails("systemctl is-active xmrig", timeout=20)
|
||||||
|
|
||||||
|
with subtest("xmrig remains stopped during grace period after inference ends"):
|
||||||
|
wait_inference_done(unit)
|
||||||
|
# Inference just finished. The monitor will need 1-2 polls to detect
|
||||||
|
# idle, then the grace period starts. Checking 2s after completion
|
||||||
|
# is well within the 5s grace window.
|
||||||
|
time.sleep(2)
|
||||||
|
machine.fail("systemctl is-active xmrig")
|
||||||
|
|
||||||
|
with subtest("xmrig resumes after grace period expires"):
|
||||||
|
# Already idle since previous subtest. Grace period (5s) plus
|
||||||
|
# detection delay (~2 polls) means xmrig should restart within ~8s.
|
||||||
|
machine.wait_until_succeeds("systemctl is-active xmrig", timeout=15)
|
||||||
|
|
||||||
|
with subtest("Sequential prompts do not cause xmrig flapping"):
|
||||||
|
# First prompt — stop xmrig
|
||||||
|
unit1 = send_completion(n_predict=200)
|
||||||
|
machine.wait_until_fails("systemctl is-active xmrig", timeout=20)
|
||||||
|
wait_inference_done(unit1)
|
||||||
|
|
||||||
|
# Brief idle gap — shorter than grace period
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Second prompt arrives before grace period expires, resetting it
|
||||||
|
unit2 = send_completion(n_predict=200)
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
# xmrig must still be stopped
|
||||||
|
machine.fail("systemctl is-active xmrig")
|
||||||
|
|
||||||
|
wait_inference_done(unit2)
|
||||||
|
machine.wait_until_succeeds("systemctl is-active xmrig", timeout=15)
|
||||||
|
|
||||||
|
with subtest("xmrig stays stopped during sustained inference"):
|
||||||
|
unit = send_completion(n_predict=500)
|
||||||
|
machine.wait_until_fails("systemctl is-active xmrig", timeout=20)
|
||||||
|
|
||||||
|
# Stay busy longer than the grace period to prove continuous
|
||||||
|
# activity keeps xmrig stopped indefinitely.
|
||||||
|
time.sleep(8)
|
||||||
|
machine.fail("systemctl is-active xmrig")
|
||||||
|
|
||||||
|
wait_inference_done(unit)
|
||||||
|
machine.wait_until_succeeds("systemctl is-active xmrig", timeout=15)
|
||||||
|
'';
|
||||||
|
}
|
||||||
42
tests/mock-llama-server-proc.py
Normal file
42
tests/mock-llama-server-proc.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Mock llama-server process for NixOS VM tests.
|
||||||
|
|
||||||
|
Sets /proc/self/comm to "llama-server" via prctl so that monitoring scripts
|
||||||
|
(llama-cpp-annotations, llama-cpp-xmrig-pause) can discover this process
|
||||||
|
the same way they discover the real one.
|
||||||
|
|
||||||
|
Usage: python3 mock-llama-server-proc.py <state-file>
|
||||||
|
|
||||||
|
The state file controls behavior:
|
||||||
|
"busy" -> burn CPU in a tight loop (simulates prompt processing / inference)
|
||||||
|
"idle" -> sleep (simulates waiting for requests)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import ctypes
|
||||||
|
import ctypes.util
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
STATE_FILE = sys.argv[1]
|
||||||
|
|
||||||
|
# PR_SET_NAME = 15, sets /proc/self/comm
|
||||||
|
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
|
||||||
|
libc.prctl(15, b"llama-server", 0, 0, 0)
|
||||||
|
|
||||||
|
with open(STATE_FILE, "w") as f:
|
||||||
|
f.write("idle")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
with open(STATE_FILE) as f:
|
||||||
|
state = f.read().strip()
|
||||||
|
except Exception:
|
||||||
|
state = "idle"
|
||||||
|
|
||||||
|
if state == "busy":
|
||||||
|
end = time.monotonic() + 0.1
|
||||||
|
while time.monotonic() < end:
|
||||||
|
_ = sum(range(10000))
|
||||||
|
else:
|
||||||
|
time.sleep(0.5)
|
||||||
@@ -28,9 +28,9 @@ in
|
|||||||
# zfs scrub annotations test
|
# zfs scrub annotations test
|
||||||
zfsScrubAnnotationsTest = handleTest ./zfs-scrub-annotations.nix;
|
zfsScrubAnnotationsTest = handleTest ./zfs-scrub-annotations.nix;
|
||||||
|
|
||||||
# llama-cpp annotation service test
|
# llama-cpp tests
|
||||||
llamaCppAnnotationsTest = handleTest ./llama-cpp-annotations.nix;
|
llamaCppAnnotationsTest = handleTest ./llama-cpp-annotations.nix;
|
||||||
|
llamaCppXmrigPauseTest = handleTest ./llama-cpp-xmrig-pause.nix;
|
||||||
# ntfy alerts test
|
# ntfy alerts test
|
||||||
ntfyAlertsTest = handleTest ./ntfy-alerts.nix;
|
ntfyAlertsTest = handleTest ./ntfy-alerts.nix;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user