refactor: extract Python scripts into standalone files

Move embedded Python scripts out of Nix string interpolation into
standalone files under scripts/.  Each script reads its configuration
from a JSON file passed as the first CLI argument.

Shared utilities (API key reading, API polling, health check loop)
are consolidated into common.py, eliminating three copies of
read_api_key and wait_for_api.

Implementation improvements included in the extraction:
- Remove pyarr dependency; all HTTP calls use raw requests
- Add update semantics: download clients and synced apps are now
  compared against desired state and updated on drift via PUT
- Bazarr configure_provider compares API keys and updates stale ones
- Narrow health_check_loop exception clause from bare Exception to
  (RequestException, ValueError, KeyError)
- Fix double resp.json() call in resolve_profile_id (jellyseerr)
- Replace os.system with subprocess.run for Jellyseerr restart
- Handle missing 'value' key in Servarr field API responses
This commit is contained in:
2026-04-16 16:33:18 -04:00
parent b97ed1e90c
commit b464a8cea2
4 changed files with 757 additions and 0 deletions

102
scripts/bazarr_init.py Normal file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""Declarative API initialization for Bazarr provider connections.
Idempotently configures Sonarr and Radarr providers in Bazarr via its
settings API. Detects stale API keys and updates them.
"""
import os
import requests as http
from common import (
load_config,
read_api_key_xml,
read_api_key_yaml,
wait_for_api,
)
def configure_provider(base_url, api_key, provider_type, provider_config):
"""Idempotently configure a Sonarr/Radarr provider in Bazarr.
Compares the stored API key against the current one and updates if stale.
"""
ltype = provider_type.lower()
print(f"Checking {provider_type} provider...")
resp = http.get(
f"{base_url}/api/system/settings",
headers={"X-API-KEY": api_key},
timeout=30,
)
resp.raise_for_status()
settings = resp.json()
use_flag = settings.get("general", {}).get(f"use_{ltype}", False)
existing_key = settings.get(ltype, {}).get("apikey", "")
provider_api_key = read_api_key_xml(
f"{provider_config['dataDir']}/config.xml"
)
bind_address = provider_config.get("bindAddress", "127.0.0.1")
if use_flag and existing_key == provider_api_key:
print(f"{provider_type} provider already correct, skipping")
return
action = "Updating" if use_flag else "Adding"
print(f"{action} {provider_type} provider...")
resp = http.post(
f"{base_url}/api/system/settings",
headers={"X-API-KEY": api_key},
data={
f"settings-general-use_{ltype}": "true",
f"settings-{ltype}-ip": bind_address,
f"settings-{ltype}-port": str(provider_config["port"]),
f"settings-{ltype}-apikey": provider_api_key,
f"settings-{ltype}-ssl": "false",
f"settings-{ltype}-base_url": "/",
},
timeout=30,
)
resp.raise_for_status()
print(f"{provider_type} provider configured")
def main():
cfg = load_config()
data_dir = cfg["dataDir"]
bind_address = cfg.get("bindAddress", "127.0.0.1")
port = cfg["port"]
api_timeout = cfg["apiTimeout"]
config_yaml = f"{data_dir}/config/config.yaml"
if not os.path.isfile(config_yaml):
print(f"Config file {config_yaml} not found, skipping bazarr init")
return
api_key = read_api_key_yaml(config_yaml)
base_url = f"http://{bind_address}:{port}"
wait_for_api(
base_url,
api_key,
api_timeout,
"Bazarr",
header_name="X-API-KEY",
status_path="/api/system/status",
)
providers = cfg.get("providers", {})
if providers.get("sonarr", {}).get("enable"):
configure_provider(base_url, api_key, "Sonarr", providers["sonarr"])
if providers.get("radarr", {}).get("enable"):
configure_provider(base_url, api_key, "Radarr", providers["radarr"])
print("Bazarr init complete")
if __name__ == "__main__":
main()

140
scripts/common.py Normal file
View File

@@ -0,0 +1,140 @@
"""Shared utilities for arr-init scripts."""
import json
import sys
import time
import xml.etree.ElementTree as ET
import requests as http
import yaml
def load_config():
"""Load JSON configuration from the path given as the first CLI argument."""
if len(sys.argv) < 2:
print("Usage: script <config.json>", file=sys.stderr)
sys.exit(1)
with open(sys.argv[1]) as f:
return json.load(f)
def read_api_key_xml(config_xml_path):
"""Extract <ApiKey> from a Servarr config.xml file."""
tree = ET.parse(config_xml_path)
node = tree.find("ApiKey")
if node is None or not node.text:
raise ValueError(f"Could not find ApiKey in {config_xml_path}")
return node.text
def read_api_key_yaml(config_yaml_path):
"""Extract the apikey from Bazarr's config.yaml (auth section)."""
with open(config_yaml_path) as fh:
data = yaml.safe_load(fh)
try:
return data["auth"]["apikey"]
except (KeyError, TypeError) as exc:
raise ValueError(
f"Could not find auth.apikey in {config_yaml_path}"
) from exc
def wait_for_api(
base_url,
api_key,
timeout,
name,
*,
header_name="X-Api-Key",
status_path="/system/status",
):
"""Poll a status endpoint until the API responds or timeout.
Args:
base_url: Base URL including any API version prefix.
api_key: API key for authentication.
timeout: Maximum seconds to wait.
name: Human-readable service name for log messages.
header_name: HTTP header name for the API key.
status_path: Path appended to base_url for the health probe.
"""
print(f"Waiting for {name} API (timeout: {timeout}s)...")
for i in range(1, timeout + 1):
try:
resp = http.get(
f"{base_url}{status_path}",
headers={header_name: api_key},
timeout=5,
)
if resp.ok:
print(f"{name} API is ready")
return
except (http.ConnectionError, http.Timeout):
pass
if i == timeout:
print(
f"{name} API not available after {timeout} seconds",
file=sys.stderr,
)
sys.exit(1)
time.sleep(1)
def health_check_loop(url, api_key, entity_name, svc_name, max_retries, interval):
"""POST to a testall endpoint with retry logic.
Exits the process on permanent failure so the systemd unit reflects the error.
"""
attempt = 0
while True:
healthy = True
last_error = ""
try:
resp = http.post(
url,
headers={
"X-Api-Key": api_key,
"Content-Type": "application/json",
},
timeout=30,
)
result = resp.json()
failures = [
item for item in result if not item.get("isValid", True)
]
if failures:
healthy = False
last_error = "\n".join(
f" - ID {f['id']}: "
+ ", ".join(
v["errorMessage"]
for v in f.get("validationFailures", [])
)
for f in failures
)
except (http.RequestException, ValueError, KeyError) as exc:
healthy = False
last_error = (
f"could not reach {svc_name} API for {entity_name} test: {exc}"
)
if healthy:
print(f"All {entity_name}s healthy")
return
attempt += 1
if attempt > max_retries:
print(
f"Health check FAILED after {attempt} attempts: "
f"{entity_name}(s) unreachable:",
file=sys.stderr,
)
print(last_error, file=sys.stderr)
sys.exit(1)
print(
f"{entity_name.capitalize()} health check failed "
f"(attempt {attempt}/{max_retries}), "
f"retrying in {interval}s..."
)
time.sleep(interval)

