from ast import Dict import asyncio from enum import Enum import json from queue import Queue import time from typing import Any, Callable, Literal, Optional, TypedDict from urllib.error import URLError from urllib.request import HTTPError, Request, urlopen from musicbrainzngs import musicbrainz from musicbrainzngs.types import Artist import pprint import aiohttp from textual.reactive import reactive from textual.app import App, ComposeResult from textual.widgets import Footer, Header, Label, ListView, ListItem from textual.widget import Widget from yt_dlp import YoutubeDL from thefuzz import fuzz from rich.pretty import pprint from rich.progress import Progress, TimeElapsedColumn from rich.status import Status from memorize import Memorize testRun = True def falsy(value: Any) -> bool: return not value or value is False JobType = Enum( "JobType", ["fetch_artists", "fetch_artist_videos", "find_video_sources", "download_video"], ) class Job(TypedDict): type: JobType payload: Any type MBRelation = {} class MBRecordingInfo(TypedDict): length: Optional[int] title: Optional[str] id: str disambiguation: Optional[str] first_release_date: Optional[str] video: Optional[bool] relations: Optional[list[MBRelation]] VideoSource = Enum("VideoSource", ["youtube", "vimeo", "tidal", "unknown"]) class UrlSource(TypedDict): url: str type: str class Video(TypedDict): id: str title: Optional[str] artist: Optional[str] year: Optional[str] source_urls: Optional[list[UrlSource]] class ArtistItemInfo(TypedDict): id: str name: str class VideoItemInfo(TypedDict): id: str title: Optional[str] artist: Optional[str] year: Optional[str] class Item(TypedDict): artist: ArtistItemInfo video_sources: VideoItemInfo class MVDownloader: def __init__(self): self.temp_path = "/mnt/user/data/downloads/temp" self.download_path = "/mnt/user/data/downloads/downloads" self.lidarr_api_key = "36fb27b01480452b8e5d01a0a0ce9979" self.lidarr_url = "http://10.0.0.101:8686" self.musicbrainzapi = musicbrainz self.artists = {} self.videos: dict[str, Video] = {} self.fetch_log = {} self.queue:Queue[Job] = Queue() self.semaphore = asyncio.Semaphore(10) self.session: aiohttp.ClientSession | None = None self.data: dict[str, Item] = {} async def http_get_json( self, url: str, headers: Optional[dict[str, str]] = None ) -> Any: async with self.semaphore: try: async with self.session.get(url, headers=headers) as resp: return await resp.json() except (URLError, HTTPError, json.JSONDecodeError) as e: self.log(f"ERROR :: HTTP/JSON error for {url}: {e}") return None def add_fetch_artist_videos_job(self, artist: Artist): # self.progress.add_task(f"Fetching videos for {name}") self.queue.put( { "type": JobType.fetch_artist_videos, "payload": artist, } ) # @Memorize async def fetch_artists(self, lidarr_url: str, lidarr_api_key: str): base = lidarr_url.rstrip("/") url = f"{base}/api/v1/artist?apikey={lidarr_api_key}" artists = await self.http_get_json(url) for artist in artists: self.add_fetch_artist_videos_job(artist) def filter_videos_factory(self, payload: dict[str, str]) -> Callable[..., Literal['Artist or title mismatch'] | None]: def filter_videos(info_dict, incomplete: bool): extra_points = { "music video": 50, "lyric video": -25, } if info_dict.get("title"): yt_title = info_dict["title"] if payload["video"]["title"].lower() not in yt_title.lower(): return "Artist or title mismatch" score = 0 test = f"{payload['video']['title']} - {payload['video']['artist-credit-phrase']}" score += fuzz.token_sort_ratio(yt_title, test) for keyword, points in extra_points.items(): if keyword.lower() in yt_title.lower(): score += points if ( info_dict.get("uploader").lower() == payload["video"]["artist-credit-phrase"].lower() ): score += 10 return return None return filter_videos async def download_yt_video(self, url: str): ydl_opts: dict[str, Any] = { "noplaylist": True, "quiet": True, "no_warnings": True, "extract-audio": False, "audio-format": "best", "skip_download": testRun, } with YoutubeDL(ydl_opts) as ydl: ydl.download([payload["url"]]) async def download_video(self, payload: dict[str, str]): if payload.get("source_urls") is None: self.log(f"No source URLs for {payload['title']} by {payload['artist']}") return for source in payload["source_urls"]: if source["target"].includes("youtube.com"): await self.download_yt_video(source["url"]) async def find_video_sources(self, payload: dict[str, str]): filter_videos = self.filter_videos_factory(payload) ydl_opts = { "t": "sleep", "noplaylist": True, "quiet": True, "no_warnings": True, "extract_flat": True, "extract_audio": False, "audio_format": "best", "default_search": "", "skip_download": testRun, "match_filter": filter_videos, } artist = payload["video"]["artist-credit-phrase"] title = payload["video"]["title"] if artist == "20SIX Hundred": return search_query = f"{artist} - {title}" with YoutubeDL(ydl_opts) as ydl: output = ydl.download( [f"https://youtube.com/results?search_query={search_query}"] ) # print(output) # id = payload['id'] # video = payload['video'] # print(video) async def fetch_mb_relations(self, id: str): return self.musicbrainzapi.get_recording_by_id(id, includes=["url-rels"]) async def search_mb_recording(self, id: str): return self.musicbrainzapi.search_recordings(strict=True, arid=id, video=True) async def check_mb_relations(self, id: str): pass async def parse_mb_relation(self, relation: MBRelation): pprint(["parsing relation: ", relation]) # self.queue.put( # "type": JobType.download_video, "payload": {"url": relation["url"]["resource"]}} # ) return_value = {"url": relation["target"], "type": "unknown"} if "youtube.com" in relation.get("url", {}).get("resource", ""): return_value["type"] = "youtube" return return_value async def fetch_artist_videos(self, payload: dict[str, str]): self.musicbrainzapi.set_useragent("MVDownloader", "1.0.0") id = payload["id"] # await self.check_mb_relations(id) recordings = await self.search_mb_recording(id) potential_videos = {} for recording in recordings["recording-list"]: info = await self.fetch_mb_relations(recording["id"]) relations = info.get("recording").get("url-relation-list", []) if relations == []: pprint("no relations") return else: for relation in relations: pprint(["relation: ", relation]) parsed_relation = await self.parse_mb_relation(relation) potential_videos[parsed_relation["type"]] = parsed_relation["url"] await self.find_video_sources({"id": id, "video": recording}) self.queue.put( { "type": JobType.find_video_sources, "payload": {"id": id, "video": recording}, } ) async def __aenter__(self): self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_value, traceback): await self.session.close() async def main(self): await self.fetch_artists(self.lidarr_url, self.lidarr_api_key) async def run_forever(self): await self.fetch_artists(self.lidarr_url, self.lidarr_api_key) while True: task = self.queue.get() # print(task) match task["type"]: case JobType.download_video: await self.download_video(task["payload"]) case JobType.fetch_artists: await self.fetch_artists(self.lidarr_url, self.lidarr_api_key) case JobType.fetch_artist_videos: await self.fetch_artist_videos(task["payload"]) case JobType.find_video_sources: await self.find_video_sources(task["payload"]) case _: raise ValueError(f"Unknown job type: {task.type}") await asyncio.sleep(3) async def main(): async with MVDownloader() as downloader: await downloader.run_forever() if __name__ == "__main__": # app = MVDownloaderTUI() # app.run() asyncio.run(main())