Files
beets_music_video/beetsplug/beets_music_videos/__init__.py
2026-02-26 19:56:25 -05:00

314 lines
12 KiB
Python

from __future__ import annotations
import os
from pathlib import Path
from beets import logging, plugins, util
from beets.autotag.hooks import TrackInfo
from beets.library import Item
from beets.metadata_plugins import SearchApiMetadataSourcePlugin
from typing import Annotated, Any, Literal, Sequence
from typing import Self
from beets.autotag.hooks import AlbumInfo
from beets.metadata_plugins import SearchFilter
from .types import SearchResponseType, VideoType
from httpx_auth import HeaderApiKey
from lapidary.runtime import Body, ClientBase, get, Query, Response, Responses, UnexpectedResponse
log = logging.getLogger("beets")
# class IMVDBApi:
# def __init__(self, api_key: str) -> None:
# self.api_key = api_key
# self.api_url = "https://imvdb.com/api/v1"
# def _base_call(self, endpoint: str, params: dict[str, Any]) -> dict[str, Any]:
# response = requests.get(
# f"{self.api_url}/{endpoint}",
# params={**params, "api_key": self.api_key},
# )
# return response.json()
# def search(self, query: str) -> list[VideoType]:
# response: SearchResponseType = self._base_call("search", params={"q": query})
# return response["results"]
class IMVDBApi(ClientBase):
"""Lapidary-based IMVDB client. Use api_key in security; do not hardcode."""
def __init__(
self,
api_key: str,
base_url: str = "https://imvdb.com/api/v1",
**kwargs: Any,
) -> None: # type: ignore[override, assignment]
# Some APIs return 500/502 for non-browser User-Agent; send a browser-like one.
headers = dict(kwargs.pop("headers", {}))
headers.setdefault(
"User-Agent",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
super().__init__(
base_url=base_url,
security=[{"api_key": [api_key]}],
headers=headers,
**kwargs,
)
# Send API key as header IMVDB-APP-KEY on every request.
self.lapidary_authenticate(
api_key=HeaderApiKey(api_key, header_name="IMVDB-APP-KEY"),
)
@get("/search/videos") # type: ignore[misc]
async def video_search(
self: Self,
q: Annotated[str, Query()],
) -> Annotated[
SearchResponseType,
Responses(
responses={
"2xx": Response(body=Body({"application/json": SearchResponseType})),
}
),
]:
"""Search IMVDB; returns response with .results (list of VideoType)."""
pass
# return cast(SearchResponseType, {}) # unreachable; lapidary implements this
def sync_video_search(self, query: str) -> list[VideoType]:
"""Sync wrapper used by the plugin. Runs video_search and returns .results."""
import asyncio
# Reuse a single event loop to avoid "Event loop is closed" on subsequent calls.
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
resp = loop.run_until_complete(self.video_search(q=query))
return resp.get("results", [])
except UnexpectedResponse as e:
# Lapidary raises UnexpectedResponse when body validation fails (e.g. API shape differs from SearchResponseType). For 200, parse body and return results.
if e.response.status_code == 200:
try:
body = e.response.json()
return body.get("results", []) if isinstance(body, dict) else []
except Exception:
pass
log.debug("imvdb search failed for %r: %s", query[:50], e)
return []
except Exception as e:
log.debug("imvdb search failed for %r: %s", query[:50], e)
return []
class BeetsMusicVideos(SearchApiMetadataSourcePlugin[VideoType]): # type: ignore[type-var]
"""Plugin that lets beets import "non-audio" files (e.g. music videos)
by treating selected extensions as supported and creating library
`Item`s for them without going through `MediaFile`.
"""
def __init__(self) -> None: # type: ignore[override, assignment]
super().__init__()
# Default set of extensions to treat as importable media, in
# addition to the formats supported by `mediafile`.
self.config.add(
{
"extensions": [
".mp4",
".m4v",
".mkv",
".avi",
".webm",
],
"imvdb_api_key": "",
}
)
self.imvdb_api_key = self.config["imvdb_api_key"].get(str)
self._patch_import_task_factory()
self.imvdb_api = IMVDBApi(self.imvdb_api_key)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _normalized_extensions(self) -> set[str]:
"""Return the configured extensions as a normalized set.
All values are lowercased and guaranteed to start with a leading
dot, e.g. ``.mp4``.
"""
exts: set[str] = set()
for raw in self.config["extensions"].as_str_seq():
raw = raw.strip().lower()
if not raw:
continue
if not raw.startswith("."):
raw = f".{raw}"
exts.add(raw)
return exts
def _patch_import_task_factory(self) -> None:
"""Monkey-patch `ImportTaskFactory.read_item` so that paths with
one of our configured extensions are turned into `Item`s even if
`mediafile` does not support them. Also patch import tasks to
skip writing tags for music videos and to avoid deleting originals
when they are already inside the library directory.
"""
# Local import to avoid importing importer machinery unless needed.
from beets.importer.tasks import ( # type: ignore[attr-defined]
Action,
ImportTask,
ImportTaskFactory,
)
# Only patch once, even if the plugin is re-instantiated.
if getattr(ImportTaskFactory, "_beets_music_videos_patched", False):
return
original_read_item = ImportTaskFactory.read_item
plugin = self
def read_item_with_videos(
self_: "ImportTaskFactory", path: util.PathBytes
) -> Item | None: # type: ignore[override]
# Determine the file extension of this path.
str_path = util.syspath(path)
ext = os.path.splitext(str_path)[1].lower()
if ext in plugin._normalized_extensions():
# Create an Item without going through MediaFile.
# We deliberately avoid calling Item.from_path() or
# Item.read() because those rely on MediaFile and would
# reject unsupported formats.
item = Item(album_id=None)
# Store a normalized bytes path, as beets expects.
item.path = util.normpath(path)
# Initialize mtime from the actual file.
try:
item.mtime = item.current_mtime()
except OSError:
# If we cannot stat the file for some reason, fall
# back to 0; the file is still importable.
item.mtime = 0
# Derive a simple title from the filename if none is set.
# This mirrors what many users would expect for
# path-based imports.
filename = Path(str_path).stem
if not item.get("title"): # type: ignore[no-any-return]
item["title"] = filename
# Mark the item as a music video via a flexible attribute
# so it can be queried, e.g. `media_type:music_video`.
item["media_type"] = "music_video"
return item
# For all other paths, fall back to the normal behavior,
# which uses Item.from_path() and MediaFile.
return original_read_item(self_, path)
# Patch ImportTask.manipulate_files to skip try_write for music videos
# (MediaFile does not support WebM etc., and writing could corrupt the file).
_original_manipulate_files = ImportTask.manipulate_files
def manipulate_files_with_video_guard(
self_task: "ImportTask",
session: Any,
operation: Any = None,
write: bool = False,
) -> None:
items = self_task.imported_items()
self_task.old_paths = [item.path for item in items] # type: ignore[attr-defined]
for item in items:
if operation is not None:
old_path = item.path
if (
operation != util.MoveOperation.MOVE
and self_task.replaced_items[item] # type: ignore[attr-defined]
and session.lib.directory in util.ancestry(old_path)
):
item.move()
self_task.old_paths.remove(old_path) # type: ignore[attr-defined]
else:
item.move(operation)
# Skip writing tags for music videos; MediaFile does not support
# video formats and could corrupt or truncate the file.
if write and (self_task.apply or self_task.choice_flag == Action.RETAG): # type: ignore[attr-defined]
if item.get("media_type") != "music_video":
item.try_write()
with session.lib.transaction():
for item in self_task.imported_items():
item.store()
plugins.send("import_task_files", session=session, task=self_task)
ImportTask.manipulate_files = manipulate_files_with_video_guard # type: ignore[assignment]
# Mark the class as patched and replace the method. These runtime
# attributes are safe but confuse static type checkers.
ImportTaskFactory._beets_music_videos_patched = True # type: ignore[attr-defined]
ImportTaskFactory._beets_music_videos_original_read_item = ( # type: ignore[attr-defined]
original_read_item
)
ImportTaskFactory.read_item = read_item_with_videos # type: ignore[assignment]
def _search_api(
self,
query_type: Literal["album", "track"],
filters: SearchFilter,
query_string: str = "",
) -> Sequence[VideoType]:
"""Required by SearchApiMetadataSourcePlugin. Search IMVDB and return videos."""
if query_type != "track":
return []
artist = (filters.get("artist") or "").strip()
title = (query_string or "").strip()
query = " ".join((artist, title)).strip()
if not query:
return []
return self.imvdb_api.sync_video_search(query)
def album_for_id(self, album_id: str) -> AlbumInfo | None:
"""We don't provide album metadata."""
return None
def track_for_id(self, track_id: str) -> TrackInfo | None:
"""Look up a single video by ID; not implemented yet."""
return None
def item_candidates(self, item: Item, artist: str, title: str) -> list[TrackInfo]:
results = self._search_api(
"track", {"artist": artist}, query_string=title
)
out: list[TrackInfo] = []
for result in results:
artists_list = result.get("artists", [])
artist_names = [str(a.get("name", "")) for a in artists_list]
first_artist = artist_names[0] if artist_names else ""
# Beets' distance code expects title/artist as strings (e.g. .lower()).
out.append(
TrackInfo(
title=str(result.get("song_title", "")),
artist=first_artist,
artists=artist_names,
track_id=str(result["id"]),
year=str(result.get("year", "")),
)
)
return out