#!/usr/bin/env python3 """ Grafana annotation service for llama-cpp inference requests. Monitors llama-server CPU usage via /proc. Creates a Grafana annotation when inference starts (CPU spikes), closes it when inference ends. """ import glob import json import os import sys import time import urllib.request GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5")) CPU_THRESHOLD = float(os.environ.get("CPU_THRESHOLD", "50")) def find_llama_pid(): for path in glob.glob("/proc/[0-9]*/comm"): try: with open(path) as f: if f.read().strip() == "llama-server": return int(path.split("/")[2]) except (OSError, ValueError): continue return None def get_cpu_times(pid): try: with open(f"/proc/{pid}/stat") as f: fields = f.read().split(")")[-1].split() return int(fields[11]) + int(fields[12]) except (OSError, IndexError, ValueError): return None def http_json(method, url, body=None): data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "Accept": "application/json"}, method=method, ) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f) os.replace(tmp, STATE_FILE) def grafana_post(text, start_ms): try: result = http_json( "POST", f"{GRAFANA_URL}/api/annotations", {"time": start_ms, "text": text, "tags": ["llama-cpp"]}, ) return result.get("id") except Exception as e: print(f"Error posting annotation: {e}", file=sys.stderr) return None def grafana_close(grafana_id, end_ms, text=None): try: body = {"timeEnd": end_ms} if text is not None: body["text"] = text http_json( "PATCH", f"{GRAFANA_URL}/api/annotations/{grafana_id}", body, ) except Exception as e: print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) def main(): state = load_state() prev_ticks = None prev_time = None hz = os.sysconf("SC_CLK_TCK") while True: now_ms = int(time.time() * 1000) pid = find_llama_pid() if pid is None: prev_ticks = None prev_time = None time.sleep(POLL_INTERVAL) continue ticks = get_cpu_times(pid) now = time.monotonic() if ticks is None or prev_ticks is None or prev_time is None: prev_ticks = ticks prev_time = now time.sleep(POLL_INTERVAL) continue dt = now - prev_time if dt <= 0: prev_ticks = ticks prev_time = now time.sleep(POLL_INTERVAL) continue cpu_pct = ((ticks - prev_ticks) / hz) / dt * 100 prev_ticks = ticks prev_time = now busy = cpu_pct > CPU_THRESHOLD if busy and "active" not in state: grafana_id = grafana_post("LLM request", now_ms) if grafana_id is not None: state["active"] = { "grafana_id": grafana_id, "start_ms": now_ms, } save_state(state) elif not busy and "active" in state: info = state.pop("active") duration_s = (now_ms - info["start_ms"]) / 1000 text = f"LLM request ({duration_s:.1f}s)" grafana_close(info["grafana_id"], now_ms, text) save_state(state) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()