Files
David Freitag ea7d420975 Add beets_music_videos plugin for managing music videos
- Implemented a new beets plugin to import and manage music videos, supporting various video formats and providing metadata from IMVDB and MusicBrainz for autotagging.
- Added installation instructions and configuration options in README.md.
- Created IMVDBApi client for interacting with the IMVDB API.
- Defined typing for various API responses and utility functions for mapping MusicBrainz data to beets TrackInfo.
- Included desktop entry and JSON info files for a video titled "[SAST] The Evolution of Search-based Software Testing".
- Added utility functions for handling artist credits and related metadata.
- Introduced a grabbed.txt file for tracking video sources.
2026-03-18 18:51:27 -04:00

1045 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import pprint
import re
from pathlib import Path
import subprocess
import xml.etree.ElementTree as ET
from beets import logging, plugins, util
import beets
from beets.autotag.hooks import TrackInfo
from beets.library import Item
from beets.metadata_plugins import SearchApiMetadataSourcePlugin
from typing import TYPE_CHECKING, Any
from fuzzywuzzy import fuzz
from urllib.parse import urljoin
from .utils.mapping import map_mb_to_trackinfo
from typing import Annotated, Any, Literal, Sequence, cast
from typing import Self
from beets.autotag.hooks import AlbumInfo
from beets.metadata_plugins import SearchFilter
from beetsplug._utils.musicbrainz import (
MusicBrainzAPIMixin,
)
from beets import config
from .IMVDBApi import IMVDBApi
# from beetsplug._utils.requests import requests
# from fuzzywuzzy import fuzz
from .types import SearchResponseType, VideoType
from httpx_auth import HeaderApiKey
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from typing import Literal
from beets.library import Item
from ._typing import JSONDict
log = logging.getLogger("beets")
MB_BASE_URL = "https://musicbrainz.org/"
def _artist_ids(credit: list[JSONDict]) -> list[str]:
"""
Given a list representing an ``artist-credit``,
return a list of artist IDs
"""
artist_ids: list[str] = []
for el in credit:
if isinstance(el, dict):
artist_ids.append(el["artist"]["id"])
return artist_ids
def _preferred_alias(
aliases: list[JSONDict], languages: list[str] | None = None
) -> JSONDict | None:
"""Given a list of alias structures for an artist credit, select
and return the user's preferred alias or None if no matching
"""
if not aliases:
return None
# Only consider aliases that have locales set.
valid_aliases = [a for a in aliases if "locale" in a]
# Get any ignored alias types and lower case them to prevent case issues
ignored_alias_types = config["import"]["ignored_alias_types"].as_str_seq()
ignored_alias_types = [a.lower() for a in ignored_alias_types]
# Search configured locales in order.
if languages is None:
languages = config["import"]["languages"].as_str_seq()
for locale in languages:
# Find matching primary aliases for this locale that are not
# being ignored
matches = []
for alias in valid_aliases:
if (
alias["locale"] == locale
and alias.get("primary")
and (alias.get("type") or "").lower() not in ignored_alias_types
):
matches.append(alias)
# Skip to the next locale if we have no matches
if not matches:
continue
return matches[0]
return None
def _multi_artist_credit(
credit: list[JSONDict], include_join_phrase: bool
) -> tuple[list[str], list[str], list[str]]:
"""Given a list representing an ``artist-credit`` block, accumulate
data into a triple of joined artist name lists: canonical, sort, and
credit.
"""
artist_parts = []
artist_sort_parts = []
artist_credit_parts = []
for el in credit:
alias = _preferred_alias(el["artist"].get("aliases", ()))
# An artist.
if alias:
cur_artist_name = alias["name"]
else:
cur_artist_name = el["artist"]["name"]
artist_parts.append(cur_artist_name)
# Artist sort name.
if alias:
artist_sort_parts.append(alias["sort-name"])
elif "sort-name" in el["artist"]:
artist_sort_parts.append(el["artist"]["sort-name"])
else:
artist_sort_parts.append(cur_artist_name)
# Artist credit.
if "name" in el:
artist_credit_parts.append(el["name"])
else:
artist_credit_parts.append(cur_artist_name)
if include_join_phrase and (joinphrase := el.get("joinphrase")):
artist_parts.append(joinphrase)
artist_sort_parts.append(joinphrase)
artist_credit_parts.append(joinphrase)
return (
artist_parts,
artist_sort_parts,
artist_credit_parts,
)
def track_url(trackid: str) -> str:
return urljoin(MB_BASE_URL, f"recording/{trackid}")
def _flatten_artist_credit(credit: list[JSONDict]) -> tuple[str, str, str]:
"""Given a list representing an ``artist-credit`` block, flatten the
data into a triple of joined artist name strings: canonical, sort, and
credit.
"""
artist_parts, artist_sort_parts, artist_credit_parts = _multi_artist_credit(
credit, include_join_phrase=True
)
return (
"".join(artist_parts),
"".join(artist_sort_parts),
"".join(artist_credit_parts),
)
class BeetsMusicVideos( # type: ignore[type-var]
MusicBrainzAPIMixin, SearchApiMetadataSourcePlugin[VideoType]
):
"""Plugin that lets beets import "non-audio" files (e.g. music videos)
by treating selected extensions as supported and creating library
`Item`s for them without going through `MediaFile`.
"""
def __init__(self) -> None: # type: ignore
super().__init__() # type: ignore
# Default set of extensions to treat as importable media, in
# addition to the formats supported by `mediafile`.
self.config.add({
"extensions": [
".mp4",
".m4v",
".mkv",
".avi",
".webm",
],
"imvdb_api_key": "",
})
self.imvdb_api_key = self.config["imvdb_api_key"].get(str)
self._patch_import_task_factory()
self.imvdb_api = IMVDBApi(self.imvdb_api_key)
# self.imvdb_api = IMVDBApiRaw(self.imvdb_api_key)Track
def _normalized_extensions(self) -> set[str]:
"""Return the configured extensions as a normalized set.
All values are lowercased and guaranteed to start with a leading
dot, e.g. ``.mp4``.
"""
exts: set[str] = set()
for raw in self.config["extensions"].as_str_seq():
raw = raw.strip().lower()
if not raw:
continue
if not raw.startswith("."):
raw = f".{raw}"
exts.add(raw)
return exts
# Separators that often split "Artist - Title" in filenames (order matters:
# try longer/unicode first so " " is preferred over single "-" in "Artist Title").
_ARTIST_TITLE_SEP = re.compile(r"\s+[–—\-]\s+", re.UNICODE)
# Common parenthetical suffixes in video filenames; stripped for better API matching.
_VIDEO_TITLE_SUFFIX = re.compile(
r"\s*[(\[]\s*(?:official\s+)?(?:music\s+)?video(?:\s+clip)?[)\]]\s*$",
re.IGNORECASE,
)
_VIDEO_MV_SUFFIX = re.compile(r"\s*[(\[]\s*mv\s*[)\]]\s*$", re.IGNORECASE)
def _parse_artist_title_for_search(
self, item: Item, artist: str, title: str
) -> tuple[str, str]:
"""Derive search artist and title from item/filename.
Handles common "Artist - Title" patterns that appear in video filenames
so you don't need to type them during import. Prefers the path stem
when available so we parse the actual filename even if beets didn't
pass it as title.
"""
a = (artist or "").strip()
t = (title or "").strip()
# Prefer the item's title if the caller didn't give us one explicitly.
if not t and item:
t = str(getattr(item, "title", "") or "").strip()
# Prefer the path stem so we always parse the real filename. Beets
# passes item.artist and item.title (we set item.title = stem when
# creating the item), but using the path here guarantees we have the
# filename to split.
path_stem_str: str | None = None
if item and getattr(item, "path", None):
try:
raw_stem = Path(util.syspath(item.path)).stem
path_stem_str = str(raw_stem).strip() or None
except Exception:
path_stem_str = None
if path_stem_str and (not t or path_stem_str == t):
t = path_stem_str
elif not t and path_stem_str:
t = path_stem_str
log.debug(
"beets_music_videos: item_candidates received artist=%r title=%r path_stem=%r",
artist,
title,
path_stem_str,
)
if not t:
return a, t
# If the title looks like "Artist - Title", split it.
m = self._ARTIST_TITLE_SEP.search(t)
if m:
parts = self._ARTIST_TITLE_SEP.split(t, 1)
if len(parts) == 2:
artist_from_title, title_from_title = (
parts[0].strip(),
parts[1].strip(),
)
# If there is no artist yet, trust the filename.
if not a and artist_from_title:
a, t = artist_from_title, title_from_title
log.debug(
"beets_music_videos: split filename -> artist=%r title=%r",
a,
t,
)
# If there *is* an artist and it matches the prefix (very common
# when both tags and filename agree), keep the existing artist
# but still use the suffix as the track title.
elif (
a
and artist_from_title
and artist_from_title.lower().startswith(a.lower())
):
t = title_from_title
return (str(a or ""), str(t or ""))
def _normalize_video_search_query(self, s: str) -> str:
"""Normalize a string for video API search: strip common video suffixes
and extra whitespace so \"Song (Official Video)\" matches \"Song\".
"""
if not s:
return ""
s = s.strip()
s = self._VIDEO_TITLE_SUFFIX.sub("", s)
s = self._VIDEO_MV_SUFFIX.sub("", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _read_nfo_metadata(
self,
path_str: str,
) -> tuple[dict[str, Any], str | None]:
"""Read a Kodi-style .nfo file that shares the basename with the video.
For a path like /path/to/Foo-Bar.mp4 we look for /path/to/Foo-Bar.nfo.
If the file exists and is wellformed XML, extract a small set of
common tags and return them as a flat dict.
"""
nfo_path = Path(path_str).with_suffix(".nfo")
if not nfo_path.is_file():
return {}, None
try:
text = nfo_path.read_text(encoding="utf-8", errors="ignore")
except Exception as exc:
log.debug(
"beets_music_videos: failed to read NFO %r: %s",
str(nfo_path),
exc,
)
return {}, None
stripped = text.lstrip()
if not stripped.startswith("<"):
# Not XML; ignore for now.
return {}, str(nfo_path)
try:
root = ET.fromstring(text)
except Exception as exc:
log.debug(
"beets_music_videos: failed to parse NFO XML %r: %s",
str(nfo_path),
exc,
)
return {}, str(nfo_path)
data: dict[str, Any] = {}
def first_text(*tags: str) -> str | None:
for tag in tags:
el = root.find(tag)
if el is not None and el.text:
value = el.text.strip()
if value:
return value
return None
# Title
title = first_text("title")
if title:
data["title"] = title
# Artists
artists: list[str] = []
for el in root.findall("artist"):
if el.text:
name = el.text.strip()
if name:
artists.append(name)
if not artists:
fallback_artist = first_text("artist", "albumartist")
if fallback_artist:
artists.append(fallback_artist)
if artists:
data["artist"] = artists[0]
if len(artists) > 1:
data["artists"] = artists
# Album / albumartist
album = first_text("album", "showtitle")
if album:
data["album"] = album
albumartist = first_text("albumartist")
if albumartist:
data["albumartist"] = albumartist
elif artists:
data["albumartist"] = artists[0]
# Year: accept YYYY or YYYY-MM-DD, etc.
year_raw = first_text("year", "releasedate", "premiered", "aired")
if year_raw:
m = re.search(r"(\\d{4})", year_raw)
if m:
try:
data["year"] = int(m.group(1))
except ValueError:
pass
# Track number
track_raw = first_text("track", "episode")
if track_raw:
try:
data["track"] = int(track_raw)
except ValueError:
pass
# Genre: potentially multiple <genre> tags.
genres: list[str] = []
for el in root.findall("genre"):
if el.text:
g = el.text.strip()
if g:
genres.append(g)
if genres:
data["genre"] = " / ".join(genres)
# Plot/description -> comments field, but only if present.
plot = first_text("plot", "review", "outline")
if plot:
data["comments"] = plot
return data, str(nfo_path)
def _apply_nfo_metadata_to_item(self, item: Item, path_str: str) -> None:
"""Apply metadata from a samebasename .nfo file to an Item."""
nfo_data, nfo_path_str = self._read_nfo_metadata(path_str)
if not nfo_data:
return
log.debug(
"beets_music_videos: applying NFO metadata from %r: %s",
nfo_path_str,
", ".join(sorted(nfo_data.keys())),
)
# String fields.
for key in ("title", "artist", "album", "albumartist", "genre", "comments"):
value = nfo_data.get(key)
if isinstance(value, str) and value:
item[key] = value
# Artists list (flexible field; may not be in every template).
artists_value = nfo_data.get("artists")
if isinstance(artists_value, list) and artists_value:
item["artists"] = artists_value
# Numeric core fields.
year_val = nfo_data.get("year")
if isinstance(year_val, int):
item.year = year_val
track_val = nfo_data.get("track")
if isinstance(track_val, int):
item.track = track_val
def _probe_video_resolution(
self,
path_str: str,
) -> tuple[int | None, int | None]:
"""Use ffprobe to detect video width/height for a given file path.
Returns (width, height) in pixels, or (None, None) if probing fails.
"""
try:
# ffprobe should be available in PATH; if not, just skip.
proc = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"json",
path_str,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
)
except FileNotFoundError:
# ffprobe not installed; nothing we can do.
log.debug(
"beets_music_videos: ffprobe not found; cannot probe resolution for %r",
path_str,
)
return None, None
except subprocess.CalledProcessError as exc:
log.debug(
"beets_music_videos: ffprobe failed for %r: %s",
path_str,
exc.stderr.strip() if exc.stderr else exc,
)
return None, None
except Exception as exc:
log.debug(
"beets_music_videos: unexpected error running ffprobe for %r: %s",
path_str,
exc,
)
return None, None
try:
data = json.loads(proc.stdout or "{}")
except Exception as exc:
log.debug(
"beets_music_videos: failed to parse ffprobe JSON for %r: %s",
path_str,
exc,
)
return None, None
streams_any = data.get("streams")
if not isinstance(streams_any, list) or not streams_any:
return None, None
stream0: dict[str, Any] = cast(dict[str, Any], streams_any[0])
width_val = stream0.get("width")
height_val = stream0.get("height")
width: int | None = None
height: int | None = None
if isinstance(width_val, int):
width = width_val
elif isinstance(width_val, str) and width_val.isdigit():
width = int(width_val)
if isinstance(height_val, int):
height = height_val
elif isinstance(height_val, str) and height_val.isdigit():
height = int(height_val)
return width, height
def _mediafile_supports_format(self, ext: str) -> bool:
"""Check if MediaFile supports writing metadata to the given extension.
MediaFile supports MP4 format (including .mp4 and .m4v extensions).
Other video formats like .webm, .mkv, .avi are not supported.
"""
ext = ext.lower()
# MediaFile supports MP4 format, which includes .mp4 and .m4v files
return ext in (".mp4", ".m4v")
def _patch_import_task_factory(self) -> None:
"""Monkey-patch `ImportTaskFactory.read_item` so that paths with
one of our configured extensions are turned into `Item`s even if
`mediafile` does not support them. Also patch import tasks to
skip writing tags for music videos and to avoid deleting originals
when they are already inside the library directory.
"""
# Local import to avoid importing importer machinery unless needed.
from beets.importer.tasks import ( # type: ignore[attr-defined]
Action,
ImportTask,
ImportTaskFactory,
)
# Only patch once, even if the plugin is re-instantiated.
if getattr(ImportTaskFactory, "_beets_music_videos_patched", False):
return
original_read_item = ImportTaskFactory.read_item
plugin = self
def read_item_with_videos(
self_: "ImportTaskFactory", path: util.PathBytes
) -> Item | None: # type: ignore[override]
# Determine the file extension of this path.
str_path = util.syspath(path)
ext = os.path.splitext(str_path)[1].lower()
if ext in plugin._normalized_extensions():
# Create an Item without going through MediaFile.
# We deliberately avoid calling Item.from_path() or
# Item.read() because those rely on MediaFile and would
# reject unsupported formats.
item = Item(album_id=None)
# Store a normalized bytes path, as beets expects.
item.path = util.normpath(path)
# Initialize mtime from the actual file.
try:
item.mtime = item.current_mtime()
except OSError:
# If we cannot stat the file for some reason, fall
# back to 0; the file is still importable.
item.mtime = 0
# Default the title to the filename stem; this is used as a
# fallback when we can't read embedded metadata.
filename = Path(str_path).stem
if not item.get("title"): # type: ignore[no-any-return]
item["title"] = filename
# Probe video resolution using ffprobe so resolution is
# available as flexible metadata on the item for queries and
# path formats.
width, height = plugin._probe_video_resolution(str_path)
if width is not None:
item["video_width"] = width
if height is not None:
item["video_height"] = height
if width is not None and height is not None:
item["video_resolution"] = f"{width}x{height}"
# Mark the item as a music video via a flexible attribute
# so it can be queried, e.g. `media_type:music_video`.
item["media_type"] = "music_video"
# For formats where MediaFile is known to work (e.g. MP4/M4V),
# read embedded tags so the initial autotag search can use
# real metadata (artist/title/album) and a correct duration.
if plugin._mediafile_supports_format(ext):
try:
from mediafile import MediaFile # type: ignore[import-untyped]
mf = MediaFile(str_path)
# Prefer embedded tags over filename-derived values.
title_val = getattr(mf, "title", None)
if isinstance(title_val, str) and title_val:
item["title"] = title_val
artist_val = getattr(mf, "artist", None)
if isinstance(artist_val, str) and artist_val:
item["artist"] = artist_val
album_val = getattr(mf, "album", None)
if isinstance(album_val, str) and album_val:
item["album"] = album_val
albumartist_val = getattr(mf, "albumartist", None)
if isinstance(albumartist_val, str) and albumartist_val:
item["albumartist"] = albumartist_val
length_val = getattr(mf, "length", None)
if isinstance(length_val, (int, float)):
item.length = float(length_val)
except Exception as exc:
log.debug(
"beets_music_videos: failed to read tags for %r: %s",
str_path,
exc,
)
# Finally, if there is a samebasename .nfo file alongside
# the video (e.g. Foo-Bar.mp4 + Foo-Bar.nfo), merge its
# metadata into the Item. NFO values intentionally override
# existing ones so that handedited metadata wins.
plugin._apply_nfo_metadata_to_item(item, str_path)
return item
# For all other paths, fall back to the normal behavior,
# which uses Item.from_path() and MediaFile.
return original_read_item(self_, path)
# Patch ImportTask.manipulate_files to conditionally skip try_write for
# music videos in unsupported formats (MediaFile supports MP4/M4V but not
# WebM, MKV, AVI, etc., and writing could corrupt unsupported files).
_original_manipulate_files = ImportTask.manipulate_files
def manipulate_files_with_video_guard(
self_task: "ImportTask",
session: Any,
operation: Any = None,
write: bool = False,
) -> None:
items = cast(list[Any], self_task.imported_items())
self_task.old_paths = [item.path for item in items] # type: ignore[attr-defined]
for item in items:
if operation is not None:
old_path = item.path
if (
operation != util.MoveOperation.MOVE
and self_task.replaced_items[item] # type: ignore[attr-defined]
and session.lib.directory in util.ancestry(old_path)
):
item.move()
self_task.old_paths.remove(old_path) # type: ignore[attr-defined]
else:
item.move(operation)
# Write tags for music videos only if MediaFile supports the format.
# MediaFile supports MP4/M4V but not WebM, MKV, AVI, etc.
if write and (self_task.apply or self_task.choice_flag == Action.RETAG): # type: ignore[attr-defined]
is_music_video = item.get("media_type") == "music_video"
if is_music_video:
# Check if MediaFile supports this video format
item_ext = os.path.splitext(util.syspath(item.path))[1].lower()
if plugin._mediafile_supports_format(item_ext):
item.try_write()
# Skip writing for unsupported video formats to avoid corruption
else:
# Not a music video, write normally
item.try_write()
with session.lib.transaction():
for item in items:
item.store()
plugins.send("import_task_files", session=session, task=self_task)
ImportTask.manipulate_files = manipulate_files_with_video_guard # type: ignore[assignment]
# Mark the class as patched and replace the method. These runtime
# attributes are safe but confuse static type checkers.
ImportTaskFactory._beets_music_videos_patched = True # type: ignore[attr-defined]
ImportTaskFactory._beets_music_videos_original_read_item = ( # type: ignore[attr-defined]
original_read_item
)
ImportTaskFactory.read_item = read_item_with_videos # type: ignore[assignment]
def item_candidates_mb(
self, item: Item, artist: str, title: str
) -> Iterable[beets.autotag.hooks.TrackInfo]:
criteria = {"artist": artist, "recording": title, "alias": title}
yield from filter(
None, map(self.track_info, self._search_api("recording", criteria))
)
def _search_api(
self,
query_type: Literal["album", "track"],
filters: SearchFilter,
query_string: str = "",
) -> Sequence[VideoType | None]:
"""Required by SearchApiMetadataSourcePlugin. Search IMVDB and return videos."""
if query_type != "track":
return []
artist = (filters.get("artist") or "").strip()
title = (query_string or "").strip()
query = " ".join((artist, title)).strip()
if not query:
return []
try:
response = self.imvdb_api.video_search(query)
return response.get("results", [])
except Exception as e:
log.debug("imvdb video search failed: %s", e)
return []
# return self.imvdb_api.video_search(query)
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""We don't provide album metadata."""
return None
def track_for_id(self, track_id: str) -> TrackInfo | None:
"""Look up a single video by ID; not implemented yet."""
return None
def _search_musicbrainz_videos(self, artist: str, title: str) -> list[TrackInfo]:
"""Search MusicBrainz for video recordings matching artist and title."""
if not artist and not title:
return []
# Build search filters for MusicBrainz API
filters: dict[str, str] = {"video": "true"}
if artist:
filters["artist"] = artist
if title:
filters["recording"] = title
# Also search aliases for better matching
filters["alias"] = title
try:
recordings = self.mb_api.search(
"recording",
filters,
limit=10, # Limit results to avoid too many matches
)
except Exception as e:
log.debug("musicbrainz video search failed: %s", e)
return []
out: list[TrackInfo] = []
for recording in recordings:
# recording is JSONDict from mb_api.search
# Extract artist information
artist_credit = recording.get("artist-credit", [])
if not artist_credit:
continue
# Flatten artist credit to get names
artist_names: list[str] = []
artist_sort_names: list[str] = []
for credit in artist_credit:
if isinstance(credit, dict) and "artist" in credit:
artist_obj = cast(dict[str, Any], credit["artist"])
name = artist_obj.get("name", "")
sort_name = artist_obj.get("sort-name", name)
if isinstance(name, str):
artist_names.append(name)
if isinstance(sort_name, str):
artist_sort_names.append(sort_name)
if not artist_names:
continue
# Extract year/month/day from first-release-date if available.
year_int: int = 0
month_int: int = 0
day_int: int = 0
original_date = recording.get("first-release-date") or ""
if isinstance(original_date, str) and original_date:
# Expected formats: YYYY, YYYY-MM, YYYY-MM-DD
parts = original_date.split("-")
if len(parts) >= 1 and parts[0].isdigit():
try:
year_int = int(parts[0])
except ValueError:
year_int = 0
if len(parts) >= 2 and parts[1].isdigit():
try:
month_int = int(parts[1])
except ValueError:
month_int = 0
if len(parts) >= 3 and parts[2].isdigit():
try:
day_int = int(parts[2])
except ValueError:
day_int = 0
# track_info = TrackInfo(
# title=recording.get("title", ""),
# artist=artist_names[0],
# artists=artist_names,
# artist_sort=artist_sort_names[0] if artist_sort_names else "",
# artists_sort=artist_sort_names,
# track_id=recording.get("id", ""),
# year=year_int if year_int else None,
# month=month_int if month_int else None,
# day=day_int if day_int else None,
# data_source="MusicBrainz",
# data_url=f"https://musicbrainz.org/recording/{recording.get('id', '')}",
# )
track_info = map_mb_to_trackinfo(recording, index=None, medium=None, medium_index=None, medium_total=None)
track_info.data_source = "MusicBrainz"
track_info.year = year_int
track_info.month = month_int
track_info.day = day_int
if track_info.trackdisambig:
track_info.title = (
track_info.title + " - " + "[" + track_info.trackdisambig + "]"
)
# Extra MusicBrainz fields where available, mirroring the core
# MusicBrainz plugin so that beets can write as many tags as
# possible via MediaFile.
# Disambiguation comment.
# ISRCs.
if recording.get("isrcs"):
isrcs_raw: Any = recording.get("isrcs")
if isinstance(isrcs_raw, list):
isrc_strings = [str(i) for i in cast(list[Any], isrcs_raw)]
track_info.isrc = ";".join(isrc_strings) # type: ignore[attr-defined]
# Work / composers / lyricists (if work relations were included).
lyricist_names: list[str] = []
composer_names: list[str] = []
composer_sort_names: list[str] = []
work_rels_any = recording.get("work-relations", ()) or ()
for work_rel in work_rels_any:
if not isinstance(work_rel, dict):
continue
work_rel_d: dict[str, Any] = cast(dict[str, Any], work_rel)
if work_rel_d.get("type") != "performance":
continue
work_raw: Any = work_rel_d.get("work") or {}
if not isinstance(work_raw, dict):
continue
work_dict = cast(dict[str, Any], work_raw)
work_id = work_dict.get("id")
if isinstance(work_id, str):
track_info.mb_workid = work_id # type: ignore[attr-defined]
work_artist_rels = work_dict.get("artist-relations", ()) or ()
for work_artist_rel in work_artist_rels:
if not isinstance(work_artist_rel, dict):
continue
work_artist_rel_d: dict[str, Any] = cast(
dict[str, Any], work_artist_rel
)
rel_type = work_artist_rel_d.get("type")
work_artist_block_raw: Any = work_artist_rel_d.get("artist") or {}
if not isinstance(work_artist_block_raw, dict):
continue
work_artist_block = cast(dict[str, Any], work_artist_block_raw)
name_val = work_artist_block.get("name")
sort_val = work_artist_block.get("sort-name")
if rel_type == "lyricist" and isinstance(name_val, str):
lyricist_names.append(name_val)
elif rel_type == "composer" and isinstance(name_val, str):
composer_names.append(name_val)
if isinstance(sort_val, str):
composer_sort_names.append(sort_val)
if lyricist_names:
track_info.lyricist = ", ".join(lyricist_names) # type: ignore[attr-defined]
if composer_names:
track_info.composer = ", ".join(composer_names) # type: ignore[attr-defined]
if composer_sort_names:
track_info.composer_sort = ", ".join(composer_sort_names) # type: ignore[attr-defined]
# Remixer / arranger from artist-relations (if present).
remixer_names: list[str] = []
arranger_names: list[str] = []
artist_rels_any = recording.get("artist-relations", ()) or ()
for artist_rel in artist_rels_any:
if not isinstance(artist_rel, dict):
continue
artist_rel_d: dict[str, Any] = cast(dict[str, Any], artist_rel)
rel_type = artist_rel_d.get("type")
artist_block_raw: Any = artist_rel_d.get("artist") or {}
if not isinstance(artist_block_raw, dict):
continue
artist_block = cast(dict[str, Any], artist_block_raw)
name_val = artist_block.get("name")
if not isinstance(name_val, str):
continue
if rel_type == "remixer":
remixer_names.append(name_val)
elif rel_type == "arranger":
arranger_names.append(name_val)
if remixer_names:
track_info.remixer = ", ".join(remixer_names) # type: ignore[attr-defined]
if arranger_names:
track_info.arranger = ", ".join(arranger_names) # type: ignore[attr-defined]
# Add length if available
if "length" in recording:
track_info.length = int(recording["length"]) / 1000.0
# print(track_info.artist, artist)
if fuzz.token_sort_ratio(track_info.artist, artist) > 50:
out.append(track_info)
return out
def _split_file_name(self, path: str) -> tuple[str]:
"""Split up all the parts of file name."""
path_str = str(path)
parts = path_str.split(" ")
return tuple[str](parts)
def item_candidates(self, item: Item, artist: str, title: str) -> list[TrackInfo]:
# Derive artist/title from filename when missing (e.g. "Artist - Song.mp4").
search_artist, search_title = self._parse_artist_title_for_search(
item, artist, title
)
search_artist = self._normalize_video_search_query(search_artist)
search_title = self._normalize_video_search_query(search_title)
# Build query variants for fallback when primary returns nothing (e.g. "The Beatles" -> "Beatles").
query_variants: list[tuple[str, str]] = [(search_artist, search_title)]
if search_artist and re.match(r"^the\s+", search_artist, re.IGNORECASE):
without_the = re.sub(
r"^the\s+", "", search_artist, flags=re.IGNORECASE
).strip()
if without_the:
query_variants.append((without_the, search_title))
seen_ids: set[str] = set()
out: list[TrackInfo] = []
for q_artist, q_title in query_variants:
q = " ".join((q_artist, q_title)).strip()
if not q:
continue
imvdb_results = self._search_api(
"track", {"artist": q_artist}, query_string=q_title
)
mb_results = self._search_musicbrainz_videos(q_artist, q_title)
for result in imvdb_results:
# pprint.pprint(result)
tid = str(result.get("id", ""))
if tid and tid in seen_ids:
continue
if tid:
seen_ids.add(tid)
artists_list = result.get("artists", [])
artist_names = [str(a.get("name", "")) for a in artists_list]
first_artist = artist_names[0] if artist_names else ""
title = str(result.get("song_title", ""))
year = str(result.get("year", ""))
track_disambig = result.get("version_name")
if track_disambig:
title = title + " - " + "[" + track_disambig + "]"
if fuzz.token_sort_ratio(first_artist, q_artist) < 80:
continue
# fuzz_score = fuzz.token_sort_ratio(title + first_artist + year + release_name)
# if fuzz_score > 80:
# continue
out.append(
TrackInfo(
title=title,
artist=first_artist,
artists=artist_names,
track_id=tid,
year=year,
data_source="IMVDB",
trackdisambig=track_disambig,
)
)
for track_info in mb_results:
tid = (track_info.track_id or "").strip()
if tid and tid in seen_ids:
continue
if tid:
seen_ids.add(tid)
out.append(track_info)
# If we got results from this variant, don't add more from later variants (avoid duplicates from alternate spellings).
if out:
break
return out