#!/usr/bin/env python3 import json import os import sys import time import urllib.request LLAMA_CPP_URL = os.environ.get("LLAMA_CPP_URL", "http://127.0.0.1:6688") GRAFANA_URL = os.environ.get("GRAFANA_URL", "http://127.0.0.1:3000") STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/llama-cpp-annotations/state.json") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "5")) def http_json(method, url, body=None): data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "Accept": "application/json"}, method=method, ) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) def get_slots(): try: req = urllib.request.Request( f"{LLAMA_CPP_URL}/slots", headers={"Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except Exception as e: print(f"Error fetching slots: {e}", file=sys.stderr) return None def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f) os.replace(tmp, STATE_FILE) def grafana_post(text, start_ms): try: result = http_json( "POST", f"{GRAFANA_URL}/api/annotations", {"time": start_ms, "text": text, "tags": ["llama-cpp"]}, ) return result.get("id") except Exception as e: print(f"Error posting annotation: {e}", file=sys.stderr) return None def grafana_close(grafana_id, end_ms, text=None): try: body = {"timeEnd": end_ms} if text is not None: body["text"] = text http_json( "PATCH", f"{GRAFANA_URL}/api/annotations/{grafana_id}", body, ) except Exception as e: print(f"Error closing annotation {grafana_id}: {e}", file=sys.stderr) def main(): state = load_state() while True: now_ms = int(time.time() * 1000) slots = get_slots() if slots is not None: # Track which slots are currently processing processing_ids = set() for slot in slots: slot_id = str(slot["id"]) is_processing = slot.get("is_processing", False) if is_processing: processing_ids.add(slot_id) if slot_id not in state: text = f"LLM request (slot {slot['id']})" grafana_id = grafana_post(text, now_ms) if grafana_id is not None: state[slot_id] = { "grafana_id": grafana_id, "start_ms": now_ms, } save_state(state) # Close annotations for slots that stopped processing for slot_id in [k for k in state if k not in processing_ids]: info = state.pop(slot_id) # Try to get token count from the slot data n_decoded = None for slot in slots: if str(slot["id"]) == slot_id: n_decoded = slot.get("next_token", {}).get("n_decoded") break text = f"LLM request (slot {slot_id})" if n_decoded is not None and n_decoded > 0: text += f" — {n_decoded} tokens" grafana_close(info["grafana_id"], now_ms, text) save_state(state) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()