mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2024-11-14 17:40:24 -08:00
f05b09f349
Updates rarfile to 3.1 Updates stevedore to 3.5.0 Updates appdirs to 1.4.4 Updates click to 8.1.3 Updates decorator to 5.1.1 Updates dogpile.cache to 1.1.8 Updates pbr to 5.11.0 Updates pysrt to 1.1.2 Updates pytz to 2022.6 Adds importlib-metadata version 3.1.1 Adds typing-extensions version 4.1.1 Adds zipp version 3.11.0
480 lines
16 KiB
Python
480 lines
16 KiB
Python
import contextlib
|
|
import io
|
|
import os
|
|
import shlex
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
import typing as t
|
|
from types import TracebackType
|
|
|
|
from . import formatting
|
|
from . import termui
|
|
from . import utils
|
|
from ._compat import _find_binary_reader
|
|
|
|
if t.TYPE_CHECKING:
|
|
from .core import BaseCommand
|
|
|
|
|
|
class EchoingStdin:
|
|
def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
|
|
self._input = input
|
|
self._output = output
|
|
self._paused = False
|
|
|
|
def __getattr__(self, x: str) -> t.Any:
|
|
return getattr(self._input, x)
|
|
|
|
def _echo(self, rv: bytes) -> bytes:
|
|
if not self._paused:
|
|
self._output.write(rv)
|
|
|
|
return rv
|
|
|
|
def read(self, n: int = -1) -> bytes:
|
|
return self._echo(self._input.read(n))
|
|
|
|
def read1(self, n: int = -1) -> bytes:
|
|
return self._echo(self._input.read1(n)) # type: ignore
|
|
|
|
def readline(self, n: int = -1) -> bytes:
|
|
return self._echo(self._input.readline(n))
|
|
|
|
def readlines(self) -> t.List[bytes]:
|
|
return [self._echo(x) for x in self._input.readlines()]
|
|
|
|
def __iter__(self) -> t.Iterator[bytes]:
|
|
return iter(self._echo(x) for x in self._input)
|
|
|
|
def __repr__(self) -> str:
|
|
return repr(self._input)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:
|
|
if stream is None:
|
|
yield
|
|
else:
|
|
stream._paused = True
|
|
yield
|
|
stream._paused = False
|
|
|
|
|
|
class _NamedTextIOWrapper(io.TextIOWrapper):
|
|
def __init__(
|
|
self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
|
|
) -> None:
|
|
super().__init__(buffer, **kwargs)
|
|
self._name = name
|
|
self._mode = mode
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def mode(self) -> str:
|
|
return self._mode
|
|
|
|
|
|
def make_input_stream(
|
|
input: t.Optional[t.Union[str, bytes, t.IO]], charset: str
|
|
) -> t.BinaryIO:
|
|
# Is already an input stream.
|
|
if hasattr(input, "read"):
|
|
rv = _find_binary_reader(t.cast(t.IO, input))
|
|
|
|
if rv is not None:
|
|
return rv
|
|
|
|
raise TypeError("Could not find binary reader for input stream.")
|
|
|
|
if input is None:
|
|
input = b""
|
|
elif isinstance(input, str):
|
|
input = input.encode(charset)
|
|
|
|
return io.BytesIO(t.cast(bytes, input))
|
|
|
|
|
|
class Result:
|
|
"""Holds the captured result of an invoked CLI script."""
|
|
|
|
def __init__(
|
|
self,
|
|
runner: "CliRunner",
|
|
stdout_bytes: bytes,
|
|
stderr_bytes: t.Optional[bytes],
|
|
return_value: t.Any,
|
|
exit_code: int,
|
|
exception: t.Optional[BaseException],
|
|
exc_info: t.Optional[
|
|
t.Tuple[t.Type[BaseException], BaseException, TracebackType]
|
|
] = None,
|
|
):
|
|
#: The runner that created the result
|
|
self.runner = runner
|
|
#: The standard output as bytes.
|
|
self.stdout_bytes = stdout_bytes
|
|
#: The standard error as bytes, or None if not available
|
|
self.stderr_bytes = stderr_bytes
|
|
#: The value returned from the invoked command.
|
|
#:
|
|
#: .. versionadded:: 8.0
|
|
self.return_value = return_value
|
|
#: The exit code as integer.
|
|
self.exit_code = exit_code
|
|
#: The exception that happened if one did.
|
|
self.exception = exception
|
|
#: The traceback
|
|
self.exc_info = exc_info
|
|
|
|
@property
|
|
def output(self) -> str:
|
|
"""The (standard) output as unicode string."""
|
|
return self.stdout
|
|
|
|
@property
|
|
def stdout(self) -> str:
|
|
"""The standard output as unicode string."""
|
|
return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
|
|
"\r\n", "\n"
|
|
)
|
|
|
|
@property
|
|
def stderr(self) -> str:
|
|
"""The standard error as unicode string."""
|
|
if self.stderr_bytes is None:
|
|
raise ValueError("stderr not separately captured")
|
|
return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
|
|
"\r\n", "\n"
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
exc_str = repr(self.exception) if self.exception else "okay"
|
|
return f"<{type(self).__name__} {exc_str}>"
|
|
|
|
|
|
class CliRunner:
|
|
"""The CLI runner provides functionality to invoke a Click command line
|
|
script for unittesting purposes in a isolated environment. This only
|
|
works in single-threaded systems without any concurrency as it changes the
|
|
global interpreter state.
|
|
|
|
:param charset: the character set for the input and output data.
|
|
:param env: a dictionary with environment variables for overriding.
|
|
:param echo_stdin: if this is set to `True`, then reading from stdin writes
|
|
to stdout. This is useful for showing examples in
|
|
some circumstances. Note that regular prompts
|
|
will automatically echo the input.
|
|
:param mix_stderr: if this is set to `False`, then stdout and stderr are
|
|
preserved as independent streams. This is useful for
|
|
Unix-philosophy apps that have predictable stdout and
|
|
noisy stderr, such that each may be measured
|
|
independently
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
charset: str = "utf-8",
|
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
|
echo_stdin: bool = False,
|
|
mix_stderr: bool = True,
|
|
) -> None:
|
|
self.charset = charset
|
|
self.env = env or {}
|
|
self.echo_stdin = echo_stdin
|
|
self.mix_stderr = mix_stderr
|
|
|
|
def get_default_prog_name(self, cli: "BaseCommand") -> str:
|
|
"""Given a command object it will return the default program name
|
|
for it. The default is the `name` attribute or ``"root"`` if not
|
|
set.
|
|
"""
|
|
return cli.name or "root"
|
|
|
|
def make_env(
|
|
self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None
|
|
) -> t.Mapping[str, t.Optional[str]]:
|
|
"""Returns the environment overrides for invoking a script."""
|
|
rv = dict(self.env)
|
|
if overrides:
|
|
rv.update(overrides)
|
|
return rv
|
|
|
|
@contextlib.contextmanager
|
|
def isolation(
|
|
self,
|
|
input: t.Optional[t.Union[str, bytes, t.IO]] = None,
|
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
|
color: bool = False,
|
|
) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
|
|
"""A context manager that sets up the isolation for invoking of a
|
|
command line tool. This sets up stdin with the given input data
|
|
and `os.environ` with the overrides from the given dictionary.
|
|
This also rebinds some internals in Click to be mocked (like the
|
|
prompt functionality).
|
|
|
|
This is automatically done in the :meth:`invoke` method.
|
|
|
|
:param input: the input stream to put into sys.stdin.
|
|
:param env: the environment overrides as dictionary.
|
|
:param color: whether the output should contain color codes. The
|
|
application can still override this explicitly.
|
|
|
|
.. versionchanged:: 8.0
|
|
``stderr`` is opened with ``errors="backslashreplace"``
|
|
instead of the default ``"strict"``.
|
|
|
|
.. versionchanged:: 4.0
|
|
Added the ``color`` parameter.
|
|
"""
|
|
bytes_input = make_input_stream(input, self.charset)
|
|
echo_input = None
|
|
|
|
old_stdin = sys.stdin
|
|
old_stdout = sys.stdout
|
|
old_stderr = sys.stderr
|
|
old_forced_width = formatting.FORCED_WIDTH
|
|
formatting.FORCED_WIDTH = 80
|
|
|
|
env = self.make_env(env)
|
|
|
|
bytes_output = io.BytesIO()
|
|
|
|
if self.echo_stdin:
|
|
bytes_input = echo_input = t.cast(
|
|
t.BinaryIO, EchoingStdin(bytes_input, bytes_output)
|
|
)
|
|
|
|
sys.stdin = text_input = _NamedTextIOWrapper(
|
|
bytes_input, encoding=self.charset, name="<stdin>", mode="r"
|
|
)
|
|
|
|
if self.echo_stdin:
|
|
# Force unbuffered reads, otherwise TextIOWrapper reads a
|
|
# large chunk which is echoed early.
|
|
text_input._CHUNK_SIZE = 1 # type: ignore
|
|
|
|
sys.stdout = _NamedTextIOWrapper(
|
|
bytes_output, encoding=self.charset, name="<stdout>", mode="w"
|
|
)
|
|
|
|
bytes_error = None
|
|
if self.mix_stderr:
|
|
sys.stderr = sys.stdout
|
|
else:
|
|
bytes_error = io.BytesIO()
|
|
sys.stderr = _NamedTextIOWrapper(
|
|
bytes_error,
|
|
encoding=self.charset,
|
|
name="<stderr>",
|
|
mode="w",
|
|
errors="backslashreplace",
|
|
)
|
|
|
|
@_pause_echo(echo_input) # type: ignore
|
|
def visible_input(prompt: t.Optional[str] = None) -> str:
|
|
sys.stdout.write(prompt or "")
|
|
val = text_input.readline().rstrip("\r\n")
|
|
sys.stdout.write(f"{val}\n")
|
|
sys.stdout.flush()
|
|
return val
|
|
|
|
@_pause_echo(echo_input) # type: ignore
|
|
def hidden_input(prompt: t.Optional[str] = None) -> str:
|
|
sys.stdout.write(f"{prompt or ''}\n")
|
|
sys.stdout.flush()
|
|
return text_input.readline().rstrip("\r\n")
|
|
|
|
@_pause_echo(echo_input) # type: ignore
|
|
def _getchar(echo: bool) -> str:
|
|
char = sys.stdin.read(1)
|
|
|
|
if echo:
|
|
sys.stdout.write(char)
|
|
|
|
sys.stdout.flush()
|
|
return char
|
|
|
|
default_color = color
|
|
|
|
def should_strip_ansi(
|
|
stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
|
|
) -> bool:
|
|
if color is None:
|
|
return not default_color
|
|
return not color
|
|
|
|
old_visible_prompt_func = termui.visible_prompt_func
|
|
old_hidden_prompt_func = termui.hidden_prompt_func
|
|
old__getchar_func = termui._getchar
|
|
old_should_strip_ansi = utils.should_strip_ansi # type: ignore
|
|
termui.visible_prompt_func = visible_input
|
|
termui.hidden_prompt_func = hidden_input
|
|
termui._getchar = _getchar
|
|
utils.should_strip_ansi = should_strip_ansi # type: ignore
|
|
|
|
old_env = {}
|
|
try:
|
|
for key, value in env.items():
|
|
old_env[key] = os.environ.get(key)
|
|
if value is None:
|
|
try:
|
|
del os.environ[key]
|
|
except Exception:
|
|
pass
|
|
else:
|
|
os.environ[key] = value
|
|
yield (bytes_output, bytes_error)
|
|
finally:
|
|
for key, value in old_env.items():
|
|
if value is None:
|
|
try:
|
|
del os.environ[key]
|
|
except Exception:
|
|
pass
|
|
else:
|
|
os.environ[key] = value
|
|
sys.stdout = old_stdout
|
|
sys.stderr = old_stderr
|
|
sys.stdin = old_stdin
|
|
termui.visible_prompt_func = old_visible_prompt_func
|
|
termui.hidden_prompt_func = old_hidden_prompt_func
|
|
termui._getchar = old__getchar_func
|
|
utils.should_strip_ansi = old_should_strip_ansi # type: ignore
|
|
formatting.FORCED_WIDTH = old_forced_width
|
|
|
|
def invoke(
|
|
self,
|
|
cli: "BaseCommand",
|
|
args: t.Optional[t.Union[str, t.Sequence[str]]] = None,
|
|
input: t.Optional[t.Union[str, bytes, t.IO]] = None,
|
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
|
catch_exceptions: bool = True,
|
|
color: bool = False,
|
|
**extra: t.Any,
|
|
) -> Result:
|
|
"""Invokes a command in an isolated environment. The arguments are
|
|
forwarded directly to the command line script, the `extra` keyword
|
|
arguments are passed to the :meth:`~clickpkg.Command.main` function of
|
|
the command.
|
|
|
|
This returns a :class:`Result` object.
|
|
|
|
:param cli: the command to invoke
|
|
:param args: the arguments to invoke. It may be given as an iterable
|
|
or a string. When given as string it will be interpreted
|
|
as a Unix shell command. More details at
|
|
:func:`shlex.split`.
|
|
:param input: the input data for `sys.stdin`.
|
|
:param env: the environment overrides.
|
|
:param catch_exceptions: Whether to catch any other exceptions than
|
|
``SystemExit``.
|
|
:param extra: the keyword arguments to pass to :meth:`main`.
|
|
:param color: whether the output should contain color codes. The
|
|
application can still override this explicitly.
|
|
|
|
.. versionchanged:: 8.0
|
|
The result object has the ``return_value`` attribute with
|
|
the value returned from the invoked command.
|
|
|
|
.. versionchanged:: 4.0
|
|
Added the ``color`` parameter.
|
|
|
|
.. versionchanged:: 3.0
|
|
Added the ``catch_exceptions`` parameter.
|
|
|
|
.. versionchanged:: 3.0
|
|
The result object has the ``exc_info`` attribute with the
|
|
traceback if available.
|
|
"""
|
|
exc_info = None
|
|
with self.isolation(input=input, env=env, color=color) as outstreams:
|
|
return_value = None
|
|
exception: t.Optional[BaseException] = None
|
|
exit_code = 0
|
|
|
|
if isinstance(args, str):
|
|
args = shlex.split(args)
|
|
|
|
try:
|
|
prog_name = extra.pop("prog_name")
|
|
except KeyError:
|
|
prog_name = self.get_default_prog_name(cli)
|
|
|
|
try:
|
|
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
|
|
except SystemExit as e:
|
|
exc_info = sys.exc_info()
|
|
e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code)
|
|
|
|
if e_code is None:
|
|
e_code = 0
|
|
|
|
if e_code != 0:
|
|
exception = e
|
|
|
|
if not isinstance(e_code, int):
|
|
sys.stdout.write(str(e_code))
|
|
sys.stdout.write("\n")
|
|
e_code = 1
|
|
|
|
exit_code = e_code
|
|
|
|
except Exception as e:
|
|
if not catch_exceptions:
|
|
raise
|
|
exception = e
|
|
exit_code = 1
|
|
exc_info = sys.exc_info()
|
|
finally:
|
|
sys.stdout.flush()
|
|
stdout = outstreams[0].getvalue()
|
|
if self.mix_stderr:
|
|
stderr = None
|
|
else:
|
|
stderr = outstreams[1].getvalue() # type: ignore
|
|
|
|
return Result(
|
|
runner=self,
|
|
stdout_bytes=stdout,
|
|
stderr_bytes=stderr,
|
|
return_value=return_value,
|
|
exit_code=exit_code,
|
|
exception=exception,
|
|
exc_info=exc_info, # type: ignore
|
|
)
|
|
|
|
@contextlib.contextmanager
|
|
def isolated_filesystem(
|
|
self, temp_dir: t.Optional[t.Union[str, os.PathLike]] = None
|
|
) -> t.Iterator[str]:
|
|
"""A context manager that creates a temporary directory and
|
|
changes the current working directory to it. This isolates tests
|
|
that affect the contents of the CWD to prevent them from
|
|
interfering with each other.
|
|
|
|
:param temp_dir: Create the temporary directory under this
|
|
directory. If given, the created directory is not removed
|
|
when exiting.
|
|
|
|
.. versionchanged:: 8.0
|
|
Added the ``temp_dir`` parameter.
|
|
"""
|
|
cwd = os.getcwd()
|
|
dt = tempfile.mkdtemp(dir=temp_dir) # type: ignore[type-var]
|
|
os.chdir(dt)
|
|
|
|
try:
|
|
yield t.cast(str, dt)
|
|
finally:
|
|
os.chdir(cwd)
|
|
|
|
if temp_dir is None:
|
|
try:
|
|
shutil.rmtree(dt)
|
|
except OSError: # noqa: B014
|
|
pass
|