This repository has been archived on 2026-04-18. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
server-config/tests/jellyfin-test-lib.py
Simon Gardling 297264a34a
Some checks failed
Build and Deploy / deploy (push) Failing after 2m35s
tests: extract shared jellyfin test helpers and use real jellyfin in annotations test
2026-04-01 11:24:44 -04:00

91 lines
3.0 KiB
Python

import json
from urllib.parse import urlencode
def jellyfin_api(machine, method, path, auth_header, token=None, data_file=None, data=None):
hdr = auth_header + (f", Token={token}" if token else "")
cmd = f"curl -sf -X {method} 'http://localhost:8096{path}'"
if data_file:
cmd += f" -d '@{data_file}' -H 'Content-Type:application/json'"
elif data:
payload = json.dumps(data) if isinstance(data, dict) else data
cmd += f" -d '{payload}' -H 'Content-Type:application/json'"
cmd += f" -H 'X-Emby-Authorization:{hdr}'"
return machine.succeed(cmd)
def setup_jellyfin(machine, retry, auth_header, auth_payload, empty_payload):
machine.wait_for_unit("jellyfin.service")
machine.wait_for_open_port(8096)
machine.wait_until_succeeds(
"curl -sf http://localhost:8096/health | grep -q Healthy", timeout=60
)
machine.wait_until_succeeds(
f"curl -sf 'http://localhost:8096/Startup/Configuration' "
f"-H 'X-Emby-Authorization:{auth_header}'"
)
jellyfin_api(machine, "GET", "/Startup/FirstUser", auth_header)
jellyfin_api(machine, "POST", "/Startup/Complete", auth_header)
result = json.loads(
jellyfin_api(
machine, "POST", "/Users/AuthenticateByName",
auth_header, data_file=auth_payload,
)
)
token = result["AccessToken"]
user_id = result["User"]["Id"]
tempdir = machine.succeed("mktemp -d -p /var/lib/jellyfin").strip()
machine.succeed(f"chmod 755 '{tempdir}'")
machine.succeed(
f"ffmpeg -f lavfi -i testsrc2=duration=5 -f lavfi -i sine=frequency=440:duration=5 "
f"-c:v libx264 -c:a aac '{tempdir}/Test Movie (2024).mkv'"
)
query = urlencode({
"name": "Test Library",
"collectionType": "Movies",
"paths": tempdir,
"refreshLibrary": "true",
})
jellyfin_api(
machine, "POST", f"/Library/VirtualFolders?{query}",
auth_header, token=token, data_file=empty_payload,
)
def is_ready(_):
folders = json.loads(
jellyfin_api(machine, "GET", "/Library/VirtualFolders", auth_header, token=token)
)
return all(f.get("RefreshStatus") == "Idle" for f in folders)
retry(is_ready, timeout=60)
movie_id = None
media_source_id = None
def get_movie(_):
nonlocal movie_id, media_source_id
items = json.loads(
jellyfin_api(
machine, "GET",
f"/Users/{user_id}/Items?IncludeItemTypes=Movie&Recursive=true",
auth_header, token=token,
)
)
if items["TotalRecordCount"] > 0:
movie_id = items["Items"][0]["Id"]
info = json.loads(
jellyfin_api(
machine, "GET", f"/Users/{user_id}/Items/{movie_id}",
auth_header, token=token,
)
)
media_source_id = info["MediaSources"][0]["Id"]
return True
return False
retry(get_movie, timeout=60)
return token, user_id, movie_id, media_source_id