mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2025-01-09 04:23:16 -08:00
56c6773c6b
Updates colorama to 0.4.6 Adds confuse version 1.7.0 Updates jellyfish to 0.9.0 Adds mediafile 0.10.1 Updates munkres to 1.1.4 Updates musicbrainzngs to 0.7.1 Updates mutagen to 1.46.0 Updates pyyaml to 6.0 Updates unidecode to 1.3.6
278 lines
11 KiB
Python
278 lines
11 KiB
Python
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
|
|
import re
|
|
import sys
|
|
import os
|
|
|
|
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL
|
|
from .winterm import enable_vt_processing, WinTerm, WinColor, WinStyle
|
|
from .win32 import windll, winapi_test
|
|
|
|
|
|
winterm = None
|
|
if windll is not None:
|
|
winterm = WinTerm()
|
|
|
|
|
|
class StreamWrapper(object):
|
|
'''
|
|
Wraps a stream (such as stdout), acting as a transparent proxy for all
|
|
attribute access apart from method 'write()', which is delegated to our
|
|
Converter instance.
|
|
'''
|
|
def __init__(self, wrapped, converter):
|
|
# double-underscore everything to prevent clashes with names of
|
|
# attributes on the wrapped stream object.
|
|
self.__wrapped = wrapped
|
|
self.__convertor = converter
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.__wrapped, name)
|
|
|
|
def __enter__(self, *args, **kwargs):
|
|
# special method lookup bypasses __getattr__/__getattribute__, see
|
|
# https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit
|
|
# thus, contextlib magic methods are not proxied via __getattr__
|
|
return self.__wrapped.__enter__(*args, **kwargs)
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
return self.__wrapped.__exit__(*args, **kwargs)
|
|
|
|
def __setstate__(self, state):
|
|
self.__dict__ = state
|
|
|
|
def __getstate__(self):
|
|
return self.__dict__
|
|
|
|
def write(self, text):
|
|
self.__convertor.write(text)
|
|
|
|
def isatty(self):
|
|
stream = self.__wrapped
|
|
if 'PYCHARM_HOSTED' in os.environ:
|
|
if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__):
|
|
return True
|
|
try:
|
|
stream_isatty = stream.isatty
|
|
except AttributeError:
|
|
return False
|
|
else:
|
|
return stream_isatty()
|
|
|
|
@property
|
|
def closed(self):
|
|
stream = self.__wrapped
|
|
try:
|
|
return stream.closed
|
|
# AttributeError in the case that the stream doesn't support being closed
|
|
# ValueError for the case that the stream has already been detached when atexit runs
|
|
except (AttributeError, ValueError):
|
|
return True
|
|
|
|
|
|
class AnsiToWin32(object):
|
|
'''
|
|
Implements a 'write()' method which, on Windows, will strip ANSI character
|
|
sequences from the text, and if outputting to a tty, will convert them into
|
|
win32 function calls.
|
|
'''
|
|
ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer
|
|
ANSI_OSC_RE = re.compile('\001?\033\\]([^\a]*)(\a)\002?') # Operating System Command
|
|
|
|
def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
|
|
# The wrapped stream (normally sys.stdout or sys.stderr)
|
|
self.wrapped = wrapped
|
|
|
|
# should we reset colors to defaults after every .write()
|
|
self.autoreset = autoreset
|
|
|
|
# create the proxy wrapping our output stream
|
|
self.stream = StreamWrapper(wrapped, self)
|
|
|
|
on_windows = os.name == 'nt'
|
|
# We test if the WinAPI works, because even if we are on Windows
|
|
# we may be using a terminal that doesn't support the WinAPI
|
|
# (e.g. Cygwin Terminal). In this case it's up to the terminal
|
|
# to support the ANSI codes.
|
|
conversion_supported = on_windows and winapi_test()
|
|
try:
|
|
fd = wrapped.fileno()
|
|
except Exception:
|
|
fd = -1
|
|
system_has_native_ansi = not on_windows or enable_vt_processing(fd)
|
|
have_tty = not self.stream.closed and self.stream.isatty()
|
|
need_conversion = conversion_supported and not system_has_native_ansi
|
|
|
|
# should we strip ANSI sequences from our output?
|
|
if strip is None:
|
|
strip = need_conversion or not have_tty
|
|
self.strip = strip
|
|
|
|
# should we should convert ANSI sequences into win32 calls?
|
|
if convert is None:
|
|
convert = need_conversion and have_tty
|
|
self.convert = convert
|
|
|
|
# dict of ansi codes to win32 functions and parameters
|
|
self.win32_calls = self.get_win32_calls()
|
|
|
|
# are we wrapping stderr?
|
|
self.on_stderr = self.wrapped is sys.stderr
|
|
|
|
def should_wrap(self):
|
|
'''
|
|
True if this class is actually needed. If false, then the output
|
|
stream will not be affected, nor will win32 calls be issued, so
|
|
wrapping stdout is not actually required. This will generally be
|
|
False on non-Windows platforms, unless optional functionality like
|
|
autoreset has been requested using kwargs to init()
|
|
'''
|
|
return self.convert or self.strip or self.autoreset
|
|
|
|
def get_win32_calls(self):
|
|
if self.convert and winterm:
|
|
return {
|
|
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
|
|
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
|
|
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
|
|
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
|
|
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
|
|
AnsiFore.RED: (winterm.fore, WinColor.RED),
|
|
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
|
|
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
|
|
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
|
|
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
|
|
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
|
|
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
|
|
AnsiFore.RESET: (winterm.fore, ),
|
|
AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True),
|
|
AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True),
|
|
AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True),
|
|
AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True),
|
|
AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True),
|
|
AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True),
|
|
AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True),
|
|
AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True),
|
|
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
|
|
AnsiBack.RED: (winterm.back, WinColor.RED),
|
|
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
|
|
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
|
|
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
|
|
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
|
|
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
|
|
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
|
|
AnsiBack.RESET: (winterm.back, ),
|
|
AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True),
|
|
AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True),
|
|
AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True),
|
|
AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True),
|
|
AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True),
|
|
AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True),
|
|
AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True),
|
|
AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True),
|
|
}
|
|
return dict()
|
|
|
|
def write(self, text):
|
|
if self.strip or self.convert:
|
|
self.write_and_convert(text)
|
|
else:
|
|
self.wrapped.write(text)
|
|
self.wrapped.flush()
|
|
if self.autoreset:
|
|
self.reset_all()
|
|
|
|
|
|
def reset_all(self):
|
|
if self.convert:
|
|
self.call_win32('m', (0,))
|
|
elif not self.strip and not self.stream.closed:
|
|
self.wrapped.write(Style.RESET_ALL)
|
|
|
|
|
|
def write_and_convert(self, text):
|
|
'''
|
|
Write the given text to our wrapped stream, stripping any ANSI
|
|
sequences from the text, and optionally converting them into win32
|
|
calls.
|
|
'''
|
|
cursor = 0
|
|
text = self.convert_osc(text)
|
|
for match in self.ANSI_CSI_RE.finditer(text):
|
|
start, end = match.span()
|
|
self.write_plain_text(text, cursor, start)
|
|
self.convert_ansi(*match.groups())
|
|
cursor = end
|
|
self.write_plain_text(text, cursor, len(text))
|
|
|
|
|
|
def write_plain_text(self, text, start, end):
|
|
if start < end:
|
|
self.wrapped.write(text[start:end])
|
|
self.wrapped.flush()
|
|
|
|
|
|
def convert_ansi(self, paramstring, command):
|
|
if self.convert:
|
|
params = self.extract_params(command, paramstring)
|
|
self.call_win32(command, params)
|
|
|
|
|
|
def extract_params(self, command, paramstring):
|
|
if command in 'Hf':
|
|
params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';'))
|
|
while len(params) < 2:
|
|
# defaults:
|
|
params = params + (1,)
|
|
else:
|
|
params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0)
|
|
if len(params) == 0:
|
|
# defaults:
|
|
if command in 'JKm':
|
|
params = (0,)
|
|
elif command in 'ABCD':
|
|
params = (1,)
|
|
|
|
return params
|
|
|
|
|
|
def call_win32(self, command, params):
|
|
if command == 'm':
|
|
for param in params:
|
|
if param in self.win32_calls:
|
|
func_args = self.win32_calls[param]
|
|
func = func_args[0]
|
|
args = func_args[1:]
|
|
kwargs = dict(on_stderr=self.on_stderr)
|
|
func(*args, **kwargs)
|
|
elif command in 'J':
|
|
winterm.erase_screen(params[0], on_stderr=self.on_stderr)
|
|
elif command in 'K':
|
|
winterm.erase_line(params[0], on_stderr=self.on_stderr)
|
|
elif command in 'Hf': # cursor position - absolute
|
|
winterm.set_cursor_position(params, on_stderr=self.on_stderr)
|
|
elif command in 'ABCD': # cursor position - relative
|
|
n = params[0]
|
|
# A - up, B - down, C - forward, D - back
|
|
x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command]
|
|
winterm.cursor_adjust(x, y, on_stderr=self.on_stderr)
|
|
|
|
|
|
def convert_osc(self, text):
|
|
for match in self.ANSI_OSC_RE.finditer(text):
|
|
start, end = match.span()
|
|
text = text[:start] + text[end:]
|
|
paramstring, command = match.groups()
|
|
if command == BEL:
|
|
if paramstring.count(";") == 1:
|
|
params = paramstring.split(";")
|
|
# 0 - change title and icon (we will only change title)
|
|
# 1 - change icon (we don't support this)
|
|
# 2 - change title
|
|
if params[0] in '02':
|
|
winterm.set_title(params[1])
|
|
return text
|
|
|
|
|
|
def flush(self):
|
|
self.wrapped.flush()
|