tests: move fail2ban tests into subdirectory

This commit is contained in:
2026-04-20 17:25:45 -04:00
parent 9ddef4bd54
commit b99a039ab0
8 changed files with 23 additions and 23 deletions

124
tests/fail2ban/caddy.nix Normal file
View File

@@ -0,0 +1,124 @@
{
config,
lib,
pkgs,
...
}:
pkgs.testers.runNixOSTest {
name = "fail2ban-caddy";
nodes = {
server =
{
config,
pkgs,
lib,
...
}:
{
imports = [
../../modules/server-security.nix
];
# Set up Caddy with basic auth (minimal config, no production stuff)
# Using bcrypt hash generated with: caddy hash-password --plaintext testpass
services.caddy = {
enable = true;
virtualHosts.":80".extraConfig = ''
log {
output file /var/log/caddy/access-server.log
format json
}
basic_auth {
testuser $2a$14$XqaQlGTdmofswciqrLlMz.rv0/jiGQq8aU.fP6mh6gCGiLf6Cl3.a
}
respond "Authenticated!" 200
'';
};
# Add the fail2ban jail for caddy-auth (same as in services/caddy.nix)
services.fail2ban.jails.caddy-auth = {
enabled = true;
settings = {
backend = "auto";
port = "http,https";
logpath = "/var/log/caddy/access-*.log";
maxretry = 3; # Lower for testing
};
filter.Definition = {
# Only match 401s where an Authorization header was actually sent
failregex = ''^.*"remote_ip":"<HOST>".*"Authorization":\["REDACTED"\].*"status":401.*$'';
ignoreregex = "";
datepattern = ''"ts":{Epoch}\.'';
};
};
# Create log directory and initial log file so fail2ban can start
systemd.tmpfiles.rules = [
"d /var/log/caddy 755 caddy caddy"
"f /var/log/caddy/access-server.log 644 caddy caddy"
];
networking.firewall.allowedTCPPorts = [ 80 ];
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("caddy.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(80)
time.sleep(2)
with subtest("Verify caddy-auth jail is active"):
status = server.succeed("fail2ban-client status")
assert "caddy-auth" in status, f"caddy-auth jail not found in: {status}"
with subtest("Verify correct password works"):
# Use -4 to force IPv4 for consistency
result = client.succeed("curl -4 -s -u testuser:testpass http://server/")
print(f"Curl result: {result}")
assert "Authenticated" in result, f"Auth should succeed: {result}"
with subtest("Unauthenticated requests (browser probes) should not trigger ban"):
# Simulate browser probe requests - no Authorization header sent
# This is the normal HTTP Basic Auth challenge-response flow:
# browser sends request without credentials, gets 401, then resends with credentials
for i in range(5):
client.execute("curl -4 -s http://server/ || true")
time.sleep(0.5)
time.sleep(3)
status = server.succeed("fail2ban-client status caddy-auth")
print(f"caddy-auth jail status after unauthenticated requests: {status}")
match = re.search(r"Currently banned:\s*(\d+)", status)
banned = int(match.group(1)) if match else 0
assert banned == 0, f"Unauthenticated 401s should NOT trigger ban, but {banned} IPs were banned: {status}"
with subtest("Generate failed basic auth attempts (wrong password)"):
# Use -4 to force IPv4 for consistent IP tracking
# These send an Authorization header with wrong credentials
for i in range(4):
client.execute("curl -4 -s -u testuser:wrongpass http://server/ || true")
time.sleep(1)
with subtest("Verify IP is banned after wrong password attempts"):
time.sleep(5)
status = server.succeed("fail2ban-client status caddy-auth")
print(f"caddy-auth jail status: {status}")
# Check that at least 1 IP is banned
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("curl -4 -s --max-time 3 http://server/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}

122
tests/fail2ban/gitea.nix Normal file
View File

@@ -0,0 +1,122 @@
{
config,
lib,
pkgs,
...
}:
let
baseServiceConfigs = import ../../hosts/muffin/service-configs.nix;
testServiceConfigs = lib.recursiveUpdate baseServiceConfigs {
zpool_ssds = "";
gitea = {
dir = "/var/lib/gitea";
domain = "git.test.local";
};
ports.private.gitea = {
port = 3000;
proto = "tcp";
};
};
testLib = lib.extend (
final: prev: {
serviceMountWithZpool =
serviceName: zpool: dirs:
{ ... }:
{ };
serviceFilePerms = serviceName: tmpfilesRules: { ... }: { };
}
);
giteaModule =
{ config, pkgs, ... }:
{
imports = [
(import ../../services/gitea/gitea.nix {
inherit config pkgs;
lib = testLib;
service_configs = testServiceConfigs;
})
];
};
in
pkgs.testers.runNixOSTest {
name = "fail2ban-gitea";
nodes = {
server =
{
config,
lib,
pkgs,
...
}:
{
imports = [
../../modules/server-security.nix
giteaModule
];
# Enable postgres for gitea
services.postgresql.enable = true;
# Disable ZFS mount dependency
systemd.services."gitea-mounts".enable = lib.mkForce false;
systemd.services.gitea = {
wants = lib.mkForce [ ];
after = lib.mkForce [ "postgresql.service" ];
requires = lib.mkForce [ ];
};
# Override for faster testing and correct port
services.fail2ban.jails.gitea.settings = {
maxretry = lib.mkForce 3;
# In test, we connect directly to Gitea port, not via Caddy
port = lib.mkForce "3000";
};
networking.firewall.allowedTCPPorts = [ 3000 ];
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("postgresql.service")
server.wait_for_unit("gitea.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(3000)
time.sleep(3)
with subtest("Verify gitea jail is active"):
status = server.succeed("fail2ban-client status")
assert "gitea" in status, f"gitea jail not found in: {status}"
with subtest("Generate failed login attempts"):
# Use -4 to force IPv4 for consistent IP tracking
for i in range(4):
client.execute(
"curl -4 -s -X POST http://server:3000/user/login -d 'user_name=baduser&password=badpass' || true"
)
time.sleep(0.5)
with subtest("Verify IP is banned"):
time.sleep(3)
status = server.succeed("fail2ban-client status gitea")
print(f"gitea jail status: {status}")
# Check that at least 1 IP is banned
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("curl -4 -s --max-time 3 http://server:3000/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}

133
tests/fail2ban/immich.nix Normal file
View File

@@ -0,0 +1,133 @@
{
config,
lib,
pkgs,
...
}:
let
baseServiceConfigs = import ../../hosts/muffin/service-configs.nix;
testServiceConfigs = lib.recursiveUpdate baseServiceConfigs {
zpool_ssds = "";
https.domain = "test.local";
ports.private.immich = {
port = 2283;
proto = "tcp";
};
immich.dir = "/var/lib/immich";
};
testLib = lib.extend (
final: prev: {
serviceMountWithZpool =
serviceName: zpool: dirs:
{ ... }:
{ };
serviceFilePerms = serviceName: tmpfilesRules: { ... }: { };
}
);
immichModule =
{ config, pkgs, ... }:
{
imports = [
(import ../../services/immich.nix {
inherit config pkgs;
lib = testLib;
service_configs = testServiceConfigs;
})
];
};
in
pkgs.testers.runNixOSTest {
name = "fail2ban-immich";
nodes = {
server =
{
config,
lib,
pkgs,
...
}:
{
imports = [
../../modules/server-security.nix
immichModule
];
# Immich needs postgres
services.postgresql.enable = true;
# Let immich create its own DB for testing
services.immich.database.createDB = lib.mkForce true;
# Disable ZFS mount dependencies
systemd.services."immich-server-mounts".enable = lib.mkForce false;
systemd.services."immich-machine-learning-mounts".enable = lib.mkForce false;
systemd.services.immich-server = {
wants = lib.mkForce [ ];
after = lib.mkForce [ "postgresql.service" ];
requires = lib.mkForce [ ];
};
systemd.services.immich-machine-learning = {
wants = lib.mkForce [ ];
after = lib.mkForce [ ];
requires = lib.mkForce [ ];
};
# Override for faster testing and correct port
services.fail2ban.jails.immich.settings = {
maxretry = lib.mkForce 3;
# In test, we connect directly to Immich port, not via Caddy
port = lib.mkForce "2283";
};
networking.firewall.allowedTCPPorts = [ 2283 ];
# Immich needs more resources
virtualisation.diskSize = 4 * 1024;
virtualisation.memorySize = 4 * 1024; # 4GB RAM for Immich
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("postgresql.service")
server.wait_for_unit("immich-server.service", timeout=120)
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(2283, timeout=60)
time.sleep(3)
with subtest("Verify immich jail is active"):
status = server.succeed("fail2ban-client status")
assert "immich" in status, f"immich jail not found in: {status}"
with subtest("Generate failed login attempts"):
# Use -4 to force IPv4 for consistent IP tracking
for i in range(4):
client.execute(
"curl -4 -s -X POST http://server:2283/api/auth/login -H 'Content-Type: application/json' -d '{\"email\":\"bad@user.com\",\"password\":\"badpass\"}' || true"
)
time.sleep(0.5)
with subtest("Verify IP is banned"):
time.sleep(3)
status = server.succeed("fail2ban-client status immich")
print(f"immich jail status: {status}")
# Check that at least 1 IP is banned
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("curl -4 -s --max-time 3 http://server:2283/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}

145
tests/fail2ban/jellyfin.nix Normal file
View File

@@ -0,0 +1,145 @@
{
config,
lib,
pkgs,
...
}:
let
baseServiceConfigs = import ../../hosts/muffin/service-configs.nix;
testServiceConfigs = lib.recursiveUpdate baseServiceConfigs {
zpool_ssds = "";
https.domain = "test.local";
jellyfin = {
dataDir = "/var/lib/jellyfin";
cacheDir = "/var/cache/jellyfin";
};
};
testLib = lib.extend (
final: prev: {
serviceMountWithZpool =
serviceName: zpool: dirs:
{ ... }:
{ };
serviceFilePerms = serviceName: tmpfilesRules: { ... }: { };
optimizePackage = pkg: pkg; # No-op for testing
}
);
jellyfinModule =
{ config, pkgs, ... }:
{
imports = [
(import ../../services/jellyfin/jellyfin.nix {
inherit config pkgs;
lib = testLib;
service_configs = testServiceConfigs;
})
];
};
in
pkgs.testers.runNixOSTest {
name = "fail2ban-jellyfin";
nodes = {
server =
{
config,
lib,
pkgs,
...
}:
{
imports = [
../../modules/server-security.nix
jellyfinModule
];
# needed for testing
services.jellyfin.openFirewall = true;
# Create the media group
users.groups.media = { };
# Disable ZFS mount dependency
systemd.services."jellyfin-mounts".enable = lib.mkForce false;
systemd.services.jellyfin = {
wants = lib.mkForce [ ];
after = lib.mkForce [ ];
requires = lib.mkForce [ ];
};
# Override for faster testing and correct port
services.fail2ban.jails.jellyfin.settings = {
maxretry = lib.mkForce 3;
# In test, we connect directly to Jellyfin port, not via Caddy
port = lib.mkForce "8096";
};
# Create log directory and placeholder log file for fail2ban
# Jellyfin logs to files, not systemd journal
systemd.tmpfiles.rules = [
"d /var/lib/jellyfin/log 0755 jellyfin jellyfin"
"f /var/lib/jellyfin/log/log_placeholder.log 0644 jellyfin jellyfin"
];
# Make fail2ban start after Jellyfin
systemd.services.fail2ban = {
wants = [ "jellyfin.service" ];
after = [ "jellyfin.service" ];
};
# Give jellyfin more disk space and memory
virtualisation.diskSize = 3 * 1024;
virtualisation.memorySize = 2 * 1024;
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("jellyfin.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(8096)
server.wait_until_succeeds("curl -sf http://localhost:8096/health | grep -q Healthy", timeout=120)
time.sleep(2)
# Wait for Jellyfin to create real log files and reload fail2ban
server.wait_until_succeeds("ls /var/lib/jellyfin/log/log_2*.log", timeout=30)
server.succeed("fail2ban-client reload jellyfin")
with subtest("Verify jellyfin jail is active"):
status = server.succeed("fail2ban-client status")
assert "jellyfin" in status, f"jellyfin jail not found in: {status}"
with subtest("Generate failed login attempts"):
# Use -4 to force IPv4 for consistent IP tracking
for i in range(4):
client.execute("""
curl -4 -s -X POST http://server:8096/Users/authenticatebyname \
-H 'Content-Type: application/json' \
-H 'X-Emby-Authorization: MediaBrowser Client="test", Device="test", DeviceId="test", Version="1.0"' \
-d '{"Username":"baduser","Pw":"badpass"}' || true
""")
time.sleep(0.5)
with subtest("Verify IP is banned"):
time.sleep(3)
status = server.succeed("fail2ban-client status jellyfin")
print(f"jellyfin jail status: {status}")
# Check that at least 1 IP is banned
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("curl -4 -s --max-time 3 http://server:8096/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}

View File

@@ -0,0 +1,103 @@
{
config,
lib,
pkgs,
...
}:
pkgs.testers.runNixOSTest {
name = "fail2ban-llama-cpp";
nodes = {
server =
{
config,
pkgs,
lib,
...
}:
{
imports = [
../../modules/server-security.nix
];
# Minimal Caddy that stands in for the llama-cpp reverse_proxy.
# Every request returns 401, mimicking llama.cpp's api-key middleware
# on an invalid key. We only care that Caddy writes the 401 with the
# real client IP to the same access log the production jail tails.
services.caddy = {
enable = true;
virtualHosts.":80".extraConfig = ''
log {
output file /var/log/caddy/access-llama-cpp.log
format json
}
respond "Invalid API Key" 401
'';
};
# Jail definition mirrors services/llama-cpp.nix. ignoreip omitted
# so the test VM subnet isn't exempted; maxretry lowered for speed.
services.fail2ban.jails.llama-cpp = {
enabled = true;
settings = {
backend = "auto";
port = "http,https";
logpath = "/var/log/caddy/access-llama-cpp.log";
maxretry = 3;
};
filter.Definition = {
failregex = ''^.*"remote_ip":"<HOST>".*"status":401.*$'';
ignoreregex = "";
datepattern = ''"ts":{Epoch}\.'';
};
};
systemd.tmpfiles.rules = [
"d /var/log/caddy 755 caddy caddy"
"f /var/log/caddy/access-llama-cpp.log 644 caddy caddy"
];
networking.firewall.allowedTCPPorts = [ 80 ];
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("caddy.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(80)
time.sleep(2)
with subtest("Verify llama-cpp jail is active"):
status = server.succeed("fail2ban-client status")
assert "llama-cpp" in status, f"llama-cpp jail not found in: {status}"
with subtest("Generate failed API key attempts"):
# Force IPv4 for consistent IP tracking across the NAT fabric.
for i in range(4):
client.execute(
"curl -4 -s -H 'Authorization: Bearer badkey' http://server/v1/chat/completions || true"
)
time.sleep(1)
with subtest("Verify IP is banned"):
time.sleep(5)
status = server.succeed("fail2ban-client status llama-cpp")
print(f"llama-cpp jail status: {status}")
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, (
f"Expected at least 1 banned IP, got: {status}"
)
with subtest("Verify banned client cannot connect"):
exit_code = client.execute("curl -4 -s --max-time 3 http://server/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}

99
tests/fail2ban/ssh.nix Normal file
View File

@@ -0,0 +1,99 @@
{
config,
lib,
pkgs,
...
}:
let
securityModule = import ../../modules/server-security.nix;
sshModule =
{
config,
lib,
pkgs,
...
}:
{
imports = [
(import ../../services/ssh.nix {
inherit config lib pkgs;
username = "testuser";
})
];
};
in
pkgs.testers.runNixOSTest {
name = "fail2ban-ssh";
nodes = {
server =
{
config,
lib,
pkgs,
...
}:
{
imports = [
securityModule
sshModule
];
# Override for testing - enable password auth
services.openssh.settings.PasswordAuthentication = lib.mkForce true;
users.users.testuser = {
isNormalUser = true;
password = "correctpassword";
};
networking.firewall.allowedTCPPorts = [ 22 ];
};
client = {
environment.systemPackages = with pkgs; [
sshpass
openssh
];
};
};
testScript = ''
import time
start_all()
server.wait_for_unit("sshd.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(22)
time.sleep(2)
with subtest("Verify sshd jail is active"):
status = server.succeed("fail2ban-client status")
assert "sshd" in status, f"sshd jail not found in: {status}"
with subtest("Generate failed SSH login attempts"):
# Use -4 to force IPv4, timeout and NumberOfPasswordPrompts=1 to ensure quick failure
# maxRetry is 3 in our config, so 4 attempts should trigger a ban
for i in range(4):
client.execute(
"timeout 5 sshpass -p 'wrongpassword' ssh -4 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=3 -o NumberOfPasswordPrompts=1 testuser@server echo test 2>/dev/null || true"
)
time.sleep(1)
with subtest("Verify IP is banned"):
# Wait for fail2ban to process the logs and apply the ban
time.sleep(5)
status = server.succeed("fail2ban-client status sshd")
print(f"sshd jail status: {status}")
# Check that at least 1 IP is banned
import re
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("timeout 3 nc -4 -z -w 2 server 22")[0]
assert exit_code != 0, "Connection should be blocked for banned IP"
'';
}

View File

@@ -0,0 +1,130 @@
{
config,
lib,
pkgs,
...
}:
let
baseServiceConfigs = import ../../hosts/muffin/service-configs.nix;
testServiceConfigs = lib.recursiveUpdate baseServiceConfigs {
zpool_ssds = "";
https.domain = "test.local";
};
testLib = lib.extend (
final: prev: {
serviceMountWithZpool =
serviceName: zpool: dirs:
{ ... }:
{ };
serviceFilePerms = serviceName: tmpfilesRules: { ... }: { };
}
);
vaultwardenModule =
{ config, pkgs, ... }:
{
imports = [
(import ../../services/bitwarden.nix {
inherit config pkgs;
lib = testLib;
service_configs = testServiceConfigs;
})
];
};
in
pkgs.testers.runNixOSTest {
name = "fail2ban-vaultwarden";
nodes = {
server =
{
config,
lib,
pkgs,
...
}:
{
imports = [
../../modules/server-security.nix
vaultwardenModule
];
# Disable ZFS mount dependencies
systemd.services."vaultwarden-mounts".enable = lib.mkForce false;
systemd.services."backup-vaultwarden-mounts".enable = lib.mkForce false;
systemd.services.vaultwarden = {
wants = lib.mkForce [ ];
after = lib.mkForce [ ];
requires = lib.mkForce [ ];
};
systemd.services.backup-vaultwarden = {
wants = lib.mkForce [ ];
after = lib.mkForce [ ];
requires = lib.mkForce [ ];
};
# Override Vaultwarden settings for testing
# - Listen on all interfaces (not just localhost)
# - Enable logging at info level to capture failed login attempts
services.vaultwarden.config = {
ROCKET_ADDRESS = lib.mkForce "0.0.0.0";
ROCKET_LOG = lib.mkForce "info";
};
# Override for faster testing and correct port
services.fail2ban.jails.vaultwarden.settings = {
maxretry = lib.mkForce 3;
# In test, we connect directly to Vaultwarden port, not via Caddy
port = lib.mkForce "8222";
};
networking.firewall.allowedTCPPorts = [ 8222 ];
};
client = {
environment.systemPackages = [ pkgs.curl ];
};
};
testScript = ''
import time
import re
start_all()
server.wait_for_unit("vaultwarden.service")
server.wait_for_unit("fail2ban.service")
server.wait_for_open_port(8222)
time.sleep(2)
with subtest("Verify vaultwarden jail is active"):
status = server.succeed("fail2ban-client status")
assert "vaultwarden" in status, f"vaultwarden jail not found in: {status}"
with subtest("Generate failed login attempts"):
# Use -4 to force IPv4 for consistent IP tracking
for i in range(4):
client.execute("""
curl -4 -s -X POST 'http://server:8222/identity/connect/token' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H 'Bitwarden-Client-Name: web' \
-H 'Bitwarden-Client-Version: 2024.1.0' \
-d 'grant_type=password&username=bad@user.com&password=badpass&scope=api+offline_access&client_id=web&deviceType=10&deviceIdentifier=test&deviceName=test' \
|| true
""")
time.sleep(0.5)
with subtest("Verify IP is banned"):
time.sleep(3)
status = server.succeed("fail2ban-client status vaultwarden")
print(f"vaultwarden jail status: {status}")
# Check that at least 1 IP is banned
match = re.search(r"Currently banned:\s*(\d+)", status)
assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}"
with subtest("Verify banned client cannot connect"):
# Use -4 to test with same IP that was banned
exit_code = client.execute("curl -4 -s --max-time 3 http://server:8222/ 2>&1")[0]
assert exit_code != 0, "Connection should be blocked"
'';
}