139
scripts/jellyseerr_init.py Normal file
View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""Declarative quality profile initialization for Jellyseerr.
Resolves profile names to IDs by querying Radarr/Sonarr APIs, then patches
Jellyseerr's settings.json so new requests default to the correct quality
profiles.
"""
import json
import os
import subprocess
import sys
import requests as http
from common import load_config, read_api_key_xml, wait_for_api
def resolve_profile_id(base_url, api_key, profile_name, app_name):
"""Query a Servarr app for quality profiles and resolve a name to an ID."""
resp = http.get(
f"{base_url}/qualityprofile",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
profiles = resp.json()
for profile in profiles:
if profile["name"] == profile_name:
print(f"Resolved {app_name} profile '{profile_name}' -> ID {profile['id']}")
return profile["id"]
available = [p["name"] for p in profiles]
print(
f"Profile '{profile_name}' not found in {app_name}. "
f"Available: {available}",
file=sys.stderr,
)
sys.exit(1)
def main():
cfg = load_config()
settings_path = os.path.join(cfg["configDir"], "settings.json")
if not os.path.isfile(settings_path):
print(f"{settings_path} not found, skipping (Jellyseerr not yet initialized)")
return
timeout = cfg["apiTimeout"]
# Resolve Radarr profile
radarr_cfg = cfg["radarr"]
radarr_key = read_api_key_xml(f"{radarr_cfg['dataDir']}/config.xml")
radarr_bind = radarr_cfg.get("bindAddress", "127.0.0.1")
radarr_base = f"http://{radarr_bind}:{radarr_cfg['port']}/api/v3"
wait_for_api(radarr_base, radarr_key, timeout, "Radarr")
radarr_profile_id = resolve_profile_id(
radarr_base, radarr_key, radarr_cfg["profileName"], "Radarr",
)
# Resolve Sonarr profiles
sonarr_cfg = cfg["sonarr"]
sonarr_key = read_api_key_xml(f"{sonarr_cfg['dataDir']}/config.xml")
sonarr_bind = sonarr_cfg.get("bindAddress", "127.0.0.1")
sonarr_base = f"http://{sonarr_bind}:{sonarr_cfg['port']}/api/v3"
wait_for_api(sonarr_base, sonarr_key, timeout, "Sonarr")
sonarr_profile_id = resolve_profile_id(
sonarr_base, sonarr_key, sonarr_cfg["profileName"], "Sonarr",
)
sonarr_anime_profile_id = resolve_profile_id(
sonarr_base, sonarr_key, sonarr_cfg["animeProfileName"], "Sonarr (anime)",
)
# Patch settings.json
with open(settings_path) as f:
settings = json.load(f)
changed = False
for entry in settings.get("radarr", []):
if (
entry.get("activeProfileId") != radarr_profile_id
or entry.get("activeProfileName") != radarr_cfg["profileName"]
):
entry["activeProfileId"] = radarr_profile_id
entry["activeProfileName"] = radarr_cfg["profileName"]
changed = True
print(
f"Radarr '{entry.get('name', '?')}': "
f"set profile to {radarr_cfg['profileName']} (ID {radarr_profile_id})"
)
for entry in settings.get("sonarr", []):
updates = {}
if (
entry.get("activeProfileId") != sonarr_profile_id
or entry.get("activeProfileName") != sonarr_cfg["profileName"]
):
updates["activeProfileId"] = sonarr_profile_id
updates["activeProfileName"] = sonarr_cfg["profileName"]
if (
entry.get("activeAnimeProfileId") != sonarr_anime_profile_id
or entry.get("activeAnimeProfileName") != sonarr_cfg["animeProfileName"]
):
updates["activeAnimeProfileId"] = sonarr_anime_profile_id
updates["activeAnimeProfileName"] = sonarr_cfg["animeProfileName"]
if updates:
entry.update(updates)
changed = True
print(
f"Sonarr '{entry.get('name', '?')}': "
f"set profile to {sonarr_cfg['profileName']} (ID {sonarr_profile_id})"
)
if not changed:
print("Jellyseerr profiles already correct, no changes needed")
return
with open(settings_path, "w") as f:
json.dump(settings, f, indent=2)
print("Updated settings.json, restarting Jellyseerr...")
result = subprocess.run(
["systemctl", "restart", "jellyseerr.service"],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(
f"Failed to restart Jellyseerr: {result.stderr.strip()}",
file=sys.stderr,
)
sys.exit(1)
print("Jellyseerr init complete")
if __name__ == "__main__":
main()

376
scripts/servarr_init.py Normal file
View File

@@ -0,0 +1,376 @@
#!/usr/bin/env python3
"""Declarative API initialization for Servarr applications.
Idempotently provisions download clients, root folders, synced applications
(Prowlarr), and naming configuration via the Servarr HTTP API. Existing
entities are updated when their configuration drifts from the declared state.
"""
import sys
import requests as http
from common import (
health_check_loop,
load_config,
read_api_key_xml,
wait_for_api,
)
# Maps Prowlarr application implementation names to Newznab parent category
# names used for automatic sync-category detection.
IMPLEMENTATION_CATEGORY_MAP = {
"Sonarr": "TV",
"Radarr": "Movies",
"Lidarr": "Audio",
"Readarr": "Books",
"Whisparr": "XXX",
}
def _fields_to_dict(fields):
"""Convert the API's [{name, value}] array into a flat dict.
Some API responses omit the 'value' key for null/unset fields.
"""
return {f["name"]: f.get("value") for f in fields}
def _dict_to_fields(d):
"""Convert a flat dict into the API's [{name, value}] array."""
return [{"name": k, "value": v} for k, v in d.items()]
def _needs_field_update(desired, current_fields):
"""Return True if any desired field value differs from the current state."""
current = _fields_to_dict(current_fields)
return any(desired.get(k) != current.get(k) for k in desired)
# -- Download clients --------------------------------------------------------
def ensure_download_clients(base_url, api_key, download_clients):
"""Idempotently provision download clients, updating on drift."""
resp = http.get(
f"{base_url}/downloadclient",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
existing_by_name = {dc["name"]: dc for dc in resp.json()}
for dc in download_clients:
dc_name = dc["name"]
desired_fields = dc["fields"]
if dc_name in existing_by_name:
current = existing_by_name[dc_name]
drift = (
current.get("implementation") != dc["implementation"]
or current.get("configContract") != dc["configContract"]
or current.get("protocol") != dc["protocol"]
or _needs_field_update(desired_fields, current.get("fields", []))
)
if not drift:
print(f"Download client '{dc_name}' already correct, skipping")
continue
print(f"Updating download client '{dc_name}'...")
payload = {**current}
payload.update(
implementation=dc["implementation"],
configContract=dc["configContract"],
protocol=dc["protocol"],
fields=_dict_to_fields(desired_fields),
)
resp = http.put(
f"{base_url}/downloadclient/{current['id']}",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
params={"forceSave": "true"},
json=payload,
timeout=30,
)
resp.raise_for_status()
print(f"Download client '{dc_name}' updated")
else:
print(f"Adding download client '{dc_name}'...")
payload = {
"enable": True,
"protocol": dc["protocol"],
"priority": 1,
"name": dc_name,
"implementation": dc["implementation"],
"configContract": dc["configContract"],
"fields": _dict_to_fields(desired_fields),
"tags": [],
}
resp = http.post(
f"{base_url}/downloadclient",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
params={"forceSave": "true"},
json=payload,
timeout=30,
)
resp.raise_for_status()
print(f"Download client '{dc_name}' added")
# -- Root folders ------------------------------------------------------------
def ensure_root_folders(base_url, api_key, root_folders):
"""Idempotently provision root folders (create-only)."""
resp = http.get(
f"{base_url}/rootfolder",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
existing_paths = {rf["path"] for rf in resp.json()}
for path in root_folders:
if path in existing_paths:
print(f"Root folder '{path}' already exists, skipping")
continue
print(f"Adding root folder '{path}'...")
resp = http.post(
f"{base_url}/rootfolder",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
json={"path": path},
timeout=30,
)
resp.raise_for_status()
print(f"Root folder '{path}' added")
# -- Synced applications (Prowlarr) ------------------------------------------
def resolve_sync_categories(base_url, api_key, implementation, explicit):
"""Resolve Newznab sync categories, auto-detecting from Prowlarr if needed."""
if explicit:
return explicit
category_name = IMPLEMENTATION_CATEGORY_MAP.get(implementation)
if not category_name:
return []
print(f"Auto-detecting sync categories for {implementation}...")
resp = http.get(
f"{base_url}/indexer/categories",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
sync_cats = []
for cat in resp.json():
if cat["name"] == category_name:
sync_cats.append(cat["id"])
for sub in cat.get("subCategories", []):
sync_cats.append(sub["id"])
if not sync_cats:
print(
f"Warning: could not auto-detect categories for "
f"'{category_name}', using empty list",
file=sys.stderr,
)
return []
print(f"Resolved sync categories: {sync_cats}")
return sync_cats
def ensure_synced_apps(base_url, api_key, synced_apps):
"""Idempotently provision synced applications (Prowlarr), updating on drift."""
resp = http.get(
f"{base_url}/applications",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
existing_by_name = {app["name"]: app for app in resp.json()}
for app in synced_apps:
app_name = app["name"]
target_api_key = read_api_key_xml(app["apiKeyFrom"])
sync_categories = resolve_sync_categories(
base_url, api_key, app["implementation"], app.get("syncCategories", []),
)
desired_fields = {
"prowlarrUrl": app["prowlarrUrl"],
"baseUrl": app["baseUrl"],
"apiKey": target_api_key,
"syncCategories": sync_categories,
}
if app_name in existing_by_name:
current = existing_by_name[app_name]
drift = (
current.get("implementation") != app["implementation"]
or current.get("configContract") != app["configContract"]
or current.get("syncLevel") != app["syncLevel"]
or _needs_field_update(desired_fields, current.get("fields", []))
)
if not drift:
print(f"Synced app '{app_name}' already correct, skipping")
continue
print(f"Updating synced app '{app_name}'...")
payload = {**current}
payload.update(
implementation=app["implementation"],
configContract=app["configContract"],
syncLevel=app["syncLevel"],
fields=_dict_to_fields(desired_fields),
)
resp = http.put(
f"{base_url}/applications/{current['id']}",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
params={"forceSave": "true"},
json=payload,
timeout=30,
)
resp.raise_for_status()
print(f"Synced app '{app_name}' updated")
else:
print(f"Adding synced app '{app_name}'...")
payload = {
"name": app_name,
"implementation": app["implementation"],
"configContract": app["configContract"],
"syncLevel": app["syncLevel"],
"fields": _dict_to_fields(desired_fields),
"tags": [],
}
resp = http.post(
f"{base_url}/applications",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
params={"forceSave": "true"},
json=payload,
timeout=30,
)
resp.raise_for_status()
print(f"Synced app '{app_name}' added")
# -- Naming ------------------------------------------------------------------
def update_naming(base_url, api_key, naming_config):
"""Merge desired naming fields into the current config."""
if not naming_config:
return
print("Checking naming configuration...")
resp = http.get(
f"{base_url}/config/naming",
headers={"X-Api-Key": api_key},
timeout=30,
)
resp.raise_for_status()
current = resp.json()
needs_update = any(
naming_config.get(k) != current.get(k) for k in naming_config
)
if not needs_update:
print("Naming configuration already correct, skipping")
return
print("Updating naming configuration...")
merged = {**current, **naming_config}
resp = http.put(
f"{base_url}/config/naming",
headers={"X-Api-Key": api_key, "Content-Type": "application/json"},
json=merged,
timeout=30,
)
resp.raise_for_status()
print("Naming configuration updated")
# -- Health checks -----------------------------------------------------------
def run_health_checks(base_url, api_key, name, cfg):
"""Run connectivity health checks if enabled."""
if not cfg["healthChecks"]:
return
print(f"Running {name} health checks...")
max_retries = cfg["healthCheckRetries"]
interval = cfg["healthCheckInterval"]
if cfg.get("downloadClients"):
print("Testing download client connectivity...")
health_check_loop(
f"{base_url}/downloadclient/testall",
api_key,
"download client",
name,
max_retries,
interval,
)
if cfg.get("syncedApps"):
print("Testing synced application connectivity...")
health_check_loop(
f"{base_url}/applications/testall",
api_key,
"synced application",
name,
max_retries,
interval,
)
print(f"{name} health checks passed")
# -- Main --------------------------------------------------------------------
def main():
cfg = load_config()
name = cfg["name"]
data_dir = cfg["dataDir"]
bind_address = cfg.get("bindAddress", "127.0.0.1")
port = cfg["port"]
api_version = cfg["apiVersion"]
api_timeout = cfg["apiTimeout"]
import os
config_xml = f"{data_dir}/config.xml"
if not os.path.isfile(config_xml):
print(f"Config file {config_xml} not found, skipping {name} init")
return
api_key = read_api_key_xml(config_xml)
base_url = f"http://{bind_address}:{port}/api/{api_version}"
wait_for_api(base_url, api_key, api_timeout, name)
if cfg.get("downloadClients"):
ensure_download_clients(base_url, api_key, cfg["downloadClients"])
if cfg.get("rootFolders"):
ensure_root_folders(base_url, api_key, cfg["rootFolders"])
if cfg.get("syncedApps"):
ensure_synced_apps(base_url, api_key, cfg["syncedApps"])
if cfg.get("naming"):
update_naming(base_url, api_key, cfg["naming"])
run_health_checks(base_url, api_key, name, cfg)
print(f"{name} init complete")
if __name__ == "__main__":
main()