Files
tidal_downloader/dl.py
2026-03-11 09:43:31 -04:00

1265 lines
51 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import os
import sys
import time
import json
import glob
import stat
import shutil
import pathlib
import datetime
import subprocess
import argparse
from typing import Dict, Any, List, Optional
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
class LidarrMusicVideoAutomator:
def __init__(self) -> None:
self.script_version = "2.2"
self.script_name = "Lidarr-MusicVideoAutomator"
self.docker_path = "/config"
self.arr_app = "Lidarr"
self.log_file_path: Optional[str] = None
self.log_file_handle = None
# Config / state values (populated per .conf)
self.config: Dict[str, Any] = {}
self.arr_url: str = ""
self.arr_api_key: str = ""
self.lidarr_music_video_automator: str = ""
self.lidarr_music_video_automator_interval: int = 3600
self.lidarr_temp_path: str = ""
self.lidarr_library_path: str = ""
self.music_video_format: str = "mkv"
self.require_min_quality: str = "SD"
self.enable_live_videos: bool = True
self.enable_visualizer_videos: bool = True
self.enable_lyric_videos: bool = True
self.enable_behind_scenes_videos: bool = True
self.enable_interview_videos: bool = True
self.enable_episode_videos: bool = True
self.tidal_country_code: str = "US"
self.log_folder: str = ""
# Per-artist / per-video state
self.lidarr_artist_count: int = 0
self.process_count: int = 0
self.lidarr_artist_name: str = ""
self.lidarr_artist_musicbrainz_id: str = ""
self.lidarr_artist_folder: str = ""
self.lidarr_artist_genres: List[str] = []
self.video_ids_count: int = 0
self.video_id_process: int = 0
self.video_title: str = ""
self.video_artist: str = ""
self.video_year: str = ""
self.video_date: str = ""
self.video_type: str = "Music Video"
self.video_type_filename: str = "video"
self.video_thumbnail_url: str = ""
self.video_artists: List[Dict[str, Any]] = []
self.video_artists_ids: List[int] = []
self.explicit_title_tag: str = ""
self.advisory: str = ""
self.completed_filename_no_ext: str = ""
self.thumbnail_file: str = ""
self.failed_download_count: int = 0
self.video_unavailable: bool = False
self.tidal_failure: bool = False
self.cli_args: Optional[argparse.Namespace] = None
# ---------- helpers ----------
def log(self, msg: str) -> None:
ts = datetime.datetime.now().strftime("%F %T")
line = f"{ts} :: {self.script_name} (v{self.script_version}) :: {msg}"
print(line, flush=True)
if self.log_file_handle is not None:
try:
self.log_file_handle.write(line + "\n")
self.log_file_handle.flush()
except Exception:
pass
def run_cmd(
self, args: List[str], check: bool = True, capture: bool = False
) -> subprocess.CompletedProcess:
return subprocess.run(
args,
check=check,
capture_output=capture,
text=True,
)
def http_get_json(self, url: str, headers: Optional[Dict[str, str]] = None) -> Any:
req = Request(url, headers=headers or {})
try:
with urlopen(req, timeout=None) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
except (URLError, HTTPError, json.JSONDecodeError) as e:
self.log(f"ERROR :: HTTP/JSON error for {url}: {e}")
return None
def load_conf_file(self, path: str) -> Dict[str, str]:
cfg: Dict[str, str] = {}
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export ") :].strip()
if "=" not in line:
continue
key, val = line.split("=", 1)
key = key.strip()
val = val.strip()
if (val.startswith('"') and val.endswith('"')) or (
val.startswith("'") and val.endswith("'")
):
val = val[1:-1]
cfg[key] = val
except FileNotFoundError:
self.log(f"ERROR :: Config file {path} not found")
return cfg
def bool_from_cfg(self, key: str, default: bool = True) -> bool:
val = str(self.config.get(key, "true" if default else "false")).strip().lower()
return val == "true"
def apply_cli_overrides_to_settings(self) -> None:
args = self.cli_args
if not args:
return
if getattr(args, "lidarrUrl", None):
self.arr_url = args.lidarrUrl.strip()
if getattr(args, "lidarrApiKey", None):
self.arr_api_key = args.lidarrApiKey.strip()
if getattr(args, "lidarrMusicVideoAutomator", None):
self.lidarr_music_video_automator = args.lidarrMusicVideoAutomator.strip()
if getattr(args, "lidarrMusicVideoTempDownloadPath", None):
self.lidarr_temp_path = args.lidarrMusicVideoTempDownloadPath.strip()
if getattr(args, "lidarrMusicVideoLibrary", None):
self.lidarr_library_path = args.lidarrMusicVideoLibrary.strip()
if getattr(args, "musicVideoFormat", None):
self.music_video_format = args.musicVideoFormat.strip()
if getattr(args, "requireMinimumVideoQaulity", None):
self.require_min_quality = args.requireMinimumVideoQaulity.strip()
if getattr(args, "tidalCountryCode", None):
self.tidal_country_code = args.tidalCountryCode.strip() or "US"
if getattr(args, "logFolder", None):
self.log_folder = args.logFolder.strip()
if getattr(args, "lidarrMusicVideoAutomatorInterval", None) is not None:
self.lidarr_music_video_automator_interval = (
args.lidarrMusicVideoAutomatorInterval
)
if getattr(args, "enableLiveVideos", None) is not None:
self.enable_live_videos = args.enableLiveVideos
if getattr(args, "enableVisualizerVideos", None) is not None:
self.enable_visualizer_videos = args.enableVisualizerVideos
if getattr(args, "enableLyricVideos", None) is not None:
self.enable_lyric_videos = args.enableLyricVideos
if getattr(args, "enableBehindTheScenesVideos", None) is not None:
self.enable_behind_scenes_videos = args.enableBehindTheScenesVideos
if getattr(args, "enableInterviewVideos", None) is not None:
self.enable_interview_videos = args.enableInterviewVideos
if getattr(args, "enableEpisodeVideos", None) is not None:
self.enable_episode_videos = args.enableEpisodeVideos
# ---------- script function equivalents ----------
def install_dependencies(self) -> None:
try:
res = self.run_cmd(["apk", "--no-cache", "list"], capture=True, check=False)
if "ffmpeg" in (res.stdout or ""):
self.log("Dependencies already installed, skipping...")
return
except FileNotFoundError:
# apk not available likely not Alpine; just skip
self.log("apk not found, skipping dependency installation...")
return
self.log("Installing script dependencies....")
try:
self.run_cmd(
[
"apk",
"add",
"-U",
"--update",
"--no-cache",
"tidyhtml",
"ffmpeg",
"jq",
"xq",
"libstdc++",
"mkvtoolnix",
],
check=True,
)
self.log("done")
self.run_cmd(
[
"apk",
"add",
"atomicparsley",
"--repository=https://dl-cdn.alpinelinux.org/alpine/edge/testing",
],
check=True,
)
except subprocess.CalledProcessError as e:
self.log(f"ERROR :: Dependency install failed: {e}")
try:
self.run_cmd(
[
"python3",
"-m",
"pip",
"install",
"tidal-dl-ng-For-DJ",
"--upgrade",
"--break-system-packages",
],
check=False,
)
except Exception as e:
self.log(f"WARNING :: pip install failed: {e}")
def configure_tidal_dl(self) -> None:
self.log("Configuring tidal-dl-ng client")
temp_path = self.lidarr_temp_path
try:
self.run_cmd(["tidal-dl-ng", "cfg", "quality_video", "1080"], check=False)
self.run_cmd(
["tidal-dl-ng", "cfg", "format_video", "{track_title}"], check=False
)
self.run_cmd(
["tidal-dl-ng", "cfg", "path_binary_ffmpeg", "/usr/bin/ffmpeg"],
check=False,
)
self.run_cmd(
["tidal-dl-ng", "cfg", "download_base_path", temp_path], check=False
)
except FileNotFoundError:
self.log("ERROR :: tidal-dl-ng not found in PATH")
self.tidal_failure = True
return
token_path = "/root/.config/tidal_dl_ng-dev/token.json"
if os.path.isfile(token_path):
try:
with open(token_path, "r", encoding="utf-8") as f:
content = f.read()
if "null" in content:
self.log("tidal-dl-ng requires authentication, authenticate now:")
self.log(
"login manually using the following command: tidal-dl-ng login"
)
self.tidal_failure = True
except Exception as e:
self.log(f"ERROR :: Failed reading tidal token: {e}")
self.tidal_failure = True
else:
self.log("tidal-dl-ng requires authentication, authenticate now:")
self.log("login manually using the following command: tidal-dl-ng login")
self.tidal_failure = True
def verify_config(self) -> None:
if self.lidarr_music_video_automator.lower() != "true":
self.log(
'Script is not enabled, enable by setting lidarrMusicVideoAutomator to "true" in /config/<filename>.conf...'
)
self.log("Sleeping (infinity)")
while True:
time.sleep(3600)
def settings_from_conf(self, conf_path: str) -> None:
self.log(f"Import Script {conf_path} Settings...")
self.config = self.load_conf_file(conf_path)
self.arr_url = self.config.get("lidarrUrl", "").strip()
self.arr_api_key = self.config.get("lidarrApiKey", "").strip()
self.lidarr_music_video_automator = self.config.get(
"lidarrMusicVideoAutomator", ""
).strip()
self.lidarr_temp_path = self.config.get(
"lidarrMusicVideoTempDownloadPath", "/tmp/lidarr-mv-temp"
).strip()
self.lidarr_library_path = self.config.get(
"lidarrMusicVideoLibrary", "/music-videos"
).strip()
self.music_video_format = self.config.get("musicVideoFormat", "mkv").strip()
self.require_min_quality = self.config.get(
"requireMinimumVideoQaulity", "SD"
).strip()
self.tidal_country_code = (
self.config.get("tidalCountryCode", "US").strip() or "US"
)
self.log_folder = self.config.get(
"logFolder", os.path.join(self.docker_path, "tidal-video-logs")
).strip()
interval_str = self.config.get("lidarrMusicVideoAutomatorInterval", "3600")
try:
self.lidarr_music_video_automator_interval = int(interval_str)
except ValueError:
self.lidarr_music_video_automator_interval = 3600
self.enable_live_videos = self.bool_from_cfg("enableLiveVideos", True)
self.enable_visualizer_videos = self.bool_from_cfg(
"enableVisualizerVideos", True
)
self.enable_lyric_videos = self.bool_from_cfg("enableLyricVideos", True)
self.enable_behind_scenes_videos = self.bool_from_cfg(
"enableBehindTheScenesVideos", True
)
self.enable_interview_videos = self.bool_from_cfg("enableInterviewVideos", True)
self.enable_episode_videos = self.bool_from_cfg("enableEpisodeVideos", True)
self.apply_cli_overrides_to_settings()
def logfile_setup(self) -> None:
logs_dir = os.path.join(self.docker_path, "logs")
os.makedirs(logs_dir, exist_ok=True)
self.log_file_name = (
f"{self.script_name}-"
f"{datetime.datetime.now().strftime('%Y_%m_%d_%I_%M_%p')}.txt"
)
self.log_file_path = os.path.join(logs_dir, self.log_file_name)
# Keep last 4 (current + 3 previous) and also delete >5 days old
pattern = os.path.join(logs_dir, f"{self.script_name}-*.txt")
files = sorted(
glob.glob(pattern), key=lambda p: os.path.getmtime(p), reverse=True
)
if len(files) > 4:
for old in files[4:]:
try:
os.remove(old)
except OSError:
pass
now = time.time()
for f in files:
try:
mtime = os.path.getmtime(f)
if (now - mtime) > 5 * 86400:
os.remove(f)
# do not break; keep scanning all
except OSError:
pass
# Create current log file
pathlib.Path(self.log_file_path).touch()
puid = int(os.getenv("PUID", "1000"))
pgid = int(os.getenv("PGID", "1000"))
try:
os.chown(self.log_file_path, puid, pgid)
except PermissionError:
pass
os.chmod(self.log_file_path, 0o666)
# (Re)open handle
if self.log_file_handle is not None:
try:
self.log_file_handle.close()
except Exception:
pass
self.log_file_handle = open(self.log_file_path, "a", encoding="utf-8")
# ---------- media helpers ----------
def thumbnail_downloader(self) -> None:
self.thumbnail_file = os.path.join(
self.lidarr_temp_path, f"{self.completed_filename_no_ext}.jpg"
)
if not os.path.isfile(self.thumbnail_file):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Downloading Thumbnail"
)
try:
with (
urlopen(self.video_thumbnail_url, timeout=None) as resp,
open(self.thumbnail_file, "wb") as out,
):
shutil.copyfileobj(resp, out)
except Exception as e:
self.log(f"ERROR :: Thumbnail download failed: {e}")
def nfo_writer(self) -> None:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Writing NFO"
)
nfo = os.path.join(
self.lidarr_temp_path, f"{self.completed_filename_no_ext}.nfo"
)
if os.path.isfile(nfo):
try:
os.remove(nfo)
except OSError:
pass
genres = self.lidarr_artist_genres or [self.video_type]
with open(nfo, "w", encoding="utf-8") as f:
f.write("<musicvideo>\n")
f.write(f"\t<title>{self.video_title}{self.explicit_title_tag}</title>\n")
f.write("\t<userrating/>\n")
f.write("\t<track/>\n")
f.write("\t<studio/>\n")
for g in genres:
f.write(f"\t<genre>{g}</genre>\n")
# original script writes videoType twice
f.write(f"\t<genre>{self.video_type}</genre>\n")
f.write("\t<premiered/>\n")
f.write(f"\t<year>{self.video_year}</year>\n")
for artist in self.video_artists:
name = artist.get("name", "")
f.write(f"\t<artist>{name}</artist>\n")
f.write("\t<albumArtistCredits>\n")
f.write(f"\t\t<artist>{self.lidarr_artist_name}</artist>\n")
f.write(
f"\t\t<musicBrainzArtistID>{self.lidarr_artist_musicbrainz_id}</musicBrainzArtistID>\n"
)
f.write("\t</albumArtistCredits>\n")
f.write(f"\t<thumb>{self.completed_filename_no_ext}.jpg</thumb>\n")
f.write("\t<source>tidal</source>\n")
f.write("</musicvideo>\n")
try:
self.run_cmd(
["tidy", "-w", "2000", "-i", "-m", "-xml", nfo],
check=False,
)
except FileNotFoundError:
pass
os.chmod(nfo, 0o666)
def completed_file_mover(self) -> None:
puid = int(os.getenv("PUID", "1000"))
pgid = int(os.getenv("PGID", "1000"))
lib_dir = self.lidarr_library_path
artist_dir = os.path.join(lib_dir, self.lidarr_artist_folder)
if not os.path.isdir(lib_dir):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Creating Library Folder"
)
os.makedirs(lib_dir, exist_ok=True)
try:
os.chown(lib_dir, puid, pgid)
except PermissionError:
pass
os.chmod(lib_dir, 0o777)
if not os.path.isdir(artist_dir):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Creating Artist Folder: {self.lidarr_artist_folder}"
)
os.makedirs(artist_dir, exist_ok=True)
try:
os.chown(artist_dir, puid, pgid)
except PermissionError:
pass
os.chmod(artist_dir, 0o777)
for ext in ("mp4", "mkv", "jpg", "nfo"):
src = os.path.join(
self.lidarr_temp_path, f"{self.completed_filename_no_ext}.{ext}"
)
dst = os.path.join(artist_dir, f"{self.completed_filename_no_ext}.{ext}")
if os.path.isfile(src):
if not os.path.isfile(dst):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Moving compeleted {ext.upper()} file to libary"
)
shutil.move(src, dst)
os.chmod(dst, 0o666)
else:
msg_type = {
"mp4": "Video",
"mkv": "Video",
"jpg": "Thumbnail",
"nfo": "NFO",
}[ext]
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: {msg_type} Previously Imported"
)
def download_video(self, url: str) -> None:
self.video_unavailable = False
self.failed_download_count = max(self.failed_download_count, 0)
os.environ["TMPDIR"] = self.lidarr_temp_path
if os.path.isdir(self.lidarr_temp_path):
for p in pathlib.Path(self.lidarr_temp_path).glob("*"):
if p.is_file():
p.unlink()
elif p.is_dir():
shutil.rmtree(p, ignore_errors=True)
else:
os.makedirs(self.lidarr_temp_path, exist_ok=True)
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Downloading Video..."
)
try:
proc = self.run_cmd(["tidal-dl-ng", "dl", url], capture=True, check=False)
output = (proc.stdout or "") + (proc.stderr or "")
if "Media not found" in output:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Media Unavailable!"
)
self.failed_download_count = max(self.failed_download_count - 1, 0)
self.video_unavailable = True
except FileNotFoundError:
self.log("ERROR :: tidal-dl-ng not found when downloading")
self.failed_download_count += 1
any_mp4 = False
for root, _, files in os.walk(self.lidarr_temp_path):
if any(f.lower().endswith(".mp4") for f in files):
any_mp4 = True
break
if any_mp4:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Download Complete!"
)
self.failed_download_count = 0
else:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Download Failed!"
)
self.failed_download_count += 1
def tag_mp4(self) -> None:
for root, _, files in os.walk(self.lidarr_temp_path):
for fname in files:
if not fname.lower().endswith(".mp4"):
continue
fpath = os.path.join(root, fname)
file_name_no_ext = os.path.splitext(fname)[0]
self.completed_filename_no_ext = (
f"{file_name_no_ext}-{self.video_type_filename}"
)
temp_mp4 = os.path.join(self.lidarr_temp_path, "temp.mp4")
shutil.move(fpath, temp_mp4)
self.thumbnail_downloader()
if self.lidarr_artist_genres:
out = " / ".join(self.lidarr_artist_genres)
genre = out
else:
genre = ""
try:
self.run_cmd(
[
"AtomicParsley",
temp_mp4,
"--title",
f"{self.video_title}{self.explicit_title_tag}",
"--year",
self.video_year,
"--artist",
self.lidarr_artist_name,
"--albumArtist",
self.lidarr_artist_name,
"--genre",
genre,
"--advisory",
self.advisory,
"--artwork",
self.thumbnail_file,
"-o",
os.path.join(
self.lidarr_temp_path,
f"{self.completed_filename_no_ext}.mp4",
),
],
check=True,
)
except FileNotFoundError:
self.log("ERROR :: AtomicParsley not found")
except subprocess.CalledProcessError as e:
self.log(f"ERROR :: AtomicParsley failed: {e}")
out_mp4 = os.path.join(
self.lidarr_temp_path,
f"{self.completed_filename_no_ext}.mp4",
)
if os.path.isfile(out_mp4):
try:
os.remove(temp_mp4)
except OSError:
pass
self.nfo_writer()
self.completed_file_mover()
def remux_to_mkv(self) -> None:
for root, _, files in os.walk(self.lidarr_temp_path):
for fname in files:
if not fname.lower().endswith(".mp4"):
continue
fpath = os.path.join(root, fname)
file_name_no_ext = os.path.splitext(fname)[0]
self.completed_filename_no_ext = (
f"{file_name_no_ext}-{self.video_type_filename}"
)
target = os.path.join(
self.lidarr_library_path,
self.lidarr_artist_folder,
f"{self.completed_filename_no_ext}.mkv",
)
if os.path.isfile(target):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Alreday in library, performing cleanup"
)
os.remove(fpath)
continue
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Detecting video quality..."
)
video_quality = "SD"
try:
proc = self.run_cmd(
["mkvmerge", "-J", fpath], capture=True, check=True
)
data = json.loads(proc.stdout)
dims = ""
for t in data.get("tracks", []):
if t.get("type") == "video":
dims = t.get("properties", {}).get("pixel_dimensions", "")
break
if "1920x" in dims:
video_quality = "FHD"
elif "1280x" in dims:
video_quality = "HD"
else:
video_quality = "SD"
except (
FileNotFoundError,
subprocess.CalledProcessError,
json.JSONDecodeError,
):
pass
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Video quality is: {video_quality}"
)
# enforce minimum quality
if self.require_min_quality == "FHD" and video_quality in ("HD", "SD"):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Video does not meet required minimum quality: {self.require_min_quality}"
)
os.remove(fpath)
continue
elif self.require_min_quality == "HD" and video_quality == "SD":
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Video does not meet required minimum quality: {self.require_min_quality}"
)
os.remove(fpath)
continue
self.thumbnail_downloader()
if self.lidarr_artist_genres:
genre = " / ".join(self.lidarr_artist_genres)
else:
genre = ""
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Remuxing file to MKV and Tagging"
)
out_mkv = os.path.join(
self.lidarr_temp_path,
f"{self.completed_filename_no_ext}.mkv",
)
try:
self.run_cmd(
[
"ffmpeg",
"-y",
"-i",
fpath,
"-c",
"copy",
"-metadata",
f"TITLE={self.video_title}{self.explicit_title_tag}",
"-metadata",
f"DATE_RELEASE={self.video_date}",
"-metadata",
f"DATE={self.video_date}",
"-metadata",
f"YEAR={self.video_year}",
"-metadata",
f"GENRE={genre}",
"-metadata",
f"ARTIST={self.lidarr_artist_name}",
"-metadata",
f"ARTISTS={self.lidarr_artist_name}",
"-metadata",
f"ALBUMARTIST={self.lidarr_artist_name}",
"-metadata",
"ENCODED_BY=tidal",
"-attach",
self.thumbnail_file,
"-metadata:s:t",
"mimetype=image/jpeg",
out_mkv,
],
check=True,
)
os.chmod(out_mkv, 0o666)
except FileNotFoundError:
self.log("ERROR :: ffmpeg not found")
except subprocess.CalledProcessError as e:
self.log(f"ERROR :: ffmpeg failed: {e}")
if os.path.isfile(out_mkv):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Removing source file for remuxing..."
)
os.remove(fpath)
self.nfo_writer()
self.completed_file_mover()
# ---------- tidal + Lidarr orchestration ----------
def tidal_process(self, tidal_artist_id: int) -> None:
url = (
f"https://api.tidal.com/v1/artists/{tidal_artist_id}/videos"
f"?countryCode={self.tidal_country_code}&offset=0&limit=1000"
)
headers = {"x-tidal-token": "CzET4vdadNUFQ5JU"}
videos_data = self.http_get_json(url, headers=headers)
if not videos_data:
return
items = videos_data.get("items", [])
items = sorted(items, key=lambda x: x.get("releaseDate", ""), reverse=True)
self.video_ids_count = len(items)
self.video_id_process = 0
for item in items:
self.video_id_process += 1
self.video_title = item.get("title", "")
self.video_artist = item.get("artist", {}).get("name", "")
video_main_artist_id = item.get("artist", {}).get("id")
video_explicit = bool(item.get("explicit", False))
release_date = str(item.get("releaseDate", ""))[:10]
self.video_date = release_date
self.video_year = release_date[:4]
image_id = item.get("imageId", "")
image_id_fix = image_id.replace("-", "/")
self.video_thumbnail_url = (
f"https://resources.tidal.com/images/{image_id_fix}/750x500.jpg"
)
self.video_artists = item.get("artists", []) or []
self.video_artists_ids = [
a.get("id") for a in self.video_artists if "id" in a
]
# Detect video type
self.video_type = "Music Video"
self.video_type_filename = "video"
title_lower = self.video_title.lower()
if "visualizer" in title_lower:
self.video_type = "Visualizer"
if not self.enable_live_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Visualizer Video detected, skipping..."
)
continue
elif "visualiser" in title_lower:
self.video_type = "Visualiser"
if not self.enable_visualizer_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Visualizer Video detected, skipping..."
)
continue
elif "video" in title_lower and "lyric" in title_lower:
self.video_type = "Lyric"
self.video_type_filename = "lyrics"
if not self.enable_lyric_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Lyric Video detected, skipping..."
)
continue
elif "live" in title_lower:
self.video_type = "Live"
self.video_type_filename = "live"
if not self.enable_live_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Live Video detected, skipping..."
)
continue
elif "behind the scenes" in title_lower or "making of" in title_lower:
self.video_type = "Behind the Scenes"
self.video_type_filename = "behindthescenes"
if not self.enable_behind_scenes_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Behind The Scenes Video detected, skipping..."
)
continue
elif "intreview" in title_lower:
self.video_type = "Interview"
self.video_type_filename = "interview"
if not self.enable_interview_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Interview Video detected, skipping..."
)
continue
elif "episode" in title_lower:
self.video_type = "Episode"
self.video_type_filename = "behindthescenes"
if not self.enable_episode_videos:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Episode Video detected, skipping..."
)
continue
# skip if previously downloaded
marker = os.path.join(self.log_folder, f"video-{item.get('id')}")
if os.path.isfile(marker):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Previously downloaded, skipping..."
)
continue
# main artist mismatch check
if tidal_artist_id != video_main_artist_id:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Video Main Artist ID ({video_main_artist_id}) "
f"does not match requested Artist ID ({tidal_artist_id}), skippping..."
)
continue
if video_explicit:
self.explicit_title_tag = " 🅴"
self.advisory = "explicit"
else:
self.explicit_title_tag = ""
self.advisory = "clean"
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Processing..."
)
self.download_video(f"https://tidal.com/video/{item.get('id')}")
if self.failed_download_count >= 3:
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: ERROR :: Too many failed download attemps, exiting..."
)
sys.exit(1)
# post-download processing
any_mp4 = False
for root, _, files in os.walk(self.lidarr_temp_path):
if any(f.lower().endswith(".mp4") for f in files):
any_mp4 = True
break
if any_mp4:
if self.music_video_format.lower() == "mkv":
self.remux_to_mkv()
else:
self.tag_mp4()
elif not self.video_unavailable:
continue
# log folder / marker file
if not os.path.isdir(self.log_folder):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Creating log folder: {self.log_folder}"
)
os.makedirs(self.log_folder, exist_ok=True)
puid = int(os.getenv("PUID", "1000"))
pgid = int(os.getenv("PGID", "1000"))
try:
os.chown(self.log_folder, puid, pgid)
except PermissionError:
pass
os.chmod(self.log_folder, 0o777)
if not os.path.isfile(marker):
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} "
f":: {self.video_artist} :: {self.video_year} :: {self.video_type} "
f":: {self.video_title} :: Writing log file: {marker}"
)
pathlib.Path(marker).touch()
os.chmod(marker, 0o666)
def process_lidarr_artists(self) -> None:
if not self.arr_url:
self.log(f"ERROR :: Skipping {self.arr_app}, missing URL...")
return
if not self.arr_api_key:
self.log(f"ERROR :: Skipping {self.arr_app}, missing API Key...")
return
base = self.arr_url.rstrip("/")
url = f"{base}/api/v1/artist?apikey={self.arr_api_key}"
artists = self.http_get_json(url)
if not isinstance(artists, list):
self.log("ERROR :: Could not fetch Lidarr artists")
return
self.lidarr_artist_count = len(artists)
self.process_count = 0
for artist in artists:
self.process_count += 1
lidarr_artist_id = artist.get("id")
url_artist = (
f"{base}/api/v1/artist/{lidarr_artist_id}?apikey={self.arr_api_key}"
)
artist_data = self.http_get_json(url_artist)
if not artist_data:
continue
self.lidarr_artist_name = artist_data.get("artistName", "")
self.lidarr_artist_musicbrainz_id = artist_data.get("foreignArtistId", "")
lidarr_artist_path = artist_data.get("path", "")
folder = os.path.basename(lidarr_artist_path).split("(")[0]
self.lidarr_artist_folder = folder.strip()
links = artist_data.get("links", []) or []
tidal_urls = [
l.get("url", "")
for l in links
if l.get("name", "").lower() == "tidal" and "url" in l
]
tidal_artist_ids: List[int] = []
for u in tidal_urls:
for part in u.split("/"):
if part.isdigit():
tid = int(part)
if tid not in tidal_artist_ids:
tidal_artist_ids.append(tid)
genres = artist_data.get("genres", []) or []
self.lidarr_artist_genres = [g for g in genres if isinstance(g, str)]
self.log(
f"{self.process_count}/{self.lidarr_artist_count} :: "
f"{self.lidarr_artist_name} :: Processing..."
)
for tid in tidal_artist_ids:
self.tidal_process(tid)
# ---------- main loop ----------
def run_forever(self) -> None:
while True:
start_loop = time.time()
self.logfile_setup()
self.log("Starting...")
self.install_dependencies()
conf_files = sorted(
str(p)
for p in pathlib.Path(self.docker_path).glob("*.conf")
if p.is_file()
)
if not conf_files:
self.log("ERROR :: No config files found, exiting...")
sys.exit(1)
for conf_path in conf_files:
start_conf = time.time()
self.log(f'Processing "{conf_path}" config file')
self.settings_from_conf(conf_path)
self.verify_config()
self.configure_tidal_dl()
if self.tidal_failure:
sys.exit(1)
self.process_lidarr_artists()
duration = int(time.time() - start_conf)
days = duration // 86400
hours = (duration % 86400) // 3600
mins = (duration % 3600) // 60
secs = duration % 60
duration_output = f"{days}d:{hours}h:{mins}m:{secs}s"
self.log(f"Script Completed in {duration_output}!")
self.log(f"Sleeping {self.lidarr_music_video_automator_interval}...")
time.sleep(self.lidarr_music_video_automator_interval)
def main() -> None:
parser = argparse.ArgumentParser(
description="Lidarr Music Video Automator (TIDAL)."
)
# Core paths / service settings
parser.add_argument(
"--dockerPath",
help="Base config path (default: /config).",
default=None,
)
parser.add_argument(
"--lidarrUrl",
help="Override Lidarr URL (lidarrUrl).",
)
parser.add_argument(
"--lidarrApiKey",
help="Override Lidarr API key (lidarrApiKey).",
)
parser.add_argument(
"--lidarrMusicVideoAutomator",
choices=["true", "false"],
help="Override lidarrMusicVideoAutomator setting.",
)
parser.add_argument(
"--lidarrMusicVideoAutomatorInterval",
type=int,
help="Override lidarrMusicVideoAutomatorInterval (seconds).",
)
parser.add_argument(
"--lidarrMusicVideoTempDownloadPath",
help="Override lidarrMusicVideoTempDownloadPath.",
)
parser.add_argument(
"--lidarrMusicVideoLibrary",
help="Override lidarrMusicVideoLibrary.",
)
parser.add_argument(
"--musicVideoFormat",
choices=["mkv", "mp4"],
help="Override musicVideoFormat.",
)
parser.add_argument(
"--requireMinimumVideoQaulity",
choices=["SD", "HD", "FHD"],
help="Override requireMinimumVideoQaulity.",
)
parser.add_argument(
"--tidalCountryCode",
help="Override tidalCountryCode (e.g. US, GB).",
)
parser.add_argument(
"--logFolder",
help="Override logFolder for per-video markers.",
)
# Boolean toggles
parser.add_argument(
"--enableLiveVideos",
dest="enableLiveVideos",
action="store_true",
default=None,
help="Enable Live videos (overrides config).",
)
parser.add_argument(
"--disableLiveVideos",
dest="enableLiveVideos",
action="store_false",
help="Disable Live videos (overrides config).",
)
parser.add_argument(
"--enableVisualizerVideos",
dest="enableVisualizerVideos",
action="store_true",
default=None,
help="Enable Visualizer videos (overrides config).",
)
parser.add_argument(
"--disableVisualizerVideos",
dest="enableVisualizerVideos",
action="store_false",
help="Disable Visualizer videos (overrides config).",
)
parser.add_argument(
"--enableLyricVideos",
dest="enableLyricVideos",
action="store_true",
default=None,
help="Enable Lyric videos (overrides config).",
)
parser.add_argument(
"--disableLyricVideos",
dest="enableLyricVideos",
action="store_false",
help="Disable Lyric videos (overrides config).",
)
parser.add_argument(
"--enableBehindTheScenesVideos",
dest="enableBehindTheScenesVideos",
action="store_true",
default=None,
help="Enable Behind the Scenes videos (overrides config).",
)
parser.add_argument(
"--disableBehindTheScenesVideos",
dest="enableBehindTheScenesVideos",
action="store_false",
help="Disable Behind the Scenes videos (overrides config).",
)
parser.add_argument(
"--enableInterviewVideos",
dest="enableInterviewVideos",
action="store_true",
default=None,
help="Enable Interview videos (overrides config).",
)
parser.add_argument(
"--disableInterviewVideos",
dest="enableInterviewVideos",
action="store_false",
help="Disable Interview videos (overrides config).",
)
parser.add_argument(
"--enableEpisodeVideos",
dest="enableEpisodeVideos",
action="store_true",
default=None,
help="Enable Episode videos (overrides config).",
)
parser.add_argument(
"--disableEpisodeVideos",
dest="enableEpisodeVideos",
action="store_false",
help="Disable Episode videos (overrides config).",
)
args = parser.parse_args()
automator = LidarrMusicVideoAutomator()
automator.cli_args = args
if args.dockerPath:
automator.docker_path = args.dockerPath
automator.run_forever()
if __name__ == "__main__":
main()