mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2025-01-23 19:33:01 -08:00
208 lines
5.3 KiB
Python
208 lines
5.3 KiB
Python
import os
|
|
import pathlib
|
|
import tempfile
|
|
import functools
|
|
import contextlib
|
|
import types
|
|
import importlib
|
|
import inspect
|
|
import warnings
|
|
import itertools
|
|
|
|
from typing import Union, Optional, cast
|
|
from .abc import ResourceReader, Traversable
|
|
|
|
from ._compat import wrap_spec
|
|
|
|
Package = Union[types.ModuleType, str]
|
|
Anchor = Package
|
|
|
|
|
|
def package_to_anchor(func):
|
|
"""
|
|
Replace 'package' parameter as 'anchor' and warn about the change.
|
|
|
|
Other errors should fall through.
|
|
|
|
>>> files('a', 'b')
|
|
Traceback (most recent call last):
|
|
TypeError: files() takes from 0 to 1 positional arguments but 2 were given
|
|
"""
|
|
undefined = object()
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(anchor=undefined, package=undefined):
|
|
if package is not undefined:
|
|
if anchor is not undefined:
|
|
return func(anchor, package)
|
|
warnings.warn(
|
|
"First parameter to files is renamed to 'anchor'",
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return func(package)
|
|
elif anchor is undefined:
|
|
return func()
|
|
return func(anchor)
|
|
|
|
return wrapper
|
|
|
|
|
|
@package_to_anchor
|
|
def files(anchor: Optional[Anchor] = None) -> Traversable:
|
|
"""
|
|
Get a Traversable resource for an anchor.
|
|
"""
|
|
return from_package(resolve(anchor))
|
|
|
|
|
|
def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]:
|
|
"""
|
|
Return the package's loader if it's a ResourceReader.
|
|
"""
|
|
# We can't use
|
|
# a issubclass() check here because apparently abc.'s __subclasscheck__()
|
|
# hook wants to create a weak reference to the object, but
|
|
# zipimport.zipimporter does not support weak references, resulting in a
|
|
# TypeError. That seems terrible.
|
|
spec = package.__spec__
|
|
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
|
|
if reader is None:
|
|
return None
|
|
return reader(spec.name) # type: ignore
|
|
|
|
|
|
@functools.singledispatch
|
|
def resolve(cand: Optional[Anchor]) -> types.ModuleType:
|
|
return cast(types.ModuleType, cand)
|
|
|
|
|
|
@resolve.register
|
|
def _(cand: str) -> types.ModuleType:
|
|
return importlib.import_module(cand)
|
|
|
|
|
|
@resolve.register
|
|
def _(cand: None) -> types.ModuleType:
|
|
return resolve(_infer_caller().f_globals['__name__'])
|
|
|
|
|
|
def _infer_caller():
|
|
"""
|
|
Walk the stack and find the frame of the first caller not in this module.
|
|
"""
|
|
|
|
def is_this_file(frame_info):
|
|
return frame_info.filename == __file__
|
|
|
|
def is_wrapper(frame_info):
|
|
return frame_info.function == 'wrapper'
|
|
|
|
not_this_file = itertools.filterfalse(is_this_file, inspect.stack())
|
|
# also exclude 'wrapper' due to singledispatch in the call stack
|
|
callers = itertools.filterfalse(is_wrapper, not_this_file)
|
|
return next(callers).frame
|
|
|
|
|
|
def from_package(package: types.ModuleType):
|
|
"""
|
|
Return a Traversable object for the given package.
|
|
|
|
"""
|
|
spec = wrap_spec(package)
|
|
reader = spec.loader.get_resource_reader(spec.name)
|
|
return reader.files()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _tempfile(
|
|
reader,
|
|
suffix='',
|
|
# gh-93353: Keep a reference to call os.remove() in late Python
|
|
# finalization.
|
|
*,
|
|
_os_remove=os.remove,
|
|
):
|
|
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
|
|
# blocks due to the need to close the temporary file to work on Windows
|
|
# properly.
|
|
fd, raw_path = tempfile.mkstemp(suffix=suffix)
|
|
try:
|
|
try:
|
|
os.write(fd, reader())
|
|
finally:
|
|
os.close(fd)
|
|
del reader
|
|
yield pathlib.Path(raw_path)
|
|
finally:
|
|
try:
|
|
_os_remove(raw_path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def _temp_file(path):
|
|
return _tempfile(path.read_bytes, suffix=path.name)
|
|
|
|
|
|
def _is_present_dir(path: Traversable) -> bool:
|
|
"""
|
|
Some Traversables implement ``is_dir()`` to raise an
|
|
exception (i.e. ``FileNotFoundError``) when the
|
|
directory doesn't exist. This function wraps that call
|
|
to always return a boolean and only return True
|
|
if there's a dir and it exists.
|
|
"""
|
|
with contextlib.suppress(FileNotFoundError):
|
|
return path.is_dir()
|
|
return False
|
|
|
|
|
|
@functools.singledispatch
|
|
def as_file(path):
|
|
"""
|
|
Given a Traversable object, return that object as a
|
|
path on the local file system in a context manager.
|
|
"""
|
|
return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
|
|
|
|
|
|
@as_file.register(pathlib.Path)
|
|
@contextlib.contextmanager
|
|
def _(path):
|
|
"""
|
|
Degenerate behavior for pathlib.Path objects.
|
|
"""
|
|
yield path
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _temp_path(dir: tempfile.TemporaryDirectory):
|
|
"""
|
|
Wrap tempfile.TemporyDirectory to return a pathlib object.
|
|
"""
|
|
with dir as result:
|
|
yield pathlib.Path(result)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _temp_dir(path):
|
|
"""
|
|
Given a traversable dir, recursively replicate the whole tree
|
|
to the file system in a context manager.
|
|
"""
|
|
assert path.is_dir()
|
|
with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
|
|
yield _write_contents(temp_dir, path)
|
|
|
|
|
|
def _write_contents(target, source):
|
|
child = target.joinpath(source.name)
|
|
if source.is_dir():
|
|
child.mkdir()
|
|
for item in source.iterdir():
|
|
_write_contents(child, item)
|
|
else:
|
|
child.open('wb').write(source.read_bytes())
|
|
return child
|