import json from urllib.parse import urlencode def jellyfin_api(machine, method, path, auth_header, token=None, data_file=None, data=None): hdr = auth_header + (f", Token={token}" if token else "") cmd = f"curl -sf -X {method} 'http://localhost:8096{path}'" if data_file: cmd += f" -d '@{data_file}' -H 'Content-Type:application/json'" elif data: payload = json.dumps(data) if isinstance(data, dict) else data cmd += f" -d '{payload}' -H 'Content-Type:application/json'" cmd += f" -H 'X-Emby-Authorization:{hdr}'" return machine.succeed(cmd) def setup_jellyfin(machine, retry, auth_header, auth_payload, empty_payload): machine.wait_for_unit("jellyfin.service") machine.wait_for_open_port(8096) machine.wait_until_succeeds( "curl -sf http://localhost:8096/health | grep -q Healthy", timeout=120 ) machine.wait_until_succeeds( f"curl -sf 'http://localhost:8096/Startup/Configuration' " f"-H 'X-Emby-Authorization:{auth_header}'" ) jellyfin_api(machine, "GET", "/Startup/FirstUser", auth_header) jellyfin_api(machine, "POST", "/Startup/Complete", auth_header) result = json.loads( jellyfin_api( machine, "POST", "/Users/AuthenticateByName", auth_header, data_file=auth_payload, ) ) token = result["AccessToken"] user_id = result["User"]["Id"] tempdir = machine.succeed("mktemp -d -p /var/lib/jellyfin").strip() machine.succeed(f"chmod 755 '{tempdir}'") machine.succeed( f"ffmpeg -f lavfi -i testsrc2=duration=5 -f lavfi -i sine=frequency=440:duration=5 " f"-c:v libx264 -c:a aac '{tempdir}/Test Movie (2024).mkv'" ) query = urlencode({ "name": "Test Library", "collectionType": "Movies", "paths": tempdir, "refreshLibrary": "true", }) jellyfin_api( machine, "POST", f"/Library/VirtualFolders?{query}", auth_header, token=token, data_file=empty_payload, ) def is_ready(_): folders = json.loads( jellyfin_api(machine, "GET", "/Library/VirtualFolders", auth_header, token=token) ) return all(f.get("RefreshStatus") == "Idle" for f in folders) retry(is_ready, timeout=60) movie_id = None media_source_id = None def get_movie(_): nonlocal movie_id, media_source_id items = json.loads( jellyfin_api( machine, "GET", f"/Users/{user_id}/Items?IncludeItemTypes=Movie&Recursive=true", auth_header, token=token, ) ) if items["TotalRecordCount"] > 0: movie_id = items["Items"][0]["Id"] info = json.loads( jellyfin_api( machine, "GET", f"/Users/{user_id}/Items/{movie_id}", auth_header, token=token, ) ) media_source_id = info["MediaSources"][0]["Id"] return True return False retry(get_movie, timeout=60) return token, user_id, movie_id, media_source_id