{ config, lib, pkgs, ... }: pkgs.testers.runNixOSTest { name = "fail2ban-llama-cpp"; nodes = { server = { config, pkgs, lib, ... }: { imports = [ ../../modules/server-security.nix ]; # Minimal Caddy that stands in for the llama-cpp reverse_proxy. # Every request returns 401, mimicking llama.cpp's api-key middleware # on an invalid key. We only care that Caddy writes the 401 with the # real client IP to the same access log the production jail tails. services.caddy = { enable = true; virtualHosts.":80".extraConfig = '' log { output file /var/log/caddy/access-llama-cpp.log format json } respond "Invalid API Key" 401 ''; }; # Jail definition mirrors services/llama-cpp.nix. ignoreip omitted # so the test VM subnet isn't exempted; maxretry lowered for speed. services.fail2ban.jails.llama-cpp = { enabled = true; settings = { backend = "auto"; port = "http,https"; logpath = "/var/log/caddy/access-llama-cpp.log"; maxretry = 3; }; filter.Definition = { failregex = ''^.*"remote_ip":"".*"status":401.*$''; ignoreregex = ""; datepattern = ''"ts":{Epoch}\.''; }; }; systemd.tmpfiles.rules = [ "d /var/log/caddy 755 caddy caddy" "f /var/log/caddy/access-llama-cpp.log 644 caddy caddy" ]; networking.firewall.allowedTCPPorts = [ 80 ]; }; client = { environment.systemPackages = [ pkgs.curl ]; }; }; testScript = '' import time import re start_all() server.wait_for_unit("caddy.service") server.wait_for_unit("fail2ban.service") server.wait_for_open_port(80) time.sleep(2) with subtest("Verify llama-cpp jail is active"): status = server.succeed("fail2ban-client status") assert "llama-cpp" in status, f"llama-cpp jail not found in: {status}" with subtest("Generate failed API key attempts"): # Force IPv4 for consistent IP tracking across the NAT fabric. for i in range(4): client.execute( "curl -4 -s -H 'Authorization: Bearer badkey' http://server/v1/chat/completions || true" ) time.sleep(1) with subtest("Verify IP is banned"): time.sleep(5) status = server.succeed("fail2ban-client status llama-cpp") print(f"llama-cpp jail status: {status}") match = re.search(r"Currently banned:\s*(\d+)", status) assert match and int(match.group(1)) >= 1, ( f"Expected at least 1 banned IP, got: {status}" ) with subtest("Verify banned client cannot connect"): exit_code = client.execute("curl -4 -s --max-time 3 http://server/ 2>&1")[0] assert exit_code != 0, "Connection should be blocked" ''; }