This repository has been archived on 2026-04-18. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
server-config/services/grafana/llama-cpp-annotations.py
2026-04-03 00:39:42 -04:00

156 lines
4.2 KiB
Python

#!/usr/bin/env python3
"""
Grafana annotation service for llama-cpp inference requests.
Monitors llama-server CPU usage via /proc. Creates a Grafana annotation
when inference starts (CPU spikes), closes it when inference ends.
"""
import glob
import json
import os
import sys
import time
import urllib.request
GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000")
STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json")
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5"))
CPU_THRESHOLD = float(os.environ.get("CPU_THRESHOLD", "50"))
def find_llama_pid():
for path in glob.glob("/proc/[0-9]*/comm"):
try:
with open(path) as f:
if f.read().strip() == "llama-server":
return int(path.split("/")[2])
except (OSError, ValueError):
continue
return None
def get_cpu_times(pid):
try:
with open(f"/proc/{pid}/stat") as f:
fields = f.read().split(")")[-1].split()
return int(fields[11]) + int(fields[12])
except (OSError, IndexError, ValueError):
return None
def http_json(method, url, body=None):
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method=method,
)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
def load_state():
try:
with open(STATE_FILE) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f)
os.replace(tmp, STATE_FILE)
def grafana_post(text, start_ms):
try:
result = http_json(
"POST",
f"{GRAFANA_URL}/api/annotations",
{"time": start_ms, "text": text, "tags": ["llama-cpp"]},
)
return result.get("id")
except Exception as e:
print(f"Error posting annotation: {e}", file=sys.stderr)
return None
def grafana_close(grafana_id, end_ms, text=None):
try:
body = {"timeEnd": end_ms}
if text is not None:
body["text"] = text
http_json(
"PATCH",
f"{GRAFANA_URL}/api/annotations/{grafana_id}",
body,
)
except Exception as e:
print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr)
def main():
state = load_state()
prev_ticks = None
prev_time = None
hz = os.sysconf("SC_CLK_TCK")
while True:
now_ms = int(time.time() * 1000)
pid = find_llama_pid()
if pid is None:
prev_ticks = None
prev_time = None
time.sleep(POLL_INTERVAL)
continue
ticks = get_cpu_times(pid)
now = time.monotonic()
if ticks is None or prev_ticks is None or prev_time is None:
prev_ticks = ticks
prev_time = now
time.sleep(POLL_INTERVAL)
continue
dt = now - prev_time
if dt <= 0:
prev_ticks = ticks
prev_time = now
time.sleep(POLL_INTERVAL)
continue
cpu_pct = ((ticks - prev_ticks) / hz) / dt * 100
prev_ticks = ticks
prev_time = now
busy = cpu_pct > CPU_THRESHOLD
if busy and "active" not in state:
grafana_id = grafana_post("LLM request", now_ms)
if grafana_id is not None:
state["active"] = {
"grafana_id": grafana_id,
"start_ms": now_ms,
}
save_state(state)
elif not busy and "active" in state:
info = state.pop("active")
duration_s = (now_ms - info["start_ms"]) / 1000
text = f"LLM request ({duration_s:.1f}s)"
grafana_close(info["grafana_id"], now_ms, text)
save_state(state)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()