- Implemented a new beets plugin to import and manage music videos, supporting various video formats and providing metadata from IMVDB and MusicBrainz for autotagging. - Added installation instructions and configuration options in README.md. - Created IMVDBApi client for interacting with the IMVDB API. - Defined typing for various API responses and utility functions for mapping MusicBrainz data to beets TrackInfo. - Included desktop entry and JSON info files for a video titled "[SAST] The Evolution of Search-based Software Testing". - Added utility functions for handling artist credits and related metadata. - Introduced a grabbed.txt file for tracking video sources.
1045 lines
39 KiB
Python
1045 lines
39 KiB
Python
import json
|
||
import os
|
||
import pprint
|
||
import re
|
||
from pathlib import Path
|
||
import subprocess
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from beets import logging, plugins, util
|
||
import beets
|
||
from beets.autotag.hooks import TrackInfo
|
||
from beets.library import Item
|
||
from beets.metadata_plugins import SearchApiMetadataSourcePlugin
|
||
from typing import TYPE_CHECKING, Any
|
||
from fuzzywuzzy import fuzz
|
||
from urllib.parse import urljoin
|
||
from .utils.mapping import map_mb_to_trackinfo
|
||
|
||
|
||
from typing import Annotated, Any, Literal, Sequence, cast
|
||
from typing import Self
|
||
|
||
from beets.autotag.hooks import AlbumInfo
|
||
from beets.metadata_plugins import SearchFilter
|
||
|
||
from beetsplug._utils.musicbrainz import (
|
||
MusicBrainzAPIMixin,
|
||
)
|
||
from beets import config
|
||
|
||
from .IMVDBApi import IMVDBApi
|
||
# from beetsplug._utils.requests import requests
|
||
|
||
# from fuzzywuzzy import fuzz
|
||
|
||
from .types import SearchResponseType, VideoType
|
||
from httpx_auth import HeaderApiKey
|
||
|
||
|
||
if TYPE_CHECKING:
|
||
from collections.abc import Iterable, Sequence
|
||
from typing import Literal
|
||
|
||
from beets.library import Item
|
||
|
||
from ._typing import JSONDict
|
||
|
||
log = logging.getLogger("beets")
|
||
|
||
|
||
MB_BASE_URL = "https://musicbrainz.org/"
|
||
|
||
|
||
def _artist_ids(credit: list[JSONDict]) -> list[str]:
|
||
"""
|
||
Given a list representing an ``artist-credit``,
|
||
return a list of artist IDs
|
||
"""
|
||
artist_ids: list[str] = []
|
||
for el in credit:
|
||
if isinstance(el, dict):
|
||
artist_ids.append(el["artist"]["id"])
|
||
|
||
return artist_ids
|
||
|
||
|
||
def _preferred_alias(
|
||
aliases: list[JSONDict], languages: list[str] | None = None
|
||
) -> JSONDict | None:
|
||
"""Given a list of alias structures for an artist credit, select
|
||
and return the user's preferred alias or None if no matching
|
||
"""
|
||
if not aliases:
|
||
return None
|
||
|
||
# Only consider aliases that have locales set.
|
||
valid_aliases = [a for a in aliases if "locale" in a]
|
||
|
||
# Get any ignored alias types and lower case them to prevent case issues
|
||
ignored_alias_types = config["import"]["ignored_alias_types"].as_str_seq()
|
||
ignored_alias_types = [a.lower() for a in ignored_alias_types]
|
||
|
||
# Search configured locales in order.
|
||
if languages is None:
|
||
languages = config["import"]["languages"].as_str_seq()
|
||
|
||
for locale in languages:
|
||
# Find matching primary aliases for this locale that are not
|
||
# being ignored
|
||
matches = []
|
||
for alias in valid_aliases:
|
||
if (
|
||
alias["locale"] == locale
|
||
and alias.get("primary")
|
||
and (alias.get("type") or "").lower() not in ignored_alias_types
|
||
):
|
||
matches.append(alias)
|
||
|
||
# Skip to the next locale if we have no matches
|
||
if not matches:
|
||
continue
|
||
|
||
return matches[0]
|
||
|
||
return None
|
||
|
||
|
||
def _multi_artist_credit(
|
||
credit: list[JSONDict], include_join_phrase: bool
|
||
) -> tuple[list[str], list[str], list[str]]:
|
||
"""Given a list representing an ``artist-credit`` block, accumulate
|
||
data into a triple of joined artist name lists: canonical, sort, and
|
||
credit.
|
||
"""
|
||
artist_parts = []
|
||
artist_sort_parts = []
|
||
artist_credit_parts = []
|
||
for el in credit:
|
||
alias = _preferred_alias(el["artist"].get("aliases", ()))
|
||
|
||
# An artist.
|
||
if alias:
|
||
cur_artist_name = alias["name"]
|
||
else:
|
||
cur_artist_name = el["artist"]["name"]
|
||
artist_parts.append(cur_artist_name)
|
||
|
||
# Artist sort name.
|
||
if alias:
|
||
artist_sort_parts.append(alias["sort-name"])
|
||
elif "sort-name" in el["artist"]:
|
||
artist_sort_parts.append(el["artist"]["sort-name"])
|
||
else:
|
||
artist_sort_parts.append(cur_artist_name)
|
||
|
||
# Artist credit.
|
||
if "name" in el:
|
||
artist_credit_parts.append(el["name"])
|
||
else:
|
||
artist_credit_parts.append(cur_artist_name)
|
||
|
||
if include_join_phrase and (joinphrase := el.get("joinphrase")):
|
||
artist_parts.append(joinphrase)
|
||
artist_sort_parts.append(joinphrase)
|
||
artist_credit_parts.append(joinphrase)
|
||
|
||
return (
|
||
artist_parts,
|
||
artist_sort_parts,
|
||
artist_credit_parts,
|
||
)
|
||
|
||
|
||
def track_url(trackid: str) -> str:
|
||
return urljoin(MB_BASE_URL, f"recording/{trackid}")
|
||
|
||
|
||
def _flatten_artist_credit(credit: list[JSONDict]) -> tuple[str, str, str]:
|
||
"""Given a list representing an ``artist-credit`` block, flatten the
|
||
data into a triple of joined artist name strings: canonical, sort, and
|
||
credit.
|
||
"""
|
||
artist_parts, artist_sort_parts, artist_credit_parts = _multi_artist_credit(
|
||
credit, include_join_phrase=True
|
||
)
|
||
return (
|
||
"".join(artist_parts),
|
||
"".join(artist_sort_parts),
|
||
"".join(artist_credit_parts),
|
||
)
|
||
|
||
|
||
class BeetsMusicVideos( # type: ignore[type-var]
|
||
MusicBrainzAPIMixin, SearchApiMetadataSourcePlugin[VideoType]
|
||
):
|
||
"""Plugin that lets beets import "non-audio" files (e.g. music videos)
|
||
by treating selected extensions as supported and creating library
|
||
`Item`s for them without going through `MediaFile`.
|
||
"""
|
||
|
||
def __init__(self) -> None: # type: ignore
|
||
super().__init__() # type: ignore
|
||
|
||
# Default set of extensions to treat as importable media, in
|
||
# addition to the formats supported by `mediafile`.
|
||
self.config.add({
|
||
"extensions": [
|
||
".mp4",
|
||
".m4v",
|
||
".mkv",
|
||
".avi",
|
||
".webm",
|
||
],
|
||
"imvdb_api_key": "",
|
||
})
|
||
|
||
self.imvdb_api_key = self.config["imvdb_api_key"].get(str)
|
||
|
||
self._patch_import_task_factory()
|
||
|
||
self.imvdb_api = IMVDBApi(self.imvdb_api_key)
|
||
|
||
# self.imvdb_api = IMVDBApiRaw(self.imvdb_api_key)Track
|
||
|
||
def _normalized_extensions(self) -> set[str]:
|
||
"""Return the configured extensions as a normalized set.
|
||
|
||
All values are lowercased and guaranteed to start with a leading
|
||
dot, e.g. ``.mp4``.
|
||
"""
|
||
exts: set[str] = set()
|
||
for raw in self.config["extensions"].as_str_seq():
|
||
raw = raw.strip().lower()
|
||
if not raw:
|
||
continue
|
||
if not raw.startswith("."):
|
||
raw = f".{raw}"
|
||
exts.add(raw)
|
||
return exts
|
||
|
||
# Separators that often split "Artist - Title" in filenames (order matters:
|
||
# try longer/unicode first so " – " is preferred over single "-" in "Artist – Title").
|
||
_ARTIST_TITLE_SEP = re.compile(r"\s+[–—\-]\s+", re.UNICODE)
|
||
|
||
# Common parenthetical suffixes in video filenames; stripped for better API matching.
|
||
_VIDEO_TITLE_SUFFIX = re.compile(
|
||
r"\s*[(\[]\s*(?:official\s+)?(?:music\s+)?video(?:\s+clip)?[)\]]\s*$",
|
||
re.IGNORECASE,
|
||
)
|
||
_VIDEO_MV_SUFFIX = re.compile(r"\s*[(\[]\s*mv\s*[)\]]\s*$", re.IGNORECASE)
|
||
|
||
def _parse_artist_title_for_search(
|
||
self, item: Item, artist: str, title: str
|
||
) -> tuple[str, str]:
|
||
"""Derive search artist and title from item/filename.
|
||
|
||
Handles common "Artist - Title" patterns that appear in video filenames
|
||
so you don't need to type them during import. Prefers the path stem
|
||
when available so we parse the actual filename even if beets didn't
|
||
pass it as title.
|
||
"""
|
||
a = (artist or "").strip()
|
||
t = (title or "").strip()
|
||
|
||
# Prefer the item's title if the caller didn't give us one explicitly.
|
||
if not t and item:
|
||
t = str(getattr(item, "title", "") or "").strip()
|
||
|
||
# Prefer the path stem so we always parse the real filename. Beets
|
||
# passes item.artist and item.title (we set item.title = stem when
|
||
# creating the item), but using the path here guarantees we have the
|
||
# filename to split.
|
||
path_stem_str: str | None = None
|
||
if item and getattr(item, "path", None):
|
||
try:
|
||
raw_stem = Path(util.syspath(item.path)).stem
|
||
path_stem_str = str(raw_stem).strip() or None
|
||
except Exception:
|
||
path_stem_str = None
|
||
|
||
if path_stem_str and (not t or path_stem_str == t):
|
||
t = path_stem_str
|
||
elif not t and path_stem_str:
|
||
t = path_stem_str
|
||
|
||
log.debug(
|
||
"beets_music_videos: item_candidates received artist=%r title=%r path_stem=%r",
|
||
artist,
|
||
title,
|
||
path_stem_str,
|
||
)
|
||
|
||
if not t:
|
||
return a, t
|
||
|
||
# If the title looks like "Artist - Title", split it.
|
||
m = self._ARTIST_TITLE_SEP.search(t)
|
||
if m:
|
||
parts = self._ARTIST_TITLE_SEP.split(t, 1)
|
||
if len(parts) == 2:
|
||
artist_from_title, title_from_title = (
|
||
parts[0].strip(),
|
||
parts[1].strip(),
|
||
)
|
||
|
||
# If there is no artist yet, trust the filename.
|
||
if not a and artist_from_title:
|
||
a, t = artist_from_title, title_from_title
|
||
log.debug(
|
||
"beets_music_videos: split filename -> artist=%r title=%r",
|
||
a,
|
||
t,
|
||
)
|
||
# If there *is* an artist and it matches the prefix (very common
|
||
# when both tags and filename agree), keep the existing artist
|
||
# but still use the suffix as the track title.
|
||
elif (
|
||
a
|
||
and artist_from_title
|
||
and artist_from_title.lower().startswith(a.lower())
|
||
):
|
||
t = title_from_title
|
||
|
||
return (str(a or ""), str(t or ""))
|
||
|
||
def _normalize_video_search_query(self, s: str) -> str:
|
||
"""Normalize a string for video API search: strip common video suffixes
|
||
and extra whitespace so \"Song (Official Video)\" matches \"Song\".
|
||
"""
|
||
if not s:
|
||
return ""
|
||
s = s.strip()
|
||
s = self._VIDEO_TITLE_SUFFIX.sub("", s)
|
||
s = self._VIDEO_MV_SUFFIX.sub("", s)
|
||
s = re.sub(r"\s+", " ", s).strip()
|
||
return s
|
||
|
||
def _read_nfo_metadata(
|
||
self,
|
||
path_str: str,
|
||
) -> tuple[dict[str, Any], str | None]:
|
||
"""Read a Kodi-style .nfo file that shares the basename with the video.
|
||
|
||
For a path like /path/to/Foo-Bar.mp4 we look for /path/to/Foo-Bar.nfo.
|
||
If the file exists and is well‑formed XML, extract a small set of
|
||
common tags and return them as a flat dict.
|
||
"""
|
||
nfo_path = Path(path_str).with_suffix(".nfo")
|
||
if not nfo_path.is_file():
|
||
return {}, None
|
||
|
||
try:
|
||
text = nfo_path.read_text(encoding="utf-8", errors="ignore")
|
||
except Exception as exc:
|
||
log.debug(
|
||
"beets_music_videos: failed to read NFO %r: %s",
|
||
str(nfo_path),
|
||
exc,
|
||
)
|
||
return {}, None
|
||
|
||
stripped = text.lstrip()
|
||
if not stripped.startswith("<"):
|
||
# Not XML; ignore for now.
|
||
return {}, str(nfo_path)
|
||
|
||
try:
|
||
root = ET.fromstring(text)
|
||
except Exception as exc:
|
||
log.debug(
|
||
"beets_music_videos: failed to parse NFO XML %r: %s",
|
||
str(nfo_path),
|
||
exc,
|
||
)
|
||
return {}, str(nfo_path)
|
||
|
||
data: dict[str, Any] = {}
|
||
|
||
def first_text(*tags: str) -> str | None:
|
||
for tag in tags:
|
||
el = root.find(tag)
|
||
if el is not None and el.text:
|
||
value = el.text.strip()
|
||
if value:
|
||
return value
|
||
return None
|
||
|
||
# Title
|
||
title = first_text("title")
|
||
if title:
|
||
data["title"] = title
|
||
|
||
# Artists
|
||
artists: list[str] = []
|
||
for el in root.findall("artist"):
|
||
if el.text:
|
||
name = el.text.strip()
|
||
if name:
|
||
artists.append(name)
|
||
if not artists:
|
||
fallback_artist = first_text("artist", "albumartist")
|
||
if fallback_artist:
|
||
artists.append(fallback_artist)
|
||
if artists:
|
||
data["artist"] = artists[0]
|
||
if len(artists) > 1:
|
||
data["artists"] = artists
|
||
|
||
# Album / albumartist
|
||
album = first_text("album", "showtitle")
|
||
if album:
|
||
data["album"] = album
|
||
|
||
albumartist = first_text("albumartist")
|
||
if albumartist:
|
||
data["albumartist"] = albumartist
|
||
elif artists:
|
||
data["albumartist"] = artists[0]
|
||
|
||
# Year: accept YYYY or YYYY-MM-DD, etc.
|
||
year_raw = first_text("year", "releasedate", "premiered", "aired")
|
||
if year_raw:
|
||
m = re.search(r"(\\d{4})", year_raw)
|
||
if m:
|
||
try:
|
||
data["year"] = int(m.group(1))
|
||
except ValueError:
|
||
pass
|
||
|
||
# Track number
|
||
track_raw = first_text("track", "episode")
|
||
if track_raw:
|
||
try:
|
||
data["track"] = int(track_raw)
|
||
except ValueError:
|
||
pass
|
||
|
||
# Genre: potentially multiple <genre> tags.
|
||
genres: list[str] = []
|
||
for el in root.findall("genre"):
|
||
if el.text:
|
||
g = el.text.strip()
|
||
if g:
|
||
genres.append(g)
|
||
if genres:
|
||
data["genre"] = " / ".join(genres)
|
||
|
||
# Plot/description -> comments field, but only if present.
|
||
plot = first_text("plot", "review", "outline")
|
||
if plot:
|
||
data["comments"] = plot
|
||
|
||
return data, str(nfo_path)
|
||
|
||
def _apply_nfo_metadata_to_item(self, item: Item, path_str: str) -> None:
|
||
"""Apply metadata from a same‑basename .nfo file to an Item."""
|
||
nfo_data, nfo_path_str = self._read_nfo_metadata(path_str)
|
||
if not nfo_data:
|
||
return
|
||
|
||
log.debug(
|
||
"beets_music_videos: applying NFO metadata from %r: %s",
|
||
nfo_path_str,
|
||
", ".join(sorted(nfo_data.keys())),
|
||
)
|
||
|
||
# String fields.
|
||
for key in ("title", "artist", "album", "albumartist", "genre", "comments"):
|
||
value = nfo_data.get(key)
|
||
if isinstance(value, str) and value:
|
||
item[key] = value
|
||
|
||
# Artists list (flexible field; may not be in every template).
|
||
artists_value = nfo_data.get("artists")
|
||
if isinstance(artists_value, list) and artists_value:
|
||
item["artists"] = artists_value
|
||
|
||
# Numeric core fields.
|
||
year_val = nfo_data.get("year")
|
||
if isinstance(year_val, int):
|
||
item.year = year_val
|
||
|
||
track_val = nfo_data.get("track")
|
||
if isinstance(track_val, int):
|
||
item.track = track_val
|
||
|
||
def _probe_video_resolution(
|
||
self,
|
||
path_str: str,
|
||
) -> tuple[int | None, int | None]:
|
||
"""Use ffprobe to detect video width/height for a given file path.
|
||
|
||
Returns (width, height) in pixels, or (None, None) if probing fails.
|
||
"""
|
||
try:
|
||
# ffprobe should be available in PATH; if not, just skip.
|
||
proc = subprocess.run(
|
||
[
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-select_streams",
|
||
"v:0",
|
||
"-show_entries",
|
||
"stream=width,height",
|
||
"-of",
|
||
"json",
|
||
path_str,
|
||
],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
except FileNotFoundError:
|
||
# ffprobe not installed; nothing we can do.
|
||
log.debug(
|
||
"beets_music_videos: ffprobe not found; cannot probe resolution for %r",
|
||
path_str,
|
||
)
|
||
return None, None
|
||
except subprocess.CalledProcessError as exc:
|
||
log.debug(
|
||
"beets_music_videos: ffprobe failed for %r: %s",
|
||
path_str,
|
||
exc.stderr.strip() if exc.stderr else exc,
|
||
)
|
||
return None, None
|
||
except Exception as exc:
|
||
log.debug(
|
||
"beets_music_videos: unexpected error running ffprobe for %r: %s",
|
||
path_str,
|
||
exc,
|
||
)
|
||
return None, None
|
||
|
||
try:
|
||
data = json.loads(proc.stdout or "{}")
|
||
except Exception as exc:
|
||
log.debug(
|
||
"beets_music_videos: failed to parse ffprobe JSON for %r: %s",
|
||
path_str,
|
||
exc,
|
||
)
|
||
return None, None
|
||
|
||
streams_any = data.get("streams")
|
||
if not isinstance(streams_any, list) or not streams_any:
|
||
return None, None
|
||
|
||
stream0: dict[str, Any] = cast(dict[str, Any], streams_any[0])
|
||
width_val = stream0.get("width")
|
||
height_val = stream0.get("height")
|
||
|
||
width: int | None = None
|
||
height: int | None = None
|
||
|
||
if isinstance(width_val, int):
|
||
width = width_val
|
||
elif isinstance(width_val, str) and width_val.isdigit():
|
||
width = int(width_val)
|
||
|
||
if isinstance(height_val, int):
|
||
height = height_val
|
||
elif isinstance(height_val, str) and height_val.isdigit():
|
||
height = int(height_val)
|
||
|
||
return width, height
|
||
|
||
def _mediafile_supports_format(self, ext: str) -> bool:
|
||
"""Check if MediaFile supports writing metadata to the given extension.
|
||
|
||
MediaFile supports MP4 format (including .mp4 and .m4v extensions).
|
||
Other video formats like .webm, .mkv, .avi are not supported.
|
||
"""
|
||
ext = ext.lower()
|
||
# MediaFile supports MP4 format, which includes .mp4 and .m4v files
|
||
return ext in (".mp4", ".m4v")
|
||
|
||
def _patch_import_task_factory(self) -> None:
|
||
"""Monkey-patch `ImportTaskFactory.read_item` so that paths with
|
||
one of our configured extensions are turned into `Item`s even if
|
||
`mediafile` does not support them. Also patch import tasks to
|
||
skip writing tags for music videos and to avoid deleting originals
|
||
when they are already inside the library directory.
|
||
"""
|
||
# Local import to avoid importing importer machinery unless needed.
|
||
from beets.importer.tasks import ( # type: ignore[attr-defined]
|
||
Action,
|
||
ImportTask,
|
||
ImportTaskFactory,
|
||
)
|
||
|
||
# Only patch once, even if the plugin is re-instantiated.
|
||
if getattr(ImportTaskFactory, "_beets_music_videos_patched", False):
|
||
return
|
||
|
||
original_read_item = ImportTaskFactory.read_item
|
||
plugin = self
|
||
|
||
def read_item_with_videos(
|
||
self_: "ImportTaskFactory", path: util.PathBytes
|
||
) -> Item | None: # type: ignore[override]
|
||
# Determine the file extension of this path.
|
||
str_path = util.syspath(path)
|
||
ext = os.path.splitext(str_path)[1].lower()
|
||
|
||
if ext in plugin._normalized_extensions():
|
||
# Create an Item without going through MediaFile.
|
||
# We deliberately avoid calling Item.from_path() or
|
||
# Item.read() because those rely on MediaFile and would
|
||
# reject unsupported formats.
|
||
item = Item(album_id=None)
|
||
|
||
# Store a normalized bytes path, as beets expects.
|
||
item.path = util.normpath(path)
|
||
|
||
# Initialize mtime from the actual file.
|
||
try:
|
||
item.mtime = item.current_mtime()
|
||
except OSError:
|
||
# If we cannot stat the file for some reason, fall
|
||
# back to 0; the file is still importable.
|
||
item.mtime = 0
|
||
|
||
# Default the title to the filename stem; this is used as a
|
||
# fallback when we can't read embedded metadata.
|
||
filename = Path(str_path).stem
|
||
if not item.get("title"): # type: ignore[no-any-return]
|
||
item["title"] = filename
|
||
|
||
# Probe video resolution using ffprobe so resolution is
|
||
# available as flexible metadata on the item for queries and
|
||
# path formats.
|
||
width, height = plugin._probe_video_resolution(str_path)
|
||
if width is not None:
|
||
item["video_width"] = width
|
||
if height is not None:
|
||
item["video_height"] = height
|
||
if width is not None and height is not None:
|
||
item["video_resolution"] = f"{width}x{height}"
|
||
|
||
# Mark the item as a music video via a flexible attribute
|
||
# so it can be queried, e.g. `media_type:music_video`.
|
||
item["media_type"] = "music_video"
|
||
|
||
# For formats where MediaFile is known to work (e.g. MP4/M4V),
|
||
# read embedded tags so the initial autotag search can use
|
||
# real metadata (artist/title/album) and a correct duration.
|
||
if plugin._mediafile_supports_format(ext):
|
||
try:
|
||
from mediafile import MediaFile # type: ignore[import-untyped]
|
||
|
||
mf = MediaFile(str_path)
|
||
|
||
# Prefer embedded tags over filename-derived values.
|
||
title_val = getattr(mf, "title", None)
|
||
if isinstance(title_val, str) and title_val:
|
||
item["title"] = title_val
|
||
|
||
artist_val = getattr(mf, "artist", None)
|
||
if isinstance(artist_val, str) and artist_val:
|
||
item["artist"] = artist_val
|
||
|
||
album_val = getattr(mf, "album", None)
|
||
if isinstance(album_val, str) and album_val:
|
||
item["album"] = album_val
|
||
|
||
albumartist_val = getattr(mf, "albumartist", None)
|
||
if isinstance(albumartist_val, str) and albumartist_val:
|
||
item["albumartist"] = albumartist_val
|
||
|
||
length_val = getattr(mf, "length", None)
|
||
if isinstance(length_val, (int, float)):
|
||
item.length = float(length_val)
|
||
except Exception as exc:
|
||
log.debug(
|
||
"beets_music_videos: failed to read tags for %r: %s",
|
||
str_path,
|
||
exc,
|
||
)
|
||
|
||
# Finally, if there is a same‑basename .nfo file alongside
|
||
# the video (e.g. Foo-Bar.mp4 + Foo-Bar.nfo), merge its
|
||
# metadata into the Item. NFO values intentionally override
|
||
# existing ones so that hand‑edited metadata wins.
|
||
plugin._apply_nfo_metadata_to_item(item, str_path)
|
||
|
||
return item
|
||
|
||
# For all other paths, fall back to the normal behavior,
|
||
# which uses Item.from_path() and MediaFile.
|
||
return original_read_item(self_, path)
|
||
|
||
# Patch ImportTask.manipulate_files to conditionally skip try_write for
|
||
# music videos in unsupported formats (MediaFile supports MP4/M4V but not
|
||
# WebM, MKV, AVI, etc., and writing could corrupt unsupported files).
|
||
_original_manipulate_files = ImportTask.manipulate_files
|
||
|
||
def manipulate_files_with_video_guard(
|
||
self_task: "ImportTask",
|
||
session: Any,
|
||
operation: Any = None,
|
||
write: bool = False,
|
||
) -> None:
|
||
items = cast(list[Any], self_task.imported_items())
|
||
self_task.old_paths = [item.path for item in items] # type: ignore[attr-defined]
|
||
for item in items:
|
||
if operation is not None:
|
||
old_path = item.path
|
||
if (
|
||
operation != util.MoveOperation.MOVE
|
||
and self_task.replaced_items[item] # type: ignore[attr-defined]
|
||
and session.lib.directory in util.ancestry(old_path)
|
||
):
|
||
item.move()
|
||
self_task.old_paths.remove(old_path) # type: ignore[attr-defined]
|
||
else:
|
||
item.move(operation)
|
||
# Write tags for music videos only if MediaFile supports the format.
|
||
# MediaFile supports MP4/M4V but not WebM, MKV, AVI, etc.
|
||
if write and (self_task.apply or self_task.choice_flag == Action.RETAG): # type: ignore[attr-defined]
|
||
is_music_video = item.get("media_type") == "music_video"
|
||
if is_music_video:
|
||
# Check if MediaFile supports this video format
|
||
item_ext = os.path.splitext(util.syspath(item.path))[1].lower()
|
||
if plugin._mediafile_supports_format(item_ext):
|
||
item.try_write()
|
||
# Skip writing for unsupported video formats to avoid corruption
|
||
else:
|
||
# Not a music video, write normally
|
||
item.try_write()
|
||
with session.lib.transaction():
|
||
for item in items:
|
||
item.store()
|
||
plugins.send("import_task_files", session=session, task=self_task)
|
||
|
||
ImportTask.manipulate_files = manipulate_files_with_video_guard # type: ignore[assignment]
|
||
|
||
# Mark the class as patched and replace the method. These runtime
|
||
# attributes are safe but confuse static type checkers.
|
||
ImportTaskFactory._beets_music_videos_patched = True # type: ignore[attr-defined]
|
||
ImportTaskFactory._beets_music_videos_original_read_item = ( # type: ignore[attr-defined]
|
||
original_read_item
|
||
)
|
||
ImportTaskFactory.read_item = read_item_with_videos # type: ignore[assignment]
|
||
|
||
def item_candidates_mb(
|
||
self, item: Item, artist: str, title: str
|
||
) -> Iterable[beets.autotag.hooks.TrackInfo]:
|
||
criteria = {"artist": artist, "recording": title, "alias": title}
|
||
|
||
yield from filter(
|
||
None, map(self.track_info, self._search_api("recording", criteria))
|
||
)
|
||
|
||
def _search_api(
|
||
self,
|
||
query_type: Literal["album", "track"],
|
||
filters: SearchFilter,
|
||
query_string: str = "",
|
||
) -> Sequence[VideoType | None]:
|
||
"""Required by SearchApiMetadataSourcePlugin. Search IMVDB and return videos."""
|
||
if query_type != "track":
|
||
return []
|
||
artist = (filters.get("artist") or "").strip()
|
||
title = (query_string or "").strip()
|
||
query = " ".join((artist, title)).strip()
|
||
if not query:
|
||
return []
|
||
try:
|
||
response = self.imvdb_api.video_search(query)
|
||
return response.get("results", [])
|
||
except Exception as e:
|
||
log.debug("imvdb video search failed: %s", e)
|
||
return []
|
||
# return self.imvdb_api.video_search(query)
|
||
|
||
def album_for_id(self, album_id: str) -> AlbumInfo | None:
|
||
"""We don't provide album metadata."""
|
||
return None
|
||
|
||
def track_for_id(self, track_id: str) -> TrackInfo | None:
|
||
"""Look up a single video by ID; not implemented yet."""
|
||
return None
|
||
|
||
def _search_musicbrainz_videos(self, artist: str, title: str) -> list[TrackInfo]:
|
||
"""Search MusicBrainz for video recordings matching artist and title."""
|
||
if not artist and not title:
|
||
return []
|
||
|
||
# Build search filters for MusicBrainz API
|
||
filters: dict[str, str] = {"video": "true"}
|
||
if artist:
|
||
filters["artist"] = artist
|
||
if title:
|
||
filters["recording"] = title
|
||
# Also search aliases for better matching
|
||
filters["alias"] = title
|
||
|
||
try:
|
||
recordings = self.mb_api.search(
|
||
"recording",
|
||
filters,
|
||
limit=10, # Limit results to avoid too many matches
|
||
)
|
||
except Exception as e:
|
||
log.debug("musicbrainz video search failed: %s", e)
|
||
return []
|
||
|
||
out: list[TrackInfo] = []
|
||
|
||
for recording in recordings:
|
||
# recording is JSONDict from mb_api.search
|
||
# Extract artist information
|
||
artist_credit = recording.get("artist-credit", [])
|
||
if not artist_credit:
|
||
continue
|
||
|
||
# Flatten artist credit to get names
|
||
artist_names: list[str] = []
|
||
artist_sort_names: list[str] = []
|
||
for credit in artist_credit:
|
||
if isinstance(credit, dict) and "artist" in credit:
|
||
artist_obj = cast(dict[str, Any], credit["artist"])
|
||
name = artist_obj.get("name", "")
|
||
sort_name = artist_obj.get("sort-name", name)
|
||
if isinstance(name, str):
|
||
artist_names.append(name)
|
||
if isinstance(sort_name, str):
|
||
artist_sort_names.append(sort_name)
|
||
|
||
if not artist_names:
|
||
continue
|
||
|
||
# Extract year/month/day from first-release-date if available.
|
||
year_int: int = 0
|
||
month_int: int = 0
|
||
day_int: int = 0
|
||
original_date = recording.get("first-release-date") or ""
|
||
if isinstance(original_date, str) and original_date:
|
||
# Expected formats: YYYY, YYYY-MM, YYYY-MM-DD
|
||
parts = original_date.split("-")
|
||
if len(parts) >= 1 and parts[0].isdigit():
|
||
try:
|
||
year_int = int(parts[0])
|
||
except ValueError:
|
||
year_int = 0
|
||
if len(parts) >= 2 and parts[1].isdigit():
|
||
try:
|
||
month_int = int(parts[1])
|
||
except ValueError:
|
||
month_int = 0
|
||
if len(parts) >= 3 and parts[2].isdigit():
|
||
try:
|
||
day_int = int(parts[2])
|
||
except ValueError:
|
||
day_int = 0
|
||
|
||
# track_info = TrackInfo(
|
||
# title=recording.get("title", ""),
|
||
# artist=artist_names[0],
|
||
# artists=artist_names,
|
||
# artist_sort=artist_sort_names[0] if artist_sort_names else "",
|
||
# artists_sort=artist_sort_names,
|
||
# track_id=recording.get("id", ""),
|
||
# year=year_int if year_int else None,
|
||
# month=month_int if month_int else None,
|
||
# day=day_int if day_int else None,
|
||
# data_source="MusicBrainz",
|
||
# data_url=f"https://musicbrainz.org/recording/{recording.get('id', '')}",
|
||
# )
|
||
|
||
track_info = map_mb_to_trackinfo(recording, index=None, medium=None, medium_index=None, medium_total=None)
|
||
track_info.data_source = "MusicBrainz"
|
||
track_info.year = year_int
|
||
track_info.month = month_int
|
||
track_info.day = day_int
|
||
if track_info.trackdisambig:
|
||
track_info.title = (
|
||
track_info.title + " - " + "[" + track_info.trackdisambig + "]"
|
||
)
|
||
|
||
# Extra MusicBrainz fields where available, mirroring the core
|
||
# MusicBrainz plugin so that beets can write as many tags as
|
||
# possible via MediaFile.
|
||
|
||
# Disambiguation comment.
|
||
|
||
# ISRCs.
|
||
if recording.get("isrcs"):
|
||
isrcs_raw: Any = recording.get("isrcs")
|
||
if isinstance(isrcs_raw, list):
|
||
isrc_strings = [str(i) for i in cast(list[Any], isrcs_raw)]
|
||
track_info.isrc = ";".join(isrc_strings) # type: ignore[attr-defined]
|
||
|
||
# Work / composers / lyricists (if work relations were included).
|
||
lyricist_names: list[str] = []
|
||
composer_names: list[str] = []
|
||
composer_sort_names: list[str] = []
|
||
work_rels_any = recording.get("work-relations", ()) or ()
|
||
for work_rel in work_rels_any:
|
||
if not isinstance(work_rel, dict):
|
||
continue
|
||
work_rel_d: dict[str, Any] = cast(dict[str, Any], work_rel)
|
||
if work_rel_d.get("type") != "performance":
|
||
continue
|
||
work_raw: Any = work_rel_d.get("work") or {}
|
||
if not isinstance(work_raw, dict):
|
||
continue
|
||
work_dict = cast(dict[str, Any], work_raw)
|
||
|
||
work_id = work_dict.get("id")
|
||
if isinstance(work_id, str):
|
||
track_info.mb_workid = work_id # type: ignore[attr-defined]
|
||
|
||
work_artist_rels = work_dict.get("artist-relations", ()) or ()
|
||
for work_artist_rel in work_artist_rels:
|
||
if not isinstance(work_artist_rel, dict):
|
||
continue
|
||
work_artist_rel_d: dict[str, Any] = cast(
|
||
dict[str, Any], work_artist_rel
|
||
)
|
||
rel_type = work_artist_rel_d.get("type")
|
||
work_artist_block_raw: Any = work_artist_rel_d.get("artist") or {}
|
||
if not isinstance(work_artist_block_raw, dict):
|
||
continue
|
||
work_artist_block = cast(dict[str, Any], work_artist_block_raw)
|
||
name_val = work_artist_block.get("name")
|
||
sort_val = work_artist_block.get("sort-name")
|
||
if rel_type == "lyricist" and isinstance(name_val, str):
|
||
lyricist_names.append(name_val)
|
||
elif rel_type == "composer" and isinstance(name_val, str):
|
||
composer_names.append(name_val)
|
||
if isinstance(sort_val, str):
|
||
composer_sort_names.append(sort_val)
|
||
|
||
if lyricist_names:
|
||
track_info.lyricist = ", ".join(lyricist_names) # type: ignore[attr-defined]
|
||
if composer_names:
|
||
track_info.composer = ", ".join(composer_names) # type: ignore[attr-defined]
|
||
if composer_sort_names:
|
||
track_info.composer_sort = ", ".join(composer_sort_names) # type: ignore[attr-defined]
|
||
|
||
# Remixer / arranger from artist-relations (if present).
|
||
remixer_names: list[str] = []
|
||
arranger_names: list[str] = []
|
||
artist_rels_any = recording.get("artist-relations", ()) or ()
|
||
for artist_rel in artist_rels_any:
|
||
if not isinstance(artist_rel, dict):
|
||
continue
|
||
artist_rel_d: dict[str, Any] = cast(dict[str, Any], artist_rel)
|
||
rel_type = artist_rel_d.get("type")
|
||
artist_block_raw: Any = artist_rel_d.get("artist") or {}
|
||
if not isinstance(artist_block_raw, dict):
|
||
continue
|
||
artist_block = cast(dict[str, Any], artist_block_raw)
|
||
name_val = artist_block.get("name")
|
||
if not isinstance(name_val, str):
|
||
continue
|
||
if rel_type == "remixer":
|
||
remixer_names.append(name_val)
|
||
elif rel_type == "arranger":
|
||
arranger_names.append(name_val)
|
||
|
||
if remixer_names:
|
||
track_info.remixer = ", ".join(remixer_names) # type: ignore[attr-defined]
|
||
if arranger_names:
|
||
track_info.arranger = ", ".join(arranger_names) # type: ignore[attr-defined]
|
||
|
||
# Add length if available
|
||
if "length" in recording:
|
||
track_info.length = int(recording["length"]) / 1000.0
|
||
|
||
# print(track_info.artist, artist)
|
||
if fuzz.token_sort_ratio(track_info.artist, artist) > 50:
|
||
out.append(track_info)
|
||
|
||
return out
|
||
|
||
def _split_file_name(self, path: str) -> tuple[str]:
|
||
"""Split up all the parts of file name."""
|
||
path_str = str(path)
|
||
parts = path_str.split(" ")
|
||
return tuple[str](parts)
|
||
|
||
def item_candidates(self, item: Item, artist: str, title: str) -> list[TrackInfo]:
|
||
|
||
# Derive artist/title from filename when missing (e.g. "Artist - Song.mp4").
|
||
search_artist, search_title = self._parse_artist_title_for_search(
|
||
item, artist, title
|
||
)
|
||
search_artist = self._normalize_video_search_query(search_artist)
|
||
search_title = self._normalize_video_search_query(search_title)
|
||
|
||
# Build query variants for fallback when primary returns nothing (e.g. "The Beatles" -> "Beatles").
|
||
query_variants: list[tuple[str, str]] = [(search_artist, search_title)]
|
||
if search_artist and re.match(r"^the\s+", search_artist, re.IGNORECASE):
|
||
without_the = re.sub(
|
||
r"^the\s+", "", search_artist, flags=re.IGNORECASE
|
||
).strip()
|
||
if without_the:
|
||
query_variants.append((without_the, search_title))
|
||
|
||
seen_ids: set[str] = set()
|
||
out: list[TrackInfo] = []
|
||
|
||
for q_artist, q_title in query_variants:
|
||
q = " ".join((q_artist, q_title)).strip()
|
||
if not q:
|
||
continue
|
||
imvdb_results = self._search_api(
|
||
"track", {"artist": q_artist}, query_string=q_title
|
||
)
|
||
mb_results = self._search_musicbrainz_videos(q_artist, q_title)
|
||
|
||
for result in imvdb_results:
|
||
# pprint.pprint(result)
|
||
tid = str(result.get("id", ""))
|
||
if tid and tid in seen_ids:
|
||
continue
|
||
if tid:
|
||
seen_ids.add(tid)
|
||
artists_list = result.get("artists", [])
|
||
artist_names = [str(a.get("name", "")) for a in artists_list]
|
||
first_artist = artist_names[0] if artist_names else ""
|
||
title = str(result.get("song_title", ""))
|
||
year = str(result.get("year", ""))
|
||
|
||
track_disambig = result.get("version_name")
|
||
if track_disambig:
|
||
title = title + " - " + "[" + track_disambig + "]"
|
||
|
||
if fuzz.token_sort_ratio(first_artist, q_artist) < 80:
|
||
continue
|
||
# fuzz_score = fuzz.token_sort_ratio(title + first_artist + year + release_name)
|
||
# if fuzz_score > 80:
|
||
# continue
|
||
out.append(
|
||
TrackInfo(
|
||
title=title,
|
||
artist=first_artist,
|
||
artists=artist_names,
|
||
track_id=tid,
|
||
year=year,
|
||
data_source="IMVDB",
|
||
trackdisambig=track_disambig,
|
||
)
|
||
)
|
||
|
||
for track_info in mb_results:
|
||
tid = (track_info.track_id or "").strip()
|
||
|
||
if tid and tid in seen_ids:
|
||
continue
|
||
if tid:
|
||
seen_ids.add(tid)
|
||
out.append(track_info)
|
||
|
||
# If we got results from this variant, don't add more from later variants (avoid duplicates from alternate spellings).
|
||
if out:
|
||
break
|
||
|
||
return out
|