Files
beets_music_video_deux/test/helper.py
2026-03-18 16:38:41 -04:00

226 lines
6.0 KiB
Python

# Copyright: Copyright (c) 2020., Adam Jakab
#
# Author: Adam Jakab <adam at jakab dot pro>
# Created: 3/26/20, 4:50 PM
# License: See LICENSE.txt
import os
import shutil
import sys
import tempfile
from contextlib import contextmanager
from unittest import TestCase
import beets
import six
import yaml
from beets import logging
from beets import plugins
from beets import ui
from beets import util
from beets.util import (
syspath,
bytestring_path,
displayable_path,
)
from six import StringIO
from beetsplug import beets_music_videos
from beetsplug.beets_music_videos import common
logging.getLogger("beets").propagate = True
# Values
PLUGIN_NAME = common.plg_ns["__PLUGIN_NAME__"]
PLUGIN_SHORT_DESCRIPTION = common.plg_ns["__PLUGIN_SHORT_DESCRIPTION__"]
PLUGIN_VERSION = common.plg_ns["__version__"]
PACKAGE_NAME = common.plg_ns["__PACKAGE_NAME__"]
PACKAGE_TITLE = common.plg_ns["__PACKAGE_TITLE__"]
class LogCapture(logging.Handler):
def __init__(self):
super().__init__()
self.messages = []
def emit(self, record):
self.messages.append(six.text_type(record.msg))
@contextmanager
def capture_log(logger="beets", suppress_output=True):
capture = LogCapture()
log = logging.getLogger(logger)
log.propagate = True
if suppress_output:
# Is this too violent?
log.handlers = []
log.addHandler(capture)
try:
yield capture.messages
finally:
log.removeHandler(capture)
@contextmanager
def capture_stdout(suppress_output=True):
"""Save stdout in a StringIO.
>>> with capture_stdout() as output:
... print('spam')
...
>>> output.getvalue()
'spam'
"""
org = sys.stdout
sys.stdout = capture = StringIO()
try:
yield sys.stdout
finally:
sys.stdout = org
# if not suppress_output:
print(capture.getvalue())
@contextmanager
def control_stdin(userinput=None):
"""Sends ``input`` to stdin.
>>> with control_stdin('yes'):
... input()
'yes'
"""
org = sys.stdin
sys.stdin = StringIO(userinput)
try:
yield sys.stdin
finally:
sys.stdin = org
def _convert_args(args):
"""Convert args to strings"""
for i, elem in enumerate(args):
if isinstance(elem, bytes):
args[i] = elem.decode(util.arg_encoding())
return args
class Assertions:
def assertIsFile(self: TestCase, path):
self.assertTrue(
os.path.isfile(syspath(path)),
msg=f"Path is not a file: {displayable_path(path)}",
)
class TestHelper(TestCase, Assertions):
_test_config_dir_ = os.path.join(
bytestring_path(os.path.dirname(__file__)), b"config"
)
_test_fixture_dir = os.path.join(
bytestring_path(os.path.dirname(__file__)), b"fixtures"
)
_test_target_dir = bytestring_path("/tmp/beets-autofix")
def setUp(self):
"""Setup required for running test. Must be called before running any
tests.
"""
self.reset_beets(config_file=b"empty.yml")
def tearDown(self):
self.teardown_beets()
def reset_beets(self, config_file: bytes):
self.teardown_beets()
plugins._classes = {beets_music_videos.BeetsMusicVideoPlugin}
self._setup_beets(config_file)
def _setup_beets(self, config_file: bytes):
self.addCleanup(self.teardown_beets)
os.environ["BEETSDIR"] = self.mkdtemp()
self.config = beets.config
self.config.clear()
# add user configuration
config_file = format(os.path.join(self._test_config_dir_, config_file).decode())
shutil.copyfile(config_file, self.config.user_config_path())
self.config.read()
self.config["plugins"] = []
self.config["verbose"] = True
self.config["ui"]["color"] = False
self.config["threaded"] = False
self.config["import"]["copy"] = False
os.makedirs(self._test_target_dir, exist_ok=True)
libdir = self.mkdtemp()
self.config["directory"] = libdir
self.libdir = bytestring_path(libdir)
self.lib = beets.library.Library(":memory:", self.libdir)
# This will initialize (create instance) of the plugins
plugins.find_plugins()
def teardown_beets(self):
self.unload_plugins()
shutil.rmtree(self._test_target_dir, ignore_errors=True)
if hasattr(self, "_tempdirs"):
for tempdir in self._tempdirs:
if os.path.exists(tempdir):
shutil.rmtree(syspath(tempdir), ignore_errors=True)
self._tempdirs = []
if hasattr(self, "lib") and hasattr(self.lib, "_connections"):
del self.lib._connections
if "BEETSDIR" in os.environ:
del os.environ["BEETSDIR"]
if hasattr(self, "config"):
self.config.clear()
# beets.config.read(user=False, defaults=True)
def mkdtemp(self):
# This return a str path, i.e. Unicode on Python 3. We need this in
# order to put paths into the configuration.
path = tempfile.mkdtemp()
self._tempdirs.append(path)
return path
@staticmethod
def unload_plugins():
for plugin in plugins._classes:
plugin.listeners = None
plugins._classes = set()
plugins._instances = {}
def runcli(self, *args):
# TODO mock stdin
with capture_stdout() as out:
try:
ui._raw_main(_convert_args(list(args)), self.lib)
except ui.UserError as u:
# TODO remove this and handle exceptions in tests
print(u.args[0])
return out.getvalue()
def lib_path(self, path):
return os.path.join(self.libdir, path.replace(b"/", bytestring_path(os.sep)))
@staticmethod
def _dump_config(cfg: Subview):
# print(json.dumps(cfg.get(), indent=4, sort_keys=False))
flat = cfg.flatten()
print(
yaml.dump(
flat, Dumper=Dumper, default_flow_style=None, indent=2, width=1000
)
)