#!/usr/bin/env python3 import os import sys import time import json import glob import stat import shutil import pathlib import datetime import subprocess import argparse from typing import Dict, Any, List, Optional from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError class LidarrMusicVideoAutomator: def __init__(self) -> None: self.script_version = "2.2" self.script_name = "Lidarr-MusicVideoAutomator" self.docker_path = "/config" self.arr_app = "Lidarr" self.log_file_path: Optional[str] = None self.log_file_handle = None # Config / state values (populated per .conf) self.config: Dict[str, Any] = {} self.arr_url: str = "" self.arr_api_key: str = "" self.lidarr_music_video_automator: str = "" self.lidarr_music_video_automator_interval: int = 3600 self.lidarr_temp_path: str = "" self.lidarr_library_path: str = "" self.music_video_format: str = "mkv" self.require_min_quality: str = "SD" self.enable_live_videos: bool = True self.enable_visualizer_videos: bool = True self.enable_lyric_videos: bool = True self.enable_behind_scenes_videos: bool = True self.enable_interview_videos: bool = True self.enable_episode_videos: bool = True self.tidal_country_code: str = "US" self.log_folder: str = "" # Per-artist / per-video state self.lidarr_artist_count: int = 0 self.process_count: int = 0 self.lidarr_artist_name: str = "" self.lidarr_artist_musicbrainz_id: str = "" self.lidarr_artist_folder: str = "" self.lidarr_artist_genres: List[str] = [] self.video_ids_count: int = 0 self.video_id_process: int = 0 self.video_title: str = "" self.video_artist: str = "" self.video_year: str = "" self.video_date: str = "" self.video_type: str = "Music Video" self.video_type_filename: str = "video" self.video_thumbnail_url: str = "" self.video_artists: List[Dict[str, Any]] = [] self.video_artists_ids: List[int] = [] self.explicit_title_tag: str = "" self.advisory: str = "" self.completed_filename_no_ext: str = "" self.thumbnail_file: str = "" self.failed_download_count: int = 0 self.video_unavailable: bool = False self.tidal_failure: bool = False self.cli_args: Optional[argparse.Namespace] = None # ---------- helpers ---------- def log(self, msg: str) -> None: ts = datetime.datetime.now().strftime("%F %T") line = f"{ts} :: {self.script_name} (v{self.script_version}) :: {msg}" print(line, flush=True) if self.log_file_handle is not None: try: self.log_file_handle.write(line + "\n") self.log_file_handle.flush() except Exception: pass def run_cmd( self, args: List[str], check: bool = True, capture: bool = False ) -> subprocess.CompletedProcess: return subprocess.run( args, check=check, capture_output=capture, text=True, ) def http_get_json(self, url: str, headers: Optional[Dict[str, str]] = None) -> Any: req = Request(url, headers=headers or {}) try: with urlopen(req, timeout=None) as resp: data = resp.read() return json.loads(data.decode("utf-8")) except (URLError, HTTPError, json.JSONDecodeError) as e: self.log(f"ERROR :: HTTP/JSON error for {url}: {e}") return None def load_conf_file(self, path: str) -> Dict[str, str]: cfg: Dict[str, str] = {} try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export ") :].strip() if "=" not in line: continue key, val = line.split("=", 1) key = key.strip() val = val.strip() if (val.startswith('"') and val.endswith('"')) or ( val.startswith("'") and val.endswith("'") ): val = val[1:-1] cfg[key] = val except FileNotFoundError: self.log(f"ERROR :: Config file {path} not found") return cfg def bool_from_cfg(self, key: str, default: bool = True) -> bool: val = str(self.config.get(key, "true" if default else "false")).strip().lower() return val == "true" def apply_cli_overrides_to_settings(self) -> None: args = self.cli_args if not args: return if getattr(args, "lidarrUrl", None): self.arr_url = args.lidarrUrl.strip() if getattr(args, "lidarrApiKey", None): self.arr_api_key = args.lidarrApiKey.strip() if getattr(args, "lidarrMusicVideoAutomator", None): self.lidarr_music_video_automator = args.lidarrMusicVideoAutomator.strip() if getattr(args, "lidarrMusicVideoTempDownloadPath", None): self.lidarr_temp_path = args.lidarrMusicVideoTempDownloadPath.strip() if getattr(args, "lidarrMusicVideoLibrary", None): self.lidarr_library_path = args.lidarrMusicVideoLibrary.strip() if getattr(args, "musicVideoFormat", None): self.music_video_format = args.musicVideoFormat.strip() if getattr(args, "requireMinimumVideoQaulity", None): self.require_min_quality = args.requireMinimumVideoQaulity.strip() if getattr(args, "tidalCountryCode", None): self.tidal_country_code = args.tidalCountryCode.strip() or "US" if getattr(args, "logFolder", None): self.log_folder = args.logFolder.strip() if getattr(args, "lidarrMusicVideoAutomatorInterval", None) is not None: self.lidarr_music_video_automator_interval = ( args.lidarrMusicVideoAutomatorInterval ) if getattr(args, "enableLiveVideos", None) is not None: self.enable_live_videos = args.enableLiveVideos if getattr(args, "enableVisualizerVideos", None) is not None: self.enable_visualizer_videos = args.enableVisualizerVideos if getattr(args, "enableLyricVideos", None) is not None: self.enable_lyric_videos = args.enableLyricVideos if getattr(args, "enableBehindTheScenesVideos", None) is not None: self.enable_behind_scenes_videos = args.enableBehindTheScenesVideos if getattr(args, "enableInterviewVideos", None) is not None: self.enable_interview_videos = args.enableInterviewVideos if getattr(args, "enableEpisodeVideos", None) is not None: self.enable_episode_videos = args.enableEpisodeVideos # ---------- script function equivalents ---------- def install_dependencies(self) -> None: try: res = self.run_cmd(["apk", "--no-cache", "list"], capture=True, check=False) if "ffmpeg" in (res.stdout or ""): self.log("Dependencies already installed, skipping...") return except FileNotFoundError: # apk not available – likely not Alpine; just skip self.log("apk not found, skipping dependency installation...") return self.log("Installing script dependencies....") try: self.run_cmd( [ "apk", "add", "-U", "--update", "--no-cache", "tidyhtml", "ffmpeg", "jq", "xq", "libstdc++", "mkvtoolnix", ], check=True, ) self.log("done") self.run_cmd( [ "apk", "add", "atomicparsley", "--repository=https://dl-cdn.alpinelinux.org/alpine/edge/testing", ], check=True, ) except subprocess.CalledProcessError as e: self.log(f"ERROR :: Dependency install failed: {e}") try: self.run_cmd( [ "python3", "-m", "pip", "install", "tidal-dl-ng-For-DJ", "--upgrade", "--break-system-packages", ], check=False, ) except Exception as e: self.log(f"WARNING :: pip install failed: {e}") def configure_tidal_dl(self) -> None: self.log("Configuring tidal-dl-ng client") temp_path = self.lidarr_temp_path try: self.run_cmd(["tidal-dl-ng", "cfg", "quality_video", "1080"], check=False) self.run_cmd( ["tidal-dl-ng", "cfg", "format_video", "{track_title}"], check=False ) self.run_cmd( ["tidal-dl-ng", "cfg", "path_binary_ffmpeg", "/usr/bin/ffmpeg"], check=False, ) self.run_cmd( ["tidal-dl-ng", "cfg", "download_base_path", temp_path], check=False ) except FileNotFoundError: self.log("ERROR :: tidal-dl-ng not found in PATH") self.tidal_failure = True return token_path = "/root/.config/tidal_dl_ng-dev/token.json" if os.path.isfile(token_path): try: with open(token_path, "r", encoding="utf-8") as f: content = f.read() if "null" in content: self.log("tidal-dl-ng requires authentication, authenticate now:") self.log( "login manually using the following command: tidal-dl-ng login" ) self.tidal_failure = True except Exception as e: self.log(f"ERROR :: Failed reading tidal token: {e}") self.tidal_failure = True else: self.log("tidal-dl-ng requires authentication, authenticate now:") self.log("login manually using the following command: tidal-dl-ng login") self.tidal_failure = True def verify_config(self) -> None: if self.lidarr_music_video_automator.lower() != "true": self.log( 'Script is not enabled, enable by setting lidarrMusicVideoAutomator to "true" in /config/.conf...' ) self.log("Sleeping (infinity)") while True: time.sleep(3600) def settings_from_conf(self, conf_path: str) -> None: self.log(f"Import Script {conf_path} Settings...") self.config = self.load_conf_file(conf_path) self.arr_url = self.config.get("lidarrUrl", "").strip() self.arr_api_key = self.config.get("lidarrApiKey", "").strip() self.lidarr_music_video_automator = self.config.get( "lidarrMusicVideoAutomator", "" ).strip() self.lidarr_temp_path = self.config.get( "lidarrMusicVideoTempDownloadPath", "/tmp/lidarr-mv-temp" ).strip() self.lidarr_library_path = self.config.get( "lidarrMusicVideoLibrary", "/music-videos" ).strip() self.music_video_format = self.config.get("musicVideoFormat", "mkv").strip() self.require_min_quality = self.config.get( "requireMinimumVideoQaulity", "SD" ).strip() self.tidal_country_code = ( self.config.get("tidalCountryCode", "US").strip() or "US" ) self.log_folder = self.config.get( "logFolder", os.path.join(self.docker_path, "tidal-video-logs") ).strip() interval_str = self.config.get("lidarrMusicVideoAutomatorInterval", "3600") try: self.lidarr_music_video_automator_interval = int(interval_str) except ValueError: self.lidarr_music_video_automator_interval = 3600 self.enable_live_videos = self.bool_from_cfg("enableLiveVideos", True) self.enable_visualizer_videos = self.bool_from_cfg( "enableVisualizerVideos", True ) self.enable_lyric_videos = self.bool_from_cfg("enableLyricVideos", True) self.enable_behind_scenes_videos = self.bool_from_cfg( "enableBehindTheScenesVideos", True ) self.enable_interview_videos = self.bool_from_cfg("enableInterviewVideos", True) self.enable_episode_videos = self.bool_from_cfg("enableEpisodeVideos", True) self.apply_cli_overrides_to_settings() def logfile_setup(self) -> None: logs_dir = os.path.join(self.docker_path, "logs") os.makedirs(logs_dir, exist_ok=True) self.log_file_name = ( f"{self.script_name}-" f"{datetime.datetime.now().strftime('%Y_%m_%d_%I_%M_%p')}.txt" ) self.log_file_path = os.path.join(logs_dir, self.log_file_name) # Keep last 4 (current + 3 previous) and also delete >5 days old pattern = os.path.join(logs_dir, f"{self.script_name}-*.txt") files = sorted( glob.glob(pattern), key=lambda p: os.path.getmtime(p), reverse=True ) if len(files) > 4: for old in files[4:]: try: os.remove(old) except OSError: pass now = time.time() for f in files: try: mtime = os.path.getmtime(f) if (now - mtime) > 5 * 86400: os.remove(f) # do not break; keep scanning all except OSError: pass # Create current log file pathlib.Path(self.log_file_path).touch() puid = int(os.getenv("PUID", "1000")) pgid = int(os.getenv("PGID", "1000")) try: os.chown(self.log_file_path, puid, pgid) except PermissionError: pass os.chmod(self.log_file_path, 0o666) # (Re)open handle if self.log_file_handle is not None: try: self.log_file_handle.close() except Exception: pass self.log_file_handle = open(self.log_file_path, "a", encoding="utf-8") # ---------- media helpers ---------- def thumbnail_downloader(self) -> None: self.thumbnail_file = os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.jpg" ) if not os.path.isfile(self.thumbnail_file): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Downloading Thumbnail" ) try: with ( urlopen(self.video_thumbnail_url, timeout=None) as resp, open(self.thumbnail_file, "wb") as out, ): shutil.copyfileobj(resp, out) except Exception as e: self.log(f"ERROR :: Thumbnail download failed: {e}") def nfo_writer(self) -> None: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Writing NFO" ) nfo = os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.nfo" ) if os.path.isfile(nfo): try: os.remove(nfo) except OSError: pass genres = self.lidarr_artist_genres or [self.video_type] with open(nfo, "w", encoding="utf-8") as f: f.write("\n") f.write(f"\t{self.video_title}{self.explicit_title_tag}\n") f.write("\t\n") f.write("\t\n") f.write("\t\n") for g in genres: f.write(f"\t{g}\n") # original script writes videoType twice f.write(f"\t{self.video_type}\n") f.write("\t\n") f.write(f"\t{self.video_year}\n") for artist in self.video_artists: name = artist.get("name", "") f.write(f"\t{name}\n") f.write("\t\n") f.write(f"\t\t{self.lidarr_artist_name}\n") f.write( f"\t\t{self.lidarr_artist_musicbrainz_id}\n" ) f.write("\t\n") f.write(f"\t{self.completed_filename_no_ext}.jpg\n") f.write("\ttidal\n") f.write("\n") try: self.run_cmd( ["tidy", "-w", "2000", "-i", "-m", "-xml", nfo], check=False, ) except FileNotFoundError: pass os.chmod(nfo, 0o666) def completed_file_mover(self) -> None: puid = int(os.getenv("PUID", "1000")) pgid = int(os.getenv("PGID", "1000")) lib_dir = self.lidarr_library_path artist_dir = os.path.join(lib_dir, self.lidarr_artist_folder) if not os.path.isdir(lib_dir): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Creating Library Folder" ) os.makedirs(lib_dir, exist_ok=True) try: os.chown(lib_dir, puid, pgid) except PermissionError: pass os.chmod(lib_dir, 0o777) if not os.path.isdir(artist_dir): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Creating Artist Folder: {self.lidarr_artist_folder}" ) os.makedirs(artist_dir, exist_ok=True) try: os.chown(artist_dir, puid, pgid) except PermissionError: pass os.chmod(artist_dir, 0o777) for ext in ("mp4", "mkv", "jpg", "nfo"): src = os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.{ext}" ) dst = os.path.join(artist_dir, f"{self.completed_filename_no_ext}.{ext}") if os.path.isfile(src): if not os.path.isfile(dst): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Moving compeleted {ext.upper()} file to libary" ) shutil.move(src, dst) os.chmod(dst, 0o666) else: msg_type = { "mp4": "Video", "mkv": "Video", "jpg": "Thumbnail", "nfo": "NFO", }[ext] self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: {msg_type} Previously Imported" ) def download_video(self, url: str) -> None: self.video_unavailable = False self.failed_download_count = max(self.failed_download_count, 0) os.environ["TMPDIR"] = self.lidarr_temp_path if os.path.isdir(self.lidarr_temp_path): for p in pathlib.Path(self.lidarr_temp_path).glob("*"): if p.is_file(): p.unlink() elif p.is_dir(): shutil.rmtree(p, ignore_errors=True) else: os.makedirs(self.lidarr_temp_path, exist_ok=True) self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Downloading Video..." ) try: proc = self.run_cmd(["tidal-dl-ng", "dl", url], capture=True, check=False) output = (proc.stdout or "") + (proc.stderr or "") if "Media not found" in output: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Media Unavailable!" ) self.failed_download_count = max(self.failed_download_count - 1, 0) self.video_unavailable = True except FileNotFoundError: self.log("ERROR :: tidal-dl-ng not found when downloading") self.failed_download_count += 1 any_mp4 = False for root, _, files in os.walk(self.lidarr_temp_path): if any(f.lower().endswith(".mp4") for f in files): any_mp4 = True break if any_mp4: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Download Complete!" ) self.failed_download_count = 0 else: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Download Failed!" ) self.failed_download_count += 1 def tag_mp4(self) -> None: for root, _, files in os.walk(self.lidarr_temp_path): for fname in files: if not fname.lower().endswith(".mp4"): continue fpath = os.path.join(root, fname) file_name_no_ext = os.path.splitext(fname)[0] self.completed_filename_no_ext = ( f"{file_name_no_ext}-{self.video_type_filename}" ) temp_mp4 = os.path.join(self.lidarr_temp_path, "temp.mp4") shutil.move(fpath, temp_mp4) self.thumbnail_downloader() if self.lidarr_artist_genres: out = " / ".join(self.lidarr_artist_genres) genre = out else: genre = "" try: self.run_cmd( [ "AtomicParsley", temp_mp4, "--title", f"{self.video_title}{self.explicit_title_tag}", "--year", self.video_year, "--artist", self.lidarr_artist_name, "--albumArtist", self.lidarr_artist_name, "--genre", genre, "--advisory", self.advisory, "--artwork", self.thumbnail_file, "-o", os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.mp4", ), ], check=True, ) except FileNotFoundError: self.log("ERROR :: AtomicParsley not found") except subprocess.CalledProcessError as e: self.log(f"ERROR :: AtomicParsley failed: {e}") out_mp4 = os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.mp4", ) if os.path.isfile(out_mp4): try: os.remove(temp_mp4) except OSError: pass self.nfo_writer() self.completed_file_mover() def remux_to_mkv(self) -> None: for root, _, files in os.walk(self.lidarr_temp_path): for fname in files: if not fname.lower().endswith(".mp4"): continue fpath = os.path.join(root, fname) file_name_no_ext = os.path.splitext(fname)[0] self.completed_filename_no_ext = ( f"{file_name_no_ext}-{self.video_type_filename}" ) target = os.path.join( self.lidarr_library_path, self.lidarr_artist_folder, f"{self.completed_filename_no_ext}.mkv", ) if os.path.isfile(target): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Alreday in library, performing cleanup" ) os.remove(fpath) continue self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Detecting video quality..." ) video_quality = "SD" try: proc = self.run_cmd( ["mkvmerge", "-J", fpath], capture=True, check=True ) data = json.loads(proc.stdout) dims = "" for t in data.get("tracks", []): if t.get("type") == "video": dims = t.get("properties", {}).get("pixel_dimensions", "") break if "1920x" in dims: video_quality = "FHD" elif "1280x" in dims: video_quality = "HD" else: video_quality = "SD" except ( FileNotFoundError, subprocess.CalledProcessError, json.JSONDecodeError, ): pass self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Video quality is: {video_quality}" ) # enforce minimum quality if self.require_min_quality == "FHD" and video_quality in ("HD", "SD"): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Video does not meet required minimum quality: {self.require_min_quality}" ) os.remove(fpath) continue elif self.require_min_quality == "HD" and video_quality == "SD": self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Video does not meet required minimum quality: {self.require_min_quality}" ) os.remove(fpath) continue self.thumbnail_downloader() if self.lidarr_artist_genres: genre = " / ".join(self.lidarr_artist_genres) else: genre = "" self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Remuxing file to MKV and Tagging" ) out_mkv = os.path.join( self.lidarr_temp_path, f"{self.completed_filename_no_ext}.mkv", ) try: self.run_cmd( [ "ffmpeg", "-y", "-i", fpath, "-c", "copy", "-metadata", f"TITLE={self.video_title}{self.explicit_title_tag}", "-metadata", f"DATE_RELEASE={self.video_date}", "-metadata", f"DATE={self.video_date}", "-metadata", f"YEAR={self.video_year}", "-metadata", f"GENRE={genre}", "-metadata", f"ARTIST={self.lidarr_artist_name}", "-metadata", f"ARTISTS={self.lidarr_artist_name}", "-metadata", f"ALBUMARTIST={self.lidarr_artist_name}", "-metadata", "ENCODED_BY=tidal", "-attach", self.thumbnail_file, "-metadata:s:t", "mimetype=image/jpeg", out_mkv, ], check=True, ) os.chmod(out_mkv, 0o666) except FileNotFoundError: self.log("ERROR :: ffmpeg not found") except subprocess.CalledProcessError as e: self.log(f"ERROR :: ffmpeg failed: {e}") if os.path.isfile(out_mkv): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Removing source file for remuxing..." ) os.remove(fpath) self.nfo_writer() self.completed_file_mover() # ---------- tidal + Lidarr orchestration ---------- def tidal_process(self, tidal_artist_id: int) -> None: url = ( f"https://api.tidal.com/v1/artists/{tidal_artist_id}/videos" f"?countryCode={self.tidal_country_code}&offset=0&limit=1000" ) headers = {"x-tidal-token": "CzET4vdadNUFQ5JU"} videos_data = self.http_get_json(url, headers=headers) if not videos_data: return items = videos_data.get("items", []) items = sorted(items, key=lambda x: x.get("releaseDate", ""), reverse=True) self.video_ids_count = len(items) self.video_id_process = 0 for item in items: self.video_id_process += 1 self.video_title = item.get("title", "") self.video_artist = item.get("artist", {}).get("name", "") video_main_artist_id = item.get("artist", {}).get("id") video_explicit = bool(item.get("explicit", False)) release_date = str(item.get("releaseDate", ""))[:10] self.video_date = release_date self.video_year = release_date[:4] image_id = item.get("imageId", "") image_id_fix = image_id.replace("-", "/") self.video_thumbnail_url = ( f"https://resources.tidal.com/images/{image_id_fix}/750x500.jpg" ) self.video_artists = item.get("artists", []) or [] self.video_artists_ids = [ a.get("id") for a in self.video_artists if "id" in a ] # Detect video type self.video_type = "Music Video" self.video_type_filename = "video" title_lower = self.video_title.lower() if "visualizer" in title_lower: self.video_type = "Visualizer" if not self.enable_live_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Visualizer Video detected, skipping..." ) continue elif "visualiser" in title_lower: self.video_type = "Visualiser" if not self.enable_visualizer_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Visualizer Video detected, skipping..." ) continue elif "video" in title_lower and "lyric" in title_lower: self.video_type = "Lyric" self.video_type_filename = "lyrics" if not self.enable_lyric_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Lyric Video detected, skipping..." ) continue elif "live" in title_lower: self.video_type = "Live" self.video_type_filename = "live" if not self.enable_live_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Live Video detected, skipping..." ) continue elif "behind the scenes" in title_lower or "making of" in title_lower: self.video_type = "Behind the Scenes" self.video_type_filename = "behindthescenes" if not self.enable_behind_scenes_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Behind The Scenes Video detected, skipping..." ) continue elif "intreview" in title_lower: self.video_type = "Interview" self.video_type_filename = "interview" if not self.enable_interview_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Interview Video detected, skipping..." ) continue elif "episode" in title_lower: self.video_type = "Episode" self.video_type_filename = "behindthescenes" if not self.enable_episode_videos: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Episode Video detected, skipping..." ) continue # skip if previously downloaded marker = os.path.join(self.log_folder, f"video-{item.get('id')}") if os.path.isfile(marker): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Previously downloaded, skipping..." ) continue # main artist mismatch check if tidal_artist_id != video_main_artist_id: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Video Main Artist ID ({video_main_artist_id}) " f"does not match requested Artist ID ({tidal_artist_id}), skippping..." ) continue if video_explicit: self.explicit_title_tag = " 🅴" self.advisory = "explicit" else: self.explicit_title_tag = "" self.advisory = "clean" self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Processing..." ) self.download_video(f"https://tidal.com/video/{item.get('id')}") if self.failed_download_count >= 3: self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: ERROR :: Too many failed download attemps, exiting..." ) sys.exit(1) # post-download processing any_mp4 = False for root, _, files in os.walk(self.lidarr_temp_path): if any(f.lower().endswith(".mp4") for f in files): any_mp4 = True break if any_mp4: if self.music_video_format.lower() == "mkv": self.remux_to_mkv() else: self.tag_mp4() elif not self.video_unavailable: continue # log folder / marker file if not os.path.isdir(self.log_folder): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Creating log folder: {self.log_folder}" ) os.makedirs(self.log_folder, exist_ok=True) puid = int(os.getenv("PUID", "1000")) pgid = int(os.getenv("PGID", "1000")) try: os.chown(self.log_folder, puid, pgid) except PermissionError: pass os.chmod(self.log_folder, 0o777) if not os.path.isfile(marker): self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: {self.video_id_process}/{self.video_ids_count} " f":: {self.video_artist} :: {self.video_year} :: {self.video_type} " f":: {self.video_title} :: Writing log file: {marker}" ) pathlib.Path(marker).touch() os.chmod(marker, 0o666) def process_lidarr_artists(self) -> None: if not self.arr_url: self.log(f"ERROR :: Skipping {self.arr_app}, missing URL...") return if not self.arr_api_key: self.log(f"ERROR :: Skipping {self.arr_app}, missing API Key...") return base = self.arr_url.rstrip("/") url = f"{base}/api/v1/artist?apikey={self.arr_api_key}" artists = self.http_get_json(url) if not isinstance(artists, list): self.log("ERROR :: Could not fetch Lidarr artists") return self.lidarr_artist_count = len(artists) self.process_count = 0 for artist in artists: self.process_count += 1 lidarr_artist_id = artist.get("id") url_artist = ( f"{base}/api/v1/artist/{lidarr_artist_id}?apikey={self.arr_api_key}" ) artist_data = self.http_get_json(url_artist) if not artist_data: continue self.lidarr_artist_name = artist_data.get("artistName", "") self.lidarr_artist_musicbrainz_id = artist_data.get("foreignArtistId", "") lidarr_artist_path = artist_data.get("path", "") folder = os.path.basename(lidarr_artist_path).split("(")[0] self.lidarr_artist_folder = folder.strip() links = artist_data.get("links", []) or [] tidal_urls = [ l.get("url", "") for l in links if l.get("name", "").lower() == "tidal" and "url" in l ] tidal_artist_ids: List[int] = [] for u in tidal_urls: for part in u.split("/"): if part.isdigit(): tid = int(part) if tid not in tidal_artist_ids: tidal_artist_ids.append(tid) genres = artist_data.get("genres", []) or [] self.lidarr_artist_genres = [g for g in genres if isinstance(g, str)] self.log( f"{self.process_count}/{self.lidarr_artist_count} :: " f"{self.lidarr_artist_name} :: Processing..." ) for tid in tidal_artist_ids: self.tidal_process(tid) # ---------- main loop ---------- def run_forever(self) -> None: while True: start_loop = time.time() self.logfile_setup() self.log("Starting...") self.install_dependencies() conf_files = sorted( str(p) for p in pathlib.Path(self.docker_path).glob("*.conf") if p.is_file() ) if not conf_files: self.log("ERROR :: No config files found, exiting...") sys.exit(1) for conf_path in conf_files: start_conf = time.time() self.log(f'Processing "{conf_path}" config file') self.settings_from_conf(conf_path) self.verify_config() self.configure_tidal_dl() if self.tidal_failure: sys.exit(1) self.process_lidarr_artists() duration = int(time.time() - start_conf) days = duration // 86400 hours = (duration % 86400) // 3600 mins = (duration % 3600) // 60 secs = duration % 60 duration_output = f"{days}d:{hours}h:{mins}m:{secs}s" self.log(f"Script Completed in {duration_output}!") self.log(f"Sleeping {self.lidarr_music_video_automator_interval}...") time.sleep(self.lidarr_music_video_automator_interval) def main() -> None: parser = argparse.ArgumentParser( description="Lidarr Music Video Automator (TIDAL)." ) # Core paths / service settings parser.add_argument( "--dockerPath", help="Base config path (default: /config).", default=None, ) parser.add_argument( "--lidarrUrl", help="Override Lidarr URL (lidarrUrl).", ) parser.add_argument( "--lidarrApiKey", help="Override Lidarr API key (lidarrApiKey).", ) parser.add_argument( "--lidarrMusicVideoAutomator", choices=["true", "false"], help="Override lidarrMusicVideoAutomator setting.", ) parser.add_argument( "--lidarrMusicVideoAutomatorInterval", type=int, help="Override lidarrMusicVideoAutomatorInterval (seconds).", ) parser.add_argument( "--lidarrMusicVideoTempDownloadPath", help="Override lidarrMusicVideoTempDownloadPath.", ) parser.add_argument( "--lidarrMusicVideoLibrary", help="Override lidarrMusicVideoLibrary.", ) parser.add_argument( "--musicVideoFormat", choices=["mkv", "mp4"], help="Override musicVideoFormat.", ) parser.add_argument( "--requireMinimumVideoQaulity", choices=["SD", "HD", "FHD"], help="Override requireMinimumVideoQaulity.", ) parser.add_argument( "--tidalCountryCode", help="Override tidalCountryCode (e.g. US, GB).", ) parser.add_argument( "--logFolder", help="Override logFolder for per-video markers.", ) # Boolean toggles parser.add_argument( "--enableLiveVideos", dest="enableLiveVideos", action="store_true", default=None, help="Enable Live videos (overrides config).", ) parser.add_argument( "--disableLiveVideos", dest="enableLiveVideos", action="store_false", help="Disable Live videos (overrides config).", ) parser.add_argument( "--enableVisualizerVideos", dest="enableVisualizerVideos", action="store_true", default=None, help="Enable Visualizer videos (overrides config).", ) parser.add_argument( "--disableVisualizerVideos", dest="enableVisualizerVideos", action="store_false", help="Disable Visualizer videos (overrides config).", ) parser.add_argument( "--enableLyricVideos", dest="enableLyricVideos", action="store_true", default=None, help="Enable Lyric videos (overrides config).", ) parser.add_argument( "--disableLyricVideos", dest="enableLyricVideos", action="store_false", help="Disable Lyric videos (overrides config).", ) parser.add_argument( "--enableBehindTheScenesVideos", dest="enableBehindTheScenesVideos", action="store_true", default=None, help="Enable Behind the Scenes videos (overrides config).", ) parser.add_argument( "--disableBehindTheScenesVideos", dest="enableBehindTheScenesVideos", action="store_false", help="Disable Behind the Scenes videos (overrides config).", ) parser.add_argument( "--enableInterviewVideos", dest="enableInterviewVideos", action="store_true", default=None, help="Enable Interview videos (overrides config).", ) parser.add_argument( "--disableInterviewVideos", dest="enableInterviewVideos", action="store_false", help="Disable Interview videos (overrides config).", ) parser.add_argument( "--enableEpisodeVideos", dest="enableEpisodeVideos", action="store_true", default=None, help="Enable Episode videos (overrides config).", ) parser.add_argument( "--disableEpisodeVideos", dest="enableEpisodeVideos", action="store_false", help="Disable Episode videos (overrides config).", ) args = parser.parse_args() automator = LidarrMusicVideoAutomator() automator.cli_args = args if args.dockerPath: automator.docker_path = args.dockerPath automator.run_forever() if __name__ == "__main__": main()