309 lines
10 KiB
Python
Executable File
309 lines
10 KiB
Python
Executable File
from ast import Dict
|
|
import asyncio
|
|
from enum import Enum
|
|
import json
|
|
from queue import Queue
|
|
import time
|
|
from typing import Any, Literal, Optional
|
|
from urllib.error import URLError
|
|
from urllib.request import HTTPError, Request, urlopen
|
|
from musicbrainzngs import musicbrainz
|
|
import pprint
|
|
import aiohttp
|
|
from textual.reactive import reactive
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import Footer, Header, Label, ListView, ListItem
|
|
from textual.widget import Widget
|
|
from yt_dlp import YoutubeDL
|
|
from thefuzz import fuzz
|
|
|
|
|
|
def falsy(value: Any) -> bool:
|
|
return not value or value is False
|
|
|
|
|
|
JobType = Enum(
|
|
"JobType", ["fetch_artists", "fetch_artist_videos", "find_video_sources"]
|
|
)
|
|
|
|
|
|
type Job = {"type": JobType.value, "payload": Any}
|
|
|
|
type MBRelation = {}
|
|
|
|
type MBRecording = {
|
|
"length": Optional[int],
|
|
"title": Optional[str],
|
|
"id": str,
|
|
"disambiguation": Optional[str],
|
|
"first-release-date": Optional[str],
|
|
"video": Optional[bool],
|
|
"relations": Optional[list[MBRelation]],
|
|
}
|
|
|
|
|
|
class JobWidget(ListItem):
|
|
def __init__(self, job: Job):
|
|
super().__init__()
|
|
self.job = job
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Label(self.job["name"])
|
|
|
|
|
|
class JobWidget(ListItem):
|
|
def __init__(self, job: dict):
|
|
super().__init__()
|
|
self.job = job
|
|
|
|
def compose(self):
|
|
yield Label(self.job["name"])
|
|
|
|
|
|
class JobList(ListView):
|
|
jobs = reactive([], recompose=True) # trigger recompose on change
|
|
|
|
def compose(self):
|
|
# This is the only place we map jobs -> ListItems
|
|
for job in self.jobs:
|
|
yield JobWidget(job)
|
|
|
|
|
|
# class MVDownloaderTUI(App):
|
|
# def compose(self):
|
|
# yield Header()
|
|
# job_list = JobList(id="job-queue")
|
|
# yield job_list
|
|
# yield Footer()
|
|
|
|
|
|
# class MVDownloaderTUI(App):
|
|
# BINDINGS = [
|
|
# ("a", "add_item", "Add a new job")
|
|
# ]
|
|
|
|
# jobs = reactive([])
|
|
|
|
# def compose(self) -> ComposeResult:
|
|
# yield Header()
|
|
# yield Label("Job Queue")
|
|
# job_list = JobList(id="job-queue")
|
|
# yield job_list
|
|
# yield Footer()
|
|
|
|
# def watch_jobs(self, jobs):
|
|
# list_view = self.query_one("#job-queue", ListView)
|
|
# list_view.clear()
|
|
# for job in jobs:
|
|
# list_view.append(JobWidget(job))
|
|
|
|
# def _action_add_item(self) -> None:
|
|
# job_list = self.query_one("#job-queue", JobList)
|
|
# id = len(job_list.jobs)
|
|
# job_list.jobs = [*job_list.jobs, {"id": id, "name": "Test Job"}]
|
|
|
|
type UrlSource = {"url": str, "type": str}
|
|
|
|
type Video = {
|
|
"id": str,
|
|
"title": Optional[str],
|
|
"artist": Optional[str],
|
|
"year": Optional[str],
|
|
"source_urls": Optional[list[UrlSource]],
|
|
}
|
|
|
|
|
|
class MVDownloader:
|
|
def __init__(self):
|
|
self.temp_path = "/mnt/user/data/downloads/temp"
|
|
self.download_path = "/mnt/user/data/downloads/downloads"
|
|
self.lidarr_api_key = "36fb27b01480452b8e5d01a0a0ce9979"
|
|
self.lidarr_url = "http://10.0.0.101:8686"
|
|
self.musicbrainzapi = musicbrainz
|
|
self.artists = {}
|
|
self.videos: dict[str, Video] = {}
|
|
self.fetch_log = {}
|
|
self.queue = Queue[Job]()
|
|
self.semaphore = asyncio.Semaphore(10)
|
|
self.session: aiohttp.ClientSession | None = None
|
|
|
|
# async def http_get_json(self, url: str, headers: Optional[Dict[str, str]] = None) -> Any:
|
|
# req = Request(url, headers=headers or {})
|
|
# try:
|
|
# with urlopen(req, timeout=None) as resp:
|
|
# data = resp.read()
|
|
# return json.loads(data.decode("utf-8"))
|
|
# except (URLError, HTTPError, json.JSONDecodeError) as e:
|
|
# self.log(f"ERROR :: HTTP/JSON error for {url}: {e}")
|
|
# return None
|
|
|
|
async def http_get_json(
|
|
self, url: str, headers: Optional[Dict[str, str]] = None
|
|
) -> Any:
|
|
async with self.semaphore:
|
|
try:
|
|
async with self.session.get(url, headers=headers) as resp:
|
|
return await resp.json()
|
|
except (URLError, HTTPError, json.JSONDecodeError) as e:
|
|
self.log(f"ERROR :: HTTP/JSON error for {url}: {e}")
|
|
return None
|
|
|
|
async def fetch_artists(self):
|
|
base = self.lidarr_url.rstrip("/")
|
|
url = f"{base}/api/v1/artist?apikey={self.lidarr_api_key}"
|
|
artists = await self.http_get_json(url)
|
|
for artist in artists:
|
|
id = artist["foreignArtistId"]
|
|
if falsy(self.fetch_log.get("lidarr_artists")):
|
|
self.fetch_log["lidarr_artists"] = {}
|
|
if falsy(self.fetch_log["lidarr_artists"].get(id)):
|
|
self.queue.put(
|
|
{
|
|
"type": JobType.fetch_artist_videos,
|
|
"payload": {"id": id, "name": artist["artistName"]},
|
|
}
|
|
)
|
|
if falsy(self.artists.get(id)):
|
|
self.artists[id] = {}
|
|
self.artists[id]["name"] = artist["artistName"]
|
|
# print(results)
|
|
# await self.fetch_artist_videos(artists[0]["id"])
|
|
|
|
def filter_videos_factory(self, payload: dict[str, str]):
|
|
def filter_videos(info_dict, incomplete: bool):
|
|
extra_points = {
|
|
"music video": 50,
|
|
"lyric video": -25,
|
|
}
|
|
# print(payload)
|
|
# if info_dict['playlist']:
|
|
# return "It's a playlist"
|
|
if info_dict.get("title"):
|
|
yt_title = info_dict["title"]
|
|
if payload["video"]["title"].lower() not in yt_title.lower():
|
|
return "Artist or title mismatch"
|
|
score = 0
|
|
# print(info_dict)
|
|
test = f"{payload['video']['title']} - {payload['video']['artist-credit-phrase']}"
|
|
# print(f"YT Title: {yt_title}")
|
|
# print(f"Test: {test}")
|
|
score += fuzz.token_sort_ratio(yt_title, test)
|
|
for keyword, points in extra_points.items():
|
|
if keyword.lower() in yt_title.lower():
|
|
score += points
|
|
if (
|
|
info_dict.get("uploader").lower()
|
|
== payload["video"]["artist-credit-phrase"].lower()
|
|
):
|
|
score += 10
|
|
# print(f"Score: {score}")
|
|
return
|
|
return None
|
|
|
|
return filter_videos
|
|
|
|
async def find_video_sources(self, payload: dict[str, str]):
|
|
filter_videos = self.filter_videos_factory(payload)
|
|
ydl_opts = {
|
|
"t": "sleep",
|
|
"noplaylist": True,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"extract_flat": True,
|
|
"extract_audio": False,
|
|
"audio_format": "best",
|
|
"default_search": "",
|
|
"skip_download": True,
|
|
"match_filter": filter_videos,
|
|
}
|
|
# print(payload)
|
|
artist = payload["video"]["artist-credit-phrase"]
|
|
title = payload["video"]["title"]
|
|
if artist == "20SIX Hundred":
|
|
return
|
|
|
|
search_query = f"{artist} - {title}"
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
output = ydl.download(
|
|
[f"https://youtube.com/results?search_query={search_query}"]
|
|
)
|
|
# print(output)
|
|
# id = payload['id']
|
|
# video = payload['video']
|
|
# print(video)
|
|
|
|
async def fetch_mb_relations(self, id: str):
|
|
info = self.musicbrainzapi.get_recording_by_id(id, includes=["url-rels"])
|
|
|
|
async def fetch_artist_videos(self, payload: dict[str, str]):
|
|
self.musicbrainzapi.set_useragent("MVDownloader", "1.0.0")
|
|
id = payload["id"]
|
|
|
|
if self.fetch_log.get("video_list") is None:
|
|
self.fetch_log["video_list"] = {}
|
|
if falsy(self.fetch_log["video_list"].get(id)):
|
|
recordings = self.musicbrainzapi.search_recordings(
|
|
strict=True, arid=id, video=True
|
|
)
|
|
for recording in recordings["recording-list"]:
|
|
info = self.musicbrainzapi.get_recording_by_id(
|
|
recording["id"], includes=["url-rels"]
|
|
)
|
|
print("inside for recording")
|
|
if info.get("relations"):
|
|
print("inside info.get('relations')")
|
|
for relation in info["relations"]:
|
|
if relation["target-type"] == "url":
|
|
print("relation: ", relation)
|
|
# print("info: ", info)
|
|
await self.find_video_sources({"id": id, "video": recording})
|
|
self.queue.put(
|
|
{
|
|
"type": JobType.find_video_sources,
|
|
"payload": {"id": id, "video": recording},
|
|
}
|
|
)
|
|
|
|
# for video in videos:
|
|
# await self.find_video_sources({"id": id, "video": video})
|
|
# self.videos.append({"id": id, "videos": videos})
|
|
# self.videos.put({"type": JobType.find_video_sources, "payload": {"id": id, "videos": videos}})
|
|
# self.fetch_log["video_list"][id] = True
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
|
await self.session.close()
|
|
|
|
async def main(self):
|
|
await self.fetch_artists()
|
|
|
|
async def run_forever(self):
|
|
await self.fetch_artists()
|
|
while True:
|
|
task = self.queue.get()
|
|
print(task)
|
|
match task["type"]:
|
|
case JobType.fetch_artists:
|
|
await self.fetch_artists()
|
|
case JobType.fetch_artist_videos:
|
|
await self.fetch_artist_videos(task["payload"])
|
|
case JobType.find_video_sources:
|
|
await self.find_video_sources(task["payload"])
|
|
case _:
|
|
raise ValueError(f"Unknown job type: {task.type}")
|
|
await asyncio.sleep(3)
|
|
|
|
|
|
async def main():
|
|
async with MVDownloader() as downloader:
|
|
await downloader.run_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# app = MVDownloaderTUI()
|
|
# app.run()
|
|
asyncio.run(main())
